# MaxDome Notebooks

MaxDome notebooks are Serverless PySpark notebooks powered by AWS Glue Interactive Sessions.
To Create a session and begin using PySpark simply run a cell of code. 

## Additional Sample Notebooks

Additonal sample notebooks are available on the aws-glue-sample repo in GitHub and can be imported to your project using the upload button on the top of the file browser in Jupyter.

https://github.com/aws-samples/aws-glue-samples/tree/master/examples/notebooks


#### Optional: Configuration

MaxDome notebooks are configured via Jupyter magics (commands prefixed with `%` and `%%`). In MaxDome and Glue IS, these are used to configure the PySpark environment including cluster size, shape and installed libraries. 

run `%help` in an empty cell to see the full list of magic commands.

In [None]:
%help

In [None]:
# Use this cell to configure your PySpark environment. 
%additional_python_modules sagemaker

In [None]:
# Start Session and configure Spark
import sys
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


args = getResolvedOptions(
    sys.argv, ["redshift_url", "redshift_iam_role", "redshift_tempdir"]
)

# These exist in Sessions automatically. Adding for Linter
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [None]:
# Get the MaxDome Database info
glue_client = boto3.client("glue")

# filter Glue Databases for databases that start with "maxdome_producer_db"
databases_paginator = glue_client.get_paginator("get_databases")
response_iterator = databases_paginator.paginate()
glue_databases = response_iterator.build_full_result().get("DatabaseList")
maxdome_database = [
    db for db in glue_databases if db["Name"].startswith("maxdome_producer_db")
][0]

maxdome_database_name = maxdome_database["Name"]
maxdome_database_location = maxdome_database["LocationUri"]
print(f"Project Database Name: {maxdome_database_name}")
print(f"Project Database Location: {maxdome_database_location}")

## Create a Spark DataFrame from NYC Taxi Dataset


In [None]:
df = spark.read.parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2009-01.parquet"
).limit(1000)
df.show(5)
df.printSchema()

## Clean the column names to match Catalog's casing specifications


In [None]:
renamed_df = df.select(
    col("vendor_name").alias("vendor_id"),
    col("Trip_Pickup_DateTime").alias("pickup_datetime"),
    col("Trip_Dropoff_DateTime").alias("dropoff_datetime"),
    col("Passenger_Count").alias("passenger_count"),
    col("Trip_Distance").alias("trip_distance"),
    col("Start_Lon").alias("pickup_longitude"),
    col("Start_Lat").alias("pickup_latitude"),
    col("Rate_Code").alias("rate_code"),
    col("store_and_forward").alias("store_and_fwd_flag"),
    col("End_Lon").alias("dropoff_longitude"),
    col("End_Lat").alias("dropoff_latitude"),
    col("Payment_Type").alias("payment_type"),
    col("Fare_Amt").alias("fare_amount"),
    col("surcharge").alias("surcharge"),
    col("mta_tax").alias("mta_tax"),
    col("Tip_Amt").alias("tip_amount"),
    col("Tolls_Amt").alias("tolls_amount"),
    col("Total_Amt").alias("total_amount"),
)
renamed_df.show(5)

## Writing the dataframe to a catalog table



#### Writing to the data catalog

In [None]:
DATABASE = maxdome_database_name
TABLE = "nyc_yellow"
S3_PATH = f"{maxdome_database_location}{TABLE}/"
(
    renamed_df.write.mode("append")
    .option("path", S3_PATH)
    .saveAsTable(f"{DATABASE}.{TABLE}")
)

#### Reading the data back from Catalog

In [None]:
df2 = spark.sql(f"select * from {DATABASE}.{TABLE}")
df2.show(5)

## Redshift Operations

Redshift connectivity is currently handled by the community Spark Connector for Redshift and is best suited for moving large amounts of data between Redshift and Spark for use with PySpark or pandas.
> A library to load data into Spark SQL DataFrames from Amazon Redshift, and write them back to Redshift tables. Amazon S3 is used to efficiently transfer data in and out of Redshift, and JDBC is used to automatically trigger the appropriate COPY and UNLOAD commands on Redshift.
>
> This library is more suited to ETL than interactive queries, since large amounts of data could be extracted to S3 for each query execution. If you plan to perform many queries against the same Redshift tables then we recommend saving the extracted data in a format such as Parquet.

Parameters and documentation can be found [on GitHub](https://github.com/spark-redshift-community/spark-redshift)

#### Write to Redshift with IAM

In [None]:
rs_database = "dev"
rs_table = "project.nyc_taxi_yellow"

(
    renamed_df.write.format("io.github.spark_redshift_community.spark.redshift")
    .option("url", args["redshift_url"])
    .option("dbtable", rs_table)
    .option("tempdir", args["redshift_tempdir"])
    .option("aws_iam_role", args["redshift_iam_role"])
    .mode("overwrite")
    .save()
)

#### Read from Redshift

In [None]:
rs_read_df = (
    spark.read.format("io.github.spark_redshift_community.spark.redshift")
    .option("url", args["redshift_url"])
    .option("aws_iam_role", args["redshift_iam_role"])
    .option("tempdir", args["redshift_tempdir"])
    .option("unload_s3_format", "PARQUET")
    .option("dbtable", rs_table)
    .load()
)
rs_read_df.show(5)

## Using pandas On Spark

For those new to or unfamiliar with PySpark the Pandas API on Spark may allow for a more familiar interface. Using the library much of the operations on a Spark DataFrame can be executed using familiar pandas APIs while taking advantage of many of Spark's optimizations and distributed compute. 

See the [Spark documentation](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html) for a deeper dive

For a full example see the `nyc_taxi_predictions` sample notebook next to this one.

In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps

psdf = ps.read_parquet("s3://nyc-tlc/trip data/yellow_tripdata_2009-01.parquet")
psdf.info()
psdf.head()

## Using Sagemaker

You can utilized The Sagemaker PySDK from your PySpark notebook by installing it with `%additional_python_modules` at the start of your session.

In [None]:
import sagemaker

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = boto3.Session().region_name

In [None]:
%stop_session