# Завдання 1: Налаштування середовища

## Creating unity catalog with external s3

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS 'ovorobiov_nyc_catalog' MANAGED LOCATION 's3://ovorobiov-hw/databricks/';

## Giving access to deniskulemza1@gmail.com

In [0]:
%sql
GRANT ALL PRIVILEGES ON CATALOG `ovorobiov_nyc_catalog` TO `deniskulemza1@gmail.com`;

## Creating schema - trips_schema in ovorobiov_nyc_catalog

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS ovorobiov_nyc_catalog.trips_schema;

#  Завдання 2: Імпорт, уніфікація та об’єднання

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, LongType
import pyspark.sql.functions as F
import datetime

date = datetime.date.today()
BASE_MOUNT_PATH = "/mnt/robot-dreams-source-mount"

GREEN_TAXI_PATH = f"{BASE_MOUNT_PATH}/home-work-1-unified/nyc_taxi/green/"
YELLOW_TAXI_PATH = f"{BASE_MOUNT_PATH}/home-work-1-unified/nyc_taxi/yellow/"
TAXI_ZONE_LOOKUP_PATH = f"{BASE_MOUNT_PATH}/home-work-1-unified/nyc_taxi//taxi_zone_lookup.csv"

## Define years that will be used for data frame generation taken from s3 structure

In [0]:
green_years = ['2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023']
yellow_years = ['2014', '2015', '2016', '2017', '2018', '2020', '2022', '2023']

## Column names to be changed

In [0]:
"""
Some datasets has different column names and with this mapping we will unify them
"""

column_mapping = {
    "tpep_pickup_datetime": "pickup_datetime",
    "lpep_pickup_datetime": "pickup_datetime",
    "tpep_dropoff_datetime": "dropoff_datetime",
    "lpep_dropoff_datetime": "dropoff_datetime",
    "Airport_fee": "airport_fee"
}

## Loading taxi data using path, years and column_maping

In [0]:
def loading_taxi_data(path, years, taxi_type, column_mapping=column_mapping):
    """
    Loading taxi data from s3 bucket to spark dataframe.

    Args:
        path (str): path to taxi data in s3 bucket (path to the historycal folders e.g. ../home-work-1-unified/nyc_taxi/green/)
        years (list): list of years to load data from
        taxi_type (str): type of taxi data (yellow or green)
    Returns:
        taxi_df (DataFrame): taxi data in spark dataframe
    """
    data_frames = []
    for year in years:
        df = spark.read.option('recursiveFileLookup', 'true').option('mergeSchema', 'true').parquet(path + year + "/")
        df = unify_main_column_names(df, column_mapping)
        df = df.withColumn("taxi_type", F.lit(taxi_type))
        data_frames.append(df)
    
    taxi_df = data_frames[0]
    for df in data_frames[1:]:
        taxi_df = taxi_df.unionByName(df)
    return taxi_df

## Unifing column names

In [0]:
def unify_main_column_names(df, column_mapping):
    """
    Unify column names in taxi data.

    Args:
        df (DataFrame): taxi data in spark dataframe
        collumn_mapping (dict): dictionary with column names mapping
    Returns:
        df (DataFrame): taxi data in spark dataframe with unified column names
    """
    standard_columns = set(column_mapping.values())
    # Rename columns using mapping
    for col in df.columns:
        if col in column_mapping:
            df = df.withColumnRenamed(col, column_mapping[col])

    # Add missing columns
    for col in standard_columns:
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None))
    return df

## Loading green taxi data

In [0]:
green_taxi_df = loading_taxi_data(GREEN_TAXI_PATH, green_years, "green")

## Loading yellow taxi data

In [0]:
yellow_taxi_df = loading_taxi_data(YELLOW_TAXI_PATH, yellow_years, "yellow")

## Create Zone Lookup DF

In [0]:
taxi_zone_lookup_df = spark.read.csv(TAXI_ZONE_LOOKUP_PATH, header=True) \
    .select(
        F.col("LocationID").cast("int").alias("location_id"),
        F.col("Zone").alias("zone")
    )

## Identifying different columns in data frames 

In [0]:
# Checking if schema is similar in green and yellow DF

yellow_schema = sorted([str(entry) for entry in yellow_taxi_df.schema.jsonValue()['fields']])
green_schem = sorted([str(entry) for entry in green_taxi_df.schema.jsonValue()['fields']])
diff = set(green_schem).symmetric_difference(set(yellow_schema))
diff_column_names = [eval(d)["name"] for d in list(diff)]
print(diff_column_names)

## Union two data frames yellow_taxi_df and green_taxi_df

In [0]:
# Merging Yellow and Green DF into final DF

raw_trips_df = yellow_taxi_df.unionByName(green_taxi_df)

## Filterout anomalies (trip_distance < 0.1, fare_amount < 2, duration < 1 min)

In [0]:
filtered_trips_df = raw_trips_df.filter(F.col("trip_distance") >= 0.1) \
                    .filter(F.col("fare_amount") >= 2) \
                    .filter(F.col("pickup_datetime").isNotNull()) \
                    .filter(F.col("dropoff_datetime").isNotNull()) \
                    .filter((F.unix_timestamp(F.col("dropoff_datetime")) - F.unix_timestamp(F.col("pickup_datetime"))) >= 60)

## Adding pickup_hour, pickup_day_of_week and duration_min columns

In [0]:
# Define the mapping using create_map
week_day_map = F.create_map([
    F.lit(1), F.lit("Sunday"),
    F.lit(2), F.lit("Monday"),
    F.lit(3), F.lit("Tuesday"),
    F.lit(4), F.lit("Wednesday"),
    F.lit(5), F.lit("Thursday"),
    F.lit(6), F.lit("Friday"),
    F.lit(7), F.lit("Saturday"),
])

filtered_trips_df = filtered_trips_df.withColumn("pickup_hour", F.hour("pickup_datetime")) \
                                     .withColumn("pickup_day_of_week", week_day_map[F.dayofweek("pickup_datetime")]) \
                                     .withColumn("duration_min", (F.round(F.unix_timestamp(F.col("dropoff_datetime")) - F.unix_timestamp(F.col("pickup_datetime")))/60).cast(IntegerType()))



## Join taxi_zone_lookup_df with filtered_trips_df and add pickup_zone and dropoff_zone columns

In [0]:
full_filtered_trips_df = filtered_trips_df.join(taxi_zone_lookup_df, filtered_trips_df["PULocationID"] == taxi_zone_lookup_df.location_id, "left") \
                                            .withColumn("pickup_zone", F.when(F.col("zone").isNull(), "Unknown").otherwise(F.col("zone"))).drop(F.col("zone"), F.col("location_id")) \
                                            .join(taxi_zone_lookup_df, filtered_trips_df["DOLocationID"] == taxi_zone_lookup_df.location_id, "left") \
                                            .withColumn("dropoff_zone", F.when(F.col("zone").isNull(), "Unknown").otherwise(F.col("zone"))).drop(F.col("zone"), F.col("location_id"))

## Writing results into raw_trips delta lake

In [0]:
full_filtered_trips_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("ovorobiov_nyc_catalog.trips_schema.raw_trips")

# Завдання 3: Створення та агрегація фінального датафрейму zone_summary

In [0]:
delta_df = spark.table("ovorobiov_nyc_catalog.trips_schema.raw_trips")

## Base summary that includes total_trips, avg_trip_distance, avg_total_amount, avg_tip_amount, max_trip_distance, min_tip_amount

In [0]:
base_summary = delta_df.groupBy("pickup_zone").agg(
                                    F.count("*").alias("total_trips"),
                                    F.avg("trip_distance").alias("avg_trip_distance"),
                                    F.avg("total_amount").alias("avg_total_amount"),
                                    F.avg("tip_amount").alias("avg_tip_amount"),
                                    F.max("trip_distance").alias("max_trip_distance"),
                                    F.min("tip_amount").alias("min_tip_amount"),
                                    F.sum(F.when(F.col("taxi_type") == "yellow", 1).otherwise(0)).alias("yellow_count"),
                                    F.sum(F.when(F.col("taxi_type") == "green", 1).otherwise(0)).alias("green_count")
                               )

 ## Adding Yellow Share and Green Share and rounding the values

In [0]:


base_summary = base_summary.withColumn("yellow_share", F.col("yellow_count") / F.col("total_trips")) \
                            .withColumn("green_share", F.col("green_count") / F.col("total_trips")) \
                            .drop("yellow_count", "green_count")

final_summary = base_summary.select(
                                "pickup_zone",
                                "total_trips",
                                F.round("avg_trip_distance", 2).alias("avg_trip_distance"),
                                F.round("avg_total_amount", 2).alias("avg_total_amount"),
                                F.round("avg_tip_amount", 2).alias("avg_tip_amount"),
                                F.round("max_trip_distance", 2).alias("max_trip_distance"),
                                F.round("min_tip_amount", 2).alias("min_tip_amount"),
                                F.round("yellow_share", 2).alias("yellow_share"),
                                F.round("green_share", 2).alias("green_share")
                            )


final_summary.show()

## Writing to delta lake summary

In [0]:
final_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .option("path", "s3://ovorobiov-hw/databricks/zone_summary/") \
    .saveAsTable("ovorobiov_nyc_catalog.trips_schema.zone_summary")

# Завдання 4: Агрегація по днях тижня та зонах

## Group by pickup_zone and pickup_day_of_week. Calculate high_fare_share

In [0]:
second_summary = delta_df.groupBy("pickup_zone", "pickup_day_of_week").agg(
                                    F.count("*").alias("total_trips"),
                                    F.sum(F.when(F.col("fare_amount") > 30, 1).otherwise(0)).alias("total_high_fares")) \
                                    .withColumn("high_fare_share", F.round(F.col("total_high_fares") / F.col("total_trips"), 2)) \
                                    .drop("total_high_fares")
second_summary.show()

## Writing zone_days_summary to delta lake

In [0]:
second_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .option("path", "s3://ovorobiov-hw/databricks/zone_days_summary/") \
    .saveAsTable("ovorobiov_nyc_catalog.trips_schema.zone_days_summary")