# Building the base tables

This is the notebook that was used to prepare the base tables for the pipeline in the paper "A Benchmarking Data Lake 
for Join Discovery and Learning with Relational Data".

All datasets are inspired from those used in the paper "[Relational data embeddings for feature enrichment with background information](https://hal.science/hal-03848124/file/main.pdf)", although some of the matching operations are slightly different.

To have more manageable and meaningful tables, some preprocessing is applied to each dataset to reduce overly noisy and 
irrelevant attributes. 

All tables are then saved as parquet files for later use in the benchmark pipeline. 

Datasets:
- [The Movies Dataset](https://www.kaggle.com/datasets/rounakbanik/the-movies-dataset)
- [7+ Million Company Dataset](https://www.kaggle.com/datasets/peopledatalabssf/free-7-million-company-dataset)
- [US Accidents (2016 - 2021)](https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents)
- [US Presidential elections](https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/42MVDX)

In [1]:
cd ~/work/prepare-data-lakes

/home/soda/rcappuzz/work/prepare-data-lakes


In [2]:
import polars as pl
import pandas as pd
from pathlib import Path
import src.yago.utils as utils
import numpy as np
import datetime

Loading YAGO fact triplets to drop entities not found in the KB.

In [3]:
data_dir = Path("data/base_tables")

yago_path = Path("/storage/store3/work/jstojano/yago3/")
facts_path = Path(yago_path, "facts_parquet/yago_updated_2022_part2")
fname = "yagoFacts"
yagofacts_path = Path(facts_path, f"{fname}.tsv.parquet")
yagofacts_categorical = utils.import_from_yago(yagofacts_path, engine="polars")
fname = "yagoLiteralFacts"
yagoliteralfacts_path = Path(facts_path, f"{fname}.tsv.parquet")
yagofacts_numerical = utils.import_from_yago(yagoliteralfacts_path, engine="polars")
fname = "yagoDateFacts"
yagodatefacts_path = Path(facts_path, f"{fname}.tsv.parquet")
yagofacts_dates = utils.import_from_yago(yagodatefacts_path, engine="polars")

yagofacts = pl.concat(
    [
        yagofacts_categorical,
        yagofacts_numerical,
        yagofacts_dates
    ]
)

# US Accidents dataset

Archive `us-accidents.zip` contains the file `US_Accidents_Dec21_updated.csv`, 
which I renamed manually to `us-accidents.csv` for simplicity. 

I also had to copy the file `datasets/us_accidents/state_codes.csv` from the 
KEN repository for some of the steps. 

In [4]:
dataset_dir = Path(data_dir, "us-accidents")
df = pl.read_csv(Path(dataset_dir, "us-accidents.csv"))
df = df.rename({"State": "Code"})
state_codes_path =  Path(dataset_dir,"state_codes.csv")
state_codes = pl.read_csv(state_codes_path)
df = df.join(
    state_codes, on="Code"
)

Adding a new column, `col_to_embed`, that formats the city and state name to have
the same format that is found in YAGO. 

In [5]:
df = df.with_columns(
    ("<" + pl.col("City") + ",_"+ pl.col("State") + ">").alias("col_to_embed")
)

Filtering out the rows not found in `yagofacts["subject"]`.

In [6]:
df_filtered=df.lazy().filter(
    pl.col("col_to_embed").is_in(
        yagofacts["subject"]
    )
).collect()

For the preparation, I am selecting only the accidents whose `Start_Time` is between 2019-01-01 and 2019-12-31. 

In [7]:
start_date = datetime.date(2019, 1,1)
end_date = datetime.date(2019, 12, 31)
df_by_year = df_filtered.filter(
    (pl.col("Start_Time").str.to_datetime()>start_date) & (pl.col("Start_Time").str.to_datetime()<end_date)
)

Counting the number of accidents (log10) per county in the given year.

In [8]:
df_counts_by_year=df_by_year.groupby(
    [
        "col_to_embed", "City", "Code"
    ]
    ).count().select(
        pl.col("col_to_embed"),
        pl.col("count").alias("target").log10()
    )
df_counts_by_year

col_to_embed,target
str,f64
"""<Yarmouth,_Mai…",1.278754
"""<Mundelein,_Il…",1.919078
"""<Parker,_Color…",1.819544
"""<Springfield,_…",2.447158
"""<Corvallis,_Or…",2.170262
"""<Patterson,_Ca…",2.004321
"""<Berkeley,_Cal…",2.488551
"""<McArthur,_Cal…",1.113943
"""<Lynwood,_Cali…",2.416641
"""<Boron,_Califo…",1.491362


### Preprocessing

In the KEN paper, everything except the county was dropped. Here, we need to rely on the information still found in the 
table, so we need to to reduce the table to have only one sample for each row. It is necessary to aggregate any information
that has granularity smaller than "county". 

The overall number of columns is also reduced from the original. 

To aggregate all the values, the `mode` is used to select the most frequent categorical value, while the `mean` is used
on the numerical attributes. 

The resulting table is saved in `us-accidents-yadl.parquet`.

In [9]:
df_final=df_counts_by_year.join(
    df_by_year, 
    on="col_to_embed",
    how="inner"
).groupby("col_to_embed").agg(
    pl.col("target").mean(),
    pl.col("County").mode().first(),
    pl.col("Code").mode().first(),
    pl.col("Severity").mean(),
    pl.col("Zipcode").mode().first(),
    pl.col("Country").mode().first(),
    pl.col("Airport_Code").mode().first(),
    pl.col("Visibility(mi)").mean(),
    pl.col("Weather_Condition").mean(),
    pl.col("Sunrise_Sunset").mode().first(),
    pl.col("Civil_Twilight").mode().first(),
    pl.col("State").mode().first(),    
)
df_final.write_parquet(Path(dataset_dir, "us-accidents-yadl.parquet"))
df_final.write_csv(Path(dataset_dir, "us-accidents-yadl.csv"))

### Comparing with the KEN version

Here we prepare the table using the same operations as those made for KEN. 

In [10]:
df_counts = df_filtered.groupby(
    [
        "col_to_embed", "City", "Code"
    ]
    ).count()

In [11]:
df_final = df_counts.with_columns(
    (df_counts["City"] + ", " + df_counts["Code"]).alias("raw_entities") 
).select(
    [
        pl.col("raw_entities"),
        pl.col("col_to_embed"),
        pl.col("count").alias("target").log10()
    ]
).sort("raw_entities")
df_final.write_parquet(Path(dataset_dir, "us-accidents-target.parquet"))

Reading original to see if the two versions look similar. 

In [12]:
path_original=Path("/storage/store3/work/acvetkov/gitlab/KEN/experiments/datasets/us_accidents/counts.parquet")
df_counts_og = pl.read_parquet(path_original)
df_counts_og

raw_entities,col_to_embed,target
str,str,f64
"""Aaronsburg, PA…","""<Aaronsburg,_P…",0.30103
"""Abbeville, LA""","""<Abbeville,_Lo…",0.0
"""Abbotsford, WI…","""<Abbotsford,_W…",0.954243
"""Abbottstown, P…","""<Abbottstown,_…",1.041393
"""Aberdeen, MD""","""<Aberdeen,_Mar…",2.396199
"""Aberdeen, MS""","""<Aberdeen,_Mis…",0.477121
"""Aberdeen, OH""","""<Aberdeen,_Ohi…",0.0
"""Aberdeen, WA""","""<Aberdeen,_Was…",1.342423
"""Abernathy, TX""","""<Abernathy,_Te…",0.0
"""Abilene, TX""","""<Abilene,_Texa…",1.0


As expected, the number of entities in the original file is smaller. 

# Company Employees Dataset

The "Company Employees" dataset contains information about companies. The prediction target in this case is the number 
of employees of a company. We filter the dataset to select only companies with at least 1000 employees.

In [13]:
dataset_dir = Path(data_dir, "company-employees")
df = pl.read_csv(Path(dataset_dir, "companies_sorted.csv"))
df_selected = df.filter(
    pl.col("current employee estimate") >= 1000
)

The heuristic we use is slightly different from that used for KEN. 

Adding a new column to `yagofacts` with lowercased subjects. 

In [14]:
yagofacts = yagofacts.with_columns(
    pl.col("subject").str.to_lowercase().alias("subject_formatted")
)
df_filtered=df_selected.lazy().with_columns(
    ("<" + pl.col("name").str.to_lowercase().str.replace(" ", "_") + ">").alias("formatted_name")
).filter(
    pl.col("formatted_name").is_in(yagofacts["subject_formatted"])
).collect()

Here we prepare a mapping between the name in the original dataset and the match found in YAGO.
Note that there is a relatively low recall, though it is higher than what is used in the original. 

In [15]:
mapping_name_subject = df_filtered.lazy().join(
    yagofacts.lazy(),
    left_on="formatted_name",
    right_on="subject_formatted"
).select(
    [
        pl.col("name"),
        pl.col("formatted_name"),
        pl.col("subject")
    ]
).unique().collect()


Joining on with the mapping on `formatted_name` to guarantee that col `col_to_embed` uses the same format (and 
capitalization) used in YAGO. 

In [16]:
df_final = df_filtered.join(
    mapping_name_subject, on="formatted_name"
).select(
    [
        pl.col("name").alias("raw_entities"),
        pl.col("subject").alias("col_to_embed"),
        pl.col("current employee estimate").alias("target").log10()
    ]
)
df_final


raw_entities,col_to_embed,target
str,str,f64
"""ibm""","""<IBM>""",5.437825
"""accenture""","""<Accenture>""",5.280326
"""hewlett-packar…","""<Hewlett-Packa…",5.107047
"""walmart""","""<Walmart>""",5.081898
"""microsoft""","""<Microsoft>""",5.065191
"""at&t""","""<AT&T>""",5.061407
"""wells fargo""","""<Wells_Fargo>""",5.039541
"""infosys""","""<Infosys>""",5.020162
"""deloitte""","""<Deloitte>""",5.017501
"""nokia""","""<Nokia>""",4.925967


In [17]:
df_final.write_parquet(Path(dataset_dir, "company-employees-target.parquet"))

Joining the measure target on the base table. Some unnecessary columns are dropped from the table, then it is saved on disk.


In [18]:
df_prepared = df_filtered.lazy().join(
    df_final.lazy(),
    left_on="name",
    right_on="raw_entities",
    how="inner"
).drop(
    "",
    "formatted_name",
    "current employee estimate",
    "total employee estimate",
).collect()
df_prepared.write_parquet(Path(dataset_dir, "company-employees-yadl.parquet"))
df_prepared.write_csv(Path(dataset_dir, "company-employees-yadl.csv"))

Comparing with the original. 

In [19]:
path_original=Path("/storage/store3/work/acvetkov/gitlab/KEN/experiments/datasets/company_employees/target.parquet")
df_target_og = pl.read_parquet(path_original)
df_target_og

raw_entities,col_to_embed,target
str,str,f64
"""accenture""","""<Accenture>""",5.280326
"""walmart""","""<Walmart>""",5.081898
"""microsoft""","""<Microsoft>""",5.065191
"""infosys""","""<Infosys>""",5.020162
"""deloitte""","""<Deloitte>""",5.017501
"""nokia""","""<Nokia>""",4.925967
"""capgemini""","""<Capgemini>""",4.925204
"""google""","""<Google>""",4.875692
"""ericsson""","""<Ericsson>""",4.830537
"""boeing""","""<Boeing>""",4.827763


Here we are checking which values in `col_to_embed` are not found in `yagofacts["subject"]`. These companies are missing 
because the name of the company is not the same in the original dataset and in YAGO. 

In [20]:
df_target_og.filter(
    ~pl.col("col_to_embed").is_in(yagofacts["subject"])
)

raw_entities,col_to_embed,target
str,str,f64
"""raytheon""","""<Raytheon>""",4.375353
"""thales""","""<Thales>""",4.321868
"""herbalife""","""<Herbalife>""",4.289433
"""flextronics""","""<Flextronics>""",4.279644
"""adecco""","""<Adecco>""",4.160799
"""altran""","""<Altran>""",4.090187
"""statoil""","""<Statoil>""",4.081563
"""symantec""","""<Symantec>""",4.034628
"""syntel""","""<Syntel>""",3.994229
"""arup""","""<Arup>""",3.98304


# The Movies Dataset

The Movies Dataset contains metadata about movies, including the revenue, which is the prediction target for this problem. 

In [21]:
dataset_dir = Path(data_dir, "the-movies-dataset")
df = pl.read_csv(Path(dataset_dir, "movies_metadata.csv"), infer_schema_length=0)


Since the target variable is the revenue, movies with revenue 0 are dropped. We are also reformatting the release date for creating the title mappings.

In [22]:
# Filtering
df_filtered = df.filter(
    pl.col("revenue").cast(int) > 0
).with_columns(
    (pl.col("release_date").str.slice(0, 4)).alias("release_date")
)

`title` and `release_date` together should approximate a unique key quite well: I am looking to see whether this is the case or not. 

In [23]:
df_filtered.groupby(["title", "release_date"]).count().sort("count",descending=True)

title,release_date,count
str,str,u32
"""The Congress""","""2013""",2
"""Confessions of…","""2002""",2
"""Le Samouraï""","""1967""",2
"""Pokémon: Spell…","""2000""",2
"""Black Gold""","""2011""",2
"""Clockstoppers""","""2002""",2
"""Force Majeure""","""2014""",2
"""Pokémon 4Ever:…","""2001""",2
"""A Farewell to …","""1932""",2
"""Camille Claude…","""2013""",2


In [24]:
# Prepare 3 different mappings, to try and cover as many cases as possible
mapping_to_yago = df_filtered.select([pl.col("title"), pl.col("release_date"), pl.col("revenue")])

mapping_to_yago=mapping_to_yago.with_columns(
    [
        ("<" + pl.col("title").str.replace(" ", "_") + ">").alias("title_format_1"),
        ("<" + pl.col("title").str.replace(" ", "_") + "_(film)>").alias("title_format_2"),
        ("<" + pl.col("title").str.replace(" ", "_") +  "_(" + pl.col("release_date") + "_film)>").alias("title_format_3"),
        pl.Series(list(range(len(mapping_to_yago)))).alias("index")
    ]
)

Here we are looking for movies that are present in YAGO according to one of the three formats defined above. 

we also reformat the output to reflect the "target dataset" schema. 

In [25]:
tgt_indices = []
selected = []
for jj in [3,2,1]:
    g1 = mapping_to_yago.filter(
        (pl.col(f"title_format_{jj}").is_in(yagofacts["subject"])) & 
        (~pl.col(f"index").is_in(tgt_indices))
    ).select(pl.col("index"))

    tgt_indices=g1["index"].to_list()
    
    newdf = mapping_to_yago.filter(
        pl.col("index").is_in(tgt_indices)
    ).select(
        [
            pl.col("title"),
            pl.col("release_date"),
            pl.col(f"title_format_{jj}").alias("col_to_embed"),
            pl.col("revenue").log10(),
        ]
    )
    selected.append(newdf)

df_final=pl.concat(selected)
df_final.write_parquet(Path(dataset_dir, "movie-revenues-target.parquet"))

Some preprocessing must be executed in order to flatten the nested fields in the dataset. 

### Cleaning up movies

In [26]:
import ast
def clean_genres(ll):
    g = ast.literal_eval(ll)
    try:
        l1 = g[0]["name"]
        return l1
    except IndexError:
        return ""


def clean_production_companies(ll):
    try:
        g = ast.literal_eval(ll)
    except ValueError:
        return ""
    except SyntaxError:
        print(ll)
    try:
        l1 = g[0]["name"]
        return l1
    except IndexError:
        return ""
    except TypeError:
        return ""


def clean_production_country(ll):
    try:
        g = ast.literal_eval(ll)
    except ValueError:
        return ""
    try:
        l1 = g[0]["iso_3166_1"]
        return l1
    except IndexError:
        return ""
    except TypeError:
        return ""


def clean_spoken_language(ll):
    try:
        g = ast.literal_eval(ll)
    except ValueError:
        return ""
    try:
        l1 = g[0]["name"]
        return l1
    except IndexError:
        return ""
    except TypeError:
        return ""



In [27]:
df = df_filtered.lazy().join(df_final.lazy(), left_on=["title", "release_date"], right_on=["title", "release_date"], how="inner").collect().to_pandas()
df = df.drop(
    [
        "belongs_to_collection",
        "homepage",
        "imdb_id",
        "overview",
        "tagline",
        "poster_path",
    ],
    axis=1,
)

df.genres = df.genres.apply(clean_genres)
df.production_companies = df.production_companies.apply(clean_production_companies)
df.production_countries = df.production_countries.apply(clean_production_country)
df.spoken_languages = df.spoken_languages.apply(clean_spoken_language)


df = df.dropna(subset=["title", "release_date"])
df["release_date"] = df["release_date"].apply(lambda x: str(x[:4])).drop_duplicates()

df["target"] = df["revenue_right"]
df_final=df.drop(["revenue","revenue_right"],axis=1)
df_final.to_parquet(Path(dataset_dir, "movies-yadl.parquet"))
df_final.to_csv(Path(dataset_dir, "movies-yadl.csv"))

### Comparing to the KEN version

In [28]:
df_final

Unnamed: 0,adult,budget,genres,id,original_language,original_title,popularity,production_companies,production_countries,release_date,runtime,spoken_languages,status,title,video,vote_average,vote_count,col_to_embed,target
0,False,30000000,Animation,862,en,Toy Story,21.946943,Pixar Animation Studios,US,1995,81.0,English,Released,Toy Story,False,7.7,5415,<Toy_Story>,8.572353
1,False,65000000,Adventure,8844,en,Jumanji,17.015539,TriStar Pictures,US,,104.0,English,Released,Jumanji,False,6.9,2413,<Jumanji>,8.419621
2,False,60000000,Action,949,en,Heat,17.924927,Regency Enterprises,US,,170.0,English,Released,Heat,False,7.7,1886,<Heat_(1995_film)>,8.272855
3,False,35000000,Action,9091,en,Sudden Death,5.23158,Universal Pictures,US,,106.0,English,Released,Sudden Death,False,5.5,174,<Sudden_Death_(1995_film)>,7.808550
4,False,0,Family,21032,en,Balto,12.140733,Universal Pictures,US,,78.0,English,Released,Balto,False,7.1,423,<Balto_(film)>,7.054932
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3832,False,0,Adventure,423453,fr,Sahara,8.665106,StudioCanal,CA,,86.0,English,Released,Sahara,False,5.4,43,<Sahara_(2017_film)>,6.892095
3833,False,0,Adventure,423453,fr,Sahara,8.665106,StudioCanal,CA,,86.0,English,Released,Sahara,False,5.4,43,<Sahara>,6.892095
3834,False,0,Comedy,265189,sv,Turist,12.165685,Motlys,NO,,118.0,Français,Released,Force Majeure,False,6.8,255,<Force_Majeure_(film)>,6.133378
3835,False,0,Comedy,265189,sv,Turist,12.165685,Motlys,NO,,118.0,Français,Released,Force Majeure,False,6.8,255,<Force_Majeure_(film)>,6.133378


In [29]:
path_original=Path("/storage/store3/work/acvetkov/gitlab/KEN/experiments/datasets/movie_revenues/target.parquet")
df_target_og = pl.read_parquet(path_original)
df_target_og

raw_entities,col_to_embed,target
str,str,f64
"""Heat""","""<Heat_(1995_fi…",8.272855
"""Sudden Death""","""<Sudden_Death_…",7.80855
"""Cry, the Belov…","""<Cry,_the_Belo…",5.830284
"""Pocahontas""","""<Pocahontas_(1…",8.539176
"""Friday""","""<Friday_(1995_…",7.450494
"""Fair Game""","""<Fair_Game_(19…",7.061998
"""Bed of Roses""","""<Bed_of_Roses_…",7.279455
"""Screamers""","""<Screamers_(19…",6.762069
"""Black Sheep""","""<Black_Sheep_(…",1.50515
"""Broken Arrow""","""<Broken_Arrow_…",8.176873


# US Presidential elections

In [30]:
dataset_dir = Path(data_dir, "presidential-results")
df = pl.read_csv(Path(dataset_dir, "presidential-results.csv"), infer_schema_length=0)
df = df.to_pandas()

In [31]:
df = df[df["year"] == "2020"]
df["county_name"] = df["county_name"].str.title()
df["state"] = df["state"].str.title()
df["col_to_embed"] = "<" + df["county_name"] + "_County,_" + df["state"] + ">"
df["target"] = np.log10(df["candidatevotes"].astype(int) + 1)
df["raw_entities"] = df["county_name"] + " " + df["state"]
mask = df["col_to_embed"].str.contains("Louisiana")
df.loc[mask, "col_to_embed"] = df.loc[mask, "col_to_embed"].str.replace(
    "County", "Parish"
)
# df = df[["raw_entities", "col_to_embed", "party", "target"]]
df.dropna(inplace=True)


In [32]:
df_final= df.drop(
    ["raw_entities", "candidatevotes", "county_fips", "office", "year", "totalvotes", "version", "mode"], axis=1
)
df_final.to_parquet(Path(dataset_dir, "us-presidential-results-yadl.parquet"), index=False)
df_final.to_csv(Path(dataset_dir, "us-presidential-results-yadl.csv"), index=False)

In [33]:
df.drop(
    ["raw_entities", "candidatevotes"], axis=1
    ).to_parquet(Path(dataset_dir, "presidential-results-prepared.parquet"), index=False)

In [34]:
pl.read_parquet(Path(dataset_dir, "presidential-results-prepared.parquet"))

year,state,state_po,county_name,county_fips,office,candidate,party,totalvotes,version,mode,col_to_embed,target
str,str,str,str,str,str,str,str,str,str,str,str,f64
"""2020""","""Alabama""","""AL""","""Autauga""","""01001""","""US PRESIDENT""","""JOSEPH R BIDEN…","""DEMOCRAT""","""27770""","""20220315""","""TOTAL""","""<Autauga_Count…",3.875293
"""2020""","""Alabama""","""AL""","""Autauga""","""01001""","""US PRESIDENT""","""OTHER""","""OTHER""","""27770""","""20220315""","""TOTAL""","""<Autauga_Count…",2.633468
"""2020""","""Alabama""","""AL""","""Autauga""","""01001""","""US PRESIDENT""","""DONALD J TRUMP…","""REPUBLICAN""","""27770""","""20220315""","""TOTAL""","""<Autauga_Count…",4.29752
"""2020""","""Alabama""","""AL""","""Baldwin""","""01003""","""US PRESIDENT""","""JOSEPH R BIDEN…","""DEMOCRAT""","""109679""","""20220315""","""TOTAL""","""<Baldwin_Count…",4.390564
"""2020""","""Alabama""","""AL""","""Baldwin""","""01003""","""US PRESIDENT""","""OTHER""","""OTHER""","""109679""","""20220315""","""TOTAL""","""<Baldwin_Count…",3.192567
"""2020""","""Alabama""","""AL""","""Baldwin""","""01003""","""US PRESIDENT""","""DONALD J TRUMP…","""REPUBLICAN""","""109679""","""20220315""","""TOTAL""","""<Baldwin_Count…",4.92192
"""2020""","""Alabama""","""AL""","""Barbour""","""01005""","""US PRESIDENT""","""JOSEPH R BIDEN…","""DEMOCRAT""","""10518""","""20220315""","""TOTAL""","""<Barbour_Count…",3.682777
"""2020""","""Alabama""","""AL""","""Barbour""","""01005""","""US PRESIDENT""","""OTHER""","""OTHER""","""10518""","""20220315""","""TOTAL""","""<Barbour_Count…",1.908485
"""2020""","""Alabama""","""AL""","""Barbour""","""01005""","""US PRESIDENT""","""DONALD J TRUMP…","""REPUBLICAN""","""10518""","""20220315""","""TOTAL""","""<Barbour_Count…",3.749968
"""2020""","""Alabama""","""AL""","""Bibb""","""01007""","""US PRESIDENT""","""JOSEPH R BIDEN…","""DEMOCRAT""","""9595""","""20220315""","""TOTAL""","""<Bibb_County,_…",3.298198


In [35]:

df=df.groupby(["raw_entities", "col_to_embed", "party"], as_index=False).sum()

df["col_to_embed"] = df["col_to_embed"].str.replace(" ", "_")
mask = df["col_to_embed"].str.contains("Louisiana")
df.loc[mask, "col_to_embed"] = df.loc[mask, "col_to_embed"].str.replace(
    "County", "Parish"
)
df["col_to_embed"] = df["col_to_embed"].str.replace("_City_County", "")


  df=df.groupby(["raw_entities", "col_to_embed", "party"], as_index=False).sum()
