# [CSC8101] Engineering for AI - 2025 Spark Coursework

## Coursework overview

### Inputs

- **NYC Taxi Trips dataset** - list of recorded taxi trips, each with several characteristics, namely: distance, number of passengers, origin zone, destination zone and trip cost (total amount charged to customer).
- **NYC Zones dataset** - list of zones wherein trips can originate/terminate.

### Tasks

1. Data cleaning
  1. Remove "0 distance" and 'no passengers' records.
  2. Remove outlier records.
2. Add new columns
  1. Join with zones dataset
  2. Compute the unit profitability of each trip
3. Zone summarisation and ranking
  1. Summarise trip data per zone
  2. Obtain the top 10 ranks according to:
    1. The total trip volume
    2. Their average profitabilitiy
    3. The total passenger volume
4. Record the total and task-specific execution times for each dataset size and format.

### How to

###### Code structure and implementation

- You must implement your solution to each task in the provided function code skeleton.
- The task-specific functions are combined together to form the full pipeline code, executed last (do not modify this code).
- Before implementing the specified function skeleton, you should develop and test your solution on separate code cells (create and destroy cells as needed).

###### Development

- Develop an initial working solution for the 'S' dataset and only then optimise it for larger dataset sizes.
- To perform vectorised operations on a DataFrame:
  - use the API docs to look for existing vectorised functions in: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
  - if a customised function is required (e.g. to add a new column based on a linear combination of other columns), implement your own User Defined Function (UDF). See:  https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
- Use only the `pyspark.sql` API - documentation link below - (note that searching through the docs returns results from the `pyspark.sql` API together with the `pyspark.pandas` API):
  - https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html
- Periodically download your notebook to your computer as backup and safety measure against accidental file deletion.

###### Execution time measurement

- Execution time is calculated and returned by the Spark Engine and shown in the output region of the cell.
- To measure the execution time of a task you must perform a `collect` or similar operation (e.g. `take`) on the returned DataFrame.

The following codeblock is used to mount the google colab instance to your google drive, in which we will be accessing the data provided in the course.


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Task 0 - Read data

The code below is ready to run. **Do not modify this code**. It does the following:

- Reads the 'zones' dataset into variable 'zone_names'
- Defines the `init_trips` function that allows you to read the 'trips' dataset (from the DBFS FileStore) given the dataset size ('S' to 'XXL') as function argument
- Defines the `pipeline` function, called in Task 4 to measure the execution time of the entire data processing pipeline
- Shows you how to call the `init_trips` function and display dataset characteristics (number of rows, schema)

In [2]:
pip install findspark



In [3]:
## global imports
import pyspark.sql as ps
import pyspark.sql.functions as pf
import pandas as pd
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
# Load zone names dataset - (much faster to read small file from git than dbfs)
zones_file_url = 'https://raw.githubusercontent.com/NewcastleComputingScience/csc8101-coursework/main/02-assignment-spark/taxi_zone_names.csv'
zone_names = spark.createDataFrame(pd.read_csv(zones_file_url))

# Function to load trips dataset by selected dataset size
def init_trips(size = 'S'):

    files = {
        'S'  : ['2021_07'],
        'M'  : ['2021'],
        'L'  : ['2020_21'],
        'XL' : ['1_6_2019', '7_12_2019'],
        'XXL': ['1_6_2019', '7_12_2019', '2020_21']
    }

    # validate input dataset size
    if size not in files.keys():
        print("Invalid input dataset size. Must be one of {}".format(list(files.keys())))
        return None

    filenames = list(map(lambda s: f'/content/drive/MyDrive/CSC8101_Data/tripdata_{s}.parquet', files[size]))
    trips_df = spark.read.parquet(filenames[0])

    for name in filenames[1:]:
        trips_df = trips_df.union(spark.read.parquet(name))


    print(
    """
    Trips dataset loaded!
    ---
      Size: {s}
      Tables loaded: {ds}
      Number of trips (dataset rows): {tc:,}
    """.format(s = size, ds = filenames, tc = trips_df.count()))

    return trips_df

# helper function to print dataset row count
def print_count(df):
    print("Row count: {t:,}".format(t = df.count()))

def pipeline(trips_df, with_task_12 = False, zones_df = zone_names):
    # Do not edit
    #---

    ## Task 1.1
    _trips_11 = t11_remove_zeros(trips_df)

    ## Task 1.2
    if with_task_12:
        _trips_12 = t12_remove_outliers(_trips_11)
    else:
        _trips_12 = _trips_11

    ## Task 2.1
    _trips_21 = t21_join_zones(_trips_12, zones_df = zone_names)

    ## Task 2.2
    _trips_22 = t22_calc_profit(_trips_21)

    ## Task 3.1
    _graph = t31_summarise_trips(_trips_22)

    ## Task 3.2
    _zones = t32_summarise_zones_pairs(_graph)

    _top10_trips     = t32_top10_trips(_zones)
    _top10_profit    = t32_top10_profit(_zones)
    _top10_passenger = t32_top10_passenger(_zones)

    return([_top10_trips, _top10_profit, _top10_passenger])

In [5]:
# CHANGE the value of argument 'size' to record the pipeline execution times for increasing dataset sizes
SIZE = 'M'


# Load trips dataset
trips = init_trips(SIZE)

# uncomment line only for small datasets
#trips.take(1)


    Trips dataset loaded!
    ---
      Size: M
      Tables loaded: ['/content/drive/MyDrive/CSC8101_Data/tripdata_2021.parquet']
      Number of trips (dataset rows): 15,571,166
    


In [6]:
print_count(trips)

Row count: 15,571,166


In [7]:
# dataset schemas
trips.printSchema()

root
 |-- index: long (nullable = true)
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |--

In [8]:
display(trips[['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']].take(5))

[Row(PULocationID=142, DOLocationID=43, trip_distance=2.1, passenger_count=1.0, total_amount=11.8),
 Row(PULocationID=238, DOLocationID=151, trip_distance=0.2, passenger_count=1.0, total_amount=4.3),
 Row(PULocationID=132, DOLocationID=165, trip_distance=14.7, passenger_count=1.0, total_amount=51.95),
 Row(PULocationID=138, DOLocationID=132, trip_distance=10.6, passenger_count=0.0, total_amount=36.35),
 Row(PULocationID=68, DOLocationID=33, trip_distance=4.94, passenger_count=1.0, total_amount=24.36)]

In [9]:
zone_names.printSchema()

root
 |-- LocationID: long (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [10]:
display(zone_names.take(5))

[Row(LocationID=1, Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID=3, Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID=4, Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID=5, Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

## Task 1 - Filter rows

**Input:** trips dataset

### Task 1.1 - Remove "0 distance" and 'no passengers' records

Remove dataset rows that represent invalid trips:

- Trips where `trip_distance == 0` (no distance travelled)
- Trips where `passenger_count == 0` and `total_amount == 0` (we want to retain records where `total_amount` > 0 - these may be significant as the taxi may have carried some parcel, for example)

Altogether, a record is removed if it satisfies the following conditions:

`trip_distance == 0` or `(passenger_count == 0` and `total_amount == 0)`.

**Recommended:** Select only the relevant dataset columns for this and subsequent tasks: `['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')]`

### Task 1.2 - Remove outliers using the modified z-score

Despite having removed spurious "zero passengers" trips in task 1.1, columns `total_amount` and `trip_distance` contain additional outlier values that must be identified and removed.

To identify and remove outliers, you will use the modified [z-score](https://en.wikipedia.org/wiki/Standard_score) method.
The modified z-score uses the median and [Median Absolute Deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (MAD), instead of the mean and standard deviation, to determine how far an observation (indexed by i) is from the mean:

$$z_i = \frac{x_i - \mathit{median}(\mathbf{x})}{\mathbf{MAD}},$$

where x represents the input vector, xi is an element of x and zi is its corresponding z-score. In turn, the MAD formula is:

$$\mathbf{MAD} = 1.483 * \mathit{median}(\big\lvert x_i - \mathit{median}(\mathbf{x})\big\rvert).$$

Observations with **high** (absolute) z-score are considered outlier observations. A score is considered **high** if its __absolute z-score__ is larger than a threshold T = 3.5:

$$\big\lvert z_i \big\rvert > 3.5.$$

where T represents the number of unit standard deviations beyond which a score is considered an outlier ([wiki](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)).

This process is repeated twice, once for each of the columns `total_amount` and `trip_distance` (in any order).

**Important:** Use the surrogate function [`percentile_approx`](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.percentile_approx.html?highlight=percentile#pyspark.sql.functions.percentile_approx) to estimate the median (calculating the median values for a column is expensive as it cannot be parallelised efficiently).

**The Solution For Task 1.1**

In [11]:

from pyspark.sql import functions as F

def t11_remove_zeros(df):
    df_filtered = df.filter(
        (F.col("trip_distance") > 0) &
        ~((F.col("passenger_count") == 0) & (F.col("total_amount") == 0))
    )
    return df_filtered


In [12]:
# execute task 1.1
trips_11 = t11_remove_zeros(trips)

print_count(trips_11)

## uncomment only for smaller datasets
#display(trips_11.take(10))

Row count: 15,350,075


**The Solution For Task 1.2**

In [13]:
from pyspark.sql import functions as F

def t12_remove_outliers(df):
    #Using percentile_approx , Compute the median values for trip_distance and total_amount
    median_values = df.select(
        F.percentile_approx("trip_distance", 0.5).alias("median_td"),
        F.percentile_approx("total_amount", 0.5).alias("median_ta")
    ).collect()[0]

    # Extract the computed median values
    median_td, median_ta = median_values["median_td"], median_values["median_ta"]

    #Compute the Median Absolute Deviation (MAD) for both columns
    df_mad = df.withColumn("dev_td", F.abs(F.col("trip_distance") - median_td)) \
               .withColumn("dev_ta", F.abs(F.col("total_amount") - median_ta))

    # Calculate MAD for each column using percentile_approx
    mad_values = df_mad.select(
        (1.483 * F.percentile_approx("dev_td", 0.5)).alias("mad_td"),
        (1.483 * F.percentile_approx("dev_ta", 0.5)).alias("mad_ta")
    ).collect()[0]

    # Extract the computed MAD values
    mad_td, mad_ta = mad_values["mad_td"], mad_values["mad_ta"]

    # Outlier filtering using Modified Z-Score with threshold larger than 3.5
    df_filtered = df.withColumn("z_score_td", (F.col("trip_distance") - median_td) / mad_td) \
                    .withColumn("z_score_ta", (F.col("total_amount") - median_ta) / mad_ta) \
                    .filter((F.abs(F.col("z_score_td")) <= 3.5) & (F.abs(F.col("z_score_ta")) <= 3.5))

    # Drop the Z-score columns after filtering
    df_filtered = df_filtered.drop("z_score_td", "z_score_ta")

    # Return the filtered DataFrame with outliers removed
    return df_filtered


In [14]:
# execute task 1.2
trips_12 = t12_remove_outliers(trips_11)

print_count(trips_12)
#display(trips_12.take(10))

Row count: 13,526,180


## Task 2 - Compute new columns

### Task 2.1 - Zone names

Obtain the **start** and **end** zone names of each trip by joining the `trips` and `zone_names` datasets (i.e. by using the `zone_names` dataset as lookup table).

**Note:** The columns containing the start and end zone ids of each trip are named `PULocationID` and `DOLocationID`, respectively.

### Task 2.2 - Unit profitability

Compute the column `unit_profitabilty = total_amount / trip_distance`.

**The Solution for Task 2.1**

In [15]:


def t21_join_zones(df, zones_df=zone_names):

    # Renaming Zones to Prevent ambiguity

    zones_pu = zones_df.withColumnRenamed("Zone", "PUZone").withColumnRenamed("LocationID", "PULocationID")
    zones_do = zones_df.withColumnRenamed("Zone", "DOZone").withColumnRenamed("LocationID", "DOLocationID")

    # Joining Zones
    df = df.join(zones_pu, on="PULocationID", how="left")
    df = df.join(zones_do, on="DOLocationID", how="left")
    return df


In [16]:
# execute task 2.1


#Joining The Trips and Zone Names
trips_21 = t21_join_zones(trips_12, zones_df = zone_names)

print_count(trips_21)
#display(trips_21.take(10))

Row count: 13,526,180


**The Solution for Task 2.2**

In [17]:

from pyspark.sql.functions import col

def t22_calc_profit(df):
    # Calculating unit_profitability = total_amount / trip_distance
    df = df.withColumn("unit_profitability", col("total_amount") / col("trip_distance"))
    return df


In [18]:
# execute task 2.2
trips_22 = t22_calc_profit(trips_21)

print_count(trips_22)
#display(trips_22.take(10))

Row count: 13,526,180


## Task 3: Rank zones by traffic, passenger volume and profitability

### 3.1 - Summarise interzonal travel

Build a graph data structure of zone-to-zone traffic, representing aggregated data about trips between any two zones. The graph will have one node for each zone and one edge connecting each pair of zones. In addition, edges contain aggregate information about all trips between those zones.

For example, zones Z1 and Z2 are connected by *two* edges: edge Z1 --> Z2 carries aggregate data about all trips that originated in Z1 and ended in Z2, and edge Z2 --> Z2 carries aggregate data about all trips that originated in Z2 and ended in Z1.

The aggregate information of interzonal travel must include the following data:

- `average_unit_profit` - the average unit profitability (calculated as `mean(unit_profitabilty)`).
- `trips_count` -- the total number of recorded trips.
- `total_passengers` -- the total number of passenger across all trips (sum of `passenger_count`).

This graph can be represented as a new dataframe, with schema:

\[`PULocationID`, `DOLocationID`, `average_unit_profit`, `trips_count`, `total_passengers` \]

__hint__: the `groupby()` operator produces a `pyspark.sql.GroupedData` structure. You can then calculate multiple aggregations from this using `pyspark.sql.GroupedData.agg()`:
- https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.groupby.html
- https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.agg.html

### Task 3.2 - Obtain top-10 zones

For each of the following measures, report the top-10 zones _using their plain names you dereferenced in the previous step, not the codes_. Note that this requires ranking the nodes in different orders. Specifically, you need to calculate the following further aggregations:

- the **total** number of trips originating from Z. This is simply the sum of `trips_count` over all outgoing edges for Z, i.e., edges of the form Z -> \*
- the **average** profitability of a zone. This is the average of all `average_unit_profit` over all *outgoing* edges from Z.
- The **total** passenger volume measured as the **sum** of `total_passengers` carried in trips that originate from Z

**The Solution for Task 3.1**

In [19]:
from pyspark.sql.functions import avg, sum, count, coalesce, lit

def t31_summarise_trips(df):

    # Pairing Data by Pickup and Drop Off Location ID's
    df_summary = df.groupBy("PULocationID", "DOLocationID").agg(

        #Calculate Average profit of a unit
        avg(coalesce("unit_profitability", lit(0))).alias("average_unit_profit"),

        # Total No of trips in a Zone
        count("*").alias("trips_count"),

        # Sum of total passengers
        sum("passenger_count").alias("total_passengers")

    ).select(
        # The Dataframe Structure as mentioned above
        "PULocationID",
        "DOLocationID",
        "average_unit_profit",
        "trips_count",
        "total_passengers"
    )

    return df_summary


In [20]:
# execute task 3.1

#Interzonal Travel Data
graph = t31_summarise_trips(trips_22)

print_count(graph)
#display(graph.take(10))

Row count: 16,992


**The Solution for Task 3.2**

In [21]:
from pyspark.sql.functions import avg, sum, desc, broadcast, coalesce, lit


def t32_summarise_zones_pairs(df, zones_df=zone_names):
    # Aggregate data per origin zone
    zones_summary = df.groupBy("PULocationID").agg(
        sum("trips_count").alias("total_trips"),  # Total trips outgoing
        avg(coalesce("average_unit_profit", lit(0))).alias("average_profitability"),  # Average profit
        sum("total_passengers").alias("total_passengers")  # Total number of passengers outgoing
    )

    # Broadcasting to optimize perfomance
    zones_df = broadcast(zones_df)

    # Keeping an plain name alias
    zones_df = zones_df.withColumnRenamed("Zone", "zone_name")

    # Joining Zone Summary and Zone df Dataframes

    return zones_summary.join(
        zones_df, zones_summary.PULocationID == zones_df.LocationID, "left"
    ).select(
        "zone_name",  # Zone_name selected
        "total_trips",
        "average_profitability",
        "total_passengers"
    )


#Below Functions gives the top 10 Trips, Profits and passenger.

def t32_top10_trips(df_zones):
    return df_zones.orderBy(desc("total_trips")).limit(10)

def t32_top10_profit(df_zones):
    return df_zones.orderBy(desc("average_profitability")).limit(10)

def t32_top10_passenger(df_zones):
    return df_zones.orderBy(desc("total_passengers")).limit(10)


In [22]:
# execute task 3.2
zones = t32_summarise_zones_pairs(graph)

top10_trips     = t32_top10_trips(zones)
top10_profit    = t32_top10_profit(zones)
top10_passenger = t32_top10_passenger(zones)

**Confirming the DataFrames for each function**

In [23]:

top10_trips

DataFrame[zone_name: string, total_trips: bigint, average_profitability: double, total_passengers: double]

In [24]:

top10_profit

DataFrame[zone_name: string, total_trips: bigint, average_profitability: double, total_passengers: double]

In [25]:

top10_passenger

DataFrame[zone_name: string, total_trips: bigint, average_profitability: double, total_passengers: double]

In [26]:
# Displaying results
top10_trips.show()
top10_profit.show()
top10_passenger.show()

+--------------------+-----------+---------------------+----------------+
|           zone_name|total_trips|average_profitability|total_passengers|
+--------------------+-----------+---------------------+----------------+
|Upper East Side S...|     749215|     7.63455343885513|       1037367.0|
|Upper East Side N...|     704371|    8.839806190981518|        975535.0|
|      Midtown Center|     474160|    7.375739609814177|        656996.0|
|Penn Station/Madi...|     464961|    8.155738187542173|        642642.0|
| Lincoln Square East|     453312|    8.789427794102219|        635902.0|
|Upper West Side S...|     451048|    7.447587585125748|        636591.0|
|         Murray Hill|     446683|   7.7410714721550455|        612601.0|
|     Lenox Hill West|     444636|   6.8301007888454315|        614478.0|
|        Midtown East|     424232|    8.541049404209726|        581892.0|
|            Union Sq|     381578|    12.86245091660277|        532751.0|
+--------------------+-----------+----

**Results in Pandas Dataframe**

In [27]:
top10_trips.toPandas()


Unnamed: 0,zone_name,total_trips,average_profitability,total_passengers
0,Upper East Side South,749215,7.634553,1037367.0
1,Upper East Side North,704371,8.839806,975535.0
2,Midtown Center,474160,7.37574,656996.0
3,Penn Station/Madison Sq West,464961,8.155738,642642.0
4,Lincoln Square East,453312,8.789428,635902.0
5,Upper West Side South,451048,7.447588,636591.0
6,Murray Hill,446683,7.741071,612601.0
7,Lenox Hill West,444636,6.830101,614478.0
8,Midtown East,424232,8.541049,581892.0
9,Union Sq,381578,12.862451,532751.0


In [28]:
top10_profit.toPandas()


Unnamed: 0,zone_name,total_trips,average_profitability,total_passengers
0,Mariners Harbor,11,170.001967,3.0
1,Bay Terrace/Fort Totten,284,169.962872,36.0
2,East Flushing,182,116.012159,21.0
3,Rossville/Woodrow,3,111.5625,1.0
4,Newark Airport,29,94.133266,43.0
5,West Brighton,4,87.371203,6.0
6,Jamaica Estates,1451,79.792172,186.0
7,Stapleton,15,75.225748,15.0
8,Bay Ridge,2132,73.858054,289.0
9,Rosedale,256,71.485307,62.0


In [29]:
top10_passenger.toPandas()

Unnamed: 0,zone_name,total_trips,average_profitability,total_passengers
0,Upper East Side South,749215,7.634553,1037367.0
1,Upper East Side North,704371,8.839806,975535.0
2,Midtown Center,474160,7.37574,656996.0
3,Penn Station/Madison Sq West,464961,8.155738,642642.0
4,Upper West Side South,451048,7.447588,636591.0
5,Lincoln Square East,453312,8.789428,635902.0
6,Lenox Hill West,444636,6.830101,614478.0
7,Murray Hill,446683,7.741071,612601.0
8,Midtown East,424232,8.541049,581892.0
9,Union Sq,381578,12.862451,532751.0


## Task 4 - Record the pipeline's execution time

Record the execution time of:

1. the whole pipeline
2. the whole pipeline except task 1.2

on the two tables below, for all dataset sizes: `'S'`, `'M'`, `'L'`, `'XL'`, `'XXL'`.

Analyse the resulting execution times and comment on the effect of dataset size, dataset format and task complexity (with and without task 1.2) on pipeline performance.

**The Soltion For Task 4**

**Evaluating the Execution Time Perfomance of Each Dataset**

Dataset ~ S Only

In [32]:
import time

# Measuring Execution time of Dataset 'S'

SIZE = 'S'
WITH_TASK_12 = True
trips = init_trips(SIZE)

start_time = time.time()  # Start time for measuring execution

# Pipeline with Task 1.2 Enabled
pipeline_results = pipeline(trips, with_task_12=WITH_TASK_12)

# Collect Results
for result in pipeline_results:
    result.collect()
end_time = time.time()  # End time for measuring execution

execution_time_with_task = end_time - start_time #Time Taken

num_records = trips.count()

# Calculating the execution time per million records
execution_time_per_million_with_task = (execution_time_with_task / num_records) * 1_000_000

print(f"The Execution Time With Task 1.2 : {execution_time_with_task:.2f} seconds")
print(f"sec / 1M records: {execution_time_per_million_with_task:.6f} seconds")

# Recording execution time WITHOUT Task 1.2
WITH_TASK_12 = False
trips = init_trips(SIZE)  # Reinitialize trips with the same size

# Measuring execution time
start_time = time.time()
pipeline_results = pipeline(trips, with_task_12=WITH_TASK_12)

# Collecting results
for result in pipeline_results:
    result.collect()
end_time = time.time()
execution_time_without_task = end_time - start_time

# Calculating the execution time per million records
execution_time_per_million_without_task = (execution_time_without_task / num_records) * 1_000_000

print(f"The Execution Time Without Task 1.2: {execution_time_without_task:.2f} seconds")
print(f"sec / 1M records (w/o 1.2): {execution_time_per_million_without_task:.6f} seconds")



    Trips dataset loaded!
    ---
      Size: S
      Tables loaded: ['/content/drive/MyDrive/CSC8101_Data/tripdata_2021_07.parquet']
      Number of trips (dataset rows): 2,898,033
    
The Execution Time With Task 1.2 : 26.93 seconds
sec / 1M records: 9.291648 seconds

    Trips dataset loaded!
    ---
      Size: S
      Tables loaded: ['/content/drive/MyDrive/CSC8101_Data/tripdata_2021_07.parquet']
      Number of trips (dataset rows): 2,898,033
    
The Execution Time Without Task 1.2: 16.03 seconds
sec / 1M records (w/o 1.2): 5.531096 seconds


Dataset ~ M Only

In [None]:
import time

# Measuring Execution time of Dataset 'M'

SIZE = 'M'
WITH_TASK_12 = True
trips = init_trips(SIZE)

start_time = time.time()  # Start time for measuring execution

# Pipeline with Task 1.2 Enabled
pipeline_results = pipeline(trips, with_task_12=WITH_TASK_12)

# Collect Results
for result in pipeline_results:
    result.collect()
end_time = time.time()  # End time for measuring execution

execution_time_with_task = end_time - start_time #Time Taken

num_records = trips.count()

# Calculating the execution time per million records
execution_time_per_million_with_task = (execution_time_with_task / num_records) * 1_000_000

print(f"The Execution Time With Task 1.2 : {execution_time_with_task:.2f} seconds")
print(f"sec / 1M records: {execution_time_per_million_with_task:.6f} seconds")

# Recording execution time WITHOUT Task 1.2
WITH_TASK_12 = False
trips = init_trips(SIZE)  # Reinitialize trips with the same size

# Measuring execution time
start_time = time.time()
pipeline_results = pipeline(trips, with_task_12=WITH_TASK_12)

# Collecting results
for result in pipeline_results:
    result.collect()
end_time = time.time()
execution_time_without_task = end_time - start_time

# Calculating the execution time per million records
execution_time_per_million_without_task = (execution_time_without_task / num_records) * 1_000_000

print(f"The Execution Time Without Task 1.2: {execution_time_without_task:.2f} seconds")
print(f"sec / 1M records (w/o 1.2): {execution_time_per_million_without_task:.6f} seconds")


_Table 1. Pipeline performance for `parquet` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    | 2,898,033 |  000 |  000 |  000 |  000 |
| execution time   (w/o 1.2)  | 16.03 | 0.00 | 0.00 | 0.00 | 0.00 |
| execution time              | 26.93 | 0.00 | 0.00 | 0.00 | 0.00 |
| sec / 1M records (w/o 1.2)  | 5.531096  | 0.0  | 0.0  | 0.0  | 0.0  |
| sec / 1M records            | 9.291648  | 0.0  | 0.0  | 0.0  | 0.0  |