In [None]:
import pandas as pd 
pd.options.display.max_columns = 500
pd.options.display.max_rows = 100

In [None]:
import logging 
logging.basicConfig()  # Means logs will print in Jupyter Lab

In [None]:
from utility_functions.demo_utils import get_spark
spark = get_spark() # See utility_functions/demo_utils.py for how to set up Spark

In [None]:
## splink_graph functionality
import splink_graph
from splink_graph.splink_graph import subgraph_stats
from splink_graph.splink_graph import _graphharmoniser
from pyspark.sql import functions as f
import pyspark
import os


In [None]:
if (pyspark.__version__).startswith("3"):
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "0"
    print("Spark 3.x detected. ARROW_PRE_0_15_IPC_FORMAT is set to ",os.environ["ARROW_PRE_0_15_IPC_FORMAT"])
else:
    print("Spark 2.x detected. ARROW_PRE_0_15_IPC_FORMAT is set to 1")

In [None]:
spark.version

In [None]:
df_e = spark.read.parquet("data/graph/df_e.parquet")
cc = spark.read.parquet("data/graph/cc.parquet")
edges = spark.read.parquet("data/graph/edges.parquet")

In [None]:

edges.printSchema()
cc.printSchema()


In [None]:
edgesinfo =(df_e.withColumn( "info",
            f.to_json(f.struct("surname_l","dob_l","city_l","email_l","group_l",
                              "surname_r","dob_r","city_r","email_r","group_r"))).
            select("tf_adjusted_match_prob","match_probability","unique_id_l","unique_id_r","info").
            withColumnRenamed("unique_id_l","src").withColumnRenamed("unique_id_r","dst"))



edgesinfo = _graphharmoniser(edgesinfo,"src","dst")
edges = _graphharmoniser(edges,"src","dst")


In [None]:
#edge_df = edges.join(edgesinfo,  (f.col("source") == f.col("unique_id_l") ) &
#                                   (f.col("target") == f.col("unique_id_r"))

In [None]:
edge_df = (edges.alias('a').join(cc.alias('b'),f.col("a.src")==f.col("b.id")).drop("id"))
edge_df = edge_df.withColumn("distance" ,1.01 - f.col("tf_adjusted_match_prob"))

In [None]:
sgs = subgraph_stats(edge_df,"component", "tf_adjusted_match_prob",
               src="src", dst="dst")

In [None]:
sgs.show(2)

In [None]:
#in case of error 
#RuntimeError: Arrow legacy IPC format is not supported in PySpark, please unset ARROW_PRE_0_15_IPC_FORMAT
#please uncomment the line below and execute the cell again!
#os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

from splink_graph.vectorised import diameter_radius_transitivity
drt = diameter_radius_transitivity(edge_df,"src", "dst")


In [None]:
drt.show()

In [None]:
graphstats = sgs.join(drt,on="component")
graphstats.sort(graphstats.nodecount.desc(),graphstats.transitivity.asc()).\
drop("graphhash","nodes","sq_clustcoeff").show(5)



In [None]:
from splink_graph.vectorised import edgebetweeness

ebdf = edgebetweeness(edge_df, src="src", dst="dst")
ebdf = _graphharmoniser(ebdf,"src","dst")
ebdf.show(10)




In [None]:
edge_eb_df= edge_df.join(ebdf, ['src','dst','component'])

In [None]:
edge_eb_df.show(5)
edge_eb_df.count()

In [None]:
# network visualisation prep

In [None]:
# prepare edges

In [None]:
edge_df_for_viz = edge_eb_df.join(edgesinfo,['src','dst','tf_adjusted_match_prob'])
edge_df_for_viz = edge_df_for_viz.withColumnRenamed("src","source").\
                  withColumnRenamed("dst","target").\
                  withColumnRenamed("eb","value").\
                  withColumnRenamed("info","comparison")
edge_df_for_viz.show(1,truncate=False,vertical=True)

In [None]:
# prepare nodes for visualisation
from splink_graph.vectorised import eigencentrality
node_eigen_centrality = eigencentrality(edge_df,component="component", distance="distance",
               src="src", dst="dst")

node_eigen_centrality.show(5)

In [None]:
df_nodes = cc.withColumnRenamed("id","index").withColumnRenamed("component","group")
df_nodes = df_nodes.withColumn("name",f.col("index"))
node_df_for_viz = df_nodes.join(node_eigen_centrality,(df_nodes.index==node_eigen_centrality.node)).drop("node")
node_df_for_viz.show(2)

In [None]:
# choose components to visualise


comp_list = graphstats.sort(graphstats.nodecount.desc(),graphstats.tri_clustcoeff.asc()).\
limit(3).select("component").rdd.flatMap(list).collect()

comp_list_str="("+(','.join(str(x) for x in comp_list))+")"
comp_list_str

In [None]:
df_nodes_pd = node_df_for_viz.filter(f" group IN {comp_list_str} ").toPandas()
df_edges_pd = edge_df_for_viz.filter(f" component IN {comp_list_str} ").toPandas()
edge_fields = ["source", "target", "value","comparison"]
node_fields = ["name", "group", "index","eigen_centrality"]

In [None]:

import json
with open("data/graph/force_template.vg.json") as f:
    vl = json.load(f)



In [None]:
vl['data'][0] = {
    "name": "node-data",
    "values": df_nodes_pd[node_fields].to_dict(orient='records'),

}

vl['data'][1] = {
    "name": "link-data",
    "values":  df_edges_pd[edge_fields].to_dict(orient='records')
}

vl['width'] = 1200
vl['height'] = 600

In [None]:
%%javascript
    var script = document.createElement('script');
    script.type = 'text/javascript';
    script.src = '//cdn.jsdelivr.net/npm/vega@5';
    document.head.appendChild(script);
    
    var script = document.createElement('script');
    script.type = 'text/javascript';
    script.src = '//cdn.jsdelivr.net/npm/vega-embed@6';
    document.head.appendChild(script);



In [None]:


from IPython.display import Javascript
script = f"""
var spec = `{json.dumps(vl)}`
spec= JSON.parse(spec)
debugger;
vegaEmbed(element, spec).then(function(result) {{
  }}).catch(console.error);  
"""



In [None]:


Javascript(script)



In [None]:
type((df_edges_pd[edge_fields].to_dict(orient='records'))[0])