In [None]:
%autoreload 2

In [None]:
import logging 
logging.basicConfig()

# logging.getLogger("sql").setLevel("DEBUG")
# logging.getLogger("gammas").setLevel("DEBUG")
# logging.getLogger("expectation_step").setLevel("DEBUG")
# logging.getLogger("maximisation_step").setLevel("DEBUG")
# logging.getLogger("comparison_evaluation").setLevel("DEBUG")
# logging.getLogger("sparklink").setLevel(logging.DEBUG)

In [None]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType
import pyspark.sql.functions as f

# WARNING:
# These config options are appropriate only if you're running Spark locally!!!
conf=SparkConf()
conf.set('spark.driver.memory', '8g')
conf.set("spark.sql.shuffle.partitions", "8") 

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)

In [None]:
from sparklink.sql import *
from sparklink.blocking import *
from sparklink.gammas import *
from sparklink.params import *
from sparklink.expectation_step import *
from sparklink.maximisation_step import *
from sparklink.iterate import *
from sparklink.comparison_evaluation import *

In [None]:
df = spark.read.csv("data/data_null.csv", header=True)
df.show()

In [None]:
# df_comparison = cartestian_block(df, spark=spark)

blocking_rules = []

blocking_rules.append('l.surname = r.surname')
blocking_rules.append('l.mob = r.mob')


df_comparison = block_using_rules(df, blocking_rules, spark=spark)

df_comparison = df_comparison.withColumn("label", (df_comparison["group_l"]==df_comparison["group_r"]).cast("int"))

df_comparison.show()

In [None]:
gamma_settings = {
    "mob": {
        "levels": 2
    },
    "surname": {
        "levels": 3
    }}

df_gammas = add_gammas(df_comparison, gamma_settings, spark, include_orig_cols = True)
df_gammas.show()

In [None]:
import copy 

params = Params(gamma_settings, starting_lambda=0.2)



In [None]:
import json
# print(json.dumps(params.params, indent=4, ensure_ascii=False))

In [None]:
sql = sql_gen_gamma_prob_columns(params)

df_gammas.registerTempTable("df_with_gamma")
df_with_gamma_probs = spark.sql(sql)
df_with_gamma_probs.show()

In [None]:
df_with_gamma_probs.registerTempTable("df_with_gamma_probs")
sql = sql_gen_expected_match_prob(params)

df_e = spark.sql(sql)
df_e.show()

In [None]:
# It's saying that for each row where surname_l == surname_r, compute the proportion of matches and non-matches

df_e.registerTempTable("df_e")

sql = """
select surname_l, surname_r, sum(match_probability) mp, sum(1-match_probability) as nmp
from df_e
where surname_l = surname_r
group by surname_l, surname_r


"""
surname_lookup = spark.sql(sql)
surname_lookup.show()
surname_lookup.registerTempTable("surname_lookup")


In [None]:
sql = f"""
select *, coalesce(mp/(mp+nmp), {params.params["λ"]}) as pseudo_lambda
from df_e as e 
left join
surname_lookup as s
on s.surname_l = e.surname_l
and s.surname_l = e.surname_r
"""
df_e_adj = spark.sql(sql)
df_e_adj.registerTempTable("df_e_adj")

sql = """
select *, (pseudo_lambda * prob_gamma_0_match * prob_gamma_1_match) /((pseudo_lambda * prob_gamma_0_match * prob_gamma_1_match) + ((1-pseudo_lambda) * prob_gamma_0_non_match * prob_gamma_1_non_match)) as adjusted_exp
from 
df_e_adj
"""

spark.sql(sql).show()