#### Create Taxi Cab Dataset
This notebook contains code to download the [Yellow](https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=azureml-opendatasets), [Green](https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-green?tabs=azureml-opendatasets), and [For-Hire](https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-for-hire-vehicle?tabs=azureml-opendatasets) Taxi Cab datasets from Azure Open Datasets, joins them into a single table and saves to a Lakehouse table named `nyc_taxi`.

In [None]:
# Import necessary libraries
from pyspark.sql.functions import col

# Define Azure Blob Storage access information
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"

# Define paths for the datasets
yellow_taxi_path = f'wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/yellow'
green_taxi_path = f'wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/green'
for_hire_taxi_path = f'wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/fhv'

# Read the datasets into DataFrames
yellow_taxi_df = spark.read.parquet(yellow_taxi_path)
green_taxi_df = spark.read.parquet(green_taxi_path)
for_hire_taxi_df = spark.read.parquet(for_hire_taxi_path)

# Standardize column names to ensure all DataFrames have the same schema

# Yellow Taxi DataFrame
yellow_taxi_df = yellow_taxi_df.withColumnRenamed("tpepPickupDateTime", "pickup_datetime") \
                               .withColumnRenamed("tpepDropoffDateTime", "dropoff_datetime") \
                               .withColumnRenamed("tripDistance", "trip_distance") \
                               .withColumnRenamed("puLocationId", "pu_location_id") \
                               .withColumnRenamed("doLocationId", "do_location_id") \
                               .withColumnRenamed("rateCodeId", "rate_code_id") \
                               .withColumnRenamed("storeAndFwdFlag", "store_and_fwd_flag") \
                               .withColumnRenamed("paymentType", "payment_type") \
                               .withColumnRenamed("fareAmount", "fare_amount") \
                               .withColumnRenamed("mtaTax", "mta_tax") \
                               .withColumnRenamed("improvementSurcharge", "improvement_surcharge") \
                               .withColumnRenamed("tipAmount", "tip_amount") \
                               .withColumnRenamed("tollsAmount", "tolls_amount") \
                               .withColumnRenamed("totalAmount", "total_amount") \
                               .withColumnRenamed("vendorID", "vendor_id") \
                               .withColumnRenamed("passengerCount", "passenger_count") \
                               .withColumnRenamed("startLon", "pickup_longitude") \
                               .withColumnRenamed("startLat", "pickup_latitude") \
                               .withColumnRenamed("endLon", "dropoff_longitude") \
                               .withColumnRenamed("endLat", "dropoff_latitude")

# Green Taxi DataFrame
green_taxi_df = green_taxi_df.withColumnRenamed("lpepPickupDatetime", "pickup_datetime") \
                             .withColumnRenamed("lpepDropoffDatetime", "dropoff_datetime") \
                             .withColumnRenamed("tripDistance", "trip_distance") \
                             .withColumnRenamed("puLocationId", "pu_location_id") \
                             .withColumnRenamed("doLocationId", "do_location_id") \
                             .withColumnRenamed("RatecodeID", "rate_code_id") \
                             .withColumnRenamed("storeAndFwdFlag", "store_and_fwd_flag") \
                             .withColumnRenamed("paymentType", "payment_type") \
                             .withColumnRenamed("fareAmount", "fare_amount") \
                             .withColumnRenamed("mtaTax", "mta_tax") \
                             .withColumnRenamed("improvementSurcharge", "improvement_surcharge") \
                             .withColumnRenamed("tipAmount", "tip_amount") \
                             .withColumnRenamed("tollsAmount", "tolls_amount") \
                             .withColumnRenamed("totalAmount", "total_amount") \
                             .withColumnRenamed("VendorID", "vendor_id") \
                             .withColumnRenamed("passengerCount", "passenger_count") \
                             .withColumnRenamed("pickupLongitude", "pickup_longitude") \
                             .withColumnRenamed("pickupLatitude", "pickup_latitude") \
                             .withColumnRenamed("dropoffLongitude", "dropoff_longitude") \
                             .withColumnRenamed("dropoffLatitude", "dropoff_latitude") \
                             .withColumnRenamed("tripType", "trip_type")

# For Hire Vehicle DataFrame
for_hire_taxi_df = for_hire_taxi_df.withColumnRenamed("pickupDateTime", "pickup_datetime") \
                                   .withColumnRenamed("dropOffDateTime", "dropoff_datetime") \
                                   .withColumnRenamed("puLocationId", "pu_location_id") \
                                   .withColumnRenamed("doLocationId", "do_location_id") \
                                   .withColumnRenamed("dispatchBaseNum", "dispatch_base_num") \
                                   .withColumnRenamed("srFlag", "sr_flag") \
                                   .withColumn("vendor_id", col("dispatch_base_num")) \
                                   .withColumn("trip_distance", col("sr_flag")) \
                                   .withColumn("rate_code_id", col("sr_flag").cast("int")) \
                                   .withColumn("store_and_fwd_flag", col("sr_flag").cast("string")) \
                                   .withColumn("payment_type", col("sr_flag").cast("int")) \
                                   .withColumn("fare_amount", col("sr_flag").cast("double")) \
                                   .withColumn("mta_tax", col("sr_flag").cast("double")) \
                                   .withColumn("improvement_surcharge", col("sr_flag").cast("double")) \
                                   .withColumn("tip_amount", col("sr_flag").cast("double")) \
                                   .withColumn("tolls_amount", col("sr_flag").cast("double")) \
                                   .withColumn("total_amount", col("sr_flag").cast("double")) \
                                   .withColumn("passenger_count", col("sr_flag").cast("int"))

# Combine the DataFrames
combined_taxi_df = yellow_taxi_df.unionByName(green_taxi_df, allowMissingColumns=True).unionByName(for_hire_taxi_df, allowMissingColumns=True)

# Show the schema and a few rows of the combined DataFrame
combined_taxi_df.printSchema()
display(combined_taxi_df.limit(5))

# Write the combined DataFrame to a Delta table in the lakehouse
combined_taxi_df.write.format("delta").mode("overwrite").saveAsTable("nyc_taxi")

Confirm that the `nyc_taxi` table is using V-Order compression. Thanks to `Sandeep Pawar` for [this approach](https://fabric.guru/checking-if-delta-table-in-fabric-is-v-order-optimized).

In [None]:
def check_vorder(table_name_path):
    '''
    Author: Sandeep Pawar | fabric.guru |  Jun 6, 2023

    Provide table_name_path as '//lakehouse/default/Tables/<table_name>'
    If the Delta table is V-ordered, returns true; otherwise, false.

    You must first mount the lakehouse to use the local filesystem API.
    '''
    import os 
    import pyarrow.dataset as ds

    if not os.path.exists(table_name_path):
        print(f'{os.path.basename(table_name_path)} does not exist')
        return None

    schema = ds.dataset(table_name_path).schema.metadata
    is_vorder = any(b'vorder' in key for key in schema.keys())
    if is_vorder:
        result = str(schema[b'com.microsoft.parquet.vorder.enabled'])
    else:
        result = "Table is not V-ordered"

    return result

# Path to your Delta table
table_name_path = '//lakehouse/default/Tables/nyc_taxi'

# Check if the table is V-ordered
vorder_status = check_vorder(table_name_path)
print(vorder_status)

Validate the results by checking the first 5 results.

In [None]:
%%sql
select * from nyc_taxi limit 5