# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm 

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

## Load the dataset.

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("YourAppName").getOrCreate()

file_path = 'green_tripdata_2019-01.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)


### Preview first 20 rows.

In [6]:
df.show(20)


+--------------------+--------------------+---------------------+------------------+-------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|              Vendor|lpep pickup datetime|lpep dropoff datetime|store and fwd flag|    Rate type|         PU Location|         DO Location|passenger count|trip distance|fare amount|extra|mta tax|tip amount|tolls amount|ehail fee|improvement surcharge|total amount|payment type|  trip type|congestion surcharge|
+--------------------+--------------------+---------------------+------------------+-------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|       VeriFone Inc.| 2018-12-21 15:17:29|  2018-12-21 15:18:57

### How many partitions is this dataframe split into?

In [7]:
num_partitions = df.rdd.getNumPartitions()
print("Number of partitions:", num_partitions)


Number of partitions: 20


## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [8]:
for col in df.columns:
    df = df.withColumnRenamed(col, col.replace(' ', '_').lower())

In [9]:
df.show(1)

+-------------+--------------------+---------------------+------------------+-------------+-----------+-----------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|       vendor|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|    rate_type|pu_location|do_location|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|  trip_type|congestion_surcharge|
+-------------+--------------------+---------------------+------------------+-------------+-----------+-----------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|VeriFone Inc.| 2018-12-21 15:17:29|  2018-12-21 15:18:57|                 N|Standard rate| Unknown,NV| Unknown,NV|            5.0|        

### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [10]:
df = df.dropDuplicates()

### check that there is are no duplicates

In [11]:
# Count the number of duplicate rows in the DataFrame
num_duplicates = df.count() - df.dropDuplicates().count()

# Check if there are no duplicates
if num_duplicates == 0:
    print("No duplicates found.")
else:
    print(f"Found {num_duplicates} duplicate rows.")


No duplicates found.


### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


def detect_missing(df):
    # Calculate the percentage of missing values for each column
    total_rows = df.count()
    missing_percentage = {}

    for col in df.columns:
        missing_count = df.filter(F.col(col).isNull()).count()
        percentage = (missing_count / total_rows) * 100
        missing_percentage[col] = percentage

    return missing_percentage


### Prinout the missing info

In [18]:


# Call the function with your DataFrame
missing_info = detect_missing(df)

# Print or use the missing information as needed
print("Missing Information:")
for col, percentage in missing_info.items():
    print(f"{col}: {percentage:.2f}% missing")


Missing Information:
vendor: 0.00% missing
lpep_pickup_datetime: 0.00% missing
lpep_dropoff_datetime: 0.00% missing
store_and_fwd_flag: 6.19% missing
rate_type: 0.00% missing
pu_location: 0.00% missing
do_location: 0.00% missing
passenger_count: 6.40% missing
trip_distance: 0.00% missing
fare_amount: 0.00% missing
extra: 54.78% missing
mta_tax: 0.00% missing
tip_amount: 0.00% missing
tolls_amount: 0.00% missing
ehail_fee: 99.95% missing
improvement_surcharge: 0.00% missing
total_amount: 0.00% missing
payment_type: 2.39% missing
trip_type: 0.00% missing
congestion_surcharge: 87.42% missing


### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [23]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, StringType

def handle_missing(df):
    # Replace missing values for numerical columns with 0
    for col in df.columns:
        if isinstance(df.schema[col].dataType, (IntegerType, DoubleType)):
            df = df.withColumn(col, F.when(F.col(col).isNull(), 0).otherwise(F.col(col)))

        # Replace missing values for string columns with 'Unknown'
        elif isinstance(df.schema[col].dataType, StringType):
            df = df.withColumn(col, F.when(F.col(col).isNull(), 'Unknown').otherwise(F.col(col)))

    return df


In [26]:
df_missing_handled = handle_missing(df)

In [25]:
df_missing_handled.show(10)

+--------------------+--------------------+---------------------+------------------+-------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|              vendor|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|    rate_type|         pu_location|         do_location|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|  trip_type|congestion_surcharge|
+--------------------+--------------------+---------------------+------------------+-------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|       VeriFone Inc.| 2019-01-01 00:34:39|  2019-01-01 01:06:30

### check that there are no missing values

In [27]:

# Call the function with your DataFrame
df_missing_handled_info = detect_missing(df_missing_handled)

# Print or use the missing information as needed
print("Missing Information:")
for col, percentage in df_missing_handled_info.items():
    print(f"{col}: {percentage:.2f}% missing")

Missing Information:
vendor: 0.00% missing
lpep_pickup_datetime: 0.00% missing
lpep_dropoff_datetime: 0.00% missing
store_and_fwd_flag: 0.00% missing
rate_type: 0.00% missing
pu_location: 0.00% missing
do_location: 0.00% missing
passenger_count: 0.00% missing
trip_distance: 0.00% missing
fare_amount: 0.00% missing
extra: 0.00% missing
mta_tax: 0.00% missing
tip_amount: 0.00% missing
tolls_amount: 0.00% missing
ehail_fee: 0.00% missing
improvement_surcharge: 0.00% missing
total_amount: 0.00% missing
payment_type: 0.00% missing
trip_type: 0.00% missing
congestion_surcharge: 0.00% missing


In [29]:
df_missing_handled.show(100)


+--------------------+--------------------+---------------------+------------------+---------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|              vendor|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|      rate_type|         pu_location|         do_location|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|  trip_type|congestion_surcharge|
+--------------------+--------------------+---------------------+------------------+---------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|       VeriFone Inc.| 2019-01-01 00:34:39|  2019-01-01 01

In [34]:
df_missing_handled.show(n=5000, truncate=False)


+---------------------------------+--------------------+---------------------+------------------+---------------------+------------------------------------------+-----------------------------------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+
|vendor                           |lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|rate_type            |pu_location                               |do_location                                    |passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type  |congestion_surcharge|
+---------------------------------+--------------------+---------------------+------------------+---------------------+------------------------------------------+-----------------------------------------------+---------------+----------

## Feature engineering - 
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...) 

In [58]:
from pyspark.sql import functions as F

def add_trip_features(df):
   # Calculate trip duration in seconds
    df = df.withColumn('trip_duration', F.unix_timestamp('lpep_dropoff_datetime') - F.unix_timestamp('lpep_pickup_datetime'))

    # Check if the trip occurred on a weekend (Saturday or Sunday)
    df = df.withColumn('is_weekend', F.dayofweek(F.col('lpep_pickup_datetime')).isin([1, 7]))

    # Calculate the week number relevant to the month
    df = df.withColumn('day_of_month', F.dayofmonth(F.col('lpep_pickup_datetime')))
    df = df.withColumn('week_number_in_month', ((F.col('day_of_month') - 1) / 7 + 1).cast('int'))

    return df

# Call the function with your DataFrame
df_with_features = add_trip_features(df_missing_handled)

# Show the updated DataFrame
df_with_features.show(100)


+--------------------+--------------------+---------------------+------------------+---------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+-------------+----------+------------+--------------------+
|              vendor|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|      rate_type|         pu_location|         do_location|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|  trip_type|congestion_surcharge|trip_duration|is_weekend|day_of_month|week_number_in_month|
+--------------------+--------------------+---------------------+------------------+---------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+

In [59]:
# Assuming df_with_features is your PySpark DataFrame with the added features

# Define the output CSV file path
output_path = "file3.csv"

# Write the DataFrame to CSV
df_with_features.write.csv(output_path, header=True, mode="overwrite")

# If you want to merge the result into a single CSV file, you can use the following command
import os
os.system("cat {}/*.csv > {}".format(output_path, output_path))


512

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added). 

In [70]:
# Selecting the specified features
selected_features = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_duration', 'is_weekend', 'week_number_in_month', 'day_of_month']

# Selecting and showing the first 20 rows with the chosen features
df_with_features.select(selected_features).show(20, truncate=False)

+--------------------+---------------------+-------------+----------+--------------------+------------+
|lpep_pickup_datetime|lpep_dropoff_datetime|trip_duration|is_weekend|week_number_in_month|day_of_month|
+--------------------+---------------------+-------------+----------+--------------------+------------+
|2019-01-01 00:34:39 |2019-01-01 01:06:30  |1911         |false     |1                   |1           |
|2019-01-01 00:28:03 |2019-01-01 00:36:24  |501          |false     |1                   |1           |
|2019-01-01 00:36:32 |2019-01-01 01:07:24  |1852         |false     |1                   |1           |
|2019-01-01 00:39:29 |2019-01-01 00:49:42  |613          |false     |1                   |1           |
|2019-01-01 01:21:52 |2019-01-01 01:26:42  |290          |false     |1                   |1           |
|2019-01-01 01:25:50 |2019-01-01 01:43:51  |1081         |false     |1                   |1           |
|2019-01-01 01:56:36 |2019-01-01 02:11:44  |908          |false 

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer) 

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type 

In [62]:
# Assuming your DataFrame is named df_with_features

# Register the DataFrame as a temporary SQL table
df_with_features.createOrReplaceTempView("trips")

# Run SQL query to find the average fare amount per payment type
result = spark.sql("""
    SELECT payment_type, AVG(fare_amount) AS average_fare
    FROM trips
    GROUP BY payment_type
""")

# Show the result
result.show()


+------------+------------------+
|payment_type|      average_fare|
+------------+------------------+
|     Unknown| 34.11291563296958|
|        Cash|10.157588838303267|
|     Dispute|1.6930769230769231|
|      Uknown| 6.340909090909091|
|   No charge|0.7256146496815284|
| Credit card|16.349304153170777|
+------------+------------------+



The average fare amounts vary across different payment types. Notably, trips paid with credit cards tend to have a higher average fare compared to cash payments, while "No charge" trips have the lowest average fare. The presence of payment types like "Unknown," "Dispute," and "Uknown" (a typo for "Unknown") suggests the need for data cleaning or further investigation into payment type categorization.

### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [64]:
# Register the DataFrame as a temporary SQL table
df_with_features.createOrReplaceTempView("taxi_data")

# Calculate the average trip duration for weekends and weekdays
avg_duration_by_day_type = spark.sql("""
    SELECT
        CASE WHEN is_weekend = 1 THEN 'Weekend' ELSE 'Weekday' END AS day_type,
        AVG(trip_duration) AS average_trip_duration
    FROM taxi_data
    GROUP BY day_type
""")

# Show the result
avg_duration_by_day_type.show()


+--------+---------------------+
|day_type|average_trip_duration|
+--------+---------------------+
| Weekday|     1381.88927495901|
| Weekend|   1264.8554613466333|
+--------+---------------------+



weekdays have longer trips

### 3 - which day recorded the most trips?

In [65]:
# Calculate the number of trips recorded for each day
trips_by_day = spark.sql("""
    SELECT
        DATE(lpep_pickup_datetime) AS pickup_date,
        COUNT(*) AS trip_count
    FROM taxi_data
    GROUP BY pickup_date
    ORDER BY trip_count DESC
    LIMIT 1
""")

# Show the result
trips_by_day.show()


+-----------+----------+
|pickup_date|trip_count|
+-----------+----------+
| 2019-01-25|     26062|
+-----------+----------+



Maybe they were celebrating the Egyptian revolution in New York :-D

### 4- What is the average "total amount" of trips with more than 2 passengers?

In [67]:
# Calculate the average "total amount" for trips with more than 2 passengers
average_total_amount = spark.sql("""
    SELECT
        AVG(total_amount) AS average_total_amount
    FROM taxi_data
    WHERE passenger_count > 2
""")

# Show the result
average_total_amount.show()


+--------------------+
|average_total_amount|
+--------------------+
|  15.393976165456667|
+--------------------+



### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1.?

In [68]:
# Calculate the average tip amount for trips with multiple passengers
average_tip_multiple_passengers = spark.sql("""
    SELECT
        AVG(tip_amount) AS average_tip_multiple_passengers
    FROM taxi_data
    WHERE passenger_count > 1
""")

# Calculate the average tip amount for trips with only one passenger
average_tip_single_passenger = spark.sql("""
    SELECT
        AVG(tip_amount) AS average_tip_single_passenger
    FROM taxi_data
    WHERE passenger_count = 1
""")

# Show the results
average_tip_multiple_passengers.show()
average_tip_single_passenger.show()


+-------------------------------+
|average_tip_multiple_passengers|
+-------------------------------+
|             0.9256355219343535|
+-------------------------------+

+----------------------------+
|average_tip_single_passenger|
+----------------------------+
|          0.8535721035442518|
+----------------------------+



Multiple Passengers --> More $$

### 6- What is the most frequent route on the weekend. 

In [74]:

# Register the updated DataFrame as a SQL temporary view
df_with_features.createOrReplaceTempView("taxi_data_with_features_view")

# Write SQL query to calculate the most frequent route on the weekend
query = """
    SELECT pu_location, do_location, COUNT(*) as count
    FROM taxi_data_with_features_view
    WHERE is_weekend
    GROUP BY pu_location, do_location
    ORDER BY count DESC
    LIMIT 1
"""

# Execute the query
most_frequent_route_weekend = spark.sql(query)

# Show the result
most_frequent_route_weekend.show()


+--------------+--------------+-----+
|   pu_location|   do_location|count|
+--------------+--------------+-----+
|Queens,Astoria|Queens,Astoria| 2032|
+--------------+--------------+-----+



## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

In [84]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
import pandas as pd


# List of categorical features to encode
categorical_features = ['store_and_fwd_flag', 'rate_type', 'payment_type', 'trip_type']

# Create a dictionary to store label encoders
label_encoders = {}

# Create a DataFrame to store label encodings
lookup_table_data = []

# Iterate through each categorical feature and encode it
for feature in categorical_features:
    # Check if the output column already exists
    output_col = f"{feature}_encoded"
    if output_col not in df_with_features.columns:
        # Create a StringIndexer model
        indexer = StringIndexer(inputCol=feature, outputCol=output_col, handleInvalid="skip")

        # Fit and transform the data
        df_with_features = indexer.fit(df_with_features).transform(df_with_features)

        # Extract distinct values and their corresponding indices
        encoding_df = df_with_features.select([output_col, feature]).distinct()

        # Convert the encoding DataFrame to Pandas
        encoding_pd = encoding_df.toPandas()

        # Convert Pandas DataFrame to dictionary
        encoding_dict = dict(zip(encoding_pd[feature], encoding_pd[output_col]))

        # Store the label encoder and encoding in the dictionaries
        label_encoders[feature] = indexer
        lookup_table_data.append((feature, encoding_dict))
    else:
        print(f"Output column {output_col} already exists.")

# Convert lookup_table_data to a Pandas DataFrame
lookup_table_df = pd.DataFrame(lookup_table_data, columns=['Feature', 'LookupTable'])

# Display the lookup table
print("Lookup Table:")
print(lookup_table_df)

# Display the DataFrame with label-encoded features
df_with_features.show()


Lookup Table:
              Feature                                        LookupTable
0  store_and_fwd_flag               {'N': 0.0, 'Y': 2.0, 'Unknown': 1.0}
1           rate_type  {'Nassau or Westchester': 4.0, 'JFK': 3.0, 'Ne...
2        payment_type  {'No charge': 3.0, 'Cash': 1.0, 'Dispute': 4.0...
3           trip_type  {'Street-hail': 0.0, 'Dispatch': 1.0, 'Unknown...
+--------------------+--------------------+---------------------+------------------+-------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+-----------+--------------------+-------------+----------+------------+--------------------+--------------------------+-----------------+--------------------+-----------------+
|              vendor|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|    rate_type|         pu_location|         do_location|passenger_count|trip_dista

In [88]:
# List of columns to be removed
columns_to_remove = ['store_and_fwd_flag', 'rate_type', 'payment_type', 'trip_type']

# Remove the specified columns
df_with_features = df_with_features.drop(*columns_to_remove)

# Display the first 20 rows of the updated DataFrame
df_with_features.show(20)


+--------------------+--------------------+---------------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+--------------------+-------------+----------+------------+--------------------+--------------------------+-----------------+--------------------+-----------------+
|              vendor|lpep_pickup_datetime|lpep_dropoff_datetime|         pu_location|         do_location|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|congestion_surcharge|trip_duration|is_weekend|day_of_month|week_number_in_month|store_and_fwd_flag_encoded|rate_type_encoded|payment_type_encoded|trip_type_encoded|
+--------------------+--------------------+---------------------+--------------------+--------------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------

DataFrame exported to lookup.csv


### Preview first 20 rows of the label encoded features

In [90]:
# Display the first 20 rows of the DataFrame with label-encoded features
df_with_features.select(['store_and_fwd_flag_encoded', 'rate_type_encoded', 'payment_type_encoded', 'trip_type_encoded']).show()


+--------------------------+-----------------+--------------------+-----------------+
|store_and_fwd_flag_encoded|rate_type_encoded|payment_type_encoded|trip_type_encoded|
+--------------------------+-----------------+--------------------+-----------------+
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|                 1.0|              0.0|
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|                 0.0|              0.0|
|                       0.0|              0.0|        

### Preview first 20 rows of your lookup table

In [92]:
lookup_table_df.head(20)

Unnamed: 0,Feature,LookupTable
0,store_and_fwd_flag,"{'N': 0.0, 'Y': 2.0, 'Unknown': 1.0}"
1,rate_type,"{'Nassau or Westchester': 4.0, 'JFK': 3.0, 'Ne..."
2,payment_type,"{'No charge': 3.0, 'Cash': 1.0, 'Dispute': 4.0..."
3,trip_type,"{'Street-hail': 0.0, 'Dispatch': 1.0, 'Unknown..."


### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [94]:
# Save the cleaned PySpark DataFrame to a Parquet file
df_with_features.write.parquet("green_tripdata_2019-01_cleaned.parquet", mode="overwrite")

# Save the lookup table to a CSV file
lookup_table_df.to_csv("lookup_table.csv", index=False)


## Bonus - Load the cleaned parquet file and lookup table into a Postgres database. 

Note that if you decide to do the bonus, you must include not only your notebook but the docker-compose.yaml file aswell.

### Screenshot of the table existing in the database and a simple query such as `select count(*) from table_name` or `select * from table_name limit 10`

(You can just copy paste the screenshots in the markdown cells below)