In [None]:
#Codes for Setting ups for the rest of our projects on shell

#installing packages
#sudo python3.7 -m pip install IPython
#sudo python3.7 -m pip install pandas
#sudo python3.7 -m pip install boto3
#sudo python3.7 -m pip install networkx

#setting up for graphframes
#spark-shell --jars scala-logging_2.12-3.5.0.jar
#pyspark --packages graphframes:graphframes:0.8.1-spark2.4-s_2.11
#nano ~/.bashrc
#export SPARK_OPTS="--packages graphframes:graphframes:0.8.1-spark2.4-s_2.11"
#source .bashrc
#sc.addPyFile('/home/hadoop/.ivy2/jars/graphframes_graphframes-0.8.1-spark2.4-s_2.11.jar')

#sudo cp "/home/hadoop/.ivy2/jars/graphframes_graphframes-0.8.1-spark2.4-s_2.11.jar" "/usr/lib/spark/jars"
#pyspark --packages graphframes:graphframes:0.8.1-spark2.4-s_2.11 --jars graphframes:0.8.1-spark2.4-s_2.11.jar

#kill jobs that are not needed
#yarn application -kill application_1605965195748_0042
#hadoop job -kill RUNNING

In [None]:
#import the graphframe packege(the regular installation with pip won't provide the lastest version which matches spark2.4)
sc.addPyFile('/usr/lib/spark/jars/graphframes_graphframes-0.8.1-spark2.4-s_2.11.jar')

In [None]:
sc.version

In [None]:
from graphframes import *

In [None]:
#import all the other packages
import pandas as pd

In [None]:
from pyspark.sql.functions import *
from pyspark.sql import *

In [None]:
from functools import reduce

In [None]:
from IPython.display import display

In [None]:
from io import StringIO
import boto3

In [None]:
#read in data
relations = spark.read.option("inferSchema", "true").option("header",
"true").csv("s3://smokeeveryday/data420/routes.csv")

In [None]:
#check the layout and partitions
relations.printSchema()

In [None]:
relations.rdd.getNumPartitions()

In [None]:
#optimize the partition
relations = relations.repartition(col("src"))

In [None]:
relations.rdd.getNumPartitions()

In [None]:
relations.count()

In [None]:
#find all unique nodes and produce a node list
node = relations.select("src").distinct()

In [None]:
tmp = relations.select("dst").distinct()

In [None]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

node = unionAll(node,tmp).distinct()

In [None]:
node = node.withColumnRenamed("src","id")

In [None]:
#calculating indegree and outdegree
relations = relations.withColumn("cnt",lit(1))

In [None]:
centrality = relations.groupBy(relations.src).sum()
centrality = centrality.withColumnRenamed("sum(cnt)","outDegree")
centrality = centrality.withColumnRenamed("src","node")

In [None]:
tmp = relations.groupBy(relations.dst).sum()
tmp = tmp.withColumnRenamed("sum(cnt)","inDegree")

In [None]:
centrality = centrality.join(tmp, centrality.node == tmp.dst, "left")

centrality = centrality.drop("dst")

In [None]:
#Here's the manual calculation for page-range without using packages, however, its running slower than graphframes,
#so we decide to go with the graphframe

#lines = sc.textFile("s3://smokeeveryday/data420/Relation.csv")
#header = lines.first()
#lines = lines.filter(lambda line: line != header)

In [None]:
#links = lines.map(lambda nodes: nodes.split(",")).groupByKey()
#def computeContribs(nodes, rank):
#    num_nodes = len(nodes)
#    for node in nodes:
#        yield (node, rank / num_nodes)
#from operator import *

In [None]:
#ranks = links.map(lambda nodes: (nodes[0], 1.0))
#for iteration in range(50):
#    contribs = links.join(ranks).flatMap(lambda nodes:
#                                         computeContribs(nodes[1][0], nodes[1][1]))
#    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank*0.85 + 0.15)

In [None]:
#tmp = spark.createDataFrame(ranks.collect())

In [None]:
#estalish a graphframe with edge list and node list
g = GraphFrame(node, relations)

In [None]:
#Calculate pagerank
tmp = g.pageRank(resetProbability=0.15, maxIter=50)

In [None]:
tmp = tmp.vertices

In [None]:
#combine with result table
centrality = centrality.join(tmp, centrality.node == tmp.id, "left")

#centrality = centrality.drop("id")

In [None]:
#page_range = g.pageRank(resetProbability=0.15, tol=0.1, maxIter=10)

In [None]:
#The function we wrote to calculate closeness with shortest paths.
#However, it works on smaller datasets, but failed on the large edge list we used.
#I first thought it was because partition wasn't optimal, so I re-partitioned it, but still not working on large set.

#def closeness(g):
#    tmp = g.vertices
#    tmp = tmp.repartition(col("id"))
#    shortestPaths = g.shortestPaths(landmarks = tmp.rdd.map(lambda x: x.id).collect())
#    pathLength = shortestPaths.select('id', explode('distances'))
#    groupedKey = pathLength.groupBy('key')
#    sumOfGroupedDistances = groupedKey.agg(sum('value').alias('c'))
#    return sumOfGroupedDistances.selectExpr('key as id','1/c as closeness')

In [None]:
#close = closeness(g)

In [None]:
#centrality = centrality.join(close, centrality.node == close.id, "left")

In [None]:
centrality = centrality.fillna(0)
centrality = centrality.drop("id")
centrality.show()

In [None]:
#transfer result to a dataframe
central = centrality.toPandas()

In [None]:
#upload the result to a S3 bucket
bucket = 'smokeeveryday'
csv_buffer = StringIO()
central.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'central.csv').put(Body=csv_buffer.getvalue())