### 1. Import Libraries

In [1]:
import os
import sys
import pandas as pd
import matplotlib.pyplot as plt
from timeit import default_timer as timer
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

### 2. Confirm Python Executable

In [2]:
sys.executable

'd:\\CampusX\\DSMP\\Yellow-Taxi-Trip-Records-Data-Analysis\\.venv\\Scripts\\python.exe'

### 3. Import the Data

In [3]:
project_dir = os.getcwd()
project_dir

'd:\\CampusX\\DSMP\\Yellow-Taxi-Trip-Records-Data-Analysis'

In [4]:
data_dir = os.path.join(project_dir, "Data")
data_dir

'd:\\CampusX\\DSMP\\Yellow-Taxi-Trip-Records-Data-Analysis\\Data'

In [5]:
data_file = os.path.join(data_dir, "yellow_tripdata_2025-06.parquet")
data_file

'd:\\CampusX\\DSMP\\Yellow-Taxi-Trip-Records-Data-Analysis\\Data\\yellow_tripdata_2025-06.parquet'

In [29]:
df = pd.read_parquet(data_file)
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-06-01 00:02:50,2025-06-01 00:39:51,1.0,10.00,1.0,N,138,50,1,47.80,11.00,0.5,20.15,6.94,1.0,87.39,2.5,1.75,0.75
1,2,2025-06-01 00:11:27,2025-06-01 00:35:35,1.0,3.93,1.0,N,158,237,1,24.70,1.00,0.5,6.09,0.00,1.0,36.54,2.5,0.00,0.75
2,1,2025-06-01 00:43:47,2025-06-01 00:49:16,0.0,0.70,1.0,N,230,163,1,7.20,4.25,0.5,2.59,0.00,1.0,15.54,2.5,0.00,0.75
3,1,2025-06-01 00:01:15,2025-06-01 00:42:16,1.0,17.00,2.0,N,132,232,1,70.00,3.25,0.5,5.00,0.00,1.0,79.75,2.5,0.00,0.75
4,7,2025-06-01 00:16:32,2025-06-01 00:16:32,1.0,2.22,1.0,N,48,234,1,20.50,0.00,0.5,5.25,0.00,1.0,31.50,2.5,0.00,0.75
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4322955,2,2025-06-30 23:43:46,2025-06-30 23:48:53,,1.03,,,141,262,0,10.08,0.00,0.5,0.00,0.00,1.0,14.08,,,0.00
4322956,2,2025-06-30 23:19:59,2025-06-30 23:32:47,,3.34,,,129,70,0,12.23,0.00,0.5,0.00,0.00,1.0,13.73,,,0.00
4322957,2,2025-06-30 23:35:59,2025-06-30 23:51:56,,3.79,,,166,243,0,16.95,0.00,0.5,0.00,0.00,1.0,18.45,,,0.00
4322958,2,2025-06-30 23:00:28,2025-06-30 23:15:01,,3.85,,,238,42,0,18.40,0.00,0.5,0.00,0.00,1.0,22.40,,,0.00


### 4. Summarize the Data

In [7]:
df.shape

(4322960, 20)

In [8]:
df.dtypes

VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
Airport_fee                     float64
cbd_congestion_fee              float64
dtype: object

In [9]:
# meta-data of dataset
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4322960 entries, 0 to 4322959
Data columns (total 20 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int32         
 8   DOLocationID           int32         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  Airport_fee           

In [30]:
# missing values
na_counts = df.isna().sum()

na_pct = df.isna().sum().div(df.shape[0]).mul(100).round(2)

na_df = (
	pd
	.concat([na_counts, na_pct], axis=1)
	.set_axis(["count", "pct"], axis=1)
	.query("count > 0")
	.sort_values(by="count", ascending=False)
)
na_df

Unnamed: 0,count,pct
passenger_count,1212946,28.06
RatecodeID,1212946,28.06
store_and_fwd_flag,1212946,28.06
congestion_surcharge,1212946,28.06
Airport_fee,1212946,28.06


In [11]:
# duplicate values
df.duplicated().sum()

np.int64(0)

### 5. Load Data into MongoDB

In [31]:
# establish connection to MongoDB local server
try:
	client = MongoClient('mongodb://localhost:27017/')
	db = client['taxis']
except ConnectionFailure as e:
	print(f"Couldn't connect to MongoDB: {e}")
else:
	print("Successfully connected to MongoDB!")
	collection = db['taxi_data']

Successfully connected to MongoDB!


In [None]:
# convert Parquet to JSON format
data_json = df.to_dict(orient='records')

In [None]:
# insert data into collection
collection.insert_many(data_json)

In [13]:
import findspark
findspark.init()

In [15]:
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder \
    .appName("VSCodeSparkDemo") \
    .master("local[*]") \
    .getOrCreate()

In [16]:
# Check Spark version
print("Spark Version:", spark.version)

Spark Version: 3.5.1


In [18]:
# Read the Parquet file
df1 = spark.read.parquet(data_file)