In [156]:
import io
import pandas as pd
import requests
import os
from geopy.geocoders import OpenCage
from geopy.exc import GeocoderServiceError, GeocoderTimedOut
from pandas import json_normalize
import sqlite3

In [157]:
# importing uber dataset
df = pd.read_csv('uber_data.csv')

In [158]:
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
df = df.drop_duplicates().reset_index(drop=True)
df['trip_id'] = df.index

In [159]:
datetime_dim = df[['tpep_pickup_datetime','tpep_dropoff_datetime']].reset_index(drop=True)
datetime_dim['tpep_pickup_datetime'] = datetime_dim['tpep_pickup_datetime']
datetime_dim['pick_hour'] = datetime_dim['tpep_pickup_datetime'].dt.hour
datetime_dim['pick_day'] = datetime_dim['tpep_pickup_datetime'].dt.day
datetime_dim['pick_month'] = datetime_dim['tpep_pickup_datetime'].dt.month
datetime_dim['pick_year'] = datetime_dim['tpep_pickup_datetime'].dt.year
datetime_dim['pick_weekday'] = datetime_dim['tpep_pickup_datetime'].dt.weekday

datetime_dim['tpep_dropoff_datetime'] = datetime_dim['tpep_dropoff_datetime']
datetime_dim['drop_hour'] = datetime_dim['tpep_dropoff_datetime'].dt.hour
datetime_dim['drop_day'] = datetime_dim['tpep_dropoff_datetime'].dt.day
datetime_dim['drop_month'] = datetime_dim['tpep_dropoff_datetime'].dt.month
datetime_dim['drop_year'] = datetime_dim['tpep_dropoff_datetime'].dt.year
datetime_dim['drop_weekday'] = datetime_dim['tpep_dropoff_datetime'].dt.weekday
datetime_dim['datetime_id'] = datetime_dim.index

datetime_dim = datetime_dim[['datetime_id', 'tpep_pickup_datetime', 'pick_hour', 'pick_day', 'pick_month', 'pick_year', 'pick_weekday',
                             'tpep_dropoff_datetime', 'drop_hour', 'drop_day', 'drop_month', 'drop_year', 'drop_weekday']]

In [160]:
passenger_count_dim = df[['passenger_count']].reset_index(drop=True)
passenger_count_dim['passenger_count_id'] = passenger_count_dim.index
passenger_count_dim = passenger_count_dim[['passenger_count_id','passenger_count']]

trip_distance_dim = df[['trip_distance']].reset_index(drop=True)
trip_distance_dim['trip_distance_id'] = trip_distance_dim.index
trip_distance_dim = trip_distance_dim[['trip_distance_id','trip_distance']]

In [161]:
rate_code_type = {
    1:"Standard rate",
    2:"JFK",
    3:"Newark",
    4:"Nassau or Westchester",
    5:"Negotiated fare",
    6:"Group ride"
}

rate_code_dim = df[['RatecodeID']].reset_index(drop=True)
rate_code_dim['rate_code_id'] = rate_code_dim.index
rate_code_dim['rate_code_name'] = rate_code_dim['RatecodeID'].map(rate_code_type)
rate_code_dim = rate_code_dim[['rate_code_id','RatecodeID','rate_code_name']]

In [162]:
pickup_location_dim = df[['pickup_longitude', 'pickup_latitude']].reset_index(drop=True)
pickup_location_dim['pickup_location_id'] = pickup_location_dim.index
pickup_location_dim = pickup_location_dim[['pickup_location_id','pickup_latitude','pickup_longitude']] 
dropoff_location_dim = df[['dropoff_longitude', 'dropoff_latitude']].reset_index(drop=True)
dropoff_location_dim['dropoff_location_id'] = dropoff_location_dim.index
dropoff_location_dim = dropoff_location_dim[['dropoff_location_id','dropoff_latitude','dropoff_longitude']]

pickup_location_dim = pickup_location_dim.head(5)
dropoff_location_dim = dropoff_location_dim.head(5)

In [163]:
def reverse_geocode(latitude, longitude, api_key):
    try:
        geolocator = OpenCage(api_key)
        location = geolocator.reverse((latitude, longitude), exactly_one=True)
        if location:
            address = location.address
            location_details = location.raw
            return address, location_details
        else:
            return None, None
    except GeocoderTimedOut:
        print("Error: Geocoding service timed out")
        return None, None
    except GeocoderServiceError as e:
        print(f"Error: Geocoding service error - {e}")
        return None, None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None, None

api_key = ''

# Define a wrapper function to pass the API key to reverse_geocode and handle the return values
def apply_reverse_pickup_geocode(row):
    address, location_details = reverse_geocode(row['pickup_latitude'], row['pickup_longitude'], api_key)
    return pd.Series([address, location_details])

# Apply the function to each row in the DataFrame and expand the result into two new columns
pickup_location_dim[['address', 'location_details']] = pickup_location_dim.apply(apply_reverse_pickup_geocode, axis=1)
# Extract location_details JSON into a DataFrame
location_details_df = json_normalize(pickup_location_dim['location_details'])
pickup_location_dim = pickup_location_dim.join(location_details_df)
# Print the updated DataFrame

# Define a wrapper function to pass the API key to reverse_geocode and handle the return values
def apply_reverse_dropoff_geocode(row):
    address, location_details = reverse_geocode(row['dropoff_latitude'], row['dropoff_longitude'], api_key)
    return pd.Series([address, location_details])

# Apply the function to each row in the DataFrame and expand the result into two new columns
dropoff_location_dim[['address', 'location_details']] = dropoff_location_dim.apply(apply_reverse_dropoff_geocode, axis=1)
# Extract location_details JSON into a DataFrame
location_details_df = json_normalize(dropoff_location_dim['location_details'])
dropoff_location_dim = dropoff_location_dim.join(location_details_df)
# Print the updated DataFrame

dropoff_location_dim = dropoff_location_dim.drop(columns=['location_details'])
pickup_location_dim = pickup_location_dim.drop(columns=['location_details'])

In [164]:
payment_type_name = {
    1:"Credit card",
    2:"Cash",
    3:"No charge",
    4:"Dispute",
    5:"Unknown",
    6:"Voided trip"
}
payment_type_dim = df[['payment_type']].reset_index(drop=True)
payment_type_dim['payment_type_id'] = payment_type_dim.index
payment_type_dim['payment_type_name'] = payment_type_dim['payment_type'].map(payment_type_name)
payment_type_dim = payment_type_dim[['payment_type_id','payment_type','payment_type_name']]

In [165]:
fact_table = df.merge(passenger_count_dim, left_on='trip_id', right_on='passenger_count_id') \
             .merge(trip_distance_dim, left_on='trip_id', right_on='trip_distance_id') \
             .merge(rate_code_dim, left_on='trip_id', right_on='rate_code_id') \
             .merge(pickup_location_dim, left_on='trip_id', right_on='pickup_location_id') \
             .merge(dropoff_location_dim, left_on='trip_id', right_on='dropoff_location_id')\
             .merge(datetime_dim, left_on='trip_id', right_on='datetime_id') \
             .merge(payment_type_dim, left_on='trip_id', right_on='payment_type_id') \
             [['trip_id','VendorID', 'datetime_id', 'passenger_count_id',
               'trip_distance_id', 'rate_code_id', 'store_and_fwd_flag', 'pickup_location_id', 'dropoff_location_id',
               'payment_type_id', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
               'improvement_surcharge', 'total_amount']]

In [166]:
conn = sqlite3.connect('UberDataWarehouse.db')

dropoff_location_dim = dropoff_location_dim.applymap(str)
pickup_location_dim = pickup_location_dim.applymap(str)

# Write the DataFrames to SQLite Tables
fact_table.to_sql('fact_table', conn, if_exists='replace', index=False)
datetime_dim.to_sql('datetime_dim', conn, if_exists='replace', index=False)
dropoff_location_dim.to_sql('dropoff_location_dim', conn, if_exists='replace', index=False)
passenger_count_dim.to_sql('passenger_count_dim', conn, if_exists='replace', index=False)
payment_type_dim.to_sql('payment_type_dim', conn, if_exists='replace', index=False)
pickup_location_dim.to_sql('pickup_location_dim', conn, if_exists='replace', index=False)
rate_code_dim.to_sql('rate_code_dim', conn, if_exists='replace', index=False)
trip_distance_dim.to_sql('trip_distance_dim', conn, if_exists='replace', index=False)

100000