### Import libraries

In [1]:
import pandas as pd
import os
import psycopg2
import psycopg2.extras

### Helper functions

In [2]:
def create_df(directory, keyword):  
    '''
    Loops through a directory to read CSV files and creates a single dataframe based on a filename keyword.

    Args:
        directory: The directory containing the target CSV files.
        keyword: A keyword present on the CSV files that relates them.
    
    Returns:
        pandas dataframe: A concatenation of all matching CSV files as a single dataframe

    Example: 
        Sales information stored in /sales_data/ with naming convention sales_{date}
        create_df('/sales_data/', 'sales') -> sales info pandas DF of all dates
    '''

    files = []
    for file in os.listdir(directory):
        with open(os.path.join(directory, file)) as f:
            if keyword in file:
                dataframe = pd.read_csv(f'{directory}/{file}')
                files.append(dataframe)
    return pd.concat(files, ignore_index=True)


def remove_whitespaces_df(df):
    '''
    Removes white spaces of object type columns of a pandas DF.

    Args:
        df: The DF that we want to clean.

    Returns: 
        It does not return anythiung, modifies the DF passed as argument.
    '''
    for col in df:
        try:
            if df[col] == 'object':
                df[col] = df[col].str.strip()
        except:
            pass

def create_table_ddl(df, table_name):
    '''
    Creates SQL DDL queries for table creation given a DF schema, maps common datatypes from pandas -> postgres.

    Args:
        df: The reference DF from which we want to create a table on DB.
        table_name: The name that will be assigned to the table, preferably with its schema.

    Returns:
        A SQL query as a string of a CREATE TABLE statement based on the DF schema.

    Example:
        pandas sales_df with schema sale_id OBJECT sale_amount INT64 and table_name='dwh.sales' returns the following query:
        CREATE TABLE IF NOT EXISTS dwh.sales (sale_id TEXT,sale_amout BIGINT)
    ''' 
    dtype_mapping = {
    "object": "TEXT",
    "int64": "BIGINT",
    "float64": "DOUBLE PRECISION",
    "bool": "BOOLEAN",
    "datetime64[ns]": "TIMESTAMP"
    }
    
    columns = []
    for col, dtype in df.dtypes.items():
        pg_type = dtype_mapping[str(dtype)]
        columns.append(f'{col} {pg_type}')
    
    create_table_sql = f'CREATE TABLE IF NOT EXISTS {table_name} ({",".join(columns)})'
    print(f'{create_table_sql} \n')
    return create_table_sql


def insert_db(df, table_name):
    '''
    Generate a SQL INSERT statement for a given DF and table name.
    It builds an SQL query string to insert rows from a pandas DF into a specified postgres table. It only generates the SQL statement with
    placeholders ('%s') for parameterized queries.

    Args:
        df: pandas DF that contains the data we want to insert
        table_name: The name of the target postgres table.

    Returns:
        A string of a parameterized SQL INSERT statement meant to be used with psycopg2.

    Example:
        Given my_df and my_table_name it returns the following query:
        INSERT INTO my_table (col1, col2) VALUES(%s, %s)
    '''
    df_columns = list(df)
    columns = ",".join(df_columns)
    values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
    insert_statement = f"INSERT INTO {table_name} ({columns}) {values}"

    return insert_statement


### Read, transform and clean citibike data

In [3]:
citi_bike_dir = './data'
citi_bike_keyword = 'citibike'
citi_bike_df = create_df(citi_bike_dir, citi_bike_keyword)

In [4]:
citi_bike_df = citi_bike_df.rename(columns={
                                            'Trip Duration':'trip_duration',
                                            'Start Time':'start_time',
                                            'Stop Time':'stop_time',
                                            'Start Station ID':'start_station_id',
                                            'Start Station Name':'start_station_name',
                                            'Start Station Latitude':'start_station_latitude',
                                            'Start Station Longitude':'start_station_longitude',
                                            'End Station ID':'end_station_id',
                                            'End Station Name':'end_station_name',
                                            'End Station Latitude':'end_station_latitude',
                                            'End Station Longitude':'end_station_longitude',
                                            'Bike ID':'bike_id',
                                            'User Type':'user_type',
                                            'Birth Year':'birth_year',
                                            'Gender':'gender'
})

In [5]:
citi_bike_df.head(50)

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender
0,362,2016-01-01 00:02:52,2016-01-01 00:08:54,3186,Grove St PATH,40.719586,-74.043117,3209,Brunswick St,40.724176,-74.050656,24647,Subscriber,1964.0,2
1,200,2016-01-01 00:18:22,2016-01-01 00:21:42,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24605,Subscriber,1962.0,1
2,202,2016-01-01 00:18:25,2016-01-01 00:21:47,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24689,Subscriber,1962.0,2
3,248,2016-01-01 00:23:13,2016-01-01 00:27:21,3209,Brunswick St,40.724176,-74.050656,3203,Hamilton Park,40.727596,-74.044247,24693,Subscriber,1984.0,1
4,903,2016-01-01 01:03:20,2016-01-01 01:18:24,3195,Sip Ave,40.730743,-74.063784,3210,Pershing Field,40.742677,-74.051789,24573,Customer,,0
5,883,2016-01-01 01:03:28,2016-01-01 01:18:11,3195,Sip Ave,40.730743,-74.063784,3210,Pershing Field,40.742677,-74.051789,24442,Customer,,0
6,445,2016-01-01 01:07:45,2016-01-01 01:15:11,3186,Grove St PATH,40.719586,-74.043117,3203,Hamilton Park,40.727596,-74.044247,24510,Subscriber,1988.0,2
7,192,2016-01-01 01:18:51,2016-01-01 01:22:03,3211,Newark Ave,40.721525,-74.046305,3203,Hamilton Park,40.727596,-74.044247,24625,Subscriber,1980.0,1
8,409,2016-01-01 01:23:44,2016-01-01 01:30:34,3187,Warren St,40.721124,-74.038051,3214,Essex Light Rail,40.712774,-74.036486,24429,Subscriber,1990.0,1
9,285,2016-01-01 01:25:12,2016-01-01 01:29:57,3187,Warren St,40.721124,-74.038051,3214,Essex Light Rail,40.712774,-74.036486,24407,Subscriber,1988.0,2


In [6]:
citi_bike_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 247584 entries, 0 to 247583
Data columns (total 15 columns):
 #   Column                   Non-Null Count   Dtype  
---  ------                   --------------   -----  
 0   trip_duration            247584 non-null  int64  
 1   start_time               247584 non-null  object 
 2   stop_time                247584 non-null  object 
 3   start_station_id         247584 non-null  int64  
 4   start_station_name       247584 non-null  object 
 5   start_station_latitude   247584 non-null  float64
 6   start_station_longitude  247584 non-null  float64
 7   end_station_id           247584 non-null  int64  
 8   end_station_name         247584 non-null  object 
 9   end_station_latitude     247584 non-null  float64
 10  end_station_longitude    247584 non-null  float64
 11  bike_id                  247584 non-null  int64  
 12  user_type                247204 non-null  object 
 13  birth_year               228585 non-null  float64
 14  gend

In [7]:
citi_bike_df['start_time'] = pd.to_datetime(citi_bike_df['start_time'])
citi_bike_df['stop_time'] = pd.to_datetime(citi_bike_df['stop_time'])

In [8]:
citi_bike_df.head()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender
0,362,2016-01-01 00:02:52,2016-01-01 00:08:54,3186,Grove St PATH,40.719586,-74.043117,3209,Brunswick St,40.724176,-74.050656,24647,Subscriber,1964.0,2
1,200,2016-01-01 00:18:22,2016-01-01 00:21:42,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24605,Subscriber,1962.0,1
2,202,2016-01-01 00:18:25,2016-01-01 00:21:47,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24689,Subscriber,1962.0,2
3,248,2016-01-01 00:23:13,2016-01-01 00:27:21,3209,Brunswick St,40.724176,-74.050656,3203,Hamilton Park,40.727596,-74.044247,24693,Subscriber,1984.0,1
4,903,2016-01-01 01:03:20,2016-01-01 01:18:24,3195,Sip Ave,40.730743,-74.063784,3210,Pershing Field,40.742677,-74.051789,24573,Customer,,0


In [9]:
citi_bike_df.isna().sum()

trip_duration                  0
start_time                     0
stop_time                      0
start_station_id               0
start_station_name             0
start_station_latitude         0
start_station_longitude        0
end_station_id                 0
end_station_name               0
end_station_latitude           0
end_station_longitude          0
bike_id                        0
user_type                    380
birth_year                 18999
gender                         0
dtype: int64

In [10]:
citi_bike_df.describe()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_latitude,start_station_longitude,end_station_id,end_station_latitude,end_station_longitude,bike_id,birth_year,gender
count,247584.0,247584,247584,247584.0,247584.0,247584.0,247584.0,247584.0,247584.0,247584.0,228585.0,247584.0
mean,885.6305,2016-07-29 05:55:07.541335040,2016-07-29 06:09:53.671073536,3207.065206,40.723121,-74.046438,3203.572553,40.722594,-74.045855,24935.260481,1979.335276,1.123534
min,61.0,2016-01-01 00:02:52,2016-01-01 00:08:54,3183.0,40.69264,-74.096937,147.0,40.692216,-74.096937,14552.0,1900.0,0.0
25%,248.0,2016-05-27 07:46:06,2016-05-27 07:54:40.249999872,3186.0,40.717732,-74.050656,3186.0,40.71654,-74.050444,24491.0,1974.0,1.0
50%,390.0,2016-08-10 09:23:50,2016-08-10 09:34:32.500000,3201.0,40.721525,-74.044247,3199.0,40.721124,-74.043117,24609.0,1981.0,1.0
75%,666.0,2016-10-05 17:25:05.500000,2016-10-05 17:33:00.750000128,3211.0,40.727596,-74.038051,3211.0,40.727224,-74.036486,24719.0,1986.0,1.0
max,16329810.0,2016-12-31 23:44:50,2017-01-18 14:26:46,3426.0,40.752559,-74.032108,3426.0,40.801343,-73.95739,27274.0,2000.0,2.0
std,35937.98,,,26.955103,0.008199,0.011211,61.579494,0.007958,0.011283,748.469712,9.596809,0.518687


In [11]:
citi_bike_df['user_type'].unique()

array(['Subscriber', 'Customer', nan], dtype=object)

In [12]:
citi_bike_df['user_type'] = citi_bike_df['user_type'].fillna('Unspecified')

In [13]:
citi_bike_df['user_type'].unique()

array(['Subscriber', 'Customer', 'Unspecified'], dtype=object)

In [14]:
# fillna(0) to be able to change datatype from float to int
citi_bike_df['birth_year'] = citi_bike_df['birth_year'].fillna(0).astype(int)

In [15]:
citi_bike_df.isna().sum()

trip_duration              0
start_time                 0
stop_time                  0
start_station_id           0
start_station_name         0
start_station_latitude     0
start_station_longitude    0
end_station_id             0
end_station_name           0
end_station_latitude       0
end_station_longitude      0
bike_id                    0
user_type                  0
birth_year                 0
gender                     0
dtype: int64

In [16]:
citi_bike_df.head()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender
0,362,2016-01-01 00:02:52,2016-01-01 00:08:54,3186,Grove St PATH,40.719586,-74.043117,3209,Brunswick St,40.724176,-74.050656,24647,Subscriber,1964,2
1,200,2016-01-01 00:18:22,2016-01-01 00:21:42,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24605,Subscriber,1962,1
2,202,2016-01-01 00:18:25,2016-01-01 00:21:47,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24689,Subscriber,1962,2
3,248,2016-01-01 00:23:13,2016-01-01 00:27:21,3209,Brunswick St,40.724176,-74.050656,3203,Hamilton Park,40.727596,-74.044247,24693,Subscriber,1984,1
4,903,2016-01-01 01:03:20,2016-01-01 01:18:24,3195,Sip Ave,40.730743,-74.063784,3210,Pershing Field,40.742677,-74.051789,24573,Customer,0,0


In [17]:
citi_bike_df['gender'].unique()

array([2, 1, 0])

In [18]:
citi_bike_df['gender'] = citi_bike_df['gender'].replace(0,'Unknown').astype(str)
citi_bike_df['gender'] = citi_bike_df['gender'].replace('1','Male').astype(str)
citi_bike_df['gender'] = citi_bike_df['gender'].replace('2','Female').astype(str)


In [19]:
citi_bike_df['gender'].unique()

array(['Female', 'Male', 'Unknown'], dtype=object)

In [20]:
citi_bike_df.head()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender
0,362,2016-01-01 00:02:52,2016-01-01 00:08:54,3186,Grove St PATH,40.719586,-74.043117,3209,Brunswick St,40.724176,-74.050656,24647,Subscriber,1964,Female
1,200,2016-01-01 00:18:22,2016-01-01 00:21:42,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24605,Subscriber,1962,Male
2,202,2016-01-01 00:18:25,2016-01-01 00:21:47,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24689,Subscriber,1962,Female
3,248,2016-01-01 00:23:13,2016-01-01 00:27:21,3209,Brunswick St,40.724176,-74.050656,3203,Hamilton Park,40.727596,-74.044247,24693,Subscriber,1984,Male
4,903,2016-01-01 01:03:20,2016-01-01 01:18:24,3195,Sip Ave,40.730743,-74.063784,3210,Pershing Field,40.742677,-74.051789,24573,Customer,0,Unknown


In [21]:
remove_whitespaces_df(citi_bike_df)

In [22]:
citi_bike_df.head(50000)

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender
0,362,2016-01-01 00:02:52,2016-01-01 00:08:54,3186,Grove St PATH,40.719586,-74.043117,3209,Brunswick St,40.724176,-74.050656,24647,Subscriber,1964,Female
1,200,2016-01-01 00:18:22,2016-01-01 00:21:42,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24605,Subscriber,1962,Male
2,202,2016-01-01 00:18:25,2016-01-01 00:21:47,3186,Grove St PATH,40.719586,-74.043117,3213,Van Vorst Park,40.718489,-74.047727,24689,Subscriber,1962,Female
3,248,2016-01-01 00:23:13,2016-01-01 00:27:21,3209,Brunswick St,40.724176,-74.050656,3203,Hamilton Park,40.727596,-74.044247,24693,Subscriber,1984,Male
4,903,2016-01-01 01:03:20,2016-01-01 01:18:24,3195,Sip Ave,40.730743,-74.063784,3210,Pershing Field,40.742677,-74.051789,24573,Customer,0,Unknown
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49995,160,2016-05-10 05:50:09,2016-05-10 05:52:49,3213,Van Vorst Park,40.718489,-74.047727,3186,Grove St PATH,40.719586,-74.043117,24453,Subscriber,1989,Male
49996,538,2016-05-10 05:53:30,2016-05-10 06:02:29,3192,Liberty Light Rail,40.711242,-74.055701,3183,Exchange Place,40.716247,-74.033459,24568,Subscriber,1960,Male
49997,613,2016-05-10 05:53:53,2016-05-10 06:04:06,3191,Union St,40.718211,-74.083639,3195,Sip Ave,40.730743,-74.063784,24469,Customer,0,Unknown
49998,154,2016-05-10 06:00:02,2016-05-10 06:02:36,3214,Essex Light Rail,40.712774,-74.036486,3183,Exchange Place,40.716247,-74.033459,24551,Subscriber,1974,Male


### Organize citibike data

In [23]:
start_stations_df = citi_bike_df.loc[:,['start_station_id', 'start_station_name', 'start_station_latitude', 'start_station_longitude']].drop_duplicates()
start_stations_df = start_stations_df.rename(columns={'start_station_id':'station_id',
                                                      'start_station_name':'station_name',
                                                      'start_station_latitude':'station_latitude',
                                                      'start_station_longitude':'station_longitude'
                                                      })
start_stations_df.head()

Unnamed: 0,station_id,station_name,station_latitude,station_longitude
0,3186,Grove St PATH,40.719586,-74.043117
3,3209,Brunswick St,40.724176,-74.050656
4,3195,Sip Ave,40.730743,-74.063784
7,3211,Newark Ave,40.721525,-74.046305
8,3187,Warren St,40.721124,-74.038051


In [24]:
end_stations_df = citi_bike_df.loc[:,['end_station_id', 'end_station_name', 'end_station_latitude', 'end_station_longitude']].drop_duplicates()
end_stations_df = end_stations_df.rename(columns={'end_station_id':'station_id',
                                                      'end_station_name':'station_name',
                                                      'end_station_latitude':'station_latitude',
                                                      'end_station_longitude':'station_longitude'
                                                      })
end_stations_df.head()

Unnamed: 0,station_id,station_name,station_latitude,station_longitude
0,3209,Brunswick St,40.724176,-74.050656
1,3213,Van Vorst Park,40.718489,-74.047727
3,3203,Hamilton Park,40.727596,-74.044247
4,3210,Pershing Field,40.742677,-74.051789
8,3214,Essex Light Rail,40.712774,-74.036486


In [25]:
bike_stations_df = pd.concat([start_stations_df, end_stations_df]).drop_duplicates()
bike_stations_df.head()

Unnamed: 0,station_id,station_name,station_latitude,station_longitude
0,3186,Grove St PATH,40.719586,-74.043117
3,3209,Brunswick St,40.724176,-74.050656
4,3195,Sip Ave,40.730743,-74.063784
7,3211,Newark Ave,40.721525,-74.046305
8,3187,Warren St,40.721124,-74.038051


In [26]:
citi_bike_df = citi_bike_df.drop('start_station_name', axis=1)
citi_bike_df = citi_bike_df.drop('start_station_latitude', axis=1)
citi_bike_df = citi_bike_df.drop('start_station_longitude', axis=1)
citi_bike_df = citi_bike_df.drop('end_station_name', axis=1)
citi_bike_df = citi_bike_df.drop('end_station_latitude', axis=1)
citi_bike_df = citi_bike_df.drop('end_station_longitude', axis=1)
citi_bike_df.head()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,end_station_id,bike_id,user_type,birth_year,gender
0,362,2016-01-01 00:02:52,2016-01-01 00:08:54,3186,3209,24647,Subscriber,1964,Female
1,200,2016-01-01 00:18:22,2016-01-01 00:21:42,3186,3213,24605,Subscriber,1962,Male
2,202,2016-01-01 00:18:25,2016-01-01 00:21:47,3186,3213,24689,Subscriber,1962,Female
3,248,2016-01-01 00:23:13,2016-01-01 00:27:21,3209,3203,24693,Subscriber,1984,Male
4,903,2016-01-01 01:03:20,2016-01-01 01:18:24,3195,3210,24573,Customer,0,Unknown


### Read, transform and clean weather data

In [27]:
weather_dir = './data'
weather_keyword = 'newark'
weather_df = create_df(weather_dir, weather_keyword)
remove_whitespaces_df(weather_df)

In [28]:
for col in weather_df:
    weather_df = weather_df.rename(columns={col: col.lower()})

In [29]:
weather_df = weather_df.rename(columns={'awnd':'avg_daily_wind_speed',
                                        'pgtm':'peak_gust_time',
                                        'prcp':'precipitation',
                                        'snwd':'snow_depth',
                                        'tavg':'avg_hourly_temp',
                                        'tmax':'max_hourly_temp',
                                        'tmin':'min_hourly_temp',
                                        'tsun':'daily_sun_hours',
                                        'wdf2':'fastest_2_min_wind_dir',
                                        'wdf5':'fastest_5_sec_wind_dir',
                                        'wsf2':'fastest_2_min_wind_speed',
                                        'wsf5':'fastest_5_sec_wind_speed'
                                        })
weather_df.head()


Unnamed: 0,station,name,date,avg_daily_wind_speed,peak_gust_time,precipitation,snow,snow_depth,avg_hourly_temp,max_hourly_temp,min_hourly_temp,daily_sun_hours,fastest_2_min_wind_dir,fastest_5_sec_wind_dir,fastest_2_min_wind_speed,fastest_5_sec_wind_speed
0,USW00014734,"NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US",2016-01-01,12.75,,0.0,0.0,0.0,41,43,34,,270,280.0,25.9,35.1
1,USW00014734,"NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US",2016-01-02,9.4,,0.0,0.0,0.0,36,42,30,,260,260.0,21.0,25.1
2,USW00014734,"NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US",2016-01-03,10.29,,0.0,0.0,0.0,37,47,28,,270,250.0,23.9,30.0
3,USW00014734,"NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US",2016-01-04,17.22,,0.0,0.0,0.0,32,35,14,,330,330.0,25.9,33.1
4,USW00014734,"NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US",2016-01-05,9.84,,0.0,0.0,0.0,19,31,10,,360,350.0,25.1,31.1


In [30]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 366 entries, 0 to 365
Data columns (total 16 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   station                   366 non-null    object 
 1   name                      366 non-null    object 
 2   date                      366 non-null    object 
 3   avg_daily_wind_speed      366 non-null    float64
 4   peak_gust_time            0 non-null      float64
 5   precipitation             366 non-null    float64
 6   snow                      366 non-null    float64
 7   snow_depth                366 non-null    float64
 8   avg_hourly_temp           366 non-null    int64  
 9   max_hourly_temp           366 non-null    int64  
 10  min_hourly_temp           366 non-null    int64  
 11  daily_sun_hours           0 non-null      float64
 12  fastest_2_min_wind_dir    366 non-null    int64  
 13  fastest_5_sec_wind_dir    364 non-null    float64
 14  fastest_2_

In [31]:
weather_df.isna().sum()

station                       0
name                          0
date                          0
avg_daily_wind_speed          0
peak_gust_time              366
precipitation                 0
snow                          0
snow_depth                    0
avg_hourly_temp               0
max_hourly_temp               0
min_hourly_temp               0
daily_sun_hours             366
fastest_2_min_wind_dir        0
fastest_5_sec_wind_dir        2
fastest_2_min_wind_speed      0
fastest_5_sec_wind_speed      2
dtype: int64

In [32]:
weather_df = weather_df.drop('peak_gust_time', axis=1)
weather_df = weather_df.drop('daily_sun_hours', axis=1)

In [33]:
weather_df.isna().sum()

station                     0
name                        0
date                        0
avg_daily_wind_speed        0
precipitation               0
snow                        0
snow_depth                  0
avg_hourly_temp             0
max_hourly_temp             0
min_hourly_temp             0
fastest_2_min_wind_dir      0
fastest_5_sec_wind_dir      2
fastest_2_min_wind_speed    0
fastest_5_sec_wind_speed    2
dtype: int64

In [34]:
weather_df['fastest_5_sec_wind_dir'] = weather_df['fastest_5_sec_wind_dir'].fillna(0)
weather_df['fastest_5_sec_wind_speed'] = weather_df['fastest_5_sec_wind_speed'].fillna(0)

In [35]:
weather_df.isna().sum()

station                     0
name                        0
date                        0
avg_daily_wind_speed        0
precipitation               0
snow                        0
snow_depth                  0
avg_hourly_temp             0
max_hourly_temp             0
min_hourly_temp             0
fastest_2_min_wind_dir      0
fastest_5_sec_wind_dir      0
fastest_2_min_wind_speed    0
fastest_5_sec_wind_speed    0
dtype: int64

In [36]:
weather_df['date'] = pd.to_datetime(weather_df['date'])

In [37]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 366 entries, 0 to 365
Data columns (total 14 columns):
 #   Column                    Non-Null Count  Dtype         
---  ------                    --------------  -----         
 0   station                   366 non-null    object        
 1   name                      366 non-null    object        
 2   date                      366 non-null    datetime64[ns]
 3   avg_daily_wind_speed      366 non-null    float64       
 4   precipitation             366 non-null    float64       
 5   snow                      366 non-null    float64       
 6   snow_depth                366 non-null    float64       
 7   avg_hourly_temp           366 non-null    int64         
 8   max_hourly_temp           366 non-null    int64         
 9   min_hourly_temp           366 non-null    int64         
 10  fastest_2_min_wind_dir    366 non-null    int64         
 11  fastest_5_sec_wind_dir    366 non-null    float64       
 12  fastest_2_min_wind_spe

In [38]:
weather_stations_df = weather_df.loc[:,['station', 'name']].drop_duplicates()
weather_stations_df.head()


Unnamed: 0,station,name
0,USW00014734,"NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US"


### Create DB schema

In [39]:
# connection establishment
conn = psycopg2.connect(
   database=os.environ['POSTGRES_DB'],
    user=os.environ['POSTGRES_USER'],
    password=os.environ['POSTGRES_PASSWORD'],
    host=os.environ['POSTGRES_HOST'],
    port= os.environ['POSTGRES_PORT']
)

conn.autocommit = True

# Creating a cursor object
cursor = conn.cursor()

In [40]:
cursor.execute('CREATE SCHEMA IF NOT EXISTS dwh')

In [41]:
citi_bike_table_name = 'dwh.citibike_info'
create_citibike_info = create_table_ddl(citi_bike_df, citi_bike_table_name)
cursor.execute(create_citibike_info)

bike_stations_table_name = 'dwh.bike_stations'
create_bike_stations = create_table_ddl(bike_stations_df, bike_stations_table_name)
cursor.execute(create_bike_stations)

weather_info_table_name = 'dwh.weather_info'
create_weather_info = create_table_ddl(weather_df, weather_info_table_name)
cursor.execute(create_weather_info)

weather_stations_table_name = 'dwh.weather_stations'
create_weather_stations = create_table_ddl(weather_stations_df, weather_stations_table_name)
cursor.execute(create_weather_stations)

CREATE TABLE IF NOT EXISTS dwh.citibike_info (trip_duration BIGINT,start_time TIMESTAMP,stop_time TIMESTAMP,start_station_id BIGINT,end_station_id BIGINT,bike_id BIGINT,user_type TEXT,birth_year BIGINT,gender TEXT) 

CREATE TABLE IF NOT EXISTS dwh.bike_stations (station_id BIGINT,station_name TEXT,station_latitude DOUBLE PRECISION,station_longitude DOUBLE PRECISION) 

CREATE TABLE IF NOT EXISTS dwh.weather_info (station TEXT,name TEXT,date TIMESTAMP,avg_daily_wind_speed DOUBLE PRECISION,precipitation DOUBLE PRECISION,snow DOUBLE PRECISION,snow_depth DOUBLE PRECISION,avg_hourly_temp BIGINT,max_hourly_temp BIGINT,min_hourly_temp BIGINT,fastest_2_min_wind_dir BIGINT,fastest_5_sec_wind_dir DOUBLE PRECISION,fastest_2_min_wind_speed DOUBLE PRECISION,fastest_5_sec_wind_speed DOUBLE PRECISION) 

CREATE TABLE IF NOT EXISTS dwh.weather_stations (station TEXT,name TEXT) 



In [42]:
citi_bike_insert = insert_db(citi_bike_df, citi_bike_table_name)
psycopg2.extras.execute_batch(cursor, citi_bike_insert, citi_bike_df.values)

In [43]:
bike_stations_insert = insert_db(bike_stations_df, 'dwh.bike_stations')
psycopg2.extras.execute_batch(cursor, bike_stations_insert, bike_stations_df.values)

In [44]:
weather_insert = insert_db(weather_df, 'dwh.weather_info')

psycopg2.extras.execute_batch(cursor, weather_insert, weather_df.values)

In [45]:
weather_stations_insert = insert_db(weather_stations_df, 'dwh.weather_stations')
psycopg2.extras.execute_batch(cursor, weather_stations_insert, weather_stations_df.values)

### Create views

In [46]:
# GENERAL AGGREGATION
daily_trips = '''CREATE OR REPLACE VIEW dwh.v_daily_trips AS
                 SELECT DATE(start_time) AS trip_date,
                        COUNT(*) AS total_trips,
                        AVG(trip_duration) AS avg_trip_duration
                 FROM dwh.citibike_info
                 GROUP BY DATE(start_time)
                 ORDER BY trip_date;
              '''
cursor.execute(daily_trips)


hourly_trips = '''CREATE OR REPLACE VIEW dwh.v_hourly_trips AS
                  SELECT DATE_TRUNC('hour', start_time) AS trip_hour,
                         COUNT(*) AS total_trips,
                         AVG(trip_duration) AS avg_trip_duration
                  FROM dwh.citibike_info
                  GROUP BY DATE_TRUNC('hour', start_time)
                  ORDER BY trip_hour;
               '''
cursor.execute(hourly_trips)


start_end_station_combo = '''CREATE OR REPLACE VIEW dwh.v_start_end_station_combo AS
                             SELECT ss.station_name AS start_station,
                                    es.station_name AS end_station,
                                    COUNT(*) AS total_trips
                             FROM dwh.citibike_info ci
                             JOIN dwh.bike_stations ss ON ci.start_station_id = ss.station_id
                             JOIN dwh.bike_stations es ON ci.end_station_id = es.station_id
                             GROUP BY ss.station_name, es.station_name
                             ORDER BY total_trips DESC;
                          '''
cursor.execute(start_end_station_combo)


rank_start_stations = '''CREATE OR REPLACE VIEW dwh.v_rank_start_stations AS
                         SELECT s.station_name,
                                COUNT(*) AS total_trips,
                                RANK() OVER (ORDER BY COUNT(*) DESC) AS station_rank
                         FROM dwh.citibike_info ci
                         INNER JOIN dwh.bike_stations s ON ci.start_station_id = s.station_id
                         GROUP BY s.station_name;
                      '''
cursor.execute(rank_start_stations)


rank_end_stations = '''CREATE OR REPLACE VIEW dwh.v_rank_end_stations AS
                       SELECT s.station_name,
                              COUNT(*) AS total_trips,
                              RANK() OVER (ORDER BY COUNT(*) DESC) AS station_rank
                       FROM dwh.citibike_info ci
                       INNER JOIN dwh.bike_stations s ON ci.end_station_id = s.station_id
                       GROUP BY s.station_name;
                    '''
cursor.execute(rank_end_stations)


# DEMOGRAPHIC AGGREGATION
trips_by_user_type = '''CREATE OR REPLACE VIEW dwh.v_trips_by_user_type AS
                        SELECT user_type,
                               COUNT(*) AS total_trips,
                               AVG(trip_duration) AS avg_trip_duration
                        FROM dwh.citibike_info
                        GROUP BY user_type;
                     '''
cursor.execute(trips_by_user_type)


trips_by_gender = '''CREATE OR REPLACE VIEW dwh.v_trips_by_gender AS
                     SELECT gender,
                            COUNT(*) AS total_trips,
                            AVG(trip_duration) AS avg_trip_duration
                     FROM dwh.citibike_info
                     GROUP BY gender
                     ORDER BY total_trips DESC;
                  '''
cursor.execute(trips_by_gender)


trips_by_age = '''CREATE OR REPLACE VIEW dwh.v_trips_by_age AS
                  SELECT EXTRACT(YEAR FROM CURRENT_DATE) - birth_year AS age,
                         COUNT(*) AS total_trips,
                         AVG(trip_duration) AS avg_trip_duration
                  FROM dwh.citibike_info
                  WHERE birth_year <> 0
                  GROUP BY age
                  ORDER BY total_trips DESC;
               '''
cursor.execute(trips_by_age)

In [47]:
cursor.close()