# [CSC8101] Big Data Analytics - 2022 Spark Coursework

## Coursework overview

### Inputs

- **NYC Taxi Trips dataset** - list of recorded taxi trips, each with several characteristics, namely: distance, number of passengers, origin zone, destination zone and trip cost (total amount charged to customer).
- **NYC Zones dataset** - list of zones wherein trips can originate/terminate.

### Tasks

1. Data cleaning
  1. Remove "0 distance" and 'no passengers' records.
  2. Remove outlier records. 
2. Add new columns
  1. Join with zones dataset
  2. Compute the unit profitability of each trip
3. Zone summarisation and ranking
  1. Summarise trip data per zone
  2. Obtain the top 10 ranks according to:
    1. The total trip volume
    2. Their average profitabilitiy
    3. The total passenger volume
4. Record the total and task-specific execution times for each dataset size and format.

### How to

###### Code structure and implementation

- You must implement your solution to each task in the provided function code skeleton.
- The task-specific functions are combined together to form the full pipeline code, executed last (do not modify this code).
- Before implementing the specified function skeleton, you should develop and test your solution on separate code cells (create and destroy cells as needed).

###### Development

- Develop an initial working solution for the 'S' dataset and only then optimise it for larger dataset sizes.
- To perform vectorised operations on a DataFrame:
  - use the API docs to look for existing vectorised functions in: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions
  - if a customised function is required (e.g. to add a new column based on a linear combination of other columns), implement your own User Defined Function (UDF). See:  https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
- Use only the `pyspark.sql` API - documentation link below - (note that searching through the docs returns results from the `pyspark.sql` API together with the `pyspark.pandas` API):
  - https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.sql.html
- Periodically download your notebook to your computer as backup and safety measure against accidental file deletion.
 
###### Execution time measurement

- Execution time is calculated and returned by the Spark Engine and shown in the output region of the cell.
- To measure the execution time of a task you must perform a `collect` or similar operation (e.g. `take`) on the returned DataFrame.

## Task 0 - Read data

The code below is ready to run. **Do not modify this code**. It does the following:

- Reads the 'zones' dataset into variable 'zone_names'
- Defines the `init_trips` function that allows you to read the 'trips' dataset (from the DBFS FileStore) given the dataset size ('S' to 'XXL') and format ('parquet' or 'delta') as function arguments
- Defines the `pipeline` function, called in Task 4 to measure the execution time of the entire data processing pipeline
- Shows you how to call the `init_trips` function and display dataset characteristics (number of rows, schema)

In [0]:
## global imports
import pyspark.sql as ps
import pyspark.sql.functions as pf
import pandas as pd

# Load zone names dataset - (much faster to read small file from git than dbfs)
zones_file_url = 'https://raw.githubusercontent.com/NewcastleComputingScience/csc8101-coursework/main/02-assignment-spark/taxi_zone_names.csv'
zone_names = spark.createDataFrame(pd.read_csv(zones_file_url))

# Function to load trips dataset by selected dataset size
def init_trips(size = 'S', data_format = "parquet", taxi_folder = "/FileStore/tables/taxi"):     
    
    files = {
        'S'  : ['2021_07'],
        'M'  : ['2021'],
        'L'  : ['2020_21'],
        'XL' : ['1_6_2019', '7_12_2019'],
        'XXL': ['1_6_2019', '7_12_2019', '2020_21']
    }
    
    # validate input dataset size
    if size not in files.keys():
        print("Invalid input dataset size. Must be one of {}".format(list(files.keys())))
        return None               
    
    if data_format == "parquet":
        filenames = list(map(lambda s: f'{taxi_folder}/tripdata_{s}.parquet', files[size]))
        trips_df = spark.read.parquet(filenames[0])
        
        for name in filenames[1:]:
            trips_df = trips_df.union(spark.read.parquet(name))
            
    elif data_format == "delta":
        filenames = f"{taxi_folder}/taxi-{size}-delta/"
        trips_df = spark.read.format("delta").load(filenames)
    
    else:
        print("Invalid data format. Must be one of {}".format(['parquet', 'delta']))
        return None
        
    print(
    """
    Trips dataset loaded!
    ---
      Size: {s}
      Format: {f}
      Tables loaded: {ds}
      Number of trips (dataset rows): {tc:,}
    """.format(s = size, f = data_format, ds = filenames, tc = trips_df.count()))
    
    return trips_df

# helper function to print dataset row count
def print_count(df):
    print("Row count: {t:,}".format(t = df.count()))

def pipeline(trips_df, with_task_12 = False, zones_df = zone_names):
    # Do not edit
    #---

    ## Task 1.1
    _trips_11 = t11_remove_zeros(trips_df)

    ## Task 1.2
    if with_task_12:
        _trips_12 = t12_remove_outliers(_trips_11)
    else:
        _trips_12 = _trips_11

    ## Task 2.1
    _trips_21 = t21_join_zones(_trips_12, zones_df = zone_names)

    ## Task 2.2
    _trips_22 = t22_calc_profit(_trips_21)

    ## Task 3.1
    _graph = t31_summarise_trips(_trips_22)

    ## Task 3.2
    _zones = t32_summarise_zones_pairs(_graph)

    _top10_trips     = t32_top10_trips(_zones)
    _top10_profit    = t32_top10_profit(_zones)
    _top10_passenger = t32_top10_passenger(_zones)
    
    return([_top10_trips, _top10_profit, _top10_passenger])

In [0]:
# CHANGE the value of argument 'size' to record the pipeline execution times for increasing dataset sizes
SIZE = 'XXL'
DATA_FORMAT = 'delta'

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)

# uncomment line only for small datasets
# trips.take(1)

In [0]:
print_count(trips)

In [0]:
# dataset schemas
trips.printSchema()

In [0]:
display(trips[['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']].take(5))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
151,239,1.5,1.0,9.95
239,246,2.6,1.0,16.3
236,236,0.0,3.0,5.8
193,193,0.0,5.0,7.55
193,193,0.0,5.0,55.55


In [0]:
zone_names.printSchema()

In [0]:
display(zone_names.take(5))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone


## Task 1 - Filter rows

**Input:** trips dataset

### Task 1.1 - Remove "0 distance" and 'no passengers' records

Remove dataset rows that represent invalid trips:

- Trips where `trip_distance == 0` (no distance travelled)
- Trips where `passenger_count == 0` and `total_amount == 0` (we want to retain records where `total_amount` > 0 - these may be significant as the taxi may have carried some parcel, for example)

Altogether, a record is removed if it satisfies the following conditions:

`trip_distance == 0` or `(passenger_count == 0` and `total_amount == 0)`.

**Recommended:** Select only the relevant dataset columns for this and subsequent tasks: `['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')]`

### Task 1.2 - Remove outliers using the modified z-score

Despite having removed spurious "zero passengers" trips in task 1.1, columns `total_amount` and `trip_distance` contain additional outlier values that must be identified and removed.

To identify and remove outliers, you will use the modified [z-score](https://en.wikipedia.org/wiki/Standard_score) method.
The modified z-score uses the median and [Median Absolute Deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (MAD), instead of the mean and standard deviation, to determine how far an observation (indexed by i) is from the mean:

$$z_i = \frac{x_i - \mathit{median}(\mathbf{x})}{\mathbf{MAD}},$$

where x represents the input vector, xi is an element of x and zi is its corresponding z-score. In turn, the MAD formula is:

$$\mathbf{MAD} = 1.438 * \mathit{median}(\big\lvert x_i - \mathit{median}(\mathbf{x})\big\rvert).$$

Observations with **high** (absolute) z-score are considered outlier observations. A score is considered **high** if its __absolute z-score__ is larger than a threshold T = 3.5:

$$\big\lvert z_i \big\rvert > 3.5.$$

where T represents the number of unit standard deviations beyond which a score is considered an outlier ([wiki](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)).

This process is repeated twice, once for each of the columns `total_amount` and `trip_distance` (in any order).

**Important:** Use the surrogate function [`percentile_approx`](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.percentile_approx.html?highlight=percentile#pyspark.sql.functions.percentile_approx) to estimate the median (calculating the median values for a column is expensive as it cannot be parallelised efficiently).

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

In [0]:
#chnage and update the condition and or 
# Your solution implementation to task 1.1 goes HERE
def t11_remove_zeros(df):
    # input: trips dataset
    df=df.select('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')
    df=df.filter((df.trip_distance != 0)
     & ((df.passenger_count != 0) & (df.total_amount > 0) |(df.passenger_count == 0) & (df.total_amount > 0)))
    return df

## Task 1.1-solution:

###The above function removes the outiler's where trip_distance==0, total_distance==0 and passenger_count==0, addtional to that it also retain records where total_amount > 0 and passenger_count==0(taxi is carrying some parcel). The below action commands clearly shows we are able to filter trips where trip_distance==0, total_amount==0 and retain the rows where passenger_count is zero but have travelled some distance(carrying some parcel) and has been charged some amount.

In [0]:
# execute task 1.1
trips_11 = t11_remove_zeros(trips)

print_count(trips_11)

## uncomment only for smaller datasets
display(trips_11.take(10))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
151,239,1.5,1.0,9.95
239,246,2.6,1.0,16.3
163,229,1.3,1.0,9.05
229,7,3.7,1.0,18.5
141,234,2.1,2.0,13.0
246,162,2.8,2.0,19.55
238,151,0.7,1.0,8.5
163,25,8.7,1.0,42.95
224,25,6.3,1.0,28.5
141,234,2.7,1.0,15.3


In [0]:
trips_11.filter(trips_11.trip_distance==0).show(10)

In [0]:
trips_11.filter(trips_11.total_amount==0).show(10)

###The below action commands clearly shows we are able to filter/retain the rows where passenger_count is zero but have travelled some distance(carrying some parcel) and has been charged some amount.

In [0]:
trips_11.filter(trips_11.passenger_count==0).show(10)

In [0]:
# Your solution implementation to task 1.2 goes HERE
import numpy as np
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from  pyspark.sql.functions import abs,mean
import pyspark.sql.functions as func 
def t12_remove_outliers(df):
    
    # Calculating Median for total amount and trip distance
    mediandf = df.select(pf.percentile_approx("total_amount", .5).alias('total_amount_median'),pf.percentile_approx("trip_distance", .5).alias('trip_distance_median') )
    # Joining with the actual dataset
    df = df.crossJoin(mediandf)
    # actual value diff from median - abs
    df = df.withColumn('total_amount_diff_median', pf.round(pf.abs(pf.col('total_amount') - pf.col('total_amount_median')),2))
    df = df.withColumn('trip_distance_diff_median', pf.round(pf.abs(pf.col('trip_distance') - pf.col('trip_distance_median')),2))
    
    # Median of abs difference
    MADdf = df.select(pf.percentile_approx("total_amount_diff_median", .5).alias('total_amount_MAD'),pf.percentile_approx("trip_distance_diff_median", .5).alias('trip_distance_MAD') )
    # Cross join with the actual dataset
    df = df.crossJoin(MADdf)
    
    # Calculate Modified Zscore using above formula
    df = df.withColumn('Zscore_trip_distance', pf.abs(pf.round((pf.col('trip_distance') - pf.col('trip_distance_median')) / (1.438 * pf.col('trip_distance_MAD') ),2 )))
    df = df.withColumn('Zscore_total_amount', pf.abs(pf.round((pf.col('total_amount') - pf.col('total_amount_median')) / (1.438 * pf.col('total_amount_MAD') ),2 )))
    
    # filtering only 
    df = df.filter(pf.col('Zscore_trip_distance') <= 3.5)
    df = df.filter(pf.col('Zscore_total_amount') <= 3.5)
    df = df.select('PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')
    return df


##Task 1.2 solution

###The above function takes the arguments(trip_distance,total_amount) calculates the median and MAD and assign new column with Zscore_trip_distance, Zscore_total_amount calculating the zscore's for both the columns, finally  we are filtering the rows one after the other based on trip_distance and total_amount and finally we are selecting the columns which we need.

In [0]:
# execute task 1.2
trips_12 = t12_remove_outliers(trips_11)

print_count(trips_12)

display(trips_12.take(10))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
151,239,1.5,1.0,9.95
239,246,2.6,1.0,16.3
163,229,1.3,1.0,9.05
229,7,3.7,1.0,18.5
141,234,2.1,2.0,13.0
246,162,2.8,2.0,19.55
238,151,0.7,1.0,8.5
141,234,2.7,1.0,15.3
170,170,0.38,1.0,4.8
170,170,0.55,1.0,9.75


## Task 2 - Compute new columns

### Task 2.1 - Zone names

Obtain the **start** and **end** zone names of each trip by joining the `trips` and `zone_names` datasets (i.e. by using the `zone_names` dataset as lookup table).

**Note:** The columns containing the start and end zone ids of each trip are named `PULocationID` and `DOLocationID`, respectively.

### Task 2.2 - Unit profitability

Compute the column `unit_profitabilty = total_amount / trip_distance`.

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

##Task 2.1 and 2.2 solution

###The above function return the start and end zones names by merging the Zones_names lookup table and finally calculating the unit_profitabilty.

In [0]:
from pyspark.sql.functions import col
# Your solution implementation to task 2.1 goes HERE
def t21_join_zones(df, zones_df = zone_names):
    # input: output of task 1.2 and zone_names dataset
    column2=["PULocationID","DOLocationID"]
    for column in column2:
        col_zone = column +'_zone'
        df=(df.join(zone_names,(col(column) == zones_df.LocationID),"inner").drop("LocationID","Borough","service_zone").withColumnRenamed("zone",col_zone))
    return df

##2.1 Merged zones_names with trips table( Added actual names for pick up and drop locations)

In [0]:
# execute task 2.1
trips_21 = t21_join_zones(trips_12, zones_df = zone_names)

print_count(trips_21)
display(trips_21.take(10))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount,PULocationID_zone,DOLocationID_zone
151,239,1.5,1.0,9.95,Manhattan Valley,Upper West Side South
239,246,2.6,1.0,16.3,Upper West Side South,West Chelsea/Hudson Yards
163,229,1.3,1.0,9.05,Midtown North,Sutton Place/Turtle Bay North
229,7,3.7,1.0,18.5,Sutton Place/Turtle Bay North,Astoria
141,234,2.1,2.0,13.0,Lenox Hill West,Union Sq
246,162,2.8,2.0,19.55,West Chelsea/Hudson Yards,Midtown East
238,151,0.7,1.0,8.5,Upper West Side North,Manhattan Valley
141,234,2.7,1.0,15.3,Lenox Hill West,Union Sq
170,170,0.38,1.0,4.8,Murray Hill,Murray Hill
170,170,0.55,1.0,9.75,Murray Hill,Murray Hill


In [0]:
# Your solution implementation to task 2.2 goes HERE
def t22_calc_profit(df):
    # input: output of task 2.1
    df=df.withColumn("unit_profitabilty",(df.total_amount/df.trip_distance))
    return df

##2.2 calculated unit profitbility

In [0]:
# execute task 2.2
trips_22 = t22_calc_profit(trips_21)

print_count(trips_22)
display(trips_22.take(10))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount,PULocationID_zone,DOLocationID_zone,unit_profitabilty
151,239,1.5,1.0,9.95,Manhattan Valley,Upper West Side South,6.633333333333333
239,246,2.6,1.0,16.3,Upper West Side South,West Chelsea/Hudson Yards,6.269230769230769
163,229,1.3,1.0,9.05,Midtown North,Sutton Place/Turtle Bay North,6.961538461538462
229,7,3.7,1.0,18.5,Sutton Place/Turtle Bay North,Astoria,5.0
141,234,2.1,2.0,13.0,Lenox Hill West,Union Sq,6.19047619047619
246,162,2.8,2.0,19.55,West Chelsea/Hudson Yards,Midtown East,6.982142857142858
238,151,0.7,1.0,8.5,Upper West Side North,Manhattan Valley,12.142857142857144
141,234,2.7,1.0,15.3,Lenox Hill West,Union Sq,5.666666666666667
170,170,0.38,1.0,4.8,Murray Hill,Murray Hill,12.63157894736842
170,170,0.55,1.0,9.75,Murray Hill,Murray Hill,17.727272727272727


## Task 3: Rank zones by traffic, passenger volume and profitability

### 3.1 - Summarise interzonal travel

Build a graph data structure of zone-to-zone traffic, representing aggregated data about trips between any two zones. The graph will have one node for each zone and one edge connecting each pair of zones. In addition, edges contain aggregate information about all trips between those zones. 

For example, zones Z1 and Z2 are connected by *two* edges: edge Z1 --> Z2 carries aggregate data about all trips that originated in Z1 and ended in Z2, and edge Z2 --> Z2 carries aggregate data about all trips that originated in Z2 and ended in Z1.

The aggregate information of interzonal travel must include the following data:

- `average_unit_profit` - the average unit profitability (calculated as `mean(unit_profitabilty)`).
- `trips_count` -- the total number of recorded trips.
- `total_passengers` -- the total number of passenger across all trips (sum of `passenger_count`).

This graph can be represented as a new dataframe, with schema:

\[`PULocationID`, `DOLocationID`, `average_unit_profit`, `trips_count`, `total_passengers` \]

__hint__: the `groupby()` operator produces a `pyspark.sql.GroupedData` structure. You can then calculate multiple aggregations from this using `pyspark.sql.GroupedData.agg()`: 
- https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.groupby.html
- https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.agg.html

### Task 3.2 - Obtain top-10 zones

For each of the following measures, report the top-10 zones _using their plain names you dereferenced in the previous step, not the codes_. Note that this requires ranking the nodes in different orders. Specifically, you need to calculate the following further aggregations:

- the **total** number of trips originating from Z. This is simply the sum of `trips_count` over all outgoing edges for Z, i.e., edges of the form Z -> \*
- the **average** profitability of a zone. This is the average of all `average_unit_profit` over all *outgoing* edges from Z.
- The **total** passenger volume measured as the **sum** of `total_passengers` carried in trips that originate from Z

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

##Task 3.1 and 3.2
###The below function calculates the trip_count, average unit profit and total number of passengers among two zones and gives the top 10 zones.

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum,avg,max,min,mean,count
## Your solution to task 3.1 goes HERE
def t31_summarise_trips(df):
    # input: output of task 2.2
    df1=df.groupby(["PULocationID","DOLocationID"]).count().withColumnRenamed("count", "trips_count")
    df2=df.groupby(["PULocationID","DOLocationID"]).agg(mean("unit_profitabilty").alias("average_unit_profit"),
                                                              sum("passenger_count").alias("total_passengers"))
    df=df1.join(df2,["PULocationID","DOLocationID"])
    return df

In [0]:
# execute task 3.1
graph = t31_summarise_trips(trips_22)

print_count(graph)
display(graph.take(10))

PULocationID,DOLocationID,trips_count,average_unit_profit,total_passengers
114,100,17527,7.628126252431913,28377.0
234,144,59727,9.18294141222978,93332.0
90,231,54139,7.433968218435063,81369.0
90,142,28412,7.12645984667109,42352.0
231,261,37338,13.4326381805424,58321.0
246,249,51986,9.188398143556556,81507.0
142,144,7691,5.837846035486613,12507.0
148,262,6896,4.8754829552081205,10913.0
25,255,1466,4.847200053741639,2020.0
125,255,2344,6.185896317299506,3545.0


In [0]:
# Your solution to task 3.2 goes HERE (implement each of the functions below)
def t32_summarise_zones_pairs(df, zones_df = zone_names):
    column2=["PULocationID","DOLocationID"]
    for column in column2:
        col_zone = column +'_zone'
        df=(df.join(zone_names,(col(column) == zones_df.LocationID),"inner").drop("LocationID","Borough","service_zone").withColumnRenamed("zone",col_zone))
    return df

# Top 10 ranked zones by traffic (trip volume)
def t32_top10_trips(df_zones):
    # input: output of task 3.2
    df_zones=zones.groupby(["PULocationID_zone"]).agg(sum("trips_count").alias("trips")).sort(["trips"],ascending = False)
    display(df_zones.take(10))
    return df_zones

# Top 10 ranked zones by profit
def t32_top10_profit(df_zones):
    # input: output of task 3.2
    df_zones=zones.groupby(["PULocationID_zone"]).agg(avg("average_unit_profit").alias("average")).sort(["average"],ascending = False)
    display(df_zones.take(10))
    return df_zones

# Top 10 ranked zones by passenger volume
def t32_top10_passenger(df_zones):
    # input: output of task 3.2
    df_zones=zones.groupby(["PULocationID_zone"]).agg(sum("total_passengers").alias("passengers")).sort(["passengers"],ascending = False)
    display(df_zones.take(10))
    return df_zones

##With task 1.2 filter these are the top 10 trips,profit and passenger count

In [0]:
# execute task 3.2
zones = t32_summarise_zones_pairs(graph)

top10_trips     = t32_top10_trips(zones)
top10_profit    = t32_top10_profit(zones)
top10_passenger = t32_top10_passenger(zones)

PULocationID_zone,trips
Upper East Side South,5296908
Upper East Side North,4853704
Midtown Center,4479753
Penn Station/Madison Sq West,4061374
Midtown East,3958602
Lincoln Square East,3531751
Murray Hill,3507218
Clinton East,3443233
Union Sq,3404499
Times Sq/Theatre District,3401930


PULocationID_zone,average
Rossville/Woodrow,69.82495415109167
JFK Airport,63.2300216294618
Co-Op City,54.67098945942805
Newark Airport,46.02349911388001
DUMBO/Vinegar Hill,45.68365308977968
Glen Oaks,43.83413934824198
Cambria Heights,42.66288328317643
East Flatbush/Farragut,39.38765612501434
Sunset Park East,39.14226584730324
Elmhurst,38.3175539265868


PULocationID_zone,passengers
Upper East Side South,7929020.0
Upper East Side North,7277675.0
Midtown Center,6866190.0
Penn Station/Madison Sq West,6082025.0
Midtown East,5966468.0
Lincoln Square East,5407821.0
Times Sq/Theatre District,5400409.0
Clinton East,5319063.0
Murray Hill,5274408.0
Union Sq,5204217.0


##Without task 1.2 filter these are the top 10 trips,profit and passenger count

In [0]:
display(top10_trips.take(10))

PULocationID_zone,trips
Upper East Side South,5506913
Upper East Side North,5075037
Midtown Center,4844970
Penn Station/Madison Sq West,4343188
Midtown East,4281374
Times Sq/Theatre District,3818960
Murray Hill,3772745
Clinton East,3744245
Lincoln Square East,3709259
JFK Airport,3680992


In [0]:
display(top10_profit.take(10))

PULocationID_zone,average
Newark Airport,78.45740322057458
Arden Heights,39.634336054219325
Rockaway Park,35.05216478417078
Rossville/Woodrow,28.235066961693214
Eltingville/Annadale/Prince's Bay,22.449201976606723
New Dorp/Midland Beach,17.793097182050946
Oakwood,17.70895011392235
,17.21994961942218
NV,16.3750823519909
Ozone Park,15.60234795394276


In [0]:
display(top10_passenger.take(10))

PULocationID_zone,passengers
Upper East Side South,8246788.0
Upper East Side North,7611806.0
Midtown Center,7440899.0
Penn Station/Madison Sq West,6509484.0
Midtown East,6460417.0
Times Sq/Theatre District,6083898.0
Clinton East,5799265.0
JFK Airport,5781768.0
Lincoln Square East,5684972.0
Murray Hill,5679954.0


## Task 4 - Record the pipeline's execution time

Record the execution time of:

1. the whole pipeline
2. the whole pipeline except task 1.2

on the two tables below, for all dataset sizes: `'S'`, `'M'`, `'L'`, `'XL'`, `'XXL'`, and data formats: `parquet` and `delta`.

Analyse the resulting execution times and comment on the effect of dataset size, dataset format and task complexity (with and without task 1.2) on pipeline performance.

In [0]:
# after developing your solution, it may be convenient to combine all your functions in a single cell (at the start or end of the notebook)

# CHANGE the value of the following arguments to record the pipeline execution times for increasing dataset sizes
SIZE = 'XL'
DATA_FORMAT = 'delta'
WITH_TASK_12 = True

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)

In [0]:
# run and record the resulting execution time shown by databricks (on the cell footer)

# IMPORTANT: this function calls all task functions in order of occurrence. For this code to run without errors, you have to load into memory all of the previous task-specific functions, even if you haven't implemented these yet.
pipeline(trips, with_task_12 = WITH_TASK_12)

PULocationID_zone,trips
Upper East Side South,5296908
Upper East Side North,4853704
Midtown Center,4479753
Penn Station/Madison Sq West,4061374
Midtown East,3958602
Lincoln Square East,3531751
Murray Hill,3507218
Clinton East,3443233
Union Sq,3404499
Times Sq/Theatre District,3401930


PULocationID_zone,average
Rossville/Woodrow,69.82495415109167
JFK Airport,63.23002162946182
Co-Op City,54.67098945942805
Newark Airport,46.023499113880014
DUMBO/Vinegar Hill,45.68365308977968
Glen Oaks,43.83413934824198
Cambria Heights,42.66288328317643
East Flatbush/Farragut,39.38765612501434
Sunset Park East,39.14226584730324
Elmhurst,38.3175539265868


PULocationID_zone,passengers
Upper East Side South,7929020.0
Upper East Side North,7277675.0
Midtown Center,6866190.0
Penn Station/Madison Sq West,6082025.0
Midtown East,5966468.0
Lincoln Square East,5407821.0
Times Sq/Theatre District,5400409.0
Clinton East,5319063.0
Murray Hill,5274408.0
Union Sq,5204217.0


_Table 1. Pipeline performance for `parquet` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2.9 |  15 |  42 |  90 |  132 |
| execution time   (w/o 1.2)  | 0.29S | 0.29S | 0.30S | 0.30S | 0.32S |
| execution time              | 3.09M | 4.5M | 14.89M | 20.89M | 30.01M |
| sec / 1M records (w/o 1.2)  | 0.1  | 0.01  | 0.007  | 0.003  | 0.002  |
| sec / 1M records            | 63.93  | 18.00  | 21.27  | 13.9  | 13.64  |

_Table 2. Pipeline performance for `delta` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2.9 |  15  |  42  |  90  |  132 |
| execution time   (w/o 1.2)  | 0.33S | 0.32S | 0.33S | 0.37S | 0.34S |
| execution time              | 2.96M | 3.7M | 4.50M | 5.55M | 6.87M |
| sec / 1M records (w/o 1.2)  | 0.11  | 0.02  | 0.07  | 0.004  | 0.002  |
| sec / 1M records            | 61.58  | 14.8  | 6.4  | 3.70  | 3.20  |