In [1]:
#read csv file containing processed data (csv file uploaded into databricks as data.csv)
df = spark.read.csv("/FileStore/tables/data.csv",header=True,inferSchema = True)
df.show(5)

In [2]:
df.printSchema()

In [3]:
#Confine trip network to only high fare trips
df_high = df.where(df.Fare == 2)
df_high.count()

In [4]:
#Extract list of vertices from trip data
df_high.createOrReplaceTempView("vertice1")
df1 = spark.sql("select distinct Pickup from vertice1")
df_high.createOrReplaceTempView("vertice2")
df2 = spark.sql("select distinct Dropoff from vertice2")
df3 = df1.unionAll(df2)
df3.createOrReplaceTempView("check")
df4 = spark.sql("select distinct Pickup from check")

#Rename variables to required names by graphframes
df_high = df_high.selectExpr("Pickup as src","Dropoff as dst")
df5 = df4.selectExpr("Pickup as id")

In [5]:
#create graph
from graphframes import *
g = GraphFrame(df5, df_high)

In [6]:
#Label propagation to identify strongly interconnected communities
result = g.labelPropagation(maxIter=10)
display(result.select("id", "label"))

In [7]:
#read file with labels from each node from label propagation
df10 = spark.read.csv("/FileStore/tables/labelHigh.csv",header=True)
df10.show(5)

In [8]:
df11 = df5.join(df10, df5.id == df10.id,"inner").drop(df10.id)
df11.show(5)

In [9]:
#distribution of labels among locations
df10.createOrReplaceTempView("label")
spark.sql("select label, count(*) from label group by label order by label").show()

In [10]:
#Triangle count in community 0 for transitivity score
from pyspark.sql.functions import *
df11a = df11.where(df11.label == '0')
g3 = GraphFrame(df11a,df_high)
tri0 = g3.triangleCount()
display(tri0.select("id","count").orderBy(desc("count")))

In [11]:
#reverse pagerank in community 0
from graphframes import *
from pyspark.sql.functions import *

df11a = df11.where(df11.label == '0')
df7 = df_high.selectExpr("src as dst","dst as src")
g3b = GraphFrame(df11a, df7)

ranks0 = g3b.pageRank(resetProbability=0.15, maxIter=10)
display(ranks0.vertices.select("id","pagerank").orderBy(desc("pagerank")))

In [12]:
#Triangle count in community 1 for transitivity score
df11b = df11.where(df11.label == '1')
g4 = GraphFrame(df11b,df_high)
tri1 = g4.triangleCount()
display(tri1.select("id","count").orderBy(desc("count")))

In [13]:
#reverse pagerank for community 1
from graphframes import *
from pyspark.sql.functions import *
df11b = df11.where(df11.label == '1')
df7 = df_high.selectExpr("src as dst","dst as src")
g4b = GraphFrame(df11b, df7)

ranks1 = g4b.pageRank(resetProbability=0.15, maxIter=10)
display(ranks1.vertices.select("id","pagerank").orderBy(desc("pagerank")))

In [14]:
#Triangle count in community 5 for transitivity score
df11c = df11.where(df11.label == '5')
g5 = GraphFrame(df11c,df_high)
tri5 = g5.triangleCount()
display(tri5.select("id","count").orderBy(desc("count")))

In [15]:
#reverse pagerank for community 5
from graphframes import *
from pyspark.sql.functions import *
df11c = df11.where(df11.label == '5')
df7 = df_high.selectExpr("src as dst","dst as src")
g5b = GraphFrame(df11c, df7)

ranks1 = g5b.pageRank(resetProbability=0.15, maxIter=10)
display(ranks1.vertices.select("id","pagerank").orderBy(desc("pagerank")))