## Citi Bike NYC Data Engineering Project
--------------------------------------------------

### Project Overview

This project focuses on building a clean, reproducible data pipeline for Citi Bike rental data. The objective is to take raw trip-level and weather datasets, apply systematic data cleaning and normalization steps, and load the resulting tables into a relational SQLite database with analytical views.

- The notebook is designed with a data-engineering mindset rather than exploratory analysis.
- All transformations are intentional and aligned with a predefined SQL schema.

### Key goals of this project:

- Clean and standardize raw trip and weather data
- Normalize entities into dimension and fact tables
- Enforce schema consistency between pandas and SQLite
- Load validated data into a relational database for downstream analysis

##### Imports and Configurations

In [None]:
import pandas as pd
import sqlite3
from glob import glob
from pathlib import Path

pd.set_option("display.max_columns", None)

DB_PATH = Path("./db/citibike.db")
RIDE_Files = glob("./data/JC-2016*.csv")
WEATHER_FILE_PATH = Path("./data/newark_airport_2016.csv")

##### Load Raw Data

In [None]:
ride = pd.concat((pd.read_csv(file) for file in RIDE_Files), ignore_index=True)
weather = pd.read_csv(WEATHER_FILE_PATH)

##### Initial Inspection

In [None]:
ride.info()

In [None]:
ride.head()

In [None]:
ride.isna().sum()

In [None]:
weather.info()

In [None]:
weather.head()

In [None]:
weather.isna().sum()

##### Data Cleaning

##### Rename Columns (For Standardization Purposes)

In [None]:
ride.columns = (
    ride.columns
    .str.strip()
    .str.replace(" ", "_")
    .str.lower()
)

old_columns = weather.columns
new_columns = ["weather_station_id", "date", "avg_daily_wind_speed", "peak_gust_time", \
            "precipitation", "snowfall", "snow_depth", "avg_temp", "max_temp",\
            "min_temp", "daily_total_sushine", "fastest_2min_wind_dir", \
            "fastest_5min_wind_dir", "fastest_2min_wind_speed", \
            "fastest_5min_wind_speed"]

weather.rename(columns=dict(zip(old_columns, new_columns)))

##### Data Type Conversion

In [None]:
ride["start_time"] = pd.to_datetime(ride["start_time"], errors="coerce")
ride["stop_time"] = pd.to_datetime(ride["stop_time"], errors="coerce")
ride["trip_duration"] = pd.to_numeric(ride["trip_duration"], errors="coerce")
ride["birth_year"] = pd.to_numeric(ride["birth_year"], errors="coerce")\
    .astype("Int64")

weather["date"] = pd.to_datetime(weather["date"])

##### Drop Null

In [None]:
# Drop rows missing essential trip identifiers
ride = ride.dropna(subset=[
    "start_time",
    "start_station_id",
    "bike_id"
])

In [None]:
# Drop null columns in the weather dataframe
weather = weather.drop(columns=["peak_gust_time", "daily_total_sushine"], axis=1)

##### Standardize Categorical Text

In [None]:
# Lowercase and remove extra space if any
ride["user_type"] = ride["user_type"].str.lower().str.strip()
ride["gender"] = ride["gender"].astype(str).str.lower().str.strip()

##### Extract Stations' Data

In [None]:
start_stations = ride[[
    "start_station_id",
    "start_station_name",
    "start_station_longitude",
    "start_station_latitude"
    ]].rename(columns={
        "start_station_id": "station_id",
        "start_station_name": "station_name",
        "start_station_longitude": "station_longitude",
        "start_station_latitude": "station_latitude"
})

end_stations = ride[[
    "end_station_id",
    "end_station_name",
    "end_station_longitude",
    "end_station_latitude"
    ]].rename(columns={
        "end_station_id": "station_id",
        "end_station_name": "station_name",
        "end_station_longitude": "station_longitude",
        "end_station_latitude": "station_latitude"
})

stations = (
    pd.concat([start_stations, end_stations])
    .dropna(subset=["station_id"])
    .drop_duplicates(subset=["station_id"])
    .reset_index(drop=True)
)

##### Extract Trips' Data

In [None]:
trips = ride[[
    "trip_duration",
    "start_time",
    "stop_time",
    "start_station_id",
    "end_station_id",
    "bike_id",
    "user_type",
    "birth_year",
    "gender"
]].copy()

trips.insert(0, "trip_id", range(1, len(trips) + 1))

##### Extract Weather Stations' Data

In [None]:
weather_station = weather[[
    "weather_station_id",
    "weather_station_name"
]].drop_duplicates()

##### Extract Weather's Data

In [None]:
weather_data = weather.drop(columns=["weather_station_name"])

##### Schema Validation

In [None]:
def validate_schema(df, expected_cols, table_name):

    if set(df.columns) != set(expected_cols):
        raise ValueError(f"Schema mismatch for table '{table_name}'")


validate_schema(stations, ["station_id", "station_name", "station_longitude", \
                           "station_latitude"], "stations")


validate_schema(trips, ["trip_id", "trip_duration", "start_time", "stop_time", \
                        "start_station_id", "end_station_id", "bike_id", \
                        "user_type", "birth_year", "gender"], "trips")

##### Load Data Into SQLITE

The database and the tables can be created by running the *setupdb.py* file once or importing the *init_db()* function from it and call it once.

In [None]:
# from setupdb import init_db
# init_db()

In [None]:
conn = sqlite3.connect(DB_PATH)


stations.to_sql("stations", conn, if_exists="append", index=False)
trips.to_sql("trips", conn, if_exists="append", index=False)
weather_station.to_sql("weather_station", conn, if_exists="append", index=False)
weather_data.to_sql("weather_data", conn, if_exists="append", index=False)


conn.close()