# 1. Data Acquisition & Preparation

## a. Downloading the Data:
The latest monthly Citi Bike dataset is available from the Citi Bike System Data S3 bucket (e.g., https://s3.amazonaws.com/tripdata/index.html). Everymonth two new files are added:
1. For Jersey City -> JC-YYYYMM-citibike-tripdata.csv.zip
2. For NYC -> YYYYMM-citibike-tripdata.zip (From 2013 till 2023 files are anually, from 2024 upwards files are monthly)

In case of January 2025 there is three files availble in the zip (_1,_2,_3) sharing same schema

## b. Ingesting the Data with PySpark:
Start a Spark session and read the CSV file into a DataFrame. We can let Spark infer the schema (or provide an explicit schema for better performance).

In [2]:
import os
import glob
import zipfile
import platform

def extract_all_zips(zip_folder, extract_to):
    """
    Extracts all zip files found in the given folder into the specified extraction directory.

    Parameters:
    zip_folder (str): Path to the folder containing zip files.
    extract_to (str): Path to the directory where the CSV files will be extracted.
    """
    # Normalize paths for cross-platform compatibility
    zip_folder = os.path.abspath(zip_folder)
    extract_to = os.path.abspath(extract_to)

    # Ensure the extraction directory exists.
    os.makedirs(extract_to, exist_ok=True)

    # Find all zip files in the specified folder.
    zip_files = glob.glob(os.path.join(zip_folder, "*.zip"))

    # Iterate over each zip file and extract its contents.
    for zip_file in zip_files:
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
            print(f"Extracted {zip_file} to {extract_to}")

# --- Usage ---

# Set paths for zip folder and extraction directory
current_dir = os.getcwd()
zip_folder_path = os.path.join(current_dir, "raw_data")  # Update as needed
extract_dir = os.path.join(current_dir, "data")          # Folder to hold extracted CSV files

# Ensure paths are formatted correctly for Spark (Windows needs forward slashes)
if platform.system() == "Windows":
    zip_folder_path = zip_folder_path.replace(os.sep, "/")
    extract_dir = extract_dir.replace(os.sep, "/")

# Extract all CSV files from the zip files.
extract_all_zips(zip_folder_path, extract_dir)



Extracted c:\Git\NYCBS\raw_data\202501-citibike-tripdata.zip to c:\Git\NYCBS\data


In [1]:
import os
import platform
from pyspark.sql import SparkSession

# Get the current working directory
current_dir = os.getcwd()

# Ensure paths are correctly formatted for both Windows and Mac/Linux
catalog_path = os.path.join(current_dir, "catalog", "iceberg.db")
db_dir = os.path.dirname(catalog_path)
os.makedirs(db_dir, exist_ok=True)

# Handle JDBC URL formatting for different OS
if platform.system() == "Windows":
    jdbc_url = f"jdbc:sqlite:///{catalog_path.replace(os.sep, '/')}"  # Convert \ to /
    warehouse_url = f"file:///{os.path.join(current_dir, 'dwh').replace(os.sep, '/')}"
else:
    jdbc_url = f"jdbc:sqlite:///{catalog_path}"
    warehouse_url = f"file://{os.path.join(current_dir, 'dwh')}"

# Set the directory where CSV files have been extracted
extract_dir = os.path.join(current_dir, "data")
os.makedirs(extract_dir, exist_ok=True)

# Ensure the "dwh" warehouse directory exists
warehouse_dir = os.path.join(current_dir, "dwh")
os.makedirs(warehouse_dir, exist_ok=True)

# Path for the Iceberg helper JAR
uber_jar_path = os.path.join(current_dir, "spark-iceberg-helper", "target", "spark-iceberg-helper-1.0-SNAPSHOT.jar")
uber_jar_path = uber_jar_path.replace(os.sep, "/")  # Ensure forward slashes for Spark compatibility

# Initialize Spark
spark = SparkSession.builder \
    .appName("SparkIcebergExample") \
    .master("local[1]") \
    .config("spark.jars", uber_jar_path) \
    .config("spark.sql.catalog.local_pc", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local_pc.type", "jdbc") \
    .config("spark.sql.catalog.local_pc.uri", jdbc_url) \
    .config("spark.sql.catalog.local_pc.jdbc.driver", "org.sqlite.JDBC") \
    .config("spark.sql.catalog.local_pc.warehouse", warehouse_url) \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .config("spark.sql.catalog.local_pc.datanucleus.schema.autoCreateTables", "true") \
    .config("spark.sql.legacy.createHiveTableByDefault.enabled", "false") \
    .config("spark.sql.hive.metastore.sharedPrefixes", "") \
    .config("spark.sql.catalog.local_pc.jdbc.connection.url", jdbc_url) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

print("Spark session created")
print("Spark version:", spark.version)
print("Java version:", spark._jvm.System.getProperty("java.version"))

# Print relevant Spark configurations
for key, value in spark.sparkContext.getConf().getAll():
    if "catalog.local_pc" in key:
        print(f"{key}: {value}")

# Read and display the contents of the CSV files
csv_path = os.path.join(extract_dir, "*.csv").replace(os.sep, "/")  # Ensure forward slashes for Spark compatibility
df = spark.read.csv(csv_path, header=True, inferSchema=True)
df.show()

print(f"Number of rows: {df.count()}")


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
import os
from pyspark.sql import SparkSession

current_dir = os.getcwd()

db_dir = os.path.dirname(f"{current_dir}/catalog/iceberg.db")
os.makedirs(db_dir, exist_ok=True)

jdbc_url = f"jdbc:sqlite:///{current_dir}/catalog/iceberg.db"
warehouse_url = f"file://{current_dir}/dwh/"

# Set the directory where CSV files have been extracted.
extract_dir = "data"  # Adjust as needed.
if not os.path.exists(extract_dir):
    os.makedirs(extract_dir)

# Ensure the "dwh" warehouse directory exists.
warehouse_dir = "dwh"
if not os.path.exists(warehouse_dir):
    os.makedirs(warehouse_dir)


#  jar named spark-iceberg-helper-1.0-SNAPSHOT.jar downloaded via maven
uber_jar_path = "spark-iceberg-helper/target/spark-iceberg-helper-1.0-SNAPSHOT.jar"

# Initialize Spark
# spark = SparkSession.builder \
#     .appName("SparkIcebergExample") \
#     .config("spark.jars", uber_jar_path) \
#     .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
#     .config("spark.sql.catalog.local.type", "sql") \
#     .config("spark.sql.catalog.local.warehouse", "file:///Users/aldam/git/NYCBS/dwh") \
#     .config("spark.sql.catalog.local.uri", "sqlite:///Users/aldam/git/NYCBS/catalog/iceberg.db") \
#     .config("spark.sql.catalog.local.datanucleus.schema.autoCreateTables", "true") \
#     .getOrCreate()

#    .config("spark.sql.catalog.local.jdbc.connection.url", jdbc_url) \

spark = SparkSession.builder \
    .appName("SparkIcebergExample") \
    .config("spark.jars", uber_jar_path) \
    .config("spark.sql.catalog.local_pc", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local_pc.type", "jdbc") \
    .config("spark.sql.catalog.local_pc.uri", jdbc_url) \
    .config("spark.sql.catalog.local_pc.jdbc.driver", "org.sqlite.JDBC") \
    .config("spark.sql.catalog.local_pc.warehouse", warehouse_url) \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .config("spark.sql.catalog.local_pc.datanucleus.schema.autoCreateTables", "true") \
    .config("spark.sql.legacy.createHiveTableByDefault.enabled", "false") \
    .config("spark.sql.hive.metastore.sharedPrefixes", "") \
    .config("spark.sql.catalog.local_pc.jdbc.connection.url", jdbc_url) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

# spark = SparkSession.builder \
#     .appName("SparkIcebergExample") \
#     .getOrCreate()

for key, value in spark.sparkContext.getConf().getAll():
    if "catalog.local_pc" in key:
        print(f"{key}: {value}")

# Read and display the contents of the CSV files.
df = spark.read.csv(os.path.join(extract_dir, "*.csv"), header=True, inferSchema=True)
df.show()
print(f"Number of rows: {df.count()}")


# Apache Iceberg
Create a new catalog and a test table using Iceberg and Spark

In [None]:
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField, IntegerType, StringType

import os
current_dir = os.getcwd()

db_path = os.path.join(current_dir, "catalog", "iceberg.db")
warehouse_path = os.path.join(current_dir, "dwh")
tables_path = os.path.join(current_dir, "tables", "example_table")

# Make sure directories exist
os.makedirs(os.path.dirname(db_path), exist_ok=True)
os.makedirs(warehouse_path, exist_ok=True)
os.makedirs(tables_path, exist_ok=True)

# Explicitly format the JDBC URL with 4 slashes, since that's what works
jdbc_url = f"sqlite:///{db_path}"
warehouse_url = f"file://{warehouse_path}"
tables_url = f"file://{tables_path}"

print(f"jdbc_url: {jdbc_url}")
print(f"warehouse_url: {warehouse_url}")
print(f"tables_url: {tables_url}")

# Set the directory where CSV files have been extracted.
catalog_dir = "catalog"  # Adjust as needed.
if not os.path.exists(catalog_dir):
    os.makedirs(catalog_dir)


# Create a sql catalog using a local file system.
catalog = load_catalog(
    name="local_pc",
    uri=jdbc_url,
    type="sql",
    warehouse=warehouse_url
)


# Create the metastore tables if they don't exist.
catalog.create_tables()

# (Optional) Create a namespace if needed.
catalog.create_namespace_if_not_exists("default")

# Define the table schema using the Schema API.
schema = Schema(
    NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
    NestedField(field_id=2, name="data", field_type=StringType(), required=True)
)

# Create the Iceberg table within the 'default' namespace.
catalog.create_table_if_not_exists(
    identifier="default.example_table",  # qualified as namespace.table
    schema=schema,
    location="file:///Users/aldam/git/NYCBS/tables/example_table/"
)

print("Catalog and table created successfully!")

In [None]:
print("Spark version:", spark.version)
print("Java version:", spark._jvm.System.getProperty("java.version"))

In [None]:
for key, value in spark.sparkContext.getConf().getAll():
    if "catalog.local_pc" in key:
        print(f"{key}: {value}")

from pyspark.sql import SparkSession


#spark.sql("USE CATALOG LOCAL_PC")
spark.sql("ALTER SESSION SET CURRENT_CATALOG = local_pc")

spark.sql("CREATE NAMESPACE IF NOT EXISTS default")

# Write DataFrame to Iceberg
df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .save("local_pc.default.example_table_df")

# Read DataFrame from Iceberg
df_read = spark.read \
    .format("iceberg") \
    .load("local_pc.default.example_table_df")

df_read.show()

In [None]:
# Create a database in the Iceberg catalog if it doesn't exist
#spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_catalog.db")

# Create the namespace if it does not exist.
#spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg_catalog.db")
# Switch to the Iceberg catalog's default namespace.
#spark.sql("USE iceberg_catalog.db")
    
# Optionally drop the table if it exists.
#tables = spark.sql("SHOW TABLES").collect()
#if any(row.tableName == "my_iceberg_table" for row in tables):
#    spark.sql("DROP TABLE my_iceberg_table")
    
#Write the DataFrame to an Iceberg table.
#data.writeTo("iceberg_catalog.default.my_iceberg_table") \
#    .create() 

    
#print("Data successfully written to Iceberg table: iceberg_catalog.default.my_iceberg_table")

In [None]:
#display the df schema
df.printSchema()

In [None]:
#describe the df
df.describe().show()

## c. Data Cleaning:
Perform cleaning tasks such as:
-	Dropping rows with missing critical fields (e.g., start/end timestamps).
-	Converting timestamp strings to proper timestamp types.
-	Filtering out trips with negative or zero durations.

# 2. Analysis & Insights

## A. Insurance Coverage Analysis

Counting Trips Over 30 Minutes:
- Filter the DataFrame for trips where trip_duration > 30 minutes and count them.

In [None]:
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql import functions as F

insurance_trip_duration = 30

# Drop rows with missing timestamps
df = df.dropna(subset=["started_at", "ended_at"])

# extract rental_date from started_at
df = df.withColumn("rental_date", to_date(col("started_at")))

# Convert time columns to timestamp and compute trip duration in minutes
df = df.withColumn("start_ts", unix_timestamp("started_at")) \
       .withColumn("end_ts", unix_timestamp("ended_at"))

# Calculate duration (in minutes) and filter out invalid records
df = df.withColumn("trip_duration", (col("end_ts") - col("start_ts")) / 60)
df = df.filter(col("trip_duration") > 0)

#add a column to flag trips that are longer than 30 minutes
df = df.withColumn("insurance_trip", F.when(col("trip_duration") > insurance_trip_duration, 1).otherwise(0))


In [None]:
insurance_trips = df.filter(col("trip_duration") > 30)
num_insurance_trips = insurance_trips.count()
print(f"Trips longer than 30 minutes: {num_insurance_trips}")

## B. Revenue Estimation

Estimating Revenue Impact:
- Assume a charge of $0.20 per trip that exceeds 30 minutes. Multiply the count by $0.20.

In [None]:
revenue = num_insurance_trips * 0.20
print(f"Estimated revenue from over-30-min rides: ${revenue:.2f}")

## C. Travel Distance Analysis

1. Calculating the Distance (Haversine Formula):
- As the dataset includes starting and ending coordinates (e.g., start station latitude, start station longitude, end station latitude, end station longitude), we can compute the great-circle distance using the haversine formula. 
- Here’s an example using Spark SQL functions:

In [None]:
import pyspark.sql.functions as F

# Earth's radius in kilometers
EARTH_RADIUS = 6371.0

# Convert coordinates and calculate haversine 'a' factor
df = df.withColumn("start_lat_rad", F.radians(col("start_lat"))) \
       .withColumn("start_lon_rad", F.radians(col("start_lng"))) \
       .withColumn("end_lat_rad", F.radians(col("end_lat"))) \
       .withColumn("end_lon_rad", F.radians(col("end_lng")))

df = df.withColumn("dlat", col("end_lat_rad") - col("start_lat_rad")) \
       .withColumn("dlon", col("end_lon_rad") - col("start_lon_rad"))

# Compute haversine distance in km
df = df.withColumn("a", F.pow(F.sin(col("dlat")/2), 2) + 
                   F.cos(col("start_lat_rad")) * F.cos(col("end_lat_rad")) * 
                   F.pow(F.sin(col("dlon")/2), 2))
df = df.withColumn("distance_km", 2 * EARTH_RADIUS * F.asin(F.sqrt(col("a"))))

2. Classifying Trips into Distance Buckets:
Create a new column to assign each trip into one of the specified buckets.

In [None]:
df = df.withColumn("distance_bucket",
    F.when(col("distance_km") <= 1, "0-1 km")
     .when((col("distance_km") > 1) & (col("distance_km") <= 4), "2-4 km")
     .when((col("distance_km") > 4) & (col("distance_km") <= 9), "4-9 km")
     .otherwise("10+ km")
)

3. Visualization:
Convert the aggregated data (e.g., count per distance bucket) to Pandas and use matplotlib or seaborn to plot the distribution.

In [None]:
# Aggregate counts by bucket and convert to Pandas DataFrame
bucket_distribution = df.groupBy("distance_bucket").count().toPandas()

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(8,6))
ax = sns.barplot(
    x="distance_bucket",
    y="count",
    data=bucket_distribution,
    palette="viridis",
    order=["0-1 km", "2-4 km", "4-9 km", "10+ km"]  # Specify your custom order here
    )
ax.set_title("Trip Distance Distribution")
ax.set_xlabel("Distance Bucket")
ax.set_ylabel("Number of Trips")

# Example: set y-ticks at 0, 500k, 1M, 1.5M, etc.
ax.set_yticks([0, 500_000, 1_000_000, 1_500_000])
ax.set_yticklabels(["0", "500K", "1M", "1.5M"])

plt.tight_layout()
plt.show()