In [5]:
import pandas as pd
import numpy as np
import uuid
import re 

# LOADING CSV TO DF

In [16]:
csvs = []

for n in range(1,13):
    filename = f'./bike-rental-starter-kit/data/JC-2016{str(n).zfill(2)}-citibike-tripdata.csv'
    df = pd.read_csv(filename)
    csvs.append(df)

weather = pd.read_csv('./bike-rental-starter-kit/data/newark_airport_2016.csv')

tripdata = pd.concat(csvs,ignore_index=True)

In [17]:

# Define a function to convert a string to snake case
def snake_case(s):
    return '_'.join(
        re.sub('([A-Z][a-z]+)', r' \1',
        re.sub('([A-Z]+)', r' \1',
        s.replace('-', ' '))).split()).lower()


# Normalizing

In [18]:

## TRIPDATA ------------

# Parsing int64 in Int64 to normalize .info()
for col in tripdata.columns:
    if tripdata[col].dtypes == 'int64':
        tripdata[col] = tripdata[col].astype('Int64')

# Normalize str to title format 
for col in tripdata.columns:
    if tripdata[col].dtypes == 'object':
        tripdata[col] = tripdata[col].str.title()

# Parsing Float64 in Int64 to normalize .info()
tripdata['Birth Year'] = tripdata['Birth Year'].astype('Int64')

# Erasing birth date for people birth before 1916
tripdata[tripdata['Birth Year']<=1916] = None

# Mapping Int64 to str values
tripdata['Gender'] = tripdata['Gender'].map({2:'female', 1:'male'})

# Keeping only trip duration under 7days long
tripdata = tripdata[tripdata['Trip Duration'] <= 60*60*24*7]


# Creating an UUID for tripdata
tripdata['Ride ID'] = [str(uuid.uuid4()) for _ in range(len(tripdata))]

# Mapping User Type to Casual/Member

tripdata['User Type'] = tripdata['User Type'].map({'Subscriber':'Casual', 'Customer':'Member'})


tripdata.rename(columns={
    'Trip Duration':'Trip Duration', #KEEP
    'Start Time':'Started at',
    'Stop Time':'Ended at',
    'Start Station ID':'Start station ID',
    'Start Station Name':'Start station name',
    'Start Station Latitude':'Start latitude',
    'Start Station Longitude':'Start longitude',
    'End Station ID':'End station ID',
    'End Station Name':'End station name',
    'End Station Latitude':'End latitude',
    'End Station Longitude':'End Longitude',
    'Bike ID':'Bike ID', #KEEP 
    'User Type':'User Type', #KEEP Name -- Member or casual ride
    'Birth Year':'Birth Year', #KEEP
    'Gender':'Gender', #KEEP
    'Ride ID':'Ride ID' #Created
    # Missing -- Rideable type
})

weather.rename(columns={
    'station':'Station ID'
    ###
})

# Snake Casing the columns name
tripdata.columns = [snake_case(column) for column in tripdata.columns]

tripdata.Name = 'tripdata'

## WEATHER -------------

# Splitting Name/Region from intial Name column
weather['SPLIT'] = weather['NAME'].str.split(',')

if 'REGION' in weather.columns:
    pass
else:
    weather['NAME'] = weather['SPLIT'].str[0]
    weather['REGION'] = weather['SPLIT'].str[1]

# Delete SPLIT if exists
if 'SPLIT' in weather.columns:
    weather.drop('SPLIT', axis=1, inplace=True)
else:
    pass

# Normalize str to title format 
weather['NAME'] = weather['NAME'].str.title()

# Drop columns where all values are NaN
for col in weather.columns:
    if weather[col].isnull().sum() == len(weather[col]):
        weather.drop(col,axis=1, inplace=True)

# Adding Lat/Long for weather df
weather['Latitude'] = 40.689531
weather['Longitude'] =-74.174462

# Snake Casing the columns name
weather.columns = [snake_case(column) for column in weather.columns]

weather.Name = 'weather'



In [5]:
# weather.head(10)

In [6]:
# tripdata.head(10)

In [7]:
# tripdata.info()

In [8]:
# weather.info()

In [9]:
# tripdata.describe()

In [10]:
# weather.describe()

# PostgreSQL

In [42]:
from sqlalchemy import create_engine, text, inspect
from sqlalchemy.exc import OperationalError
import time

# Define your PostgreSQL database connection parameters
db_config = {
    'user': 'python',
    'password': 'postgres',
    'host': 'localhost',
    'port': '5432',
    'database': 'postgres'
}

# Create a SQLAlchemy engine
engine = create_engine(f'postgresql://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:{db_config["port"]}/{db_config["database"]}')
print("Engine created")

table_dict = {
    'tripdata' : 'fact_trip',
    'weather' : 'fact_weather'
}

tables = [tripdata,weather]

inspector = inspect(engine)

for table in tables:

    postgres_name = table_dict[f'{table.Name}']
    pd_name = table.Name

    if not inspector.has_table(postgres_name):
        # Map pandas dtypes to PostgreSQL dtypes
        dtype_mapping = {'object': 'TEXT', 'int64': 'BIGINT', 'float64': 'NUMERIC', 'bool': 'BOOLEAN', 'datetime64': 'TIMESTAMP', 'timedelta': 'INTERVAL'}
        
        # Convert pandas dtypes to PostgreSQL dtypes for each column
        column_types = {col: dtype_mapping.get(str(table[col].dtype), 'TEXT') for col in table.columns}
        
        # Generate the CREATE TABLE SQL statement dynamically based on DataFrame columns and their types
        create_table_sql = f"CREATE TABLE {postgres_name} (\n"
        create_table_sql += ',\n'.join([f"{col} {column_types[col]}" for col in table.columns])
        create_table_sql += "\n);"
        
        # Execute the CREATE TABLE SQL statement
        try:
            start_time = time.time()
            with engine.connect() as connection:
                connection.execute(text(create_table_sql))
                print("Creating {} table in : {} seconds".format(postgres_name,round(time.time() - start_time,2)))
        except OperationalError as e:
            print(f"Error connecting to PostgreSQL: {e}")

        # Write the DataFrame to the PostgreSQL database
        start_time = time.time()
        print("Starting data loading...")
        table.to_sql(postgres_name, engine, index=False, if_exists='replace')
        print("Loading data into table in : {} seconds".format(round(time.time() - start_time,2)))
        
        print(f'DataFrame successfully loaded into the table: {postgres_name} \n')

    else:
        print(f'{postgres_name} already exists in {db_config["database"]}')

Engine created
Creating fact_trip table in : 0.01 seconds
Starting data loading...
Loading data into table in : 28.54 seconds
DataFrame successfully loaded into the table: fact_trip 

Creating fact_weather table in : 0.01 seconds
Starting data loading...
Loading data into table in : 0.06 seconds
DataFrame successfully loaded into the table: fact_weather 



In [41]:
db_config = {
    'user': 'python',
    'password': 'postgres',
    'host': 'localhost',
    'port': '5432',
    'database': 'postgres'
}

engine = create_engine(f'postgresql://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}:{db_config["port"]}/{db_config["database"]}')
print("Engine created")
print(f'Connected to {db_config["database"]} as user {db_config["user"]}\n')


table_dict = {
    'tripdata' : 'fact_trip',
    'weather' : 'fact_weather'
}

tables = [tripdata,weather]

for table in tables:

    postgres_name = table_dict[f'{table.Name}']

    inspector = inspect(engine)
    print(f'Table {postgres_name} exists:', inspector.has_table(postgres_name))

    if inspector.has_table(postgres_name):
        print('Table exists. Deleting...')
        drop_table_sql = f"DROP TABLE {postgres_name} CASCADE;"
        print(f"Statement used : {drop_table_sql}")
        
        try:
            with engine.connect() as connection:
                # connection.execute("SET search_path TO public, public;")
                connection.execute(text(drop_table_sql))
                # Commit the transaction
                connection.commit()

            print(f'Table {postgres_name} successfully deleted.\n')
        except Exception as e:
            print(f'Error deleting table {postgres_name}: {e}')
            connection.rollback()
            raise  # This will re-raise the exception for more detailed error information

    else:
        print(f'Table {postgres_name} does not exist.\n')

Engine created
Connected to postgres as user python

Table fact_trip exists: True
Table exists. Deleting...
Statement used : DROP TABLE fact_trip CASCADE;
Table fact_trip successfully deleted.

Table fact_weather exists: True
Table exists. Deleting...
Statement used : DROP TABLE fact_weather CASCADE;
Table fact_weather successfully deleted.

