# seed nodes preparation API

In [None]:
ERR: this notebook (depending on the type of DuckDB connection) can only execute on the server
TODO: support dev mode to connect to local DDB

In [None]:
1 + 1

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import os
import boto3
from surt import surt
import re
import tldextract
import duckdb
import polars as pl
import s3fs

from itertools import chain
from functools import reduce
from dotenv import load_dotenv
from ascii_library.cleanup.commoncrawl.urls import get_surt_host

In [None]:
load_dotenv()

In [None]:
key = os.environ.get("ASCII_AWS_ACCESS_KEY_ID")
secret = os.environ.get("ASCII_AWS_SECRET_ACCESS_KEY")

s3client = boto3.client(
    "s3", use_ssl=False, aws_access_key_id=key, aws_secret_access_key=secret
)

storage_options = {
    "key": key,
    "secret": secret,
}


fs = s3fs.S3FileSystem(key=key, secret=secret)

In [None]:
# dev_mode = True
dev_mode = False
if dev_mode:
    # DEV (user specific)
    database = "/home/heiler/development/projects/ascii/research-space/src/pipelines/ascii/ascii_dbt/ascii_pipeline.duckdb"
    prefix = "ascii_dev"
else:
    # prod
    database = "/data/raid5/data/ascii/mastered-data/ascii_pipeline.duckdb"
    prefix = "ascii"

In [None]:
# on the server
con = duckdb.connect(
    database=database,
    read_only=True,
)

# for development data
# pass in your path to the DDB file instead

In [None]:
# dummy demo
pl.DataFrame(
    con.query(
        f"""
  SELECT class, type, value, description
  FROM {prefix}_ref_clean.orbis_company_industry_classification 
  where ascii_id_company = 'pm1SzeD0UD+nCj+GMJdc2Q=='
"""
    ).arrow()
).to_pandas().sort_values(
    [
        "type",
        "class",
    ]
).reset_index(
    drop=True
)

### automated

In [None]:
c = pd.read_csv("check_indsutry_class.csv")
c.head()

In [None]:
ids_validation = "(" + ", ".join(c["id"].apply(lambda x: f"'{x}'")) + ")"
# ids_validation

In [None]:
# dummy demo
cc = pl.DataFrame(
    con.query(
        f"""
  SELECT *
  FROM {prefix}.company_source_rel
  where id_number in {ids_validation}
"""
    ).arrow()
).to_pandas()

In [None]:
cc.head()  # .ascii_id_company.nunique()

In [None]:
ids_validation = "(" + ", ".join(cc["ascii_id_company"].apply(lambda x: f"'{x}'")) + ")"

In [None]:
# dummy demo
ccc = (
    pl.DataFrame(
        con.query(
            f"""
  SELECT ascii_id_company, class, type, value, description
  FROM {prefix}_ref_clean.orbis_company_industry_classification 
  where ascii_id_company in {ids_validation}
"""
        ).arrow()
    )
    .to_pandas()
    .sort_values(
        [
            "ascii_id_company",
            "type",
            "class",
        ]
    )
    .reset_index(drop=True)
)
ccc.head()

In [None]:
ccc.to_parquet("ascii_ref.parquet", index=False, compression="gzip")

global

In [None]:
%ls /data/raid5/data/ascii/mastered-data/reference-data/data_raw_direct_source_drop/joshua

local dev data

In [None]:
%ls ../../../../../reference-data/data_raw_direct_source_drop/joshua/

In [None]:
# mapping = '/home/zelle/development/projects/ascii/reference-data/data_raw_direct_source_drop/joshua/map_ascii_orbis_gt.csv' # map ascii id to gt id
mapping = "../../../../../reference-data/data_raw_direct_source_drop/joshua/map_ascii_orbis_gt.csv"

filename = "Georgetown_v1.0.0"

In [None]:
combined_query = f"""
SELECT 
    -- gt.provider_id,
    gt.ascii_id_company,
    -- ascii_ref_clean.orbis_company_contact_info.name_internat, 
    ascii_ref_clean.orbis_company_contact_info.website_address
FROM 
    (
        SELECT *
        FROM read_csv_auto('{mapping}')
    ) AS gt
JOIN 
    ascii_ref_clean.orbis_company_contact_info
ON 
    gt.ascii_id_company = ascii_ref_clean.orbis_company_contact_info.ascii_id_company;
"""

In [None]:
%%time

combined_seeds = pl.DataFrame(con.query(combined_query).arrow())
print(combined_seeds.shape)

combined_seeds.head().to_pandas()

## Clean data

In [None]:
# drop NAs for website and the names
combined_seeds = combined_seeds.drop_nulls(["website_address"])

In [None]:
combined_seeds.to_pandas().duplicated(subset="website_address").sum()

In [None]:
df = combined_seeds.with_columns(
    pl.col("website_address")
    .str.replace("www..", "", literal=True)
    .str.replace("http:", "", literal=True)
    .str.replace("https:", "", literal=True)
    .str.replace("www.", "", literal=True)
    .str.replace("..", "", literal=True)
    .str.strip_chars("/")
    .str.to_lowercase()
    .str.split("/")
    .list.get(0)
    .alias("seed_node_url")
)  # .drop("website_address")
df = df.with_columns(
    pl.col("seed_node_url")
    .map_elements(
        lambda x: tldextract.extract(x).registered_domain if x is not None else None
    )
    .alias("extracted")
)

print(df.shape)
# Filter rows where the length of website_address is at least 4
df = df.filter(
    (pl.col("seed_node_url").str.len_bytes() >= 4)
    & (pl.col("seed_node_url").is_not_null())
).with_columns(
    pl.col("extracted")
    .map_elements(lambda x: get_surt_host(f"https://{x}") if x is not None else None)
    .alias("seed_node_url_surt")
)
print(df.shape)
df.head()

In [None]:
get_surt_host("https://google.com")

We must filter out non-unique URLs.

We also keep only the TLD main domain name.

- this is in general the preferred approach
- there are issues like with `ntjinda.1688.com` -> `1688.com` for content aggregators. These are ignored for now

In [None]:
# df.unique(subset=['seed_node_url'])
df.unique().to_pandas().duplicated(subset="seed_node_url").sum()

Some weird URLs are deleted!

ERROR validate cleaned result manually
we do not catch all special cases yet:

```
www.polydrag.com- -> polydrag.com-
integratedwww.com -> integratedcom
www.magamex.com.m -> magamex.com.m
```

In [None]:
df.filter(
    pl.col("extracted") != pl.col("seed_node_url"),
    ~pl.col("extracted").is_in(["1688.com"]),
    pl.col("extracted").str.len_bytes() < 4,
).to_pandas()

> TODO: over time we should combine companies (perform identity resolution) to map who belongs together
> TODO: potentially also re-use the ORBIS hierarchy here

In [None]:
# dummy demo
pl.DataFrame(
    con.query(
        """
  SELECT *
  FROM ascii_ref_clean.orbis_company_identifiers where ascii_id_company in ('1qxQ+KZOVuCyhDgdiyU0mg==', 'TULKI2k0XoO2B3exsjd/9Q==')
"""
    ).arrow()
).to_pandas()

In [None]:
# df_seeds_duplicated["ascii_id_company"].n_unique()

In [None]:
print(df.shape)
df_seeds_duplicated = df.filter(
    ~pl.col("extracted").is_in(["1688.com"]),
    pl.col("extracted").str.len_bytes() >= 4,
    pl.col("extracted").is_not_null(),
).select(
    pl.col("ascii_id_company"),
    pl.col("extracted").alias("seed_node_url"),
    pl.col("seed_node_url_surt"),
)
print(df_seeds_duplicated.shape)
# display(df_seeds_duplicated.head())

df_seeds_production = df_seeds_duplicated.unique(subset=["seed_node_url"])
print(df_seeds_production.shape)
df_seeds_production.head()

## store data accessible for CC scripts

we want to store both `df_seeds_production` and `df_seeds_duplicated` accessible for the CC scripts in S3.
`df_seeds_duplicated` can be used for later lookuping the results with all the duplicate matching companies.

In [None]:
WARNING! use unique prefix for file name

In [None]:
filename

In [None]:
BUCKET_NAME = "ascii-supply-chain-research-input"

### refine storage for easy DDB processing

we need one partitioned (delta?) table

In [None]:
with fs.open(
    f"{BUCKET_NAME}/ascii_seeds/seeds_deduplicated/seeds={filename}/seeds.parquet",
    mode="wb",
) as f:
    df_seeds_production.write_parquet(f, compression="gzip")

In [None]:
with fs.open(
    f"{BUCKET_NAME}/ascii_seeds/seeds_not_deduplicated/seeds={filename}/seeds_duplicated.parquet",
    mode="wb",
) as f:
    df_seeds_duplicated.write_parquet(f, compression="gzip")

In [None]:
dsp = pl.read_parquet(
    f"s3://{BUCKET_NAME}/ascii_seeds/seeds_deduplicated/seeds=*/*.parquet"
)
dsp.head()

In [None]:
dsp.group_by("seeds").agg(pl.col("ascii_id_company").n_unique()).to_pandas()