# Data Universe 2024 - Workshop 4

## Data pipelines, views & data products [Python-based]

This workshop was created for Data Universe 2024 - https://www.datauniverseevent.com/

The GitHub project at https://github.com/lestermartin/du2024 includes information on all 4 of the workshops presented.  

Workshop 4's lab guide ((make that a link)) identify where the steps are being referenced.

### Lab 3 > Part 3 > Step 4: Begin working with PyStarburst

Follow the instructions and run the code in the cells until you read a **RETURN TO LAB GUIDE** message.

Run the following cell to install the PyStarburst library.

In [None]:
#
# Install the library
#  https://docs.starburst.io/clients/python/pystarburst.html
#

%pip install pystarburst

Wait until you see a message which states `you may need to restart the kernal ...`. **You do not need to restart anything.** 

Run the next cell which needs your host, user, and password details.  The prior step in the lab guide identified how to find those in Starburst Galaxy. 

In [None]:
#
# Define connection properties
#  get the host and other information from the cluster list
#

import getpass

host = input("Host name")
username = input("User name")
password = getpass.getpass("Password")

IF you pasted or typed anything wrong in those 3 properties, you won't really know about it until the next step(s). 

IF you do get an error, the easiest solution is to delete the notebook and re-import it then try again.  :)

**Unless otherwise indicated** read the comments at the top of the subsequent cells and then run them.

In [None]:
#
# Import dependencies and create a Session object
#  https://pystarburst.eng.starburstdata.net/session.html#pystarburst.session.Session
#

from pystarburst import Session
from pystarburst import functions as F
from pystarburst.functions import *
from pystarburst.window import Window as W

import trino

session_properties = {
    "host":host,
    "port": 443,
    "http_scheme": "https",
    "auth": trino.auth.BasicAuthentication(username, password)
}

session = Session.builder.configs(session_properties).create()
# no output expected (well... unless there is an error!!)

In [None]:
#
# Validate connectivity to the cluster
#

session.sql("select 1 as b").collect()

In Starburst Galaxy, go to the **Query** > **Query History** and see if you can find that this bogus select statement actually ran.

In [None]:
#
# Verify you can see the discovered_schema schema
#  https://pystarburst.eng.starburstdata.net/session.html#pystarburst.session.Session.sql
#

session.sql("show tables from du2024.discovered_schema").collect()

In [None]:
#
# What columns make up the pokemon table
#  https://pystarburst.eng.starburstdata.net/session.html#pystarburst.session.Session.table
#  https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.schema
#

# Create a Dataframe for the pokemon table
pokemonDF = session.table("du2024.discovered_schema.pokemon")

# Show the columns
print(pokemonDF.schema.printSchema())

In [None]:
#
# Show the data
#  pokemonDF is the Dataframe (DF) that we defined early
#  the show() command will list out up to 10 rows
#    pass it an argument for something longer
#  https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.show
#

pokemonDF.show()

In [None]:
#
# That was pretty busy, let's try that again...
#  use the select method on an existing DF identifying just the columns to keep
#   https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.select
#

pokemonDF.select("num", "name", "lat", "lng").show()

### Create a new schema to house the structure and consume tables/views

In [None]:
session.sql("CREATE SCHEMA IF NOT EXISTS du2024.workshop4").show()

session.sql("USE du2024.workshop4").show()

session.get_fully_qualified_current_schema()

#### Create the `pokemon_spawn_structure` table

Recall that the default table format you set for this catalog was Iceberg. However, you are still able to create a table with the Hive table format due to Great Lakes connectivity in Starburst Galaxy.  Great Lakes connectivity abstracts the details of using different table formats and file types when using certain write access statements for object storage systems. 

Part of your cleansing in the structure layer is to update the table format from Hive to Iceberg and convert the text file to an ORC file.  The table you just created with schema discovery is not yet fully optimized and transformed. 

First, build a new structure layer table with more accurate data types. Create the table using the ORC file format. This improves performance when using the Iceberg table format.  

Run the following command to create the structure layer table.

In [None]:
session.sql("DROP TABLE IF EXISTS pokemon_spawns_structure").show()

session.sql(" \
    CREATE TABLE pokemon_spawns_structure (   \
        number INTEGER,                       \
        name VARCHAR,                         \
        latitude DOUBLE,                      \
        longitude DOUBLE,                     \
        encounter_seconds BIGINT              \
    ) WITH (format = 'ORC', type = 'ICEBERG')").show()

#### Cast the land zone table's records appropriately

The following code performs the need structuring to be loaded into the structure table.

In [None]:
# https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.filter
# https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.withColumn
# https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.drop
# https://pystarburst.eng.starburstdata.net/dataframe_functions.html#pystarburst.functions.cast
# https://pystarburst.eng.starburstdata.net/dataframe_functions.html#pystarburst.functions.round
# https://pystarburst.eng.starburstdata.net/dataframe_functions.html#pystarburst.functions.iff
# https://pystarburst.eng.starburstdata.net/dataframe_functions.html#pystarburst.functions.div0

pokemonCleanedDF = session.table("du2024.discovered_schema.pokemon") \
    .select("num", "name", "lat", "lng", "encounter_ms") \
    .filter("lat >= 37.62 AND lat <= 37.86") \
    .filter("lng >= -122.51 AND lng <= -122.12") \
    .withColumn("number", cast("num", "INTEGER")) \
    .drop("num") \
    .withColumn("name2", col("name")).drop("name").with_column_renamed("name2", "name") \
    .withColumn("latitude", round("lat", 2)) \
    .drop("lat") \
    .withColumn("longitude", round("lng", 2)) \
    .drop("lng") \
    .withColumn("encounter_seconds", iff("encounter_ms = -1", lit(-1), div0("encounter_ms", 1000))) \
    .drop("encounter_ms")

pokemonCleanedDF.show()

#### Insert data into the `pokemon_spawns_structure` table

With the table already created, we can use the dataframe from above to write into the table.

**Note:** After doing this manually the first time, you can automate the process in the future using an orchestration tool like Airflow, Prefect, or Dagster.

In [None]:
# 
# Save to an EXISTING table (using 'append' mode)
#  https://pystarburst.eng.starburstdata.net/dataframe_write_functions.html
# 

pokemonCleanedDF.write.mode("append").save_as_table("pokemon_spawns_structure")


# Verify 95197 rows were added

session.table("pokemon_spawns_structure").count()

In [None]:
# Validate it 'looks' good at a glance

session.table("pokemon_spawns_structure").show()

#### Preview the lookup table

The data in the lookup table also needs to be cleaned and optimized. It’s best practice to clean up the data in the structure layer before creating the consume layer.  First, run a quick join to show the federation capabilities available within Starburst Galaxy and join the newly created S3 `pokemon_spawns_structure` table with the unoptimized Snowflake lookup table `pokemon_lkp`.

In [None]:
# https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.join

spawnsStructureDF = session.sql("SELECT number, name, latitude, longitude  \
                                   FROM pokemon_spawns_structure")

pokedexLookupDF = session.table("pokemon_lkp.pokemon_lookup.pokedex") \
    .withColumn("lkup_number", cast("number", "INTEGER")) \
    .select("lkup_number", "type_1", "type_2", "catch_rate", "generation")

spawnsStructureDF.join(pokedexLookupDF, spawnsStructureDF.number == pokedexLookupDF.lkup_number) \
    .drop("lkup_number").show()

You can see that data from two different data sources (catalogs) are returned above with one federated query. This is beneficial when performing interactive analytics; specifically for data consumers who could not otherwise get this information without a data engineer. 

#### Create a `pokemon_pokedex_structure` table

Now, create a table which will store an optimized version of this table. Create this table as a Delta Lake table. Why would you create a Delta Lake table? Because Starburst Galaxy’s Great Lakes connectivity gives you the ability to query multiple table formats at once. You will test this out later in the lab.  

In [None]:
session.sql("DROP TABLE IF EXISTS pokemon_pokedex_structure").show()

session.sql(" \
    CREATE TABLE pokemon_pokedex_structure (  \
        name VARCHAR, number INTEGER,         \
        type_1 VARCHAR, type_2 VARCHAR,       \
        catch_rate INTEGER,                   \
        final_evolution DOUBLE,               \
        generation DOUBLE, abilities DOUBLE   \
    ) WITH (type = 'DELTA')").show()

#### Insert data into the `pokemon_pokedex_structure` table

Now that the table is created, create a dataframe of the cleaned data and insert it into the table.

In [None]:
toBeAdded = session.table("pokemon_lkp.pokemon_lookup.pokedex") \
    .withColumn("cast_number", cast("number", "INTEGER")) \
    .withColumn("cast_catch_rate", cast("catch_rate", "INTEGER")) \
    .withColumn("cast_final_evolution", cast("final_evolution", "DOUBLE")) \
    .withColumn("cast_generation", cast("generation", "DOUBLE")) \
    .withColumn("cast_abilities", cast("abilities", "DOUBLE")) \
    .drop("number", "catch_rate", "final_evolution", "generation", "abilities") \
    .with_column_renamed("cast_number", "number") \
    .with_column_renamed("cast_catch_rate", "catch_rate") \
    .with_column_renamed("cast_final_evolution", "final_evolution") \
    .with_column_renamed("cast_generation", "generation") \
    .with_column_renamed("cast_abilities", "abilities") \
    .select("name", "number", "type_1", "type_2", "catch_rate", "final_evolution", "generation", "abilities")

toBeAdded.write.mode("append").save_as_table("pokemon_pokedex_structure")


# verify 1032 rows were added

session.table("pokemon_pokedex_structure").count()

In [None]:
# Validate it 'looks' good at a glance

session.table("pokemon_pokedex_structure").show()

### Build the consumer layer

Now it’s time to construct the last of the three layers of your data lake reporting structure, the consume layer. This will ready the layer for final consumption by data consumers. 

#### Revisit the business requirements

Take this opportunity to refamiliarize yourself with the business case. This will inform the kinds of questions that you will ask about your dataset. 

In this scenario, the business case required you to: 

1. Create a final table output combining data from both structure tables.
2. Create a data product answering two specific business questions from the marketing department. 
  - What are the easiest and most popular Pokemon to catch in San Francisco by Type_1? 
  - Find the total number of Pokemon caught for each Type_1 and Type_2 pairing. Also, find the average catch rate.

**Note:** *Easiest is defined by having a high catch rate. A high catch rate is greater than or equal to 100. Also consider that in the structure layer, you filtered out data that did not exist in the San Francisco Bay Area.*  


#### Create the consume table

Run the following to create the consume table.  This constructs a new table from two separate tables in S3. Note that the `pokemon_spawns_structure` table is an Iceberg table and the `pokemon_pokedex_structure` table is a Delta Lake table. 

**Note:** The `mode("overwrite")` is implemented as a DROP then CTAS for the named table. 

In [None]:
sSpawnsDF = session.table("pokemon_spawns_structure") \
    .select("number", "name", "latitude", "longitude")

sPokedexDF = session.table("pokemon_pokedex_structure") \
    .filter("catch_rate > 100") \
    .select("type_1", "type_2", "catch_rate", "number")

# when joining tables, any overlapping column names will have random column
#  names in the resulting dataframe; one approach to helping with this is 
#  to use lsuffix and/or rsuffix to resolve duplicate names
sSpawnsDF.join(sPokedexDF, sSpawnsDF.number == sPokedexDF.number, rsuffix="_p") \
    .drop("number_p") \
    .write.mode("overwrite").save_as_table("pokemon_final_spawns")


# verify the table was created and has 94626 rows in it

session.table("pokemon_final_spawns").count()

In [None]:
# Validate it 'looks' good at a glance

session.table("pokemon_final_spawns").show()

#### Find the easiest and most popular Pokemon in San Francisco

Now, you need to derive two different views for the marketing department. These will be used to answer the business questions.  Remember, easiest is defined by a catch rate of greater than or equal to 100.  The most popular is defined as the most number of appearances for a certain Pokemon for each Type_1.

**Note:** The window functions used in the rest of this exercise will be using SQL, but there are first-class Dataframe API calls identified in https://pystarburst.eng.starburstdata.net/window.html and some examples of using them captured in https://lestermartin.wordpress.com/2023/10/19/viewing-astronauts-thru-windows-more-pystarburst-examples/.  As a "bonus exercise", feel free to recreate some/all of the embedded SQL using these functions.

In [None]:
# Use SQL window function to rank the most popular Pokemon for each Type_1

popTypesDF = session.sql(" \
    SELECT type_1, name,                              \
           COUNT(*) AS total_appearances,             \
           RANK() OVER (PARTITION BY type_1           \
                           ORDER BY count(name) DESC  \
                       ) AS rank_column               \
      FROM pokemon_final_spawns                       \
     GROUP BY type_1, name                            \
     ORDER BY type_1, COUNT(*) DESC")
                         
popTypesDF.show()                        

In [None]:
# From the DF above, only keep the most popular Pokemon types

mostPopTypesDF = popTypesDF.filter("rank_column = 1") \
    .drop("rank_column") \
    .order_by("total_appearances", ascending=False) \

mostPopTypesDF.show()

#### Create the first marketing view

We already have the DF we need and can just use a method to create the view.

In [None]:
# https://pystarburst.eng.starburstdata.net/dataframe.html#pystarburst.dataframe.DataFrame.createOrReplaceView

mostPopTypesDF.createOrReplaceView("popular_types_sf_vw")

session.table("popular_types_sf_vw").show()

#### Create the second marketing view

Use SQL's GROUPING SETS to find the total number of Pokemon caught for each Type_1 and Type_2 pairing. 

In [None]:
grpSetsDF = session.sql(" \
    SELECT type_1, type_2,                               \
           ROUND(AVG(catch_rate), 2) AS avg_catch_rate,  \
           COUNT(name) AS total_count                    \
      FROM pokemon_final_spawns                          \
     GROUP BY GROUPING SETS ((type_1, type_2))           \
     ORDER BY type_1, total_count DESC")


grpSetsDF.createOrReplaceView("counts_by_types_sf_vw")

session.table("counts_by_types_sf_vw").show()

In [None]:
# Verify the 3 structure tables and 2 consume views are present

session.sql("show tables from du2024.workshop4").collect()

### RETURN TO LAB GUIDE

#### Resume at Lab 3 > Part 3 > Step 5: Return from working with PyStarburst