# Apache Spark Project

# Name: Mehul Vikas Sankhe


## Project overview

### Inputs

- **NYC Taxi Trips dataset** - list of recorded taxi trips, each with several characteristics, namely: distance, number of passengers, origin zone, destination zone and trip cost (total amount charged to customer).
- **NYC Zones dataset** - list of zones wherein trips can originate/terminate.

### Tasks

1. Data cleaning
  1. Remove "0 distance" and 'no passengers' records.
  2. Remove outlier records.
2. Add new columns
  1. Join with zones dataset
  2. Compute the unit profitability of each trip
3. Zone summarisation and ranking
  1. Summarise trip data per zone
  2. Obtain the top 10 ranks according to:
    1. The total trip volume
    2. Their average profitabilitiy
    3. The total passenger volume
4. Record the total and task-specific execution times for each dataset size and format.

### How to

###### Code structure and implementation

- You must implement your solution to each task in the provided function code skeleton.
- The task-specific functions are combined together to form the full pipeline code, executed last.


###### Development

- Develop an initial working solution for the 'S' dataset and only then optimise it for larger dataset sizes.
- To perform vectorised operations on a DataFrame:
  - use the API docs to look for existing vectorised functions in: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
  - actions to get around the lazy execution of spark:
  https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
  - if a customised function is required (e.g. to add a new column based on a linear combination of other columns), implement your own User Defined Function (UDF). See:  https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
- Use only the `pyspark.sql` API - documentation link below - (note that searching through the docs returns results from the `pyspark.sql` API together with the `pyspark.pandas` API):
  - https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.sql.html
- Periodically download your notebook to your computer as backup and safety measure against accidental file deletion.

###### Execution time measurement

- Execution time is calculated and returned by the Spark Engine and shown in the output region of the cell.
- To measure the execution time of a task you must perform a `collect` or similar operation (e.g. `take`) on the returned DataFrame.

## Task 0 - Read data

The code below is ready to run. Do not modify this code. It does the following:

- Reads the 'zones' dataset into variable 'zone_names'
- Defines the `init_trips` function that allows you to read the 'trips' dataset (from the DBFS FileStore) given the dataset size ('S' to 'XXL') and format ('parquet' or 'delta') as function arguments
- Defines the `pipeline` function, called in Task 4 to measure the execution time of the entire data processing pipeline
- Shows you how to call the `init_trips` function and display dataset characteristics (number of rows, schema)

In [None]:
## global imports
import pyspark.sql as ps
import pyspark.sql.functions as pf
import pandas as pd

# Load zone names dataset - (much faster to read small file from git than dbfs)
zones_file_url = 'https://raw.githubusercontent.com/mutazb999/CSC8101-lab-and-coursework/main/02-assignment-spark/taxi_zone_names.csv'
zone_names = spark.createDataFrame(pd.read_csv(zones_file_url))

# Function to load trips dataset by selected dataset size
def init_trips(size = 'S', data_format = "parquet", taxi_folder = "/FileStore/tables/taxi"):

    files = {
        'S'  : ['2021_07'],
        'M'  : ['2021'],
        'L'  : ['2020_21'],
        'XL' : ['1_6_2019', '7_12_2019'],
        'XXL': ['1_6_2019', '7_12_2019', '2020_21']
    }

    # validate input dataset size
    if size not in files.keys():
        print("Invalid input dataset size. Must be one of {}".format(list(files.keys())))
        return None

    if data_format == "parquet":
        filenames = list(map(lambda s: f'{taxi_folder}/parquet/tripdata_{s}.parquet', files[size]))
        trips_df = spark.read.parquet(filenames[0])

        for name in filenames[1:]:
            trips_df = trips_df.union(spark.read.parquet(name))

    elif data_format == "delta":
        filenames = f"{taxi_folder}/delta/taxi-{size}-delta/"
        trips_df = spark.read.format("delta").load(filenames)

    else:
        print("Invalid data format. Must be one of {}".format(['parquet', 'delta']))
        return None

    print(
    """
    Trips dataset loaded!
    ---
      Size: {s}
      Format: {f}
      Tables loaded: {ds}
      Number of trips (dataset rows): {tc:,}
    """.format(s = size, f = data_format, ds = filenames, tc = trips_df.count()))

    return trips_df

# helper function to print dataset row count
def print_count(df):
    print("Row count: {t:,}".format(t = df.count()))

def pipeline(trips_df, with_task_12 = False, zones_df = zone_names):
    # Do not edit
    #---

    ## Task 1.1
    _trips_11 = t11_remove_zeros(trips_df)

    ## Task 1.2
    if with_task_12:
        _trips_12 = t12_remove_outliers(_trips_11)
    else:
        _trips_12 = _trips_11

    ## Task 2.1
    _trips_21 = t21_join_zones(_trips_12, zones_df = zone_names)

    ## Task 2.2
    _trips_22 = t22_calc_profit(_trips_21)

    ## Task 3.1
    _graph = t31_summarise_trips(_trips_22)

    ## Task 3.2
    _zones = t32_summarise_zones_pairs(_graph)

    _top10_trips     = t32_top10_trips(_zones)
    _top10_profit    = t32_top10_profit(_zones)
    _top10_passenger = t32_top10_passenger(_zones)

    return([_top10_trips, _top10_profit, _top10_passenger])

In [None]:
# CHANGE the value of argument 'size' to record the pipeline execution times for increasing dataset sizes
SIZE = 'S'
DATA_FORMAT = 'parquet'

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)

# uncomment line only for small datasets
# trips.take(1)


    Trips dataset loaded!
    ---
      Size: S
      Format: parquet
      Tables loaded: ['/FileStore/tables/taxi/parquet/tripdata_2021_07.parquet']
      Number of trips (dataset rows): 2,898,033
    


In [None]:
print_count(trips)

Row count: 2,898,033


In [None]:
# dataset schemas
trips.printSchema()

root
 |-- index: long (nullable = true)
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |--

In [None]:
display(trips[['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']].take(5))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
142,238,2.0,0.0,12.3


In [None]:
zone_names.printSchema()

root
 |-- LocationID: long (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [None]:
display(zone_names.take(5))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone


## Task 1 - Filter rows

**Input:** trips dataset

### Task 1.1 - Remove "0 distance" and 'no passengers' records

Remove dataset rows that represent invalid trips:

- Trips where `trip_distance == 0` (no distance travelled)
- Trips where `passenger_count == 0` and `total_amount == 0` (we want to retain records where `total_amount` > 0 - these may be significant as the taxi may have carried some parcel, for example)

Altogether, a record is removed if it satisfies the following conditions:

`trip_distance == 0` or `(passenger_count == 0` and `total_amount == 0)`.

**Recommended:** Select only the relevant dataset columns for this and subsequent tasks: `['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')]`

### Task 1.2 - Remove outliers using the modified z-score

Despite having removed spurious "zero passengers" trips in task 1.1, columns `total_amount` and `trip_distance` contain additional outlier values that must be identified and removed.

To identify and remove outliers, you will use the modified [z-score](https://en.wikipedia.org/wiki/Standard_score) method.
The modified z-score uses the median and [Median Absolute Deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (MAD), instead of the mean and standard deviation, to determine how far an observation (indexed by i) is from the mean:

$$z_i = \frac{x_i - \mathit{median}(\mathbf{x})}{\mathbf{MAD}},$$

where x represents the input vector, xi is an element of x and zi is its corresponding z-score. In turn, the MAD formula is:

$$\mathbf{MAD} = 1.483 * \mathit{median}(\big\lvert x_i - \mathit{median}(\mathbf{x})\big\rvert).$$

Observations with **high** (absolute) z-score are considered outlier observations. A score is considered **high** if its __absolute z-score__ is larger than a threshold T = 3.5:

$$\big\lvert z_i \big\rvert > 3.5.$$

where T represents the number of unit standard deviations beyond which a score is considered an outlier ([wiki](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)).

This process is repeated twice, once for each of the columns `total_amount` and `trip_distance` (in any order).

**Important:** Use the surrogate function [`percentile_approx`](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.percentile_approx.html?highlight=percentile#pyspark.sql.functions.percentile_approx) to estimate the median (calculating the median values for a column is expensive as it cannot be parallelised efficiently).

In [None]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

**In below task 1.1 implementation, I have removed "0 distance" and "no passenger" records as per the condition given. As it is recommended above in task description, I have selected only ('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount') dataset columns and then applied filter.**

In [None]:
# Your solution implementation to task 1.1 goes HERE
def t11_remove_zeros(df):
    # input: trips dataset
    # As given in the recommendation, selecting only ('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount') dataset columns.
    new_df = df.select('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')

    # Removing 0 distance and no passengers records
    removedZeros = new_df.filter((new_df['trip_distance'] > 0.0) | ((new_df['passenger_count'] > 0.0) & (new_df['total_amount'] > 0.0)))

    return removedZeros

In [None]:
# execute task 1.1
trips_11 = t11_remove_zeros(trips)

print_count(trips_11)

## uncomment only for smaller datasets
# display(trips_11.take(10))

Row count: 2,888,435


**In implementation of task 1.2 we need to remove outliers using modified z-score method given in description above. We need to calculate z-score value of two columns "total_amount" and "trip_distance", and need to filter out rows with z-score value > 3.5. As mentioned in the description, z-score values need to be calculated individually for "total_amount" column and "trip_distance" column in any order. So I have done the calculation first on "total_amount" and then "trip_distance" column respectively.**

In [None]:
# Your solution implementation to task 1.2 goes HERE
# Importing required libraries
from pyspark.sql.functions import expr

def t12_remove_outliers(df):

    # Function to claculate median
    def calc_median(y):
        median = df.selectExpr(y).collect()[0][0]
        return median

    # Function to claculate MAD
    def calc_mad(z, z1):
        mad = df.selectExpr(z.format(z1)).collect()[0][0]
        return mad

    # Computing modified z score for total_amount column
    totalAmount = "percentile_approx(total_amount, 0.5)"
    totalAmount1 = "1.483 * percentile_approx(abs(total_amount - {}), 0.5)"
    totalAmountMedian = calc_median(totalAmount)
    totalAmountMad = calc_mad(totalAmount1, totalAmountMedian)

    modified_z_score_expr_totalAmount = expr("(total_amount - {}) / {} AS total_amount_z_score".format(totalAmountMedian, totalAmountMad))

    # Add modified z-score column to DataFrame
    df_with_z_scores_totalAmount = df.withColumn("total_amount_z_score", modified_z_score_expr_totalAmount)

    # Filter out rows with high absolute z-scores
    filter_Zscores_totalAmount = df_with_z_scores_totalAmount.filter("abs(total_amount_z_score) <= 3.5")


    # Computing modified z score for trip_distance column
    tripDistanceMedian = filter_Zscores_totalAmount.selectExpr("percentile_approx(trip_distance, 0.5)").collect()[0][0]
    tripDistanceMad = filter_Zscores_totalAmount.selectExpr("1.483 * percentile_approx(abs(trip_distance - {}), 0.5)".format(tripDistanceMedian)).collect()[0][0]

    modified_z_score_expr_tripDistance = expr("(trip_distance - {}) / {} AS trip_distance_z_score".format(tripDistanceMedian, tripDistanceMad))

    # Add modified z-score column to DataFrame
    df_with_z_scores_tripDistance = filter_Zscores_totalAmount.withColumn("trip_distance_z_score", modified_z_score_expr_tripDistance)

    # Filter out rows with high absolute z-scores
    filter_Zscores_tripDistance = df_with_z_scores_tripDistance.filter("abs(trip_distance_z_score) <= 3.5")

    # Drop newly added columns for "total_amount_z_score" and "trip_distance_z_score" from dataframe as they were used just for filtering
    removedOutliers = filter_Zscores_tripDistance.drop("total_amount_z_score","trip_distance_z_score")

    return removedOutliers

In [None]:
# execute task 1.2
trips_12 = t12_remove_outliers(trips_11)

print_count(trips_12)
# display(trips_12.take(10))

Row count: 2,461,535


## Task 2 - Compute new columns

### Task 2.1 - Zone names

Obtain the **start** and **end** zone names of each trip by joining the `trips` and `zone_names` datasets (i.e. by using the `zone_names` dataset as lookup table).

**Note:** The columns containing the start and end zone ids of each trip are named `PULocationID` and `DOLocationID`, respectively.

### Task 2.2 - Unit profitability

Compute the column `unit_profitability = total_amount / trip_distance`.

In [None]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

**Obtaining start zone and end zone names for the trips by joining it with "zone_names" dataset.**

In [None]:
# Your solution implementation to task 2.1 goes HERE
# Importing required libraries
from pyspark.sql.functions import col

def t21_join_zones(df, zones_df = zone_names):
    # input: output of task 1.2 and zone_names dataset

    # Obtaining the start zone name
    df = df.join(zones_df, df.PULocationID == zones_df.LocationID)
    df = df.select('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount', 'Zone')
    df = df.withColumnRenamed("Zone","start_zone")

    # Obtaining the end zone name
    zones_df = zones_df.select("LocationID",col("Zone").alias("Zone1"))
    df = df.join(zones_df, df.DOLocationID == zones_df.LocationID)
    df = df.select('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount', 'start_zone', 'Zone1')
    df = df.withColumnRenamed("Zone1","end_zone")

    return df

In [None]:
# execute task 2.1
trips_21 = t21_join_zones(trips_12, zones_df = zone_names)

print_count(trips_21)
# display(trips_21.take(10))

Row count: 2,461,535


**In implementation of task 2.2 we compute new column "unit_profitability".**

In [None]:
# Your solution implementation to task 2.2 goes HERE
def t22_calc_profit(df):
    # input: output of task 2.1

    df = df.withColumn("unit_profitability", df['total_amount']/df['trip_distance'])

    return df

In [None]:
# execute task 2.2
trips_22 = t22_calc_profit(trips_21)

print_count(trips_22)
# display(trips_22.take(10))

Row count: 2,461,535


## Task 3: Rank zones by traffic, passenger volume and profitability

### 3.1 - Summarise interzonal travel

Build a graph data structure of zone-to-zone traffic, representing aggregated data about trips between any two zones. The graph will have one node for each zone and one edge connecting each pair of zones. In addition, edges contain aggregate information about all trips between those zones.

For example, zones Z1 and Z2 are connected by *two* edges: edge Z1 --> Z2 carries aggregate data about all trips that originated in Z1 and ended in Z2, and edge Z2 --> Z1 carries aggregate data about all trips that originated in Z2 and ended in Z1.

The aggregate information of interzonal travel must include the following data:

- `average_unit_profit` - the average unit profitability (calculated as `mean(unit_profitability)`).
- `trips_count` -- the total number of recorded trips.
- `total_passengers` -- the total number of passenger across all trips (sum of `passenger_count`).

This graph can be represented as a new dataframe, with schema:

\[`PULocationID`, `DOLocationID`, `average_unit_profit`, `trips_count`, `total_passengers` \]

__hint__: the `groupby()` operator produces a `pyspark.sql.GroupedData` structure. You can then calculate multiple aggregations from this using `pyspark.sql.GroupedData.agg()`:
- https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.groupby.html
- https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.agg.html

### Task 3.2 - Obtain top-10 zones

For each of the following measures, report the top-10 zones _using their plain names you dereferenced in the previous step, not the codes_. Note that this requires ranking the nodes in different orders. Specifically, you need to calculate the following further aggregations:

- the **total** number of trips originating from Z. This is simply the sum of `trips_count` over all outgoing edges for Z, i.e., edges of the form Z -> \*
- the **average** profitability of a zone. This is the average of all `average_unit_profit` over all *outgoing* edges from Z.
- The **total** passenger volume measured as the **sum** of `total_passengers` carried in trips that originate from Z

In [None]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

In [None]:
## Your solution to task 3.1 goes HERE
def t31_summarise_trips(df):
    # input: output of task 2.2
    df = df.groupby(df['PULocationID'], df['DOLocationID']).agg(pf.mean(df['unit_profitability']),pf.count(df['PULocationID']),pf.sum(df['passenger_count']))
    df = df.withColumnRenamed('avg(unit_profitability)','average_unit_profit').withColumnRenamed('count(PULocationID)', 'trips_count').withColumnRenamed('sum(passenger_count)', 'total_passengers')
    return df

In [None]:
# execute task 3.1
graph = t31_summarise_trips(trips_22)

print_count(graph)
# display(graph.take(10))

Row count: 14,361


In [None]:
# Your solution to task 3.2 goes HERE (implement each of the functions below)
def t32_summarise_zones_pairs(df, zones_df = zone_names):
    df = df.groupby(df['PULocationID']).agg(pf.sum(df['trips_count']), pf.mean(df['average_unit_profit']), pf.sum(df['total_passengers']))
    df = df.withColumnRenamed('sum(trips_count)', 'total_no_trips').withColumnRenamed('avg(average_unit_profit)', 'average profitability').withColumnRenamed('sum(total_passengers)', 'total_no_passenger').join(zones_df, df.PULocationID == zones_df.LocationID, "left")
    df = df.drop('Borough').drop('service_zone')
    return df

# Top 10 ranked zones by traffic (trip volume)
def t32_top10_trips(df_zones):
    # input: output of task 3.2
    top10_tripss = df_zones.orderBy(col('total_no_trips').desc()).take(10)
    return top10_tripss

# Top 10 ranked zones by profit
def t32_top10_profit(df_zones):
    # input: output of task 3.2
    top10_profitt = df_zones.orderBy(col('average profitability').desc()).take(10)
    return top10_profitt

# Top 10 ranked zones by passenger volume
def t32_top10_passenger(df_zones):
    # input: output of task 3.2
    top10_passengerr = df_zones.orderBy(col('total_no_passenger').desc()).take(10)
    return top10_passengerr

In [None]:
# execute task 3.2
zones = t32_summarise_zones_pairs(graph)

top10_trips     = t32_top10_trips(zones)
top10_profit    = t32_top10_profit(zones)
top10_passenger = t32_top10_passenger(zones)

In [None]:
# use 'display()' or return a pandas DataFrame for 'pretty' output
display(top10_trips)

PULocationID,total_no_trips,average profitability,total_no_passenger,LocationID,Zone
237,121782,6.904942617564514,169758.0,237,Upper East Side South
236,103428,7.086510643557858,143555.0,236,Upper East Side North
186,96332,7.9173290357329105,136582.0,186,Penn Station/Madison Sq West
161,93257,7.685414381975197,134233.0,161,Midtown Center
170,87892,7.757348026142983,123083.0,170,Murray Hill
162,84506,7.440035224459652,118843.0,162,Midtown East
142,80608,7.570946106028091,114236.0,142,Lincoln Square East
239,74157,6.920120397371032,105026.0,239,Upper West Side South
48,74097,8.939710370747635,106450.0,48,Clinton East
141,73931,6.5210824289445934,103138.0,141,Lenox Hill West


In [None]:
# use 'display()' return a pandas DataFrame for 'pretty' output
display(top10_profit)

PULocationID,total_no_trips,average profitability,total_no_passenger,LocationID,Zone
15,55,373.64352610622007,16.0,15,Bay Terrace/Fort Totten
10,355,106.86575379124724,153.0,10,Baisley Park
207,294,103.12501378067476,354.0,207,Saint Michaels Cemetery/Woodside
190,76,90.59317954859831,67.0,190,Prospect Park
241,228,59.391859123132,89.0,241,Van Cortlandt Village
101,27,48.41059594606823,12.0,101,Glen Oaks
115,4,38.65,4.0,115,Grymes Hill/Clifton
59,8,36.84437137879828,6.0,59,Crotona Park
81,142,35.01882481592064,19.0,81,Eastchester
89,714,35.000379842565565,176.0,89,Flatbush/Ditmas Park


In [None]:
# use 'display()' or return a pandas DataFrame for 'pretty' output
display(top10_passenger)

PULocationID,total_no_trips,average profitability,total_no_passenger,LocationID,Zone
237,121782,6.904942617564514,169758.0,237,Upper East Side South
236,103428,7.086510643557858,143555.0,236,Upper East Side North
186,96332,7.9173290357329105,136582.0,186,Penn Station/Madison Sq West
161,93257,7.685414381975197,134233.0,161,Midtown Center
170,87892,7.757348026142983,123083.0,170,Murray Hill
162,84506,7.440035224459652,118843.0,162,Midtown East
142,80608,7.570946106028091,114236.0,142,Lincoln Square East
48,74097,8.939710370747635,106450.0,48,Clinton East
239,74157,6.920120397371032,105026.0,239,Upper West Side South
234,73352,7.573112251525214,104817.0,234,Union Sq


## Task 4 - Record the pipeline's execution time

Record the execution time of:

1. the whole pipeline
2. the whole pipeline except task 1.2

on the two tables below, for all dataset sizes: `'S'`, `'M'`, `'L'`, `'XL'`, `'XXL'`, and data formats: `parquet` and `delta`.

Analyse the resulting execution times and comment on the effect of dataset size, dataset format and task complexity (with and without task 1.2) on pipeline performance.

In [None]:
# CHANGE the value of the following arguments to record the pipeline execution times for increasing dataset sizes
SIZE = 'XXL'
DATA_FORMAT = 'delta'
WITH_TASK_12 = True

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)


    Trips dataset loaded!
    ---
      Size: XXL
      Format: delta
      Tables loaded: /FileStore/tables/taxi/delta/taxi-XXL-delta/
      Number of trips (dataset rows): 132,396,785
    


In [None]:
# run and record the resulting execution time shown by databricks (on the cell footer)

# IMPORTANT: this function calls all task functions in order of occurrence. For this code to run without errors, you have to load into memory all of the previous task-specific functions, even if you haven't implemented these yet.
pipeline(trips, with_task_12 = WITH_TASK_12)

[[Row(PULocationID=237, total_no_trips=5310020, average profitability=21.147393908739954, total_no_passenger=7915455.0, LocationID=237, Zone='Upper East Side South'),
  Row(PULocationID=236, total_no_trips=4863323, average profitability=12.100854640836335, total_no_passenger=7253828.0, LocationID=236, Zone='Upper East Side North'),
  Row(PULocationID=161, total_no_trips=4458485, average profitability=22.239805098570198, total_no_passenger=6810486.0, LocationID=161, Zone='Midtown Center'),
  Row(PULocationID=186, total_no_trips=4041730, average profitability=13.321993203498558, total_no_passenger=6038775.0, LocationID=186, Zone='Penn Station/Madison Sq West'),
  Row(PULocationID=162, total_no_trips=3935304, average profitability=16.471113676345233, total_no_passenger=5911055.0, LocationID=162, Zone='Midtown East'),
  Row(PULocationID=142, total_no_trips=3523890, average profitability=20.08786025170914, total_no_passenger=5370025.0, LocationID=142, Zone='Lincoln Square East'),
  Row(PULo

### I have run this notebook on CSC8641 cluster for computation.
_Table 1. Pipeline performance for `parquet` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2,898,033 |  15,571,166 |  41,953,716 |  90,443,069 |  132,396,785 |
| execution time   (w/o 1.2) (seconds) | 1.55 sec | 5.36 sec | 7.46 sec | 15.18 sec | 17.88 sec |
| execution time     (seconds)         | 3.39 sec | 8.53 sec | 20.75 sec | 41.88 sec | 49.77 sec |
| sec / 1M records (w/o 1.2) (seconds) | 0.534 sec  | 0.344 sec  | 0.177 sec  | 0.167 sec  | 0.135 sec  |
| sec / 1M records    (seconds)        | 1.169 sec  | 0.547 sec  | 0.494 sec  | 0.463 sec  | 0.375 sec  |

_Table 2. Pipeline performance for `delta` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2,898,033 |  15,571,166 |  41,953,716 |  90,443,069 |  132,396,785 |
| execution time   (w/o 1.2) (seconds) | 1.48 sec | 3.04 sec | 3.24 sec | 3.87 sec | 4.31 sec |
| execution time   (seconds)           | 3.13 sec | 4.29 sec | 6.30 sec | 9.09 sec | 12.66 sec |
| sec / 1M records (w/o 1.2) (seconds) | 0.510 sec  | 0.195 sec  | 0.077 sec  | 0.042 sec  | 0.032 sec  |
| sec / 1M records   (seconds)         | 1.080 sec  | 0.275 sec  | 0.150 sec  | 0.100 sec  | 0.095 sec  |


### Observations

In the tables above we have calculated the pipeline execution times for two types of dataset namely **"parquet"** and **"delta"**. Also there are 5 different dataset sizes which are small(**S**), mediun(**M**), large(**L**), extra large(**XL**) and double extra large(**XXL**) in increasing order.<br/>
Beginning with the **parquet** dataset format, we can observe that pipeline execution time is increasing as we keep increasing the size of dataset. Comparison is also made between execution time when we include task 1.2 and exclude task 1.2. Execution time for all sizes of datasets increases when we include task 1.2, because of the task complexity of removing outliers where we need to calculate median, MAD and z-score values for two columns and apply filter. Execution time required for processing 1M records has a decreasing trend of time as we keep increasing dataset size.<br/>
Moving on with **delta** dataset, it is seen that pipeline takes less execution time as compared with parquet dataset. Remaining trends are same as discussed for parquet dataset where,<br/> 1. Execution time increasing with increase in dataset size<br/> 2. Inclusion of task 1.2 requires more execution time<br/> 3. Decreasing trend for processing 1M records as we increase dataset size.