In [0]:
from graphframes import *
from pyspark.sql import SparkSession
import psycopg2
import pandas as pd

# access the postgresql server
db = psycopg2.connect(
    host="codd04.research.northwestern.edu",
    port = "5433",
    database="postgres",
    user="cpdbstudent",
    password="DataSci4AI")

cursor = db.cursor()

# construct nodes - officer IDs
nodes_query = "SELECT id FROM data_officer;"

cursor.execute(nodes_query)
nodes = cursor.fetchall()

df_nodes = pd.DataFrame(nodes)
colnames = [desc[0] for desc in cursor.description]
df_nodes.columns = colnames

nodes = spark.createDataFrame(df_nodes)

# construct edges - allegations involving two officers (the source and destination nodes)
edges_query = "SELECT da1.officer_id src, da2.officer_id dst FROM data_officerallegation da1 JOIN data_officerallegation da2 ON da1.allegation_id = da2.allegation_id GROUP BY da1.officer_id, da2.officer_id;"

cursor.execute(edges_query)
edges = cursor.fetchall()

df_edges = pd.DataFrame(edges)
colnames = [desc[0] for desc in cursor.description]
df_edges.columns = colnames

edges = spark.createDataFrame(df_edges)

# construct graph
graph = GraphFrame(nodes, edges)

# Question 1: PageRank algorithm
pr = graph.pageRank(resetProbability=0.15, tol=0.01)
pr.vertices.orderBy("pagerank", ascending=False).show()

# Question 2: Triangle Count algorithm
tc = graph.triangleCount()
tc.select("id", "count").orderBy("count", ascending=False).show()

data_collect = tc.collect()

sum = 0
num_vals = 0
for row in data_collect:
    sum = sum + row['count']
    num_vals = num_vals + 1
    
avg = sum / num_vals

print("Allegation graph avg: ", avg)

# Question 2 (part 2): Triangle Count algorithm on TRR graph
# construct edges - TRRs involving two officers (the source and destination nodes)
edges_query_trr = "SELECT t1.officer_id src, t2.officer_id dst FROM trr_trr t1 JOIN trr_trr t2 ON t1.event_id = t2.event_id GROUP BY t1.officer_id, t2.officer_id;"

cursor.execute(edges_query_trr)
edges_trr = cursor.fetchall()

df_edges_trr = pd.DataFrame(edges_trr)
colnames_trr = [desc[0] for desc in cursor.description]
df_edges_trr.columns = colnames_trr

edges_trr = spark.createDataFrame(df_edges_trr)

# construct graph
graph_trr = GraphFrame(nodes, edges_trr)

# run Triangle Count
tc_trr = graph_trr.triangleCount()
tc_trr.select("id", "count").orderBy("count", ascending=False).show()

data_collect_trr = tc_trr.collect()

sum_trr = 0
num_vals_trr = 0
for row in data_collect_trr:
    sum_trr = sum_trr + row['count']
    num_vals_trr = num_vals_trr + 1
    
avg_trr = sum_trr / num_vals_trr

print("TRR graph avg: ", avg_trr)