## Trip Data Aggregation 
### Group By Columns
1. year
2. Month
3. Pickup Location ID
4. Drop Off Location ID

### Aggregated Columns
1. Total Trip Count
2. Total Fare Amount

### Purpose of the notebook

Demonstrate the integration between Spark Pool and Serverless SQL Pool

1. Create the aggregated table in Spark Pool
2. Access the data from Serverless SQL Pool 

In [2]:
#Set the folder paths so that it can be used later. 
bronze_folder_path = 'abfss://nyc-taxi-data@synapsecoursedl2023.dfs.core.windows.net/raw'
silver_folder_path = 'abfss://nyc-taxi-data@synapsecoursedl2023.dfs.core.windows.net/silver'
gold_folder_path = 'abfss://nyc-taxi-data@synapsecoursedl2023.dfs.core.windows.net/gold'

In [3]:
#Set the spark config to be able to get the partitioned columns year and month as strings rather than integers
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")

In [4]:
%%sql

-- Create database to which we are going to write the data

CREATE DATABASE IF NOT EXISTS nyc_taxi_ldw_spark
LOCATION 'abfss://nyc-taxi-data@synapsecoursedl2023.dfs.core.windows.net/gold';

In [5]:
# Read the silver data to be processed. 
trip_data_green_df = spark.read.parquet(f"{silver_folder_path}/trip_data_green") 

In [6]:
# Perform the required aggregations
# 1. Total trip count
# 2. Total fare amount
from pyspark.sql.functions import *

trip_data_green_agg_df = trip_data_green_df \
                        .groupBy("year", "month", "pu_location_id", "do_location_id") \
                        .agg(count(lit(1)).alias("total_trip_count"),
                        round(sum("fare_amount"), 2).alias("total_fare_amount"))

In [7]:
# Write the aggregated data to the gold table for consumption

trip_data_green_agg_df.write.mode("overwrite").partitionBy("year", "month").format("parquet").saveAsTable("nyc_taxi_ldw_spark.trip_data_green_agg")