In [None]:
!pip install pyspark findspark

In [None]:
"""
Create a SparkSession
"""

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("Almanac")
    .config("spark.jars", "postgresql-42.7.3.jar")
    .getOrCreate()
)

spark

In [None]:
"""
    Load the OpenLibrary dataset

    The OpenLibrary dataset does not include a header
    so the headers are added manually
"""

df = spark.read.load(
    "ol_dump_works_2024-03-31.txt",
    format="csv",
    sep="\t",
    inferSchema="true",
    header="true",
)

df = df.drop("type", "editions", "date")

In [None]:
"""
    Extract the ID from the id column
    Extract the title and description from the book_data column
"""

from pyspark.sql.functions import split, from_json, get_json_object

extract_id = split(df["id"], "/").getItem(2)
df = df.withColumn("id", extract_id)

extract_title = from_json(df["book_data"], "MAP<STRING, STRING>").getItem("title")
df = df.withColumn("title", extract_title)

extract_description = from_json(df["book_data"], "MAP<STRING, STRING>").getItem(
    "description"
)
df = df.withColumn("description", extract_description)

clean_description = (
    from_json(df["description"], "MAP<STRING, STRING>").getItem("value")
    if get_json_object(df["description"], "$.value") is not None
    else df["description"]
)
df = df.withColumn("description", clean_description)
df = df.where(df["title"].isNotNull())

In [None]:
df.show()

In [None]:
"""
    Write the DataFrame to a PostgreSQL database
"""

import os

DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "54322")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASSWORD = os.getenv("DB_PASSWORD", "postgres")
DB_NAME = os.getenv("DB_NAME", "postgres")

df.select("id", "title", "description").write.mode("append").format("jdbc").option(
    "url", f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
).option("driver", "org.postgresql.Driver").option("dbtable", "books").option(
    "user", DB_USER
).option(
    "password", DB_PASSWORD
).save()