In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, array_compact
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, TimestampType

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Read JSON Files") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [None]:

# Define the schema to avoid the overhead of schema inference
schema = StructType([
    StructField("id", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("details", StringType(), True),
    StructField("aliases", ArrayType(StringType()), True),
    StructField("modified", TimestampType(), True),
    StructField("published", TimestampType(), True),
    StructField("database_specific", StructType([
        StructField("severity", StringType(), True)
    ]), True),
    StructField("affected", ArrayType(StructType([
        StructField("package", StructType([
            StructField("name", StringType(), True)
        ]), True),
        StructField("versions", ArrayType(StringType()), True)
    ])), True),
    StructField("severity", ArrayType(StructType([
        StructField("type", StringType(), True),
        StructField("score", StringType(), True)
    ])), True)
])

# Path to the directory containing JSON files
json_directory = "../.viper_cache/*.json"

# Read JSON files into a DataFrame using the predefined schema
df = spark.read.option("multiLine", "true").schema(schema).json(json_directory)

In [None]:

# Select relevant fields, handling nested structures appropriately
selected_df = df.select(
    col("id"),
    col("summary"),
    col("details"),
    col("aliases"),
    col("modified"),
    col("published"),
    col("database_specific.severity").alias("severity"),
    col("affected.package.name").alias("package_name"),
    col("affected.versions").alias("versions"),
    col("severity.score").alias("severity_score")
)

selected_df.printSchema()

In [None]:
selected_df.show()

In [None]:
exploded_df = (
    selected_df
        .withColumn("package_name", explode(col("package_name")))
        .withColumn("versions", explode(col("versions")))
        .withColumn("severity_score", explode(col("severity_score")))
)

exploded_df.show()

In [None]:
filtered_df = (
    exploded_df
        .filter(col("versions").isNull() == False)
        .withColumn(
            "versions",
            explode(col("versions"))
        )
)

In [None]:
filtered_df.show()

In [None]:
filtered_df = filtered_df.repartition(10, ["package_name", "versions"])
filtered_df.write.mode("overwrite").parquet("../.viper_cache/vulnerabilities.parquet")