In [None]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window as w

from pyspark.sql import Row
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = (
  SparkSession.builder.master('local[*]')
    .appName('synth')
    
    #these confs are spark installation specific (these specific confs are for MOJ AP)
    
    .config('spark.driver.memory', '8g')
    .config("spark.sql.shuffle.partitions", "8") 
    
    
    #these confs are required for splink_graph to run properly 

    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", "1")
    .getOrCreate()
  )
import os

In [None]:
df_e= spark.read.parquet("data/df_e.snappy.parquet")

In [None]:
df_e.printSchema()

In [None]:
df_e = df_e.select("tf_adjusted_match_prob","unique_id_l","unique_id_r") # only keeping information needed for graph processing

# renaming columns left to more palatable column names
df_e = (df_e.withColumnRenamed("unique_id_l","src").
        withColumnRenamed("unique_id_r","dst").
        withColumnRenamed("tf_adjusted_match_prob","weight"))

df_e.printSchema()

In [None]:
from splink_graph.utils import _graphharmoniser

help(_graphharmoniser)

In [None]:
#ensure src/dst are always in the same order in order to avoid join problems
df_e = _graphharmoniser (df_e,colsrc="src",coldst="dst")

In [None]:
#filter so only edges above threshold form a graph
df_e = df_e.filter(f.col("weight")>0.95) 

In [None]:
from splink_graph.cc import nx_connected_components
help(nx_connected_components)

In [None]:
#run connected components algorithm to create dataframe containing cluster_id and node_id

ccs = nx_connected_components(spark,df_e,src="src",dst="dst",weight_colname='weight', 
                       cluster_id_colname='cluster_id', 
                       cc_threshold=0.95)

In [None]:
# this is temporary. Algorithms now need cluster_id as an integer so cluster_id is casted to Integer
ccs = ccs.withColumn("cluster_id",f.col("cluster_id").cast(IntegerType()))

In [None]:
#show a bit of the cc dataframe
ccs.show(6)

In [None]:
# there are clusters of 2,3,4,5,6 nodes
ccs.groupBy("cluster_id").count().groupBy("count").count().show()

In [None]:
# have the edge dataframe include the cluster_id
edge_df = ccs.join(df_e,on=ccs["node_id"]==df_e["src"]).drop("node_id")
edge_df = edge_df.withColumn("distance",f.round(1.01 - f.col("weight"),2))
edge_df.show(8)

In [None]:
# start with some basic cluster statistics
from splink_graph.cluster_metrics import cluster_basic_stats
bcs=cluster_basic_stats(edge_df)
bcs.sort(f.col("density").asc()).show(truncate=False)

In [None]:
from splink_graph.cluster_metrics import cluster_main_stats
cms=cluster_main_stats(edge_df)
cms.show(10)

In [None]:
from splink_graph.cluster_metrics import cluster_connectivity_stats
ccs=cluster_connectivity_stats(edge_df)
ccs.show(6)

In [None]:
from splink_graph.cluster_metrics import number_of_bridges

nb = number_of_bridges(edge_df)

nb.show(5)

In [None]:
nb.groupby("number_of_bridges").count().show()

In [None]:
from splink_graph.edge_metrics import bridge_edges

br_e = bridge_edges(edge_df)
br_e.show()

In [None]:
from splink_graph.node_metrics import eigencentrality

eigenc = eigencentrality(edge_df)
eigenc.show()