In [None]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pandas as pd 
pd.options.display.max_columns = 1000
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Person data

In [None]:
df_person = spark.read.parquet("scrape_wikidata/processed_data/step_1_one_line_per_person/page000_0_to_2000.parquet")
df_person.createOrReplaceTempView("df_person")
df_person.limit(2).toPandas()

### Names

In [None]:

df_names = spark.read.parquet("scrape_wikidata/raw_data/names/")
df_names.createOrReplaceTempView("df_names")
sql = """

with concat as (
select original_name, alt_name
from df_names
union all
select alt_name as original_name, original_name as alt_name
from df_names)

select distinct original_name, alt_name
from concat 

"""
df_names = spark.sql(sql)
df_names.createOrReplaceTempView("df_names")


df_freq_fn = spark.read.parquet("scrape_wikidata/processed_data/step_x_births_namrefreq/name_counts_from_births_register.parquet")
df_freq_fn = df_freq_fn.withColumnRenamed("forename", "name")
df_freq_sn = spark.read.parquet("scrape_wikidata/processed_data/step_x_births_namrefreq/surname_counts_from_births_register.parquet")
df_freq_sn = df_freq_fn.withColumnRenamed("surname", "name")
df_freq = df_freq_fn.union(df_freq_sn)
df_freq.createOrReplaceTempView("df_freq")
sql = """
select name, sum(count) as count
from df_freq
group by name
order by count desc
"""
df_freq = spark.sql(sql)
df_freq.createOrReplaceTempView("df_freq")



sql = """
select df_names.*, df_freq.count
from df_names
left join df_freq
on lower(df_names.alt_name) = lower(df_freq.name)
where df_freq.count > 5
"""
df_names = spark.sql(sql)

df_names.createOrReplaceTempView("df_names")

df_names = spark.sql("""



select original_name, collect_list(struct(alt_name, count)) as alt_names
from df_names
where alt_name rlike '^[A-Za-z]+$'
    
group by original_name
""")
df_names.createOrReplaceTempView("df_names")




In [None]:
df_names.count()

In [None]:
df_names.limit(3).toPandas()

### Person postcode lookup

In [None]:
df_point_postcode = spark.read.parquet("scrape_wikidata/processed_data/step_2_person_postcode_lookups/page000_0_to_2000.parquet")
df_point_postcode.createOrReplaceTempView("df_point_postcode")


sql = """
select person, collect_list(nearby_postcodes) as nearby_postcodes
from df_point_postcode
group by person

"""
df_point_postcode = spark.sql(sql)
df_point_postcode.createOrReplaceTempView("df_point_postcode")
df_point_postcode.limit(2).toPandas()

## Master data

In [None]:
spark.sql("select * from df_person where human = 'Q38082'").toPandas()

#### Get list of given names and family names

In [None]:
df_person = spark.read.parquet("scrape_wikidata/processed_data/step_1_one_line_per_person/page000_0_to_2000.parquet")
df_person.createOrReplaceTempView("df_person")
df_person.limit(2).toPandas()

In [None]:
from pyspark.sql import Row
data_list = [
    {"to_filter_out": ['Rt.', 'Hon.', 'Sir', 'Rev.', 'Lady', 'Duke', 'of', 'Dr.', 'Dr', 'Baron', 'The', 'and', 'last', 'Baronet']},

        ]

df_filter = spark.createDataFrame(Row(**x) for x in data_list)
df_filter.createOrReplaceTempView("df_filter")

In [None]:
# Given names 

split_given_name = "ifnull(split(given_nameLabel,' \\\\| '), array())"


hl_rr = "regexp_replace(humanLabel, ',(.+)', '')"
label_parts_expr = f"split({hl_rr}, ' \\\\| ')"
split_label = f"ifnull(flatten(transform({label_parts_expr}, x -> slice(split(x, ' '), 1, size(split(x, ' ')) - 1))), array())"

rr_x = "regexp_replace(x, ',(.+)', '')"
alt_label_parts_expr = "split(humanAltlabel, ', ')"
split_alt_label = f"ifnull(flatten(transform({alt_label_parts_expr}, x -> slice(split({rr_x}, ' '), 1, size(split({rr_x}, ' ')) - 1))), array())"


union_all_names = f"array_union({split_given_name}, array_union({split_label},{split_alt_label}))"
filter_union_all_names = f"array_except({union_all_names}, to_filter_out)"

filter_union_all_given_names =f"filter({filter_union_all_names}, x -> x not rlike '[0-9]')"

# Family names
split_family_name = "ifnull(split(family_nameLabel,' \\\\| '), array())"

hl_rr = "regexp_replace(humanLabel, ',(.+)', '')"
label_parts_expr = f"split({hl_rr}, ' \\\\| ')"
split_label = f"ifnull(flatten(transform({label_parts_expr}, x -> slice(split(x, ' '), -1,1))), array())"

rr_x = "regexp_replace(x, ',(.+)', '')"
alt_label_parts_expr = "split(humanAltlabel, ', ')"
split_alt_label = f"ifnull(flatten(transform({alt_label_parts_expr}, x -> slice(split({rr_x}, ' '), -1, 1))), array())"


union_all_names = f"array_union({split_family_name}, array_union({split_label},{split_alt_label}))"
filter_union_all_family_names = f"array_except({union_all_names}, to_filter_out)"



sql = f"""
select
humanLabel, humanAltLabel, given_nameLabel, family_nameLabel,



{filter_union_all_given_names} as given_names_array,
{filter_union_all_family_names} as family_names_array

from df_person
cross join df_filter


limit 1000
"""
spark.sql(sql).toPandas().sample(20)

In [None]:
# Note that given names are NOT in order - we do not scrape the ordinality of given names so order cannot easily be imposed

concat(split(given_namelabel), split(human_label)[:]

def split_field(col_name, num_cols=3):
    parts = [f"split({col_name}, ' \\\\| ')[{i-1}] as {col_name}_{i}" for i in range(1,num_cols+1)]
    return ", ".join(parts)

sql = f"""
select 
humanlabel, humanaltlabel, given_namelabel, family_namelabel, birth_name, 
{split_field('given_namelabel')},
 {split_field('family_namelabel',2)}
from df_person

"""
df_person_split_names = spark.sql(sql)
df_person_split_names.createOrReplaceTempView("df_person_split_names")
df_person_split_names.limit(1000).toPandas().sample(20)

In [None]:
sql = f"""
select 
h.human, 

h.humanlabel,
split(h.humanaltlabel, ", ") as humanaltlabel,
substr(h.dob,1,10) as dob, 


split(h.country_citizenlabel, ' \\\\| ') as country_citizenship,
place_birthlabel as birth_place,
birth_countrylabel as birth_country,
sex_or_genderlabel as gender,



residencelabel as residence_place,
residence_countrylabel as residence_country,

pc.nearby_postcodes[0][0].postcode as fake_postcode,
pc.nearby_postcodes[0][0].lat as fake_lat,
pc.nearby_postcodes[0][0].lng as fake_lng,

pc.nearby_postcodes,
h.given_namelabel_1 as given_name_1,
n1.alt_names as alt_given_name_1,
h.given_namelabel_2 as given_name_2,
n2.alt_names as alt_given_name_2,
h.given_namelabel_3 as given_name_3,
n3.alt_names as alt_given_name_3,
h.family_namelabel_1 as family_name_1,
n4.alt_names as alt_family_name_1,
h.family_namelabel_2 as family_name_2,
n5.alt_names as alt_family_name_2

from df_person_split_names as h

left join df_names as n1
on h.given_namelabel_1 = n1.original_name

left join df_names as n2
on h.given_namelabel_2 = n2.original_name

left join df_names as n3
on h.given_namelabel_3 = n3.original_name

left join df_names as n4
on h.family_namelabel_1 = n4.original_name

left join df_names as n5
on h.family_namelabel_2 = n5.original_name

left join df_point_postcode as pc
on h.human = pc.person

"""
df_final = spark.sql(sql)
df_final.createOrReplaceTempView("df_final")  
df_final = df_final.repartition(1)
df_final.write.mode('overwrite').parquet("scrape_wikidata/clean_data/master_data/")
# df_final.limit(10).toPandas()