In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, collect_set, size, desc, when, expr, regexp_replace
import os

# Initialize Spark session
spark = SparkSession.builder.appName("JSON Analysis").getOrCreate()

workspace_dir = os.getcwd() 
json_path = os.path.join(workspace_dir, "data", "*.json")

# Read the JSON files
df = spark.read.option("multiline", "true").json(json_path).cache()

# Count rows in the DataFrame
df_count = df.count()
print(f"Total Records: {df_count}")

# Extract certificate details
out = (
    df.select(explode(col("certificates")).alias("certificate"))
    .select(
        col("certificate.commonName").alias("domain"),
        col("certificate.san"),
        col("certificate.address"),
        col("certificate.organization")
    )
    .select(
        col("domain"),
        explode(col("san")).alias("alternate_domain"),
        col("address"),
        col("organization")
    )
)

25/02/07 07:54:08 WARN Utils: Your hostname, codespaces-e703dd resolves to a loopback address: 127.0.0.1; using 10.0.10.221 instead (on interface eth0)
25/02/07 07:54:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/07 07:54:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Total Records: 2


In [18]:
# Aggregate 
classified = (
    out.withColumn("domain", regexp_replace(col("domain"), r"^\*\.", ""))
    .withColumn("is_self", col("domain") == col("alternate_domain"))
    .withColumn("is_subdomain", expr("alternate_domain LIKE concat('%.', domain)") & ~col("is_self"))
    .groupBy("domain")
    .agg(
        collect_set(when(col("is_subdomain") & ~col("is_self"), col("alternate_domain"))).alias("subdomains"),
        collect_set(when(~col("is_subdomain") & ~col("is_self"), col("alternate_domain"))).alias("alternate_domains"),
        collect_set("organization").alias("organizations"),
        collect_set("address").alias("addresses")
    )
    .withColumn("organizations_count", size(col("organizations")))
    .withColumn("alternate_domains_count", size(col("alternate_domains")))
    .orderBy(desc("alternate_domains_count"))
)

classified.printSchema()

root
 |-- domain: string (nullable = true)
 |-- subdomains: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- alternate_domains: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- organizations: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- addresses: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- organizations_count: integer (nullable = false)
 |-- alternate_domains_count: integer (nullable = false)



In [19]:
classified.show(truncate=False)

+---------------------------------+-------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------------+
|domain                           |subdomains 