### This notebook shows examples of using Glue on Spark for users familiar with pandas
To follow this example notebook, execute the cells in order.
The keyboard shortcut to execute the current cell and jump to the following is: Shift+Enter.

To delete cells no longer needed (including this one), you can use the context menu or use the Escape key (to exit any cell you might be in) and then press the d key twice. You can select multiple cells using Shift + Up/Down, to delete many quickly.

This example assumes the configured role has permission to read/write on the default catalog database and the s3 glue temporary folder, otherwise update the code or the permissions accordingly.

####  Running the following cell will set up and start your interactive session.

In [5]:
# %idle_timeout 120
# %glue_version 4.0
# %worker_type G.1X
# %number_of_workers 2

import boto3
import sys
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

RuntimeError: OpenSSL 3.0's legacy provider failed to load. This is a fatal error by default, but cryptography supports running without legacy algorithms by setting the environment variable CRYPTOGRAPHY_OPENSSL_NO_LEGACY. If you did not expect this error, you have likely made a mistake with your OpenSSL configuration.

In [None]:
# Optimize the data movement from pandas to Spark DataFrame and back
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# You can define a distributed Spark DataFrame, to read the data in a distributed way and be able to process large data
# Here it takes a bit of time because we ask it to infer schema, in practice could just let it set everything as string
# and handle the schema manually
sdf = spark.read.csv("s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv", 
                     header=True, inferSchema=True)

In [None]:
# The schema inference considered the dollar amounts as string due to the $ symbol
# Also in the csv there are some header with extra spaces, we'll deal with that later
sdf.printSchema()

In [None]:
# The last 3 columns are dollar amounts, let's parse them into Decimal numbers for calculations
last_3cols = sdf.columns[-3:]
# These transformations are just defined here, until we extract the data Spark won't do the work (lazy execution)
for col_name in last_3cols:
    # Note: normally for monies it's better to use decimal but pandas doesn't support it
    sdf = sdf.withColumn(col_name, regexp_replace(sdf[col_name], '\$', '').cast('double'))

# The zip code is not really a number
sdf = sdf.withColumn('Provider Zip Code', sdf['Provider Zip Code'].cast('string'))

In [None]:
# Check the parsing is working fine
sdf.show(n=10)

In [None]:
# Let's say you are only interested in California
sdf_ca = sdf.filter('`Provider State` == "CA"')

# Now that we have narrowed down the data, it's small enough that we can convert into a native pandas DataFrame
# Unlike sdf which reads data distributed and when needed, this pdf uses the driver memory to store the data
# so is faster for smaller data as long as it fits
pdf_ca = sdf_ca.toPandas()

# The column names in the csv have extra spaces, in pandas we can trim that easily
pdf_ca.columns = [c.strip() for c in pdf_ca.columns]

In [None]:
# Check the pandas schema
pdf_ca.dtypes

In [None]:
# Explore the statistics of the numeric columns
import pandas as pd
import numpy as np
pd.set_option('display.max_columns', 5)
pd.set_option('max_colwidth', 30)
pdf_ca.describe()

In [None]:
# Plot a histogram on the notebook directly from pandas
import matplotlib.pyplot as plt
plt.figure()
plt.title("Histogram of average Medicare payments")
plt.xlabel("Average payment in dollars")
histogram = pdf_ca['Average Medicare Payments'].plot.hist(bins=12, alpha=0.5)
%matplot plt

If you have more experience with the pandas APIs would rather use that instead of Spark DataFrame  
But notice that while we use native pandas, only the driver was doing work and the rest of the cluster is not used   
That's why we set the minimum size: *%number_of_workers 2*  

On Glue 4.0, you can get both distributed computed and the pandas syntax by using the "pandas on Spark" API, it's not yet 100% compatible but should work for most cases

In [None]:
# The following cells will only work on Glue 4.0
# psdf is a pandas on Spark DataFrame, uses the pandas API but the data and processing is distributed
# this means it has higher latency but also can scale beyond a single node to handler larger data
psdf = sdf.pandas_api()
psdf.columns = [c.strip() for c in psdf.columns]
# Statistics on the full dataset
psdf.describe()

In [None]:
# Operate the data in a distributed way but using pandas syntax
relevant_psdf = psdf[(psdf['Total Discharges'] > 100) & (psdf['Average Medicare Payments'] > 10000)]
relevant_psdf.groupby(["Provider State", "Provider Id"])["Average Medicare Payments"].max()

Note that in the previous cell the output is not equivalent than the same on native pandas, which displays the results grouped by the first column.   
Also running a sort before the groupby wouldn't work the same way, in this case the data is distributed so it's unsorted again when doing the groupby. 

In [None]:
# Convert back to Spark DataFrame if you want leverage the data saving features (for instance creating a catalog table)
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.html
relevant_sdf = relevant_psdf.to_spark()
relevant_sdf.show()

# Or go a step further and convert to DynamicFrame to use its sinks and features
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer.html
relevant_dynf = DynamicFrame.fromDF(relevant_sdf, glueContext, "")