# **Uber Data Pipeline** 
---
### Star Schema Model - Data Engineering Project
This notebook demonstrates a complete ETL (Extract, Transform, Load) pipeline for Uber trip data using Google Cloud and BigQuery. The notebook is divided into three main sections:

1. **Extract**: The data is extracted from a Google Cloud Storage bucket using the Google Cloud Storage API.
2. **Transform**: The extracted data is transformed into a **star schema** format with well-structured **dimension** and **fact tables**. Key transformations include cleaning the data, deriving new columns, and organizing it into dimensions such as datetime, passenger count, trip distance, and rate codes.
3. **Load**: The transformed dimension and fact tables are loaded into BigQuery using the Google Cloud BigQuery API, establishing a clean and structured dataset for further analysis.

*Notebook by: Logan Pearson https://github.com/loganpear - Transform step instructed by: Darshil Parmar https://github.com/darshilparmar*

# ✅ Section 1: <span style="color:green">**Extract**</span> The Data From Google Cloud
---

### 1.1 Import Statements

In [1]:
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery
from io import StringIO

### 1.2 Checking Buckets From Google Cloud

In [2]:
# Create a Google Cloud Storage client
storage_client = storage.Client()

# List all buckets in your project
buckets = list(storage_client.list_buckets())
print("Buckets in your project:", buckets)

Buckets in your project: [<Bucket: uber-data-engineering-project-logan>]


### 1.3 Reading The Data 

In [3]:
bucket_name = "uber-data-engineering-project-logan"
blob_name = "uber_data.csv"

bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(blob_name)

blob.download_to_filename("local_uber_data.csv")
df = pd.read_csv("local_uber_data.csv")
df.shape
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.5,-73.976746,40.765152,1,N,-74.004265,40.746128,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.9,-73.983482,40.767925,1,N,-74.005943,40.733166,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.782021,40.64481,1,N,-73.974541,40.67577,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8
3,2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.863419,40.769814,1,N,-73.96965,40.757767,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-73.971741,40.792183,3,N,-74.17717,40.695053,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8


In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 19 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   VendorID               100000 non-null  int64  
 1   tpep_pickup_datetime   100000 non-null  object 
 2   tpep_dropoff_datetime  100000 non-null  object 
 3   passenger_count        100000 non-null  int64  
 4   trip_distance          100000 non-null  float64
 5   pickup_longitude       100000 non-null  float64
 6   pickup_latitude        100000 non-null  float64
 7   RatecodeID             100000 non-null  int64  
 8   store_and_fwd_flag     100000 non-null  object 
 9   dropoff_longitude      100000 non-null  float64
 10  dropoff_latitude       100000 non-null  float64
 11  payment_type           100000 non-null  int64  
 12  fare_amount            100000 non-null  float64
 13  extra                  100000 non-null  float64
 14  mta_tax                100000 non-nul

### 1.4 Fixing Data Types

In [5]:
# since row's 1 and 2 are of Dtype object, we need to convert them to Dtype datetime
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

### 1.5 Dropping Duplicates

In [6]:
# Drop duplicate records and reset the index
df = df.drop_duplicates().reset_index(drop=True)

# set each trip's ID to simply be the index
df['trip_id'] = df.index

df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_id
0,1,2016-03-01,2016-03-01 00:07:55,1,2.5,-73.976746,40.765152,1,N,-74.004265,40.746128,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,0
1,1,2016-03-01,2016-03-01 00:11:06,1,2.9,-73.983482,40.767925,1,N,-74.005943,40.733166,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35,1
2,2,2016-03-01,2016-03-01 00:31:06,2,19.98,-73.782021,40.64481,1,N,-73.974541,40.67577,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8,2
3,2,2016-03-01,2016-03-01 00:00:00,3,10.78,-73.863419,40.769814,1,N,-73.96965,40.757767,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62,3
4,2,2016-03-01,2016-03-01 00:00:00,5,30.43,-73.971741,40.792183,3,N,-74.17717,40.695053,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8,4


# ✅ Section 2: <span style="color:green">**Transform**</span> Data by Creating Dimensions
---

### 2.1 Datetime Dimension

In [7]:
# Creating datetime dimension: which will store date and time-related information
datetime_dim = df[['tpep_pickup_datetime','tpep_dropoff_datetime']].reset_index(drop=True)

# Individualizing time aspects and assigning each their own column

# For pick up
datetime_dim['pick_hour'] = datetime_dim['tpep_pickup_datetime'].dt.hour
datetime_dim['pick_day'] = datetime_dim['tpep_pickup_datetime'].dt.day
datetime_dim['pick_month'] = datetime_dim['tpep_pickup_datetime'].dt.month
datetime_dim['pick_year'] = datetime_dim['tpep_pickup_datetime'].dt.year
datetime_dim['pick_weekday'] = datetime_dim['tpep_pickup_datetime'].dt.weekday

# For drop off
datetime_dim['drop_hour'] = datetime_dim['tpep_dropoff_datetime'].dt.hour
datetime_dim['drop_day'] = datetime_dim['tpep_dropoff_datetime'].dt.day
datetime_dim['drop_month'] = datetime_dim['tpep_dropoff_datetime'].dt.month
datetime_dim['drop_year'] = datetime_dim['tpep_dropoff_datetime'].dt.year
datetime_dim['drop_weekday'] = datetime_dim['tpep_dropoff_datetime'].dt.weekday

# Setting the index as the primary key for the datetime dimension table (foreign key in the fact table)
datetime_dim['datetime_id'] = datetime_dim.index

# Rearrange columns for organization
datetime_dim = datetime_dim[['datetime_id', 'tpep_pickup_datetime', 'pick_hour', 'pick_day', 'pick_month', 'pick_year', 'pick_weekday',
                             'tpep_dropoff_datetime', 'drop_hour', 'drop_day', 'drop_month', 'drop_year', 'drop_weekday']]

datetime_dim.head()

Unnamed: 0,datetime_id,tpep_pickup_datetime,pick_hour,pick_day,pick_month,pick_year,pick_weekday,tpep_dropoff_datetime,drop_hour,drop_day,drop_month,drop_year,drop_weekday
0,0,2016-03-01,0,1,3,2016,1,2016-03-01 00:07:55,0,1,3,2016,1
1,1,2016-03-01,0,1,3,2016,1,2016-03-01 00:11:06,0,1,3,2016,1
2,2,2016-03-01,0,1,3,2016,1,2016-03-01 00:31:06,0,1,3,2016,1
3,3,2016-03-01,0,1,3,2016,1,2016-03-01 00:00:00,0,1,3,2016,1
4,4,2016-03-01,0,1,3,2016,1,2016-03-01 00:00:00,0,1,3,2016,1


### 2.2 Passenger Count Dimension

In [8]:
# Creating passenger_count dimension: which will store passenger count-related information
passenger_count_dim = df[['passenger_count']].reset_index(drop=True)
passenger_count_dim['passenger_count_id'] = passenger_count_dim.index  
passenger_count_dim = passenger_count_dim[['passenger_count_id', 'passenger_count']]  
passenger_count_dim.head()

Unnamed: 0,passenger_count_id,passenger_count
0,0,1
1,1,1
2,2,2
3,3,3
4,4,5


### 2.3 Trip Distance Dimension

In [9]:
# Creating trip_distance dimension: which will store trip distance-related information
trip_distance_dim = df[['trip_distance']].reset_index(drop=True)  
trip_distance_dim['trip_distance_id'] = trip_distance_dim.index  
trip_distance_dim = trip_distance_dim[['trip_distance_id', 'trip_distance']]
trip_distance_dim.head()

Unnamed: 0,trip_distance_id,trip_distance
0,0,2.5
1,1,2.9
2,2,19.98
3,3,10.78
4,4,30.43


### 2.4 Rate Code Dimension

In [10]:
# Creating a dictionary to map RatecodeID to meaningful rate type names
rate_code_type = {
    1: "Standard rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or Westchester",
    5: "Negotiated fare",
    6: "Group ride"
}

# Creating rate_code dimension: which will store rate code-related information
rate_code_dim = df[['RatecodeID']].reset_index(drop=True)
rate_code_dim['rate_code_id'] = rate_code_dim.index

# Mapping RatecodeID to a more meaningful name using the rate_code_type dictionary
rate_code_dim['rate_code_name'] = rate_code_dim['RatecodeID'].map(rate_code_type)

rate_code_dim = rate_code_dim[['rate_code_id', 'RatecodeID', 'rate_code_name']]
rate_code_dim.head()

Unnamed: 0,rate_code_id,RatecodeID,rate_code_name
0,0,1,Standard rate
1,1,1,Standard rate
2,2,1,Standard rate
3,3,1,Standard rate
4,4,3,Newark


### 2.5 Pickup Location Dimension

In [11]:
# Creating pickup_location dimension: storing pickup latitude and longitude information
pickup_location_dim = df[['pickup_longitude', 'pickup_latitude']].reset_index(drop=True)
pickup_location_dim['pickup_location_id'] = pickup_location_dim.index
pickup_location_dim = pickup_location_dim[['pickup_location_id', 'pickup_latitude', 'pickup_longitude']] 
pickup_location_dim.head()

Unnamed: 0,pickup_location_id,pickup_latitude,pickup_longitude
0,0,40.765152,-73.976746
1,1,40.767925,-73.983482
2,2,40.64481,-73.782021
3,3,40.769814,-73.863419
4,4,40.792183,-73.971741


### 2.6 Drop Off Dimension

In [12]:
# Creating dropoff_location dimension: storing dropoff latitude and longitude information
dropoff_location_dim = df[['dropoff_longitude', 'dropoff_latitude']].reset_index(drop=True)
dropoff_location_dim['dropoff_location_id'] = dropoff_location_dim.index
dropoff_location_dim = dropoff_location_dim[['dropoff_location_id', 'dropoff_latitude', 'dropoff_longitude']]
dropoff_location_dim.head()

Unnamed: 0,dropoff_location_id,dropoff_latitude,dropoff_longitude
0,0,40.746128,-74.004265
1,1,40.733166,-74.005943
2,2,40.67577,-73.974541
3,3,40.757767,-73.96965
4,4,40.695053,-74.17717


### 2.7 Payment Type Dimension

In [13]:
# Creating a dictionary to map payment_type IDs to meaningful payment type names
payment_type_name = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

# Creating payment_type dimension: which will store payment type-related information
payment_type_dim = df[['payment_type']].reset_index(drop=True)
payment_type_dim['payment_type_id'] = payment_type_dim.index

# Mapping payment_type ID to a more meaningful name using the payment_type_name dictionary
payment_type_dim['payment_type_name'] = payment_type_dim['payment_type'].map(payment_type_name)

payment_type_dim = payment_type_dim[['payment_type_id', 'payment_type', 'payment_type_name']]
payment_type_dim.head()

Unnamed: 0,payment_type_id,payment_type,payment_type_name
0,0,1,Credit card
1,1,1,Credit card
2,2,1,Credit card
3,3,1,Credit card
4,4,1,Credit card


### 2.8 Fact Table

In [14]:
# Merging the original DataFrame with the following dimension tables on keys (IDs)
fact_table = df.merge(passenger_count_dim, left_on='trip_id', right_on='passenger_count_id') \
             .merge(trip_distance_dim, left_on='trip_id', right_on='trip_distance_id') \
             .merge(rate_code_dim, left_on='trip_id', right_on='rate_code_id') \
             .merge(pickup_location_dim, left_on='trip_id', right_on='pickup_location_id') \
             .merge(dropoff_location_dim, left_on='trip_id', right_on='dropoff_location_id') \
             .merge(datetime_dim, left_on='trip_id', right_on='datetime_id') \
             .merge(payment_type_dim, left_on='trip_id', right_on='payment_type_id') \
             
# Selecting specific columns to be included in the final fact table:
fact_table = fact_table[['trip_id', 'VendorID', 'datetime_id', 'passenger_count_id', 
                         'trip_distance_id', 'rate_code_id', 'store_and_fwd_flag', 
                         'pickup_location_id', 'dropoff_location_id', 'payment_type_id', 
                         'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
                         'improvement_surcharge', 'total_amount']]

fact_table.head()

Unnamed: 0,trip_id,VendorID,datetime_id,passenger_count_id,trip_distance_id,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type_id,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,0,1,0,0,0,0,N,0,0,0,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,1,1,1,1,1,1,N,1,1,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2,2,2,2,2,2,N,2,2,2,54.5,0.5,0.5,8.0,0.0,0.3,63.8
3,3,2,3,3,3,3,N,3,3,3,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,4,2,4,4,4,4,N,4,4,4,98.0,0.0,0.0,0.0,15.5,0.3,113.8


In [15]:
# Checking Fact Table Columns
fact_table.columns

Index(['trip_id', 'VendorID', 'datetime_id', 'passenger_count_id',
       'trip_distance_id', 'rate_code_id', 'store_and_fwd_flag',
       'pickup_location_id', 'dropoff_location_id', 'payment_type_id',
       'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
       'improvement_surcharge', 'total_amount'],
      dtype='object')

# ✅ Section 3: <span style="color:green">**Load**</span> Data Into Google BigQuery

### 3.1 BigQuery Variables

In [16]:
bq_client = bigquery.Client()

dataset_id = "uber_data"  
project_id = "uber-data-project-439104"  

### 3.2 Defining The Load Function

In [17]:
# Utility function to load DataFrame to BigQuery
def load_to_bigquery(dataframe, table_name, dataset_id=dataset_id, project_id=project_id, if_exists='replace'):
    table_id = f"{project_id}.{dataset_id}.{table_name}"
    
    # Define write disposition depending on the requirement
    if if_exists == 'replace':
        write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    elif if_exists == 'append':
        write_disposition = bigquery.WriteDisposition.WRITE_APPEND
    else:
        write_disposition = bigquery.WriteDisposition.WRITE_EMPTY

    job_config = bigquery.LoadJobConfig(
        write_disposition=write_disposition,
    )

    job = bq_client.load_table_from_dataframe(
        dataframe, table_id, job_config=job_config
    )

    job.result()  # Waits for the job to complete.
    print(f"Loaded {len(dataframe)} rows into {table_id}.")

### 3.3 Loading Data Using The Function

In [18]:
# Load each transformed DataFrame into BigQuery
load_to_bigquery(datetime_dim, "datetime_dimension")
load_to_bigquery(passenger_count_dim, "passenger_count_dimension")
load_to_bigquery(trip_distance_dim, "trip_distance_dimension")
load_to_bigquery(rate_code_dim, "rate_code_dimension")
load_to_bigquery(pickup_location_dim, "pickup_location_dimension")  
load_to_bigquery(dropoff_location_dim, "dropoff_location_dimension")  
load_to_bigquery(payment_type_dim, "payment_type_dimension")  
load_to_bigquery(fact_table, "fact_table")

Loaded 100000 rows into uber-data-project-439104.uber_data.datetime_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.passenger_count_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.trip_distance_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.rate_code_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.pickup_location_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.dropoff_location_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.payment_type_dimension.
Loaded 100000 rows into uber-data-project-439104.uber_data.fact_table.
