# Ingest Data Warehouse

### Import


In [None]:
import glob
from typing import List, Optional, Any
import polars as pl
import pyarrow as pa
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, MapType
from pyspark.sql import SparkSession, DataFrame as SparkDataFrame
from pyspark.sql import functions as F
from duckdb import DuckDBPyConnection, DuckDBPyRelation
from tqdm import tqdm
import urllib.request
import duckdb
import json

In [None]:
print(pyspark.__version__)
print(duckdb.__version__)

### Config


In [None]:
duckdb_database = "../orchestration/db/bigdata.duckdb"

In [None]:
class DuckDBContext:
    def __init__(self, db_path: str):
        self.db_path = db_path

    def __enter__(self) -> "DuckDBContext":
        self.conn = duckdb.connect(database=self.db_path, read_only=False)
        return self

    def __exit__(
        self,
        exc_type: Optional[type],
        exc_val: Optional[Exception],
        exc_tb: Optional[object],
    ) -> None:
        self.conn.close()

    def save_to_duckdb(self, df, table_name: str) -> None:
        # If the DataFrame is a Polars DataFrame, convert it to an Arrow table
        if isinstance(df, pl.DataFrame):
            df = pa.Table.from_pandas(df.to_pandas())
        # If the DataFrame is a Spark DataFrame, convert it to an Arrow table
        elif isinstance(df, pyspark.sql.DataFrame):
            df = pa.Table.from_batches(df._collect_as_arrow())
        # Convert the Arrow table to a DuckDB DataFrame
        df = self.conn.from_arrow(df)

        df.create(table_name)
        row_count = self.conn.query(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
        print(f"CREATED TABLE: {table_name} WITH {row_count} ROWS!")

    def show_n(self, table_name: str, n: int = 10):
        try:
            result = self.conn.execute(f"SELECT * FROM {table_name} LIMIT {n}")

            print(result.pl())
        except Exception as e:
            print(f"An error occurred: {e}")
            return None


# Usage
# with DuckDBContext("../orchestration/db/bigdata.duckdb") as con:
#     # Use 'con' for operations
#     pass

#### Helpers


In [None]:
def get_table_name(file_name: str) -> str:
    return file_name.rsplit(".", 2)[0]


def create_url(endpoint: str) -> str:
    """
    Create Url

    :param str endpoint: download endpoint
    :return str: full url
    """
    return f"https://datasets.imdbws.com/{endpoint}"


def download_file(url, filename):
    print(f"Downloading file: {filename}")

    response = urllib.request.urlopen(url)

    # Get the total file size
    file_size = int(response.headers.get("content-length", 0))

    # Create a tqdm progress bar
    progress = tqdm(total=file_size, unit="iB", unit_scale=True, desc=filename)

    chunk_size = 1024  # you can change this to larger if you want

    with open(filename, "wb") as f:
        while True:
            chunk = response.read(chunk_size)
            if not chunk:
                break
            f.write(chunk)
            progress.update(len(chunk))
    progress.close()

#### Setting up Cluster Connection


In [None]:
# Connect to Spark Cluster
# spark = (
#     SparkSession.builder.master("spark://spark:7077")
#     .appName("Spark-ETL")
#     .config("spark.sql.debug.maxToStringFields", 1000)
#     .getOrCreate()
# )

# Connect to local Spark Sessions
spark = (
    SparkSession.builder.master("local").appName("Spark-ETL")
    # .config("spark.sql.debug.maxToStringFields", 1000)
    .getOrCreate()
)

# Add to Data Warehouse

## Initial Data

#### Train


In [None]:
# Get a list of all CSV files that match the pattern
csv_files = glob.glob("../../data/train-*.csv")
print(csv_files)

# Load all CSV files in the data directory into a dataframe
# Specify '\\N' as a null value
# Ignore the header and infer the schema from data
train_spark_df = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("../../data/train-*.csv", nullValue="\\N")
)
# Drop the first column
train_spark_df = train_spark_df.drop("_c0")
train_spark_df.describe().show()
# Print the dataframe
train_spark_df.show(5)

#### Directing


In [None]:
# Using Polars to retrieve the directing data
# Load and parse the JSON file
with open("../../data/directing.json") as f:
    data = json.load(f)

movies_polars_df = pl.from_dict(data["movie"]).transpose().rename({"column_0": "movie"})
directors_polars_df = (
    pl.from_dict(data["director"]).transpose().rename({"column_0": "director"})
)
directing_polars_df = pl.concat(
    [
        movies_polars_df,
        directors_polars_df,
    ],
    how="horizontal",
)
directing_polars_df.head(5)

#### Writing


In [None]:
with open("../../data/writing.json") as f:
    data = json.load(f)
writing_json = spark.sparkContext.parallelize(data)
writing_spark_df = spark.read.json(writing_json)
writing_spark_df.show(5)

### Load into DuckDB Database


In [None]:
# DuckDBContext to add pyspark tables to DuckDB
with DuckDBContext(duckdb_database) as ctx:
    ctx.save_to_duckdb(train_spark_df, "imdb_train")
    ctx.show_n("imdb_train", 5)

    ctx.save_to_duckdb(directing_polars_df, "imdb_directors")
    ctx.show_n("imdb_directors", 5)

    ctx.save_to_duckdb(writing_spark_df, "imdb_writing")
    ctx.show_n("imdb_writing", 5)

## Extra Data


### Downloading

Run this cell once, otherwise you'll keep downloading the same files over and over...


In [None]:
extra_imdb = [
    "name.basics.tsv.gz",
    "title.akas.tsv.gz",
    "title.basics.tsv.gz",
    "title.crew.tsv.gz",
    "title.episode.tsv.gz",
    "title.principals.tsv.gz",
    "title.ratings.tsv.gz",
]

# RUN THIS ONCE!
# # Download the files
# for ds in extra_imdb:
#     # Create an instance of the IMDB class with the desired endpoint
#     download_url = create_url(ds)

#     filepath = f"../../data/extra/{ds}"  # Local fp
#     # Use the function to download the file
#     download_file(download_url, filepath)

## Reading with Spark


In [None]:
with DuckDBContext("../orchestration/db/bigdata.duckdb") as ctx:
    train_ids = ctx.conn.execute("SELECT tconst FROM imdb_train").fetchdf()
    # Convert the pandas DataFrame to a PySpark DataFrame
    train_ids_spark = spark.createDataFrame(train_ids)
    train_ids_spark.show(5)

    for ds in extra_imdb:
        print(f"Reading file: {ds}")
        table_name = f"extra.{get_table_name(ds)}"
        print("Creating table: ", table_name)

        if ds == "name.basics.tsv.gz":
            # Load all TSV.GZ files in the data directory into a dataframe
            spark_df = spark.read.csv(
                f"../../data/extra/{ds}", header=True, sep="\t", nullValue="\\N"
            )
            spark_df.show(5)
        elif ds == "title.akas.tsv.gz":
            # Load all TSV.GZ files in the data directory into a dataframe
            spark_df = spark.read.csv(
                f"../../data/extra/{ds}", header=True, sep="\t", nullValue="\\N"
            )
            spark_df.show(5)
            print("Joining with train_ids_spark on tconst <==> titleId")
            joined_spark_df = (
                train_ids_spark.join(
                    spark_df, train_ids_spark.tconst == spark_df.titleId, how="left"
                )
                .drop("titleId")
                .show(5)
            )
            print(type(joined_spark_df))

            ctx.save_to_duckdb(joined_spark_df, table_name)
            ctx.show_n(table_name, 5)

        else:
            # Load all TSV.GZ files in the data directory into a dataframe

            spark_df = spark.read.csv(
                f"../../data/extra/{ds}", header=True, sep="\t", nullValue="\\N"
            )
            spark_df.show(5)
            print("Joining with train_ids_spark on tconst")
            joined_spark_df = train_ids_spark.join(
                spark_df, on="tconst", how="left"
            ).show(5)
            print(type(joined_spark_df))

            ctx.save_to_duckdb(joined_spark_df, table_name)
            ctx.show_n(table_name, 5)