# ETL.pynb

Notebook that fetches the latest measurement records from the message bus and loads
those records into the warehouse.

In [None]:
# Imports go in this cell
import functools
import io
import operator
import os
import sys
import time
from typing import List, Tuple

import confluent_kafka as kafka
import numpy as np 
import pandas as pd
import psycopg2

# The convention with PySpark is to import individual classes, Java-style
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark

# Function that configures the notebook kernel by putting the lib directory 
# onto the library path and changing the working directory to the top-level
# project dir. Idempotent.
def setup_kernel():
    # Move to project root if we're not already there
    if os.getcwd().endswith("notebooks"):
        os.chdir("..")
    # TODO: Verify that we're actually at the project root.
    # Add the scripts dir to the Python path if it's not already there.
    scripts_dir = os.getcwd() + "/scripts"
    if scripts_dir not in sys.path:
        sys.path.append(scripts_dir)

setup_kernel()

In [None]:
# Retrieve configuration parameters from the environment, if present.

# Start a map from environment var => default value.
PARAMS_MAP = {
    "spark_master": "local[*]",
    "kafka_bootstrap_servers": "localhost:9092",
    "kafka_topic": "reefer",
    "batch_temp_loc": "batch.csv",
    "postgres_host" : None,
    "postgres_port" : None,
    "postgres_db" : "demo",
    "postgres_user" : None,
    "postgres_password" : None,
    "table_name": "reefer_telemetries"
}

# Override with environment variable values where applicable.
# Uppercase names, i.e. PARAMS_MAP["my_var_name"] <=> os.environ["MY_VAR_NAME"]
for k in PARAMS_MAP.keys():
    env_var_name = k.upper()
    if env_var_name in os.environ:
        PARAMS_MAP[k] = os.environ[env_var_name]
      
# TODO: Remove the following line to avoid leaking credentials to the log
PARAMS_MAP

In [None]:
# Fire up PySpark.
if "spark" not in locals(): # Make this cell idempotent
    spark = SparkSession.builder \
        .master(PARAMS_MAP["spark_master"]) \
        .appName("ReeferETL") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .getOrCreate()
    
PARAMS = spark.sparkContext.broadcast(PARAMS_MAP)

spark

In [None]:
# Load up subroutines and make sure we have a fresh copy
import importlib
import etl_lib
importlib.reload(etl_lib)

In [None]:
etl_lib.__file__

In [None]:
# PySpark needs to know about every library we call
spark.sparkContext.addPyFile(etl_lib.__file__)

In [None]:
# Define our UDFs. This must happen AFTER starting Spark.
fetch_udf = pandas_udf(etl_lib.fetch_udf,                        
                       "partition_id long, offset long, value string",
                       PandasUDFType.GROUPED_MAP)

# This one has to be done differently so that it can get parameters from
# the broadcast variable.
@pandas_udf("partition_id long, offset long", PandasUDFType.GROUPED_MAP)
def load_udf(records):
    return etl_lib.load_udf(records, PARAMS.value)


In [None]:
# Generate a dataframe for job control.
partition_ids = etl_lib.get_partition_ids(PARAMS.value["kafka_bootstrap_servers"],
                                          PARAMS.value["kafka_topic"])    
params_df = spark.createDataFrame(
    [(p, PARAMS.value["kafka_bootstrap_servers"], 
      PARAMS.value["kafka_topic"])
      for p in partition_ids], 
    ["partition_id", "bootstrap_servers", "topic_name"])
params_df.show()

In [None]:
# Fetch all available messages from all partitions in parallel and write 
# the resulting messages to a temp directory on the distributed filesystem.
# Use CSV format for ease of debugging.
raw_batch_df = (
    params_df
    # Work around Spark's tendancy to use spark.sql.shuffle.partitions blindly
    .repartition(len(partition_ids), "partition_id")
    .groupby("partition_id")
    .apply(fetch_udf)
)
raw_batch_df.write.csv(path=PARAMS.value["batch_temp_loc"], 
                       mode="overwrite", header=True)

In [None]:
# Wrap the temp file in a dataframe for all subsequent processing.
batch_df = spark.read.csv(PARAMS.value["batch_temp_loc"], header=True, 
                          schema=raw_batch_df.schema)
"{}: count = {}".format(batch_df, batch_df.count())

In [None]:
# Bulk-load the batch of data into the warehouse in parallel
num_batches = 4
load_results = (
    batch_df
    # Work around Spark's tendancy to use spark.sql.shuffle.partitions blindly
    .withColumn("partition_id", batch_df.offset % num_batches)
    .repartition(num_batches, "partition_id")
    .groupby("partition_id")
    .apply(load_udf)
    .toPandas()
)
load_results

In [None]:
# Compute the offsets we should commit.
# In Kafka, you commit with the *next* offset you *will* consume, not the 
# last offset you have consumed.
offsets_df = batch_df.groupby("partition_id").agg({"offset": "max"})
offsets_df = offsets_df.select(offsets_df["partition_id"], 
                               (offsets_df["max(offset)"] + 1).alias("to_commit"))
offsets_pd = offsets_df.toPandas()
offsets_pd

In [None]:
etl_lib.commit_offsets(list(offsets_pd.to_records(index=False)),
                       PARAMS.value["kafka_bootstrap_servers"],
                       PARAMS.value["kafka_topic"])