In [None]:
# This dataset contains historical records accumulated from 2009 to 2018, however, only random 5 records to be performed aggregation.
# Spark Dataframe:
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
from azureml.opendatasets import NycTlcGreen
from functools import reduce 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import mean, median

print("start")

spark = SparkSession.builder.master("local[*]")\
.config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.6,com.microsoft.azure:azure-storage:8.6.6").getOrCreate()
if (spark.getActiveSession()):
    print('yes')
else:
    print('no')

print(spark.sparkContext.getConf().get("spark.jars.packages"))


# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

if (spark.getActiveSession()):
    print('yes')
else:
    print('no')

print(spark.sparkContext.getConf().get("spark.jars.packages"))

# read parquet
taxi_df = spark.read.parquet(wasbs_path)
print('taxi_df is created')

# Sample 5 records
limited_df = taxi_df.limit(5)
print('limited_df is created')

# Extract year and month from the pickup_datetime column
limited_df = limited_df.withColumn("year", F.year(F.col("tpepPickupDateTime")))
limited_df = limited_df.withColumn("month", F.month(F.col("tpepPickupDateTime")))
# Impute missing values with 0
limited_df = limited_df.fillna(0, subset=["fareAmount"])

# Map values to the correct ones
limited_df = limited_df.withColumn("paymentType", F.when(F.col("paymentType").isin(['Credit','CREDIT','CRD','CRE','Cre', '1']), "Credit Card")\
                                      .when(F.col("paymentType").isin(['CAS','CASH','CSH', 'Cash','Cas','2']), "Cash")\
                                       .when(F.col("paymentType").isin(['No Charge','NOC','No', '3']), "No Charge")\
                                       .when(F.col("paymentType").isin(['Dispute','DIS', 'Dis','4']), "Dispute")\
                                       .when(F.col("paymentType").isin(['Unknown','UNK','NA', '5']), "Unknown")\
                                       .when(F.col("paymentType").isin(['Voided trip', '6']), "Voided trip")\
                                       .when(F.col('paymentType').contains('No'), 'No Charge')\
                                       .when(F.col('paymentType').rlike('40.|0|NA'), 'Unknown')
                          )
                                      
print('cleaninng data is completed.')

# Perform Aggregation
result_df = limited_df.groupBy("paymentType", "year", "month") \
            .agg(F.mean("fareAmount").alias("mean_costAmount"),
                 F.median("fareAmount").alias("median_costAmount"),
                 F.mean("totalAmount").alias("mean_priceAmount"),
                 F.median("totalAmount").alias("median_priceAmount"),
                 F.mean("passengerCount").alias("mean_passengerCount"),
                 F.median("passengerCount").alias("median_passengerCount"))

# Output data frame to parquet file
result_df.write.parquet('sample')
print('the job is complete.')

# check output data
spark.read.parquet('part-00000-13657153-c709-4034-a595-0bb7391af309-c000.snappy.parquet').show()

# stop session
spark.stop()