In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import col, array, explode_outer, from_json, when, collect_set, array_join

### Initialize Spark Session

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("BigDataAnalyticsPipeline").getOrCreate()

## Large Data Ingestion

In [None]:
# Define the base directory for the project
base_dir = "./data"

# Gather paths to all text files that are actually JSON
file_paths = []
for root, dirs, files in os.walk(base_dir):

    for file in files:
        # Add only text files (assuming they don’t have a .json extension)
        if not file.endswith('.json'):
            file_paths.append(os.path.join(root, file))

# Load all JSON text files into a single Spark DataFrame
# Using the option `multiLine=True` if each file contains multiple JSON objects across lines
df = spark.read.option("multiLine", True).json(path=file_paths)

# Display schema and sample records to verify
df.printSchema()
df.show(5, truncate=True)

## Data Transformation

#### Extracting and Flattening Nested Affiliation Fields 

In [None]:
# Define the affiliation schema
affiliation_schema = StructType([
    StructField("affiliation-city", StringType(), True),
    StructField("affilname", StringType(), True),
    StructField("affiliation-country", StringType(), True),
])

affiliation_array_schema = ArrayType(affiliation_schema)

# Extract title, abstract, publication date, and affiliation
df = df.withColumn(
    "affiliation_string",
    col("abstracts-retrieval-response.affiliation").cast("string")
)

df = df.withColumn(
    "affiliation_parsed_array",
    from_json(col("affiliation_string"), affiliation_array_schema)
).withColumn(
    "affiliation_parsed_single",
    from_json(col("affiliation_string"), affiliation_schema)
)

df = df.withColumn(
    "affiliation_array",
    when(col("affiliation_parsed_array").isNotNull(), col("affiliation_parsed_array"))
    .when(col("affiliation_parsed_single").isNotNull(), array(col("affiliation_parsed_single")))
    .otherwise(array())
).drop("affiliation_string", "affiliation_parsed_array", "affiliation_parsed_single")

#### Extracting and Flattening Nested Mainterm Fields

In [None]:
mainterm_schema = StructType([
    StructField("$", StringType(), True),
])

mainterm_array_schema = ArrayType(mainterm_schema)

df = df.withColumn(
    "mainterm_string",
    col("abstracts-retrieval-response.idxterms.mainterm").cast("string")
)

df = df.withColumn(
    "mainterm_parsed_array",
    from_json(col("mainterm_string"), mainterm_array_schema)
).withColumn(
    "mainterm_parsed_single",
    from_json(col("mainterm_string"), mainterm_schema)
)

df = df.withColumn(
    "mainterm_array",
    when(col("mainterm_parsed_array").isNotNull(), col("mainterm_parsed_array"))
    .when(col("mainterm_parsed_single").isNotNull(), array(col("mainterm_parsed_single")))
    .otherwise(array())
).drop("mainterm_string", "mainterm_parsed_array", "mainterm_parsed_single")

### Combining and Aggregating Data

In [36]:
# name variable to avoid confusion
# explode_df  = df.select(
#     col("abstracts-retrieval-response.coredata.dc:identifier").alias("document_id"),
#     explode_outer("affiliation_array").alias("affiliation"),
#     explode_outer("mainterm_array").alias("mainterm")
# ).select(
#     "document_id",
#     col("affiliation.affiliation-country").alias("affiliation_country"),
#     col("affiliation.affiliation-city").alias("affiliation_city"),
#     col("affiliation.affilname").alias("affiliation_name"),
#     col("mainterm.$").alias("keyword")
# )
# 
# # Aggregate countries and cities per document_id
# join_df = explode_df.groupBy("document_id").agg(
#     array_join(collect_set("affiliation_country"), ";").alias("affiliation_countries"),
#     array_join(collect_set("affiliation_city"), ";").alias("affiliation_cities"),
#     array_join(collect_set("affiliation_name"), ";").alias("affiliation_names"),
#     array_join(collect_set("keyword"), ";").alias("keywords")
# )
# 
# selected_df = df.select(
#     col("abstracts-retrieval-response.coredata.dc:identifier").alias("document_id"),
#     col("abstracts-retrieval-response.coredata.dc:title").alias("title"),
#     col("abstracts-retrieval-response.coredata.dc:description").alias("description"),
#     col("abstracts-retrieval-response.coredata.prism:coverDate").alias("publication_date"),
#     col("abstracts-retrieval-response.coredata.dc:publisher").alias("publisher"),
# )
# 
# final_df = join_df.join(
#     selected_df,
#     on="document_id",
#     how="left"
# )

mainterm_df = df.withColumn("mainterm", explode_outer("mainterm_array")).select(
    col("abstracts-retrieval-response.coredata.dc:identifier").alias("document_id"),
    col("mainterm.$").alias("keyword"),
)

affiliation_df = df.withColumn("affiliation", explode_outer("affiliation_array")).select(
    col("abstracts-retrieval-response.coredata.dc:identifier").alias("document_id"),
    col("affiliation.affiliation-country").alias("affiliation_country"),
    col("affiliation.affiliation-city").alias("affiliation_city"),
    col("affiliation.affilname").alias("affiliation_name"),
)

exploded_df = mainterm_df.join(
    affiliation_df,
    on="document_id",
    how="left"
)

final_df = exploded_df.join(
    df.select(
        col("abstracts-retrieval-response.coredata.dc:identifier").alias("document_id"),
        col("abstracts-retrieval-response.coredata.dc:title").alias("title"),
        col("abstracts-retrieval-response.coredata.dc:description").alias("description"),
        col("abstracts-retrieval-response.coredata.prism:coverDate").alias("publication_date"),
        col("abstracts-retrieval-response.coredata.dc:publisher").alias("publisher")
    ),
    on="document_id",
    how="left"
)

final_df = final_df.select(
    "document_id",
    "title",
    "description",
    "publication_date",
    "publisher",
    "keyword",
    "affiliation_country",
    "affiliation_city",
    "affiliation_name",
)
# Show the result
final_df.show(10, truncate=False)



+---------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

### Data Export

In [37]:
output_path = "documents_output_explode"

# Write the final DataFrame to a CSV file
final_df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")


                                                                                