### House Keeping items

- Create Spark Pool
- Monitor Pool to see Active applications
- Spark UI - Environment - Check dynamicApplication settings


In [15]:
# Defining Parameters
# > Click on the '...' > Toggle parameter cell

accountName = "stdemodatalake001"
containerName = "demo"
tripPath = 'nycTripYellow2018/*.parquet'
paymentTypePath = 'nycTripYellowPaymentType/nycTripYellowPaymentType.csv'

StatementMeta(testPool, 3, 15, Finished, Available, Finished)

In [16]:
# Check our data source using the parameters

dataSource = f"abfss://{containerName}@{accountName}.dfs.core.windows.net/"
print(dataSource)

StatementMeta(testPool, 3, 16, Finished, Available, Finished)

abfss://demo@stdemodatalake001.dfs.core.windows.net/


In [17]:
# Reading - NYC Yellow Taxi data - Parquet format
tripFileFormat = 'parquet'

nycTaxiDf = spark.read.load(dataSource + tripPath, format=tripFileFormat)

# Show 10 rows
display(nycTaxiDf.limit(10))

StatementMeta(testPool, 3, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 38143054-362f-4311-aab2-0395b6b29629)

In [18]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType

# Reading - Reference: Payment Type - CSV format
paymentTypeSchema = StructType([
    StructField("PaymentTypeID", IntegerType()),
    StructField("description", StringType())
])

paymentTypeDf = spark.read.options(header=True) \
          .schema(paymentTypeSchema) \
          .csv(dataSource + paymentTypePath) 

# Other common options: 
#   - inferSchema=True : Schema to be defined by spark. 
#   - quote='"'        : Overwrite the default quote characters
#   - escape='\'       : Overwrite the default escape characters

# You can break up your command for better readability using "\"

display(paymentTypeDf)

StatementMeta(testPool, 3, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0923f5b1-91f9-4b51-9e5f-19da03727a39)

In [19]:
# Schema is defined in the parquet. Check schema of the Taxi data

display(nycTaxiDf.dtypes)

StatementMeta(testPool, 3, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 461ca8b9-55b1-4f45-a744-33d28ed88e23)

In [20]:
# Fix data type and create new columns

from pyspark.sql.functions import col, avg, dayofweek, hour, to_date

tripExtraDf = nycTaxiDf.withColumn("puLocationId", col("puLocationId").cast("int")) \
                       .withColumn("doLocationId", col("doLocationId").cast("int")) \
                       .withColumn("paymentTypeId", col("paymentType").cast("int")) \
                       .withColumn("DayOfWeek", dayofweek("tpepPickupDateTime")) \
                       .withColumn("PickUpDate", to_date("tpepPickupDateTime")) \
                       .withColumn("PickUpHr", hour("tpepPickupDateTime"))

# Show Tuesday
display(tripExtraDf.select("puLocationId", "DayOfWeek", "PickUpDate", "PickUpHr").filter(col("DayOfWeek") == 3).limit(10))

StatementMeta(testPool, 3, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e35087f4-0781-4181-9e31-2dc7b24ac642)

In [21]:
# Join paymentType with trip data

joinedPaymentDf = tripExtraDf.join(paymentTypeDf, tripExtraDf["paymentTypeId"] == paymentTypeDf["PaymentTypeID"], how="inner")

display(joinedPaymentDf.limit(10))

# Check the Job -> "Associated SQL Query" to go to the SQL/Data Frame tab
# BroadcastExchange 

StatementMeta(testPool, 3, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7b1b8bd4-e2bd-4f1b-a714-a82a1e015b10)

In [22]:
from pyspark.sql.functions import broadcast

# To guarantee 'broadcast' is done, we have to explicit call it
joinedPaymentDf2 = tripExtraDf.join(broadcast(paymentTypeDf), tripExtraDf["paymentTypeId"] == paymentTypeDf["PaymentTypeID"], how="inner")

display(joinedPaymentDf2.limit(10))


# Check the Job -> "Associated SQL Query" to go to the SQL/Data Frame tab
#   - BroadcastExchange 

StatementMeta(testPool, 3, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8e3df8e0-e16e-4c9b-99d7-b66f0325d71b)

In [23]:
# Force Broadcast join on Large DS - Run live

# from pyspark.sql.functions import broadcast

# joinedPaymentDf3 = paymentTypeDf.join(broadcast(tripExtraDf), tripExtraDf["paymentTypeId"] == paymentTypeDf["PaymentTypeID"], how="inner")

# display(joinedPaymentDf3)

StatementMeta(testPool, 3, 23, Finished, Available, Finished)

# Demo Part 2

In [24]:
# Top 10 Zones by Fare per Mile (by Date of Week)
# When, Window

from pyspark.sql.functions import when, row_number, desc
from pyspark.sql.window import Window

farePerMileDf = tripExtraDf.withColumn("FarePerMile", when(col("tripDistance") > 0, col("fareAmount")/col("tripDistance")).otherwise(None))

zoneFarePerMileDf = farePerMileDf.groupBy("puLocationId", "DayOfWeek") \
                               .agg(avg("FarePerMile").alias("AvgFarePerMile"))

# Get the top 10 zones
top10ZonesDowDf = zoneFarePerMileDf.withColumn(
    "Rank", row_number().over(
            Window.partitionBy("DayOfWeek").orderBy(desc("AvgFarePerMile"))
    )
).filter(col("Rank") <= 10)

display(top10ZonesDowDf)

# Check the Job with 26 tasks > "Associated SQL Query"
#   - Exchange & ShuffleRead 1: zoneFarePerMileDf
#   - Exchange & ShuffleRead 2: top10ZonesDowDf

StatementMeta(testPool, 3, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9e0d9087-fc5e-4cfb-b9b3-b5e5c54e8bed)

In [25]:
# What happened if we Repartition the data to use pickup Location ID and Day of Week only

tripExtraDf2 = tripExtraDf.repartition("puLocationId", "DayOfWeek")

print(f"Original Partition: {tripExtraDf.rdd.getNumPartitions()}")
print(f"New Partition: {tripExtraDf2.rdd.getNumPartitions()}")

StatementMeta(testPool, 3, 25, Finished, Available, Finished)

Original Partition: 26
New Partition: 113


In [26]:
# Do the same calcuation using the new partitions

farePerMileDf2 = tripExtraDf2.withColumn("FarePerMile", when(col("tripDistance") > 0, col("fareAmount")/col("tripDistance")).otherwise(None))

zoneFarePerMileDf2 = farePerMileDf2.groupBy("puLocationId", "DayOfWeek") \
                               .agg(avg("FarePerMile").alias("AvgFarePerMile"))

# Get the top 10 zones
top10ZonesDowDf2 = zoneFarePerMileDf2.withColumn(
    "Rank", row_number().over(
            Window.partitionBy("DayOfWeek").orderBy(desc("AvgFarePerMile"))
    )
).filter(col("Rank") <= 10)

display(top10ZonesDowDf2)

# Performance got worst so the built-in optimizations are doing a better job

StatementMeta(testPool, 3, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 67689f3c-cb4e-4548-9824-be83054b5517)

In [27]:
# Lets see why our repartition might not be a good idea

from pyspark.sql.functions import count

skewCheckDf = farePerMileDf.groupBy("puLocationId", "DayOfWeek").agg(count("*").alias("RecordCount"))

display(skewCheckDf)

# Chart view > Line, Key (puLocationId, DayOfWeek), Values (RecordCount)

StatementMeta(testPool, 3, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e1f91dc9-c75f-4619-89b5-6fb725f309d8)

In [28]:
# Write the Top 10 Zones by Fare per Mile (by Date of Week)
#  for SQL to access

top10ZonesDowDf.write.mode("overwrite").saveAsTable("demoDataSparkDb.top10Zones2018")

StatementMeta(testPool, 3, 28, Finished, Available, Finished)

In [29]:
# Read the SQL Table

dfTest = spark.read.table("demoDataSparkDb.top10Zones2018")

display(dfTest.limit(10))

StatementMeta(testPool, 3, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ef33e7f8-fb29-4d50-b414-bdf8e2374f57)

In [30]:
# If you are running this as a notebook, this is required to stop the Spark Application

# mssparkutils.session.stop()

StatementMeta(testPool, 3, 30, Finished, Available, Finished)