In [1]:
# Imports
import sys
import os
sys.path.append(os.path.abspath('/home/msds2025/jvalera/bdcc2025/bdcc-lab-openlib/scripts'))
from marc21_countries import country_mapping

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, regexp_extract, regexp_replace, floor, lit, lower, explode, concat, array_contains,
    broadcast, transform, split, trim, lower, flatten, array_distinct, size, expr
)

In [2]:
spark = (SparkSession
         .builder
         .appName("Working Dataframe Creation")
         .master("local[*]")
         .config("spark.driver.memory", "16g")
         .config("spark.driver.maxResultSize", "2g")
         .config("spark.executor.memory", "8g")
         .config("spark.executor.cores", "4")
         .config("spark.sql.shuffle.partitions", "32")
         .config("spark.default.parallelism", "32")
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.memory.offHeap.enabled", "true")
         .config("spark.memory.offHeap.size", "4g")
         .config("spark.sql.autoBroadcastJoinThreshold", "-1")
         .getOrCreate()
        )

In [3]:
eds_output_path = "/home/msds2025/jvalera/bdcc2025/bdcc-lab-openlib/partitioned_data/eds_partitioned"
df_eds_final = spark.read.parquet(eds_output_path)

In [4]:
from pyspark.sql.functions import (
    col, lit, lower, regexp_replace, trim, when, regexp_extract,
    floor, broadcast, split, array_distinct, flatten, transform, filter as spark_filter
)

df_eds = (df_eds_final
    .withColumn("year",
        when(regexp_extract(col("publish_date"), r"(\d{4})", 1) != "",
             regexp_extract(col("publish_date"), r"(\d{4})", 1))
        .otherwise(None))
    .withColumn("decade",
        when(col("year").isNotNull(),
             concat((floor(col("year") / 10) * 10).cast("string"), lit("s")))
        .otherwise(lit("Unknown")))
    .filter(col("year").isNotNull() & col("year").between(1900, 2020))
    .withColumn("publish_country",
        lower(regexp_replace(col("publish_country"), r"^-", ""))) 
    .join(
        broadcast(spark.createDataFrame(list(country_mapping.items()), ["publish_country", "country"])),
        on="publish_country",
        how="left")
    .withColumn("country",
        when(col("country").isNotNull(), col("country"))
        .otherwise(lit("Unknown")))
    .withColumn("subjects_cleaned",
        transform(col("subjects"),
            lambda s: trim(lower(
                regexp_replace(
                    regexp_replace(
                        regexp_replace(s, r"\.+", ""),     
                        r"/", ","),                        
                    r"\s*-\s*", ",")))))                  
    .withColumn("subject_clean_flat",
        array_distinct(
            flatten(
                transform(col("subjects_cleaned"),
                          lambda s: split(s, ",")))))
    .withColumn("subject_clean",
        array_distinct(
            spark_filter(
                transform(col("subject_clean_flat"),
                    lambda s: when(~s.rlike(r"^\s*$") & (lower(s) != "etc"),
                                   trim(regexp_replace(s, r"\s+", " ")))
                              .otherwise(None)),
                lambda s: s.isNotNull())))
    
    .filter(size(col("subject_clean")) > 0)
    
    .select("decade", "country", "subject_clean")
)


In [5]:
# Partition and write
output_dir = "/home/msds2025/jvalera/bdcc2025/bdcc-lab-openlib/partitioned_data"
df_eds_path = f"{output_dir}/df_eds_partitioned"
df_eds.write.parquet(df_eds_path, mode="overwrite")

In [6]:
# Explode df_eds to make df_explode
df_explode = (df_eds
            .withColumn("subject", explode(col("subject_clean")))
            .select("decade", "country", "subject")
            .filter(col("subject").isNotNull())
           )

In [7]:
df_exp_path = f"{output_dir}/df_exp_partitioned"
df_explode.write.parquet(df_exp_path, mode="overwrite")

In [8]:
spark.stop()