<a href="https://colab.research.google.com/github/lestermartin/starburst-dataframes-exploration/blob/main/DellAppliance/SmokeTest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Initial smoke test of the Dell Data Analytics Engine (powered by Starburst)</center></h1>

<a id='the-runtime-environment'></a>
## The runtime environment

This notebook is to allow quick validation that
[Apache Spark](https://spark.apache.org/) code can be run on the
[Dell Data Analytics Engine](https://dell.starburst.io/latest/index.html) -- *powered by [Starburst](httphttps://www.starburst.io/s://)*.

<a id='installing-spark'></a>
## Installing Spark

> These instructions where lifted & enhanced from [Colab and PySpark](https://colab.research.google.com/drive/1G894WS7ltIUTusWWmsCnF_zQhQqZCDOc) whose source file can be downloaded from [here](https://github.com/jacobceles/knowledge-repo/blob/master/pyspark/Colab%20and%20PySpark.ipynb) and then used with any Jupyter notebook.

Install Dependencies:

1.   Java 8 (Dell appliance requires 22, but so far 8 is working from the notebook)
2.   Apache Spark with hadoop (Settled on 3.5.1 for starters as needed >= 3.4 for Spark Connect)
3.   Findspark (used to locate the spark in the system)

> If you have issues with spark version, please upgrade to the latest version from [here](https://archive.apache.org/dist/spark/).

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
!ls

Set Environment Variables:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

<a id='dell-cli-tasks'></a>
## Dell CLI tasks



Obtain & set Spark Connect uri:

> Full details in the [CLI docs](https://dell.starburst.io/latest/dell-data-processing-engine/cli.html),
but here are the general steps after installation.

Run the following wherever you have the Dell CLI installed.

`./dell-data-processing-engine login`

Replace `ACCESS_KEY` and `SECRET_KEY` accordingly and create the Spark Connect instance

```
./dell-data-processing-engine submit \
	--conf spark.hadoop.fs.s3a.access.key=ACCESS_KEY \
	--conf spark.hadoop.fs.s3a.secret.key=SECRET_KEY \
	--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
	--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
	--conf spark.hadoop.fs.s3a.endpoint= \
	--conf spark.sql.repl.eagerEval.enabled=True \
	--spark-connect
```

Copy the outputted `sparkId` value to your clipboard and replace that with `REPLACE-ME` in next step

`./dell-data-processing-engine instance uris REPLACE-ME`

Copy the `Spark Connect` uri (starts with `sc://`) to your clipboard and use it in the next code cell








**Note: when all done be sure to run `./dell-data-processing-engine instance delete REPLACE-ME`**


In [None]:
#
# run this cell and past the Spark Connect uri in the textbox that surfaces (and press <enter> OF COURSE; haha)
#

import getpass

sparkConnectUri = input("Spark Connect uri ")

<a id='run-spark'></a>
## Run Spark


Create the SparkSession:

> Output should look similar to
`<pyspark.sql.connect.session.SparkSession at 0x7fe9f73bbe90>`

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .remote(sparkConnectUri) \
    .getOrCreate()
spark.version

Create a DataFrame from hard-coded data and display it:

In [None]:
from datetime import datetime, date
from pyspark.sql import Row

bogus_df = spark.createDataFrame([
  Row(aNbr=1, nutherNbr=2, aString='string1', aDate=date(2000, 1, 1), aTimestamp=datetime(2000, 1, 1, 12, 0)),
  Row(aNbr=2, nutherNbr=3, aString='string2', aDate=date(2000, 2, 1), aTimestamp=datetime(2000, 1, 2, 12, 0)),
  Row(aNbr=4, nutherNbr=5, aString='string3', aDate=date(2000, 3, 1), aTimestamp=datetime(2000, 1, 3, 12, 0)),
  Row(aNbr=8, nutherNbr=7, aString='string4', aDate=date(2000, 4, 1), aTimestamp=datetime(2000, 1, 4, 12, 0)),
])
bogus_df.show()

You can open up the Spark UI by getting the `Spark Web UI` link from the earlier run of `./dell-data-processing-engine instance uris REPLACE-ME` and opening it up in the same browser that launched when you ran the `login` command.

If you navigate to the **SQL / Dataframe** tab you should see something similar to the following now.

![alt text](https://github.com/lestermartin/starburst-dataframes-exploration/blob/main/DellAppliance/SparkUI.png?raw=true "Spark Web UI screenshot")


## Are you done?

If so (or when you are), don't forget to run the following command.

**`./dell-data-processing-engine instance delete REPLACE-ME`**


<a id='transformation-logic'></a>
## Transformation logic

We are using the publicly available Bluebikes - Hubway dataset. Read more information [about Blue Bikes Boston](http://bluebikes.com/about), a bicycle-sharing program based in Boston since 2011.

We are focusing on the [transactional records](https://bluebikes.com/system-data) of the bike trips from start to finish.

<a id='exploring-the-raw-data'></a>
### Exploring the raw data

In [None]:
# lets just grab a single CSV to explore with
s3_file_path = "s3a://starburst101-handsonlab-nyc-uber-rides/blue_bikes/raw_trips-2022_01-2022-09/202201-bluebikes-tripdata.csv"

# read CSV file into a DataFrame
df = spark.read.csv(s3_file_path, header=True, inferSchema=True)

# Show the DataFrame
df.show()

In [None]:
# Q: how many rows
df.count()

# RAISES EXCEPTION -- DON'T RUN!!
#  looks like captured in https://issues.apache.org/jira/browse/SPARK-45769

In [None]:
# we are going to explore 'df' several times and Spark uses lazy execution so let's just cache it
#  NOTE: won't need this when turn this into a batch program

from pyspark import StorageLevel

df.cache()

In [None]:
# prime the cache

df.show()

In [None]:
# Q: any null values for the tripduration field?
from pyspark.sql.functions import col
df.filter(col("tripduration").isNull()).show()

# A: no null values found (that's good!)

In [None]:
# Q: tripduration values seem realistic? note: time is in seconds
from pyspark.sql.functions import min, max, avg, count
df.select(count("tripduration"),
                 min("tripduration"),
                 max("tripduration"),
                 avg("tripduration")
          ).show()

# A: min trip is a minute seems ok, but max trip of 27 DAYS **seems** WRONG,
#     but maybe this rider just didn't check the bike back in for a month
#     and average of 20 minutes seems reasonable

In [None]:
# Q: are there a BUNCH of super long rides? Say greater than 16 hours (kept it with you all day)

df.filter("tripduration > 50400").sort("tripduration", ascending=False).show(200)

# A: well, it is less than 200 at least!

In [None]:
# Q: exactly how many are there greater than 16 hours?

df.filter("tripduration > 50400").select(count("tripduration")).show()

# A: Approx 100 out of 81613 seems reasonable

In [None]:
# Q: what is the exact count of rides longer than 18 hours (added 2 hours after) columns have concerning names and/or data types?

df.printSchema()

# A: the names are all slammed together or include spaces between the words
#     fortunately, the data types look pretty good although we'll want to
#     standardize the number of decimal places for the lat/long values

In [None]:
# standardize the column names

renames_df = df.withColumnRenamed('tripduration',            'trip_seconds') \
               .withColumnRenamed('starttime',               'start_time') \
               .withColumnRenamed('stoptime',                'stop_time') \
               .withColumnRenamed('start station id',        'start_station_id') \
               .withColumnRenamed('start station name',      'start_station_name') \
               .withColumnRenamed('start station latitude',  'start_station_latitude') \
               .withColumnRenamed('start station longitude', 'start_station_longitude') \
               .withColumnRenamed('end station id',          'end_station_id') \
               .withColumnRenamed('end station name',        'end_station_name') \
               .withColumnRenamed('end station latitude',    'end_station_latitude') \
               .withColumnRenamed('end station longitude',   'end_station_longitude') \
               .withColumnRenamed('bikeid',                  'bike_id') \
               .withColumnRenamed('usertype',                'user_type') \
               .withColumnRenamed('postal code',             'postal_code')
renames_df.show(truncate=False)

In [None]:
# cast the lat & long values to Decimal (15,13) and (16,13), respectively

from pyspark.sql.types import DecimalType

after_cast_df = renames_df.withColumn("start_station_latitude", col("start_station_latitude").cast(DecimalType(15, 13))) \
                          .withColumn("end_station_latitude", col("end_station_latitude").cast(DecimalType(15, 13))) \
                          .withColumn("start_station_longitude", col("start_station_longitude").cast(DecimalType(16, 13))) \
                          .withColumn("end_station_longitude", col("end_station_longitude").cast(DecimalType(16, 13)))
after_cast_df.printSchema()

In [None]:
# notice the changes in the lat/long field values (some longer due to zeros and some rounded off to be shorter)

after_cast_df.show()

In [None]:
# and, OF COURSE, we could have done all of this in one pass

from pyspark.sql.types import DecimalType

one_pass_df = df.withColumnRenamed('tripduration',            'trip_seconds') \
                .withColumnRenamed('starttime',               'start_time') \
                .withColumnRenamed('stoptime',                'stop_time') \
                .withColumnRenamed('start station id',        'start_station_id') \
                .withColumnRenamed('start station name',      'start_station_name') \
                .withColumnRenamed('start station latitude',  'start_station_latitude') \
                .withColumn("start_station_latitude", col("start_station_latitude").cast(DecimalType(15, 13))) \
                .withColumnRenamed('start station longitude', 'start_station_longitude') \
                .withColumn("start_station_longitude", col("start_station_longitude").cast(DecimalType(16, 13))) \
                .withColumnRenamed('end station id',          'end_station_id') \
                .withColumnRenamed('end station name',        'end_station_name') \
                .withColumnRenamed('end station latitude',    'end_station_latitude') \
                .withColumn("end_station_latitude", col("end_station_latitude").cast(DecimalType(15, 13))) \
                .withColumnRenamed('end station longitude',   'end_station_longitude') \
                .withColumn("end_station_longitude", col("end_station_longitude").cast(DecimalType(16, 13))) \
                .withColumnRenamed('bikeid',                  'bike_id') \
                .withColumnRenamed('usertype',                'user_type') \
                .withColumnRenamed('postal code',             'postal_code')
one_pass_df.show(truncate=False)

<a id='define-the-schema'></a>
### Define the schema

We COULD just use this approach of INFERRING THE SCHEMA IMPLICITLY, but there might be problems from file to file. An example might be how a header name on the first row might change in a future file which would break out transformations.

Therefore, once we feel we have the schema figured out (including standardized naming and any data type conversions) we can DEFINE THE SCHEMA EXPLICITY to make our transformation job more robust.

In [None]:
# show the schema as we have it now
one_pass_df.printSchema()

In [None]:
# create a list of the schema in the format column_name, data_type

from pyspark.sql.types import *

labels = [
     ('trip_seconds',IntegerType()),
     ('start_time',TimestampType()),
     ('stop_time',TimestampType()),
     ('start_station_id',IntegerType()),
     ('start_station_name',StringType()),
     ('start_station_latitude',DecimalType(15,13)),
     ('start_station_longitude',DecimalType(16,13)),
     ('end_station_id',IntegerType()),
     ('end_station_name',StringType()),
     ('end_station_latitude',DecimalType(15,13)),
     ('end_station_longitude',DecimalType(16,13)),
     ('bike_id',IntegerType()),
     ('user_type',StringType()),
     ('postal_code',StringType())
]

# Creating the schema that will be passed when reading the csv

schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

In [None]:
# lets just grab a single CSV to explore with
s3_file_path = "s3a://starburst101-handsonlab-nyc-uber-rides/blue_bikes/raw_trips-2022_01-2022-09/202201-bluebikes-tripdata.csv"

# read CSV file into a DataFrame
df_explicit_schema = spark.read.csv(s3_file_path, header=True, schema=schema)

# Show the DataFrame
df_explicit_schema.printSchema()

In [None]:
# verify data still looks good, too
df_explicit_schema.show()

In [None]:
# let's just use SQL for a bit...
df_explicit_schema.createOrReplaceTempView("raw_rides")

# quick check that the temp name works in SQL
spark.sql("SELECT * FROM raw_rides").show()

<a id='clean-the-data'></a>
### Clean the data

For this demo we will just pretend we did the proper due diligence and found out that all of the columns, except for the last, were being received with high quality.

We will just focus on the last column, postal_code; especially since there are hard-coded strings of 'NULL' noticeable in prior results.


In [None]:
# Q: does min/max tell us anything?

spark.sql("select min(postal_code), max(postal_code) from raw_rides").show()

# A: the min of 00000 (which is actually in the CSV) indicates in invalid USA zip code
#     and the 'W12 9PL' indicates there are customers who live in Canada (close-ish to Boston)

In [None]:
# Q: how many with 00000?
spark.sql("select count(*) from raw_rides where postal_code = '00000'").show()

# A: 126 (those would be better set to NULL)

In [None]:
# Q: how many with empty strings or hard-coded to 'NULL'?

spark.sql(
    "SELECT postal_code, count(*) "\
    "  FROM raw_rides "\
    " WHERE postal_code IN ('', 'NULL')"\
    " GROUP BY postal_code").show()

# A: seems only ones with hard-coded 'NULL' values (which should be true NULL values)

In [None]:
# Q: any with strings that start with a space?

spark.sql("select count(*) from raw_rides where postal_code LIKE ' %'").show()

# A: none

In [None]:
# Q: are there some actual NULL values present?
spark.sql("select count(*) from raw_rides where postal_code IS NULL").show()

# A: Yep, just over a 100 are present

In [None]:
# convert the 3260 records with 'NULL' and the 126 with '00000' (total of 3386)
#  to actual NULL values

from pyspark.sql.functions import count

df_cleaned = df_explicit_schema \
  .replace({'00000': None}, subset=['postal_code']) \
  .replace({'NULL': None}, subset=['postal_code'])

# the total should now be the converted 3386 + the original 116 which is 3502

df_cleaned.filter("postal_code IS NULL").select(count("trip_seconds")).show()

<a id='enrich-the-data'></a>
### Enrich the data

There is an enrichment requirement to augment the bike rides by adding province and average income values based on rider postal code.

Explore the lookup dataset which only contains USA-based zip codes and focus on the columns than can be leveraged for our enrichment needs.

In [None]:
# load and cache the lookup dataset

# TODO: parameterize bucket name
s3_file_path_for_lookup = "s3a://starburst101-handsonlab-nyc-uber-rides/common/zip_code_income/"

lookup_labels = [
     ('state',StringType()),
     ('zip_code',IntegerType()),
     ('num_returns',IntegerType()),
     ('agi',IntegerType()),
     ('agi_avg',DecimalType(16,8)),
     ('num_returns_with_tot_inc',IntegerType()),
     ('tot_inc_amt',IntegerType()),
     ('tot_inc_avg',DecimalType(16,8)),
     ('num_returns_with_tax_inc',IntegerType()),
     ('tax_inc_amt',IntegerType()),
     ('tax_inc_avg',DecimalType(16,8))
]

# Creating the schema that will be passed when reading the csv
lookup_schema = StructType([StructField (x[0], x[1], True) for x in lookup_labels])

# read CSV file into a DataFrame
lookup_df = spark.read.csv(s3_file_path_for_lookup, header=False, schema=lookup_schema)

# mark it to be cached (again, not needed for the batch version)
lookup_df.cache()

# make the df available to simple SQL
lookup_df.createOrReplaceTempView("zip_code_income")

# Show the DataFrame
lookup_df.show()

In [None]:
# look at some values from around Roswell, Georgia

spark.sql(
    "SELECT zip_code, state, tot_inc_avg * 1000 AS avg_income "\
    "  FROM zip_code_income "\
    " WHERE zip_code IN (30004, 30009, 30022, 30075, 30076, 30092)").show()

In [None]:
# notice the zip_code is a numeric field, BUT giving it zip codes starting
#  with 0 basically still works

spark.sql(
    "SELECT * FROM zip_code_income "\
    " WHERE zip_code IN (02139, 02124)").show()

In [None]:
# verify the join will work fine despite diff data types on the postal/zip code columns

# this didn't happen yet, so making it available
df_cleaned.createOrReplaceTempView("cleaned_rides")

spark.sql(
    "SELECT r.postal_code, z.tot_inc_amt "\
    "  FROM cleaned_rides AS r "\
    "  JOIN zip_code_income AS z ON (r.postal_code = z.zip_code) "\
    " WHERE postal_code IN ('02139', '02124')").show()

In [None]:
# create a cleaned up & trimmed down df of the data to include in the join

from pyspark.sql.functions import col

min_cols_of_lookup_df = lookup_df.select("zip_code", "state", "tot_inc_avg") \
  .withColumnRenamed("state", "province") \
  .withColumn("tot_inc_avg", col("tot_inc_avg") * 1000) \
  .withColumnRenamed("tot_inc_avg", "avg_income")

min_cols_of_lookup_df.show()

In [None]:
# join the datasets and get rid of the extra zip code column
df_enriched = df_cleaned.join(min_cols_of_lookup_df,
                              df_cleaned.postal_code == min_cols_of_lookup_df.zip_code,
                              "left") \
                        .drop("zip_code")

df_enriched.show()

In [None]:
# check on the Canadian postal codes (which should have avg_income set to NULL)
df_enriched.filter("postal_code IS NOT NULL") \
           .select("postal_code", "province", "avg_income") \
           .orderBy("postal_code", ascending=False) \
           .show(50, truncate=False)

<a id='persist-the-data'></a>
### Persist the data

Stopping here for now, but basically the next step is to save this as Parquet files to a destination that there is a Hive external table looking at.

Once I can use the shared catalog, can do that to a managed table.

EITHER WAY, this is our SILVER dataset.

THEN I can use the Starburst UI and create some views for the GOLD layer.

## Test it as batch

The next cell is validating the short/n/sweet version that can then be saved in a .py file and submitted via the CLI.

In [None]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, count

# initialize the application
findspark.init()
spark = SparkSession.builder \
    .remote(sparkConnectUri) \
    .getOrCreate()

# load the raw data with appropriate names and datatypes
labels = [
     ('trip_seconds',IntegerType()),
     ('start_time',TimestampType()),
     ('stop_time',TimestampType()),
     ('start_station_id',IntegerType()),
     ('start_station_name',StringType()),
     ('start_station_latitude',DecimalType(15,13)),
     ('start_station_longitude',DecimalType(16,13)),
     ('end_station_id',IntegerType()),
     ('end_station_name',StringType()),
     ('end_station_latitude',DecimalType(15,13)),
     ('end_station_longitude',DecimalType(16,13)),
     ('bike_id',IntegerType()),
     ('user_type',StringType()),
     ('postal_code',StringType())
]
schema = StructType([StructField (x[0], x[1], True) for x in labels])

# TODO: parameterize the bucket name AND the new file to process
s3_file_path = "s3a://starburst101-handsonlab-nyc-uber-rides/blue_bikes/raw_trips-2022_01-2022-09/202201-bluebikes-tripdata.csv"
df_explicit_schema = spark.read.csv(s3_file_path, header=True, schema=schema)

# clean the dataset (our simple example only deals with postal_code)
df_cleaned = df_explicit_schema \
  .replace({'00000': None}, subset=['postal_code']) \
  .replace({'NULL': None}, subset=['postal_code'])

# load the lookup dataset
lookup_labels = [
     ('state',StringType()),
     ('zip_code',IntegerType()),
     ('num_returns',IntegerType()),
     ('agi',IntegerType()),
     ('agi_avg',DecimalType(16,8)),
     ('num_returns_with_tot_inc',IntegerType()),
     ('tot_inc_amt',IntegerType()),
     ('tot_inc_avg',DecimalType(16,8)),
     ('num_returns_with_tax_inc',IntegerType()),
     ('tax_inc_amt',IntegerType()),
     ('tax_inc_avg',DecimalType(16,8))
]
lookup_schema = StructType([StructField (x[0], x[1], True) for x in lookup_labels])

# TODO: parameterize bucket name AND folder name
s3_file_path_for_lookup = "s3a://starburst101-handsonlab-nyc-uber-rides/common/zip_code_income/"
lookup_df = spark.read.csv(s3_file_path_for_lookup, header=False, schema=lookup_schema)

# create a cleaned up & trimmed down df of the data to include in the join
min_cols_of_lookup_df = lookup_df.select("zip_code", "state", "tot_inc_avg") \
  .withColumnRenamed("state", "province") \
  .withColumn("tot_inc_avg", col("tot_inc_avg") * 1000) \
  .withColumnRenamed("tot_inc_avg", "avg_income")

# join the datasets and get rid of the extra zip code column
df_enriched = df_cleaned.join(min_cols_of_lookup_df,
                              df_cleaned.postal_code == min_cols_of_lookup_df.zip_code,
                              "left") \
                        .drop("zip_code")

# FOR NOW... JUST SHOW IT!
df_enriched.show()

## Are you done?

If so (or when you are), don't forget to run the following command (the REPLACE-ME is explained at the top of this notebook).

**`./dell-data-processing-engine instance delete REPLACE-ME`**