# Wikipedia pageviews Bronze
Executes by run_date as string formatted 'yyyy-mm-dd
Assumes s3 folder structure: "s3a://data/wikipedia_pageviews/YYYY/YYYY-MM/DD/pageviews-YYYYMMDD-hhmmss.gz"
Performs: 
* ingestion
* column naming
* column casting
* get the date from the filename
* partitioning
* storing as delta table

Note: there will be a bunch of warnings and some exceptions from the Hive catalog, that's because it doesn't support schema updates. Everything works just fine anyway.
In databricks the Unity Catalog is used, but it's proproetary so we'll have to make do with Hive Catalog.

In [None]:
import datetime as dt

execution_date = "2025-05-15T01:00:00+00:00"
execution_id = f"wikipedia_pageviews_bronze-{dt.datetime.now():%Y%m%d-%H0000}"
full_refresh = False
bronze_db = "bronze"

## Standard stuff that should be a package

In [None]:
import datetime as dt
import os

import pyspark
import requests  # type: ignore
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from pyspark.sql.utils import AnalysisException

print(f"Starting notebook execution: {execution_id}")

if not os.environ.get("SPARK_CONF_DIR"):
    os.environ["SPARK_CONF_DIR"] = "/opt/tfds/spark/conf"


def get_date():
    """Parse the exeuction date to a datetime object."""
    return dt.datetime.fromisoformat(execution_date)


def get_config(config_name):
    """Get config from tfds-config server."""
    config_server_url = os.environ.get("TFDS_CONFIG_URL")
    if config_server_url is None:
        config_server_url = "http://tfds-config:8005/api/configs"

    config_url = config_server_url + "/" + config_name

    print(f"retrieving {config_name} config from {config_url}")
    response = requests.get(config_url)
    response.raise_for_status()
    if response.json() is None:
        raise ValueError(f"Config '{config_name}' not found. config server response: {response.text}")
    cfg = response.json().get("config")
    if cfg is None:
        raise ValueError(
            f"Config '{config_name}' does not have a 'config' key. Config server response: {response.text}"
        )

    return cfg


def get_spark_session(app_name: str) -> SparkSession:
    """Get spark client for s3."""
    s3_cfg = get_config("s3")

    conf = (
        pyspark.conf.SparkConf()
        .setAppName(app_name)
        # s3 secrets
        .set("spark.hadoop.fs.s3a.access.key", s3_cfg["access_key"])
        .set("spark.hadoop.fs.s3a.secret.key", s3_cfg["secret_key"])
        .set("spark.task.maxFailures", "1")
        # .setMaster("local[*]")
    )
    builder = pyspark.sql.SparkSession.builder.config(conf=conf)
    spark_session = configure_spark_with_delta_pip(builder).getOrCreate()

    return spark_session


def show_cfg(spark_session: SparkSession):
    """Print out the spark config."""
    cfg = spark_session.sparkContext.getConf().getAll()
    for key, value in cfg:
        if key in (
            "spark.submit.pyFiles",
            "spark.driver.extraJavaOptions",
            "park.app.initial.jar.urls",
            "spark.files",
            "spark.repl.local.jars",
            "spark.app.initial.file.urls" "spark.executor.extraJavaOption",
            "spark.app.initial.jar.urls" "spark.app.initial.file.urls",
        ):
            print(key)
            for csv in value.split(","):
                print("    " + str(csv))
        else:
            print(f"{key} = {value}")


def print_spark_info(sc: SparkSession):
    """Print some spark info."""
    cfg: pyspark.SparkConf = sc.sparkContext.getConf()
    print(f'==== spark app: {cfg.get("spark.app.name")} ====')
    print(f'Spark master: {cfg.get("spark.master")}')
    print(f'Delta lake location: {cfg.get("spark.sql.warehouse.dir")}')
    print(f'S3 endpoint: {cfg.get("spark.hadoop.fs.s3a.endpoint")}')
    print(f'Custom config file status: {cfg.get("spark.signal.config.value")}')

    dbs = sc.catalog.listDatabases()
    print("Databases:")
    for db in dbs:
        print(db.name)
        tables = sc.catalog.listTables(db.name)
        for tbl in tables:
            print(f"    {tbl.name}")

## Bronze pipeline

In [None]:
from pyspark.sql.functions import col, input_file_name, substring, to_date


def make_s3_path(date):
    """Make s3 path for the given date."""
    year = date.strftime("%Y")
    month = date.strftime("%m")
    day = date.strftime("%d")
    return f"s3a://data/wikipedia_pageviews/{year}/{year}-{month}/{day}/*.gz"


# s3_path = "s3a://data/wikipedia_pageviews/2025/2025-03/*/*.gz"
# s3_path = "s3a://data/wikipedia_pageviews/2025/2025-03/11/pageviews-20250311-220000.gz"
# s3_path = "s3a://data/test.csv"

run_dates = [get_date(), get_date() - dt.timedelta(days=1)]

print("Getting a spark session")
spark = get_spark_session(execution_id)
print_spark_info(spark)

print(f"Creating the {bronze_db} database")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {bronze_db}")

schema = StructType(
    [
        StructField(name="domain_code", dataType=StringType(), nullable=True),
        StructField("page_title", StringType(), True),
        StructField("count_views", StringType(), True),
    ]
)
file_spec = [make_s3_path(date) for date in run_dates]
print(f"Loading data: {file_spec}")

# this only helps if part of the files are missing
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")
# in our case all files were missing, wrap in try/except
reader = (
    spark.read.format("csv")
    .option("delimiter", " ")
    .option("header", "false")
    .option("inferSchema", "false")
    .schema(schema)
)
try:
    data = reader.load(file_spec)
except AnalysisException as e:
    if "Path does not exist" in str(e):
        data = spark.createDataFrame([], schema)
    else:
        raise e

if len(data.inputFiles()) == 0:
    print(f"No files found in: {file_spec}")
else:
    print("Files found:")
    for f in data.inputFiles():
        print(f)

data_enriched = (
    data.na.drop(subset=["domain_code"])
    .filter(~col("page_title").contains(":"))
    .filter(~col("page_title").isin("-", "Main_Page", "Forside", "Hauptseite", "wiki.phtml"))
    .withColumn("country_code", substring(col("domain_code"), 1, 2))
    .filter(col("domain_code").isin("sv", "dk", "no", "de", "en"))
    .withColumn("count_views", col("count_views").cast(IntegerType()))
    .withColumn("file_name", input_file_name())
    .withColumn("date", to_date(substring(col("file_name"), -18, 8), "yyyyMMdd"))
)

table_name = f"{bronze_db}.wikipedia_page_reads"
# silence a few bulky hive catalog warnings
spark.sparkContext.setLogLevel("ERROR")
if not full_refresh:
    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
(
    data_enriched.write.mode("overwrite")  # Options: 'overwrite', 'append', 'ignore', 'error' (default)
    .option("mergeSchema", "true")
    .format("delta")  # Options: 'parquet', 'csv', 'json', 'orc', etc.
    .partitionBy("date")
    .saveAsTable(table_name)
)
print(f"Updated delta table: {table_name}")
spark.stop()
print(f"All done: {execution_id}")