In [None]:
import os
import numpy as np
import pandas as pd
import subprocess
import json
from datetime import datetime
from dateutil import rrule

# PySpark imports
import pyspark
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
import pyspark.sql.functions as F
import pyspark.sql.types as pst
from pyspark.sql.functions import countDistinct
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
print('Ready imports')

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
spark = SparkSession.builder.getOrCreate()

print('Started: ', datetime.now().replace(microsecond=0))

In [None]:
# sc.stop()
# print('Stoped:', datetime.now().replace(microsecond=0))

In [None]:
import pandas as pd

country_dataset = pd.read_json("extra_data/countries.json").T
language_dataset = pd.read_json("extra_data/languages.json").T

country_dataset_df = spark.createDataFrame(
    country_dataset.reset_index()[["index", "name", "continent", "languages"]]
).repartition(30)

rows = [['COM', "United States", "NA", [""]],
        ['NET', "United States", "NA", [""]],
        ['ORG', "United States", "NA", [""]],
       ]
columns = ["index", "name", "continent", "languages"]

second_df = spark.createDataFrame(rows, columns)

country_dataset_df = country_dataset_df.union(second_df)

In [None]:
as2name2023 = (
    (
        spark.read.option("header", "true").csv("extra_data/asn2organization.csv")
        .select("asn", "asn_name", "org_name", "source", "country")
    )
)

as2name2023.show()

print("Load ok")

In [None]:
# Read PARQUET file into dataframe
df = spark.read.parquet("all_ipv4_queries.parquet")

df.show()


In [None]:
group = df.groupBy("asn", "tld").count()

group.sort(col("count").desc()).show(10, truncate=False)

In [None]:
full_domain_per_iptype_concentration = (group.join(
        as2name2023, group["asn"] == as2name2023["asn"], "inner"
    )
    .join(
        country_dataset_df.select("index", "continent", "languages"),
        as2name2023["country"] == country_dataset_df["index"],
    ).drop("index", "source")
    .withColumn(
        "normalized_tld",
        F.when(F.col("tld") == ".xn--p1ai.", F.lit("RU")).otherwise(
            F.when(F.col("tld") == ".uk.", F.lit("GB")).otherwise(
                F.when(F.col("tld") == ".com.", F.lit("COM")).otherwise(
                    F.when(F.col("tld") == ".net.", F.lit("NET")).otherwise(
                        F.when(F.col("tld") == ".org.", F.lit("ORG")).otherwise(
                        F.upper(F.substring(F.col("TLD"), 2, 2))
        ))))),
    ).join(
        country_dataset_df.select(
            F.col("index").name("tld_index"),
            F.col("continent").name("tld_continent"),
            F.col("languages").name("tld_languages"),
        ),
        F.col("normalized_tld") == F.col("tld_index"),
    )
    .drop("tld_index")
    .withColumn(
        "characteristic",
        F.when(
            F.col("normalized_tld") == F.col("country"), F.lit("Local")
        ).otherwise(
                F.when(
                    (F.col("country") == "US"), "US Hosting"
                ).otherwise(
                    F.when(
                        F.arrays_overlap(F.col("languages"), F.col("tld_languages")),
                        "Language",
                    ).otherwise("Rest")
                )
        ),
    ).drop("languages", "tld_languages"))
    # full_domain_per_iptype_concentration.sort_values(by=['domain_count'], ascending=False, inplace=True)

full_domain_per_iptype_concentration.sort(col("count").desc()).show(truncate=False)

In [None]:
group_tld = full_domain_per_iptype_concentration.groupBy("tld").count()

In [None]:
full_domain_per_iptype_concentration = full_domain_per_iptype_concentration.sort(col("count").desc())

full_domain_per_iptype_concentration.write.format("csv").save("tlds_new_ipv6/", partitionBy=["tld"])