In [None]:
# installing spark in colab and creating spark session

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

In [None]:
from pyspark import SparkFiles
from graphframes import GraphFrame
from pyspark.sql.functions import count, desc, asc
 
class Graphframes:
    def __init__(self, inp):
        
        spark.sparkContext.addFile(inp)
        self.reddit = spark.read.csv("file://"+SparkFiles.get("soc-redditHyperlinks-body.tsv"), sep=r'\t', header = True)
        
        self.createGraph()
        self.solutions()
    def createGraph(self):
        src = self.reddit.select("SOURCE_SUBREDDIT").distinct()
        dst = self.reddit.select("TARGET_SUBREDDIT").distinct()
        # spark dataframe union retains duplicates, hence we use distinct
        self.vertices = src.union(dst).distinct().withColumnRenamed("SOURCE_SUBREDDIT", "id")
        
        self.edges = self.reddit.select("SOURCE_SUBREDDIT", "TARGET_SUBREDDIT", "LINK_SENTIMENT")\
        .withColumnRenamed("SOURCE_SUBREDDIT", "src")\
        .withColumnRenamed("TARGET_SUBREDDIT", "dst")\
        .withColumnRenamed("LINK_SENTIMENT", "sentiment")
        
    def solutions(self):
        g = GraphFrame(self.vertices, self.edges) # step that created the graph
        g.cache()
                
        #Find the top 5 nodes with the highest outdegree and find the count of the number of outgoing edges in each
        display(g.outDegrees.orderBy(desc("outDegree")).head(5))
        
        #Find the top 5 nodes with the highest indegree and find the count of the number of incoming edges in each
        display(g.inDegrees.orderBy(desc("inDegree")).head(5))
        
        #Calculate PageRank for each of the nodes and output the top 5 nodes with the highest PageRank values
        ranks = g.pageRank(maxIter=10)
        ranks.cache()
        display(ranks.vertices.orderBy(desc("pagerank")).head(5))
        
        
        !mkdir /content/checkpoints
        # we need checkpointing to save temperory data used in big processes like finding connected components
        sc.setCheckpointDir('/content/checkpoints')
        
        # Run the connected components algorithm on it and find the top 5 components with the largest number of nodes.
        display(g.connectedComponents().orderBy(desc('component')).head(5))
        
        # Run the triangle counts algorithm on each of the vertices and output the top 5 vertices with the largest triangle count
        display(g.triangleCount().orderBy(desc("count")).head(5))
 
Graphframes("https://snap.stanford.edu/data/soc-redditHyperlinks-body.tsv")