# Data Preparation for Citibike Tableau Viz

Note: All data from 2013 and now are loaded to Redshift cluster.

In [46]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen

# Utility to write data to redshift quickly
import pandas_redshift as pr

import pandas as pd
import settings # Axial DB Settings
from axial.factory.db_engine_factory import get_dwp_engine
from __future__ import print_function
import datetime
from collections import OrderedDict
from os import environ

DWP_ENGINE = get_dwp_engine()

db_setting = settings.RS_DATABASE
pr.connect_to_redshift(dbname = db_setting['database'],
                       host = db_setting['host'],
                       port = db_setting['port'],
                       user = db_setting['username'],
                       password = db_setting['password'])

In [47]:
pr.connect_to_s3(aws_access_key_id = environ['AWS_ACCESS_KEY_ID'] ,
                aws_secret_access_key = environ['AWS_SECRET_ACCESS_KEY'],
                bucket = environ['AWS_BUCKET'],
                subdirectory = 'citibike')

In [48]:
BASE_URL = "https://s3.amazonaws.com/tripdata/" # Load the data directly form nyc.gov site
URL = BASE_URL + "index.html"


# Hard coded columns to be loaded
COLUMNS = ['trip_duration', 'start_time', 'stop_time', 'start_station_id',
       'start_station_name', 'start_station_latitude',
       'start_station_longitude', 'end_station_id', 'end_station_name',
       'end_station_latitude', 'end_station_longitude', 'bike_id', 'user_type',
       'birth_year', 'gender']

In [49]:
# Read zip file and return a file handler to be loaded to DataFrame
def get_zip(file_url):
    resp = requests.get(file_url)
    zipfile = ZipFile(BytesIO(resp.content))
    zip_names = zipfile.namelist()
    #if len(zip_names) == 1:
    file_name = zip_names[0]
    extracted_file = zipfile.open(file_name)
    return extracted_file

In [50]:

# Column mapping function
column_map = dict(
    starttime='start_time',
    stoptime='stop_time',
    bikeid='bike_id',
    tripduration='trip_duration',
    usertype='user_type'
)

def map_column(column):
    if column in column_map:
        return column_map[column]
    else:
        return '_'.join(
            [x.lower() for x in column.split(' ')]
        )

In [60]:
# Hard coded the table schema to make it easier to manage. Transfromation of column name happends in loading process
sql = '''

drop table if exists citi_trips_new;
create table citi_trips_new
(
	trip_duration int,
	start_time timestamp,
	stop_time timestamp,
	start_station_id int,
	start_station_name varchar(256),
	start_station_latitude double precision,
	start_station_longitude double precision,
	end_station_id float,
	end_station_name varchar(256),
	end_station_latitude double precision,
	end_station_longitude double precision,
	bike_id int,
	user_type varchar(256),
	birth_year double precision,
	gender int
);

'''
DWP_ENGINE.execute(sql)

<sqlalchemy.engine.result.ResultProxy at 0x7f122b0caf28>

In [52]:
# Get list of zip files
text = ''
with open('citibike/tripdata.html') as f:
    text = f.read()

soup = BeautifulSoup(text, 'html.parser')

zip_urls = []
for x in soup.find_all('a'):
    href = x.get('href')
    if href and 'zip' in href:
        zip_urls.append(href)

In [58]:
def convert_birth_year(x):
    #print('x: ' + x)
    if isinstance(x, str) and x == '\\N':
        return None
    else:
        return x # pd.to_numeric(x)

In [59]:
def load_by_url(url):
    df = pd.read_csv(get_zip(url)) #, nrows=1)
    df.columns = [ map_column(x) for x in df.columns]
    cnt = df.bike_id.count()
    print(f'Loading {cnt} records from {url}')
    df['start_time'] = pd.to_datetime(df['start_time'])
    df['stop_time'] = pd.to_datetime(df['stop_time'])
    #df['birth_year'] = pd.to_numeric(df['birth_year'])
    df.assign(birth_year=lambda x: x['birth_year'].apply(convert_birth_year))                                 
    pr.pandas_to_redshift(data_frame = df[COLUMNS], append = True,
                        redshift_table_name = 'citi_trips_new')

if True:
    resp = requests.get(url)
    zipfile = ZipFile(BytesIO(resp.content))
    zip_names = zipfile.namelist()
    extracted_file = None
    print(zip_names)
    #if len(zip_names)  1:
    file_name = zip_names[0]
    print(file_name)
    extracted_file = zipfile.open(file_name)
    
    df = pd.read_csv(extracted_file, nrows=1)
    print(df)
    df.columns = [ map_column(x) for x in df.columns]
    cnt = df.bike_id.count()
    print(f'Loading {cnt} records from {url}')
    df['start_time'] = pd.to_datetime(df['start_time'])
    df['stop_time'] = pd.to_datetime(df['stop_time'])
    #df['birth_year'] = pd.to_numeric(df['birth_year'])
    df.assign(birth_year=lambda x: x['birth_year'].apply(convert_birth_year))                                 
    pr.pandas_to_redshift(data_frame = df[COLUMNS], append = True,
                        redshift_table_name = 'citi_trips_new')

In [61]:
# Load all files!!! 
for url in sorted(zip_urls):
    #print(url)
    load_by_url(url)
    

Loading 577703 records from https://s3.amazonaws.com/tripdata/201306-citibike-tripdata.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key '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'
        ;
FILLING THE TABLE IN REDSHIFT
Loading 224736 records from https://s3.amazonaws.com/tripdata/201307-201402-citibike-tripdata.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key 'yE9ptQ0yh71NqauFkWhTc

Loading 953887 records from https://s3.amazonaws.com/tripdata/201409-citibike-tripdata.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key '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'
        ;
FILLING THE TABLE IN REDSHIFT
Loading 828711 records from https://s3.amazonaws.com/tripdata/201410-citibike-tripdata.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key 'yE9ptQ0yh71NqauFkWhTcrC5DETy

Loading 560874 records from https://s3.amazonaws.com/tripdata/201602-citibike-tripdata.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key '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'
        ;
FILLING THE TABLE IN REDSHIFT
Loading 919921 records from https://s3.amazonaws.com/tripdata/201603-citibike-tripdata.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key 'yE9ptQ0yh71NqauFkWhTcrC5DETy

Loading 1731594 records from https://s3.amazonaws.com/tripdata/201706-citibike-tripdata.csv.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key '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'
        ;
FILLING THE TABLE IN REDSHIFT
Loading 1735599 records from https://s3.amazonaws.com/tripdata/201707-citibike-tripdata.csv.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key 'yE9ptQ0yh71NqauFkW

Loading 7479 records from https://s3.amazonaws.com/tripdata/JC-201601-citibike-tripdata.csv.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key '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'
        ;
FILLING THE TABLE IN REDSHIFT
Loading 8250 records from https://s3.amazonaws.com/tripdata/JC-201602-citibike-tripdata.csv.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key 'yE9ptQ0yh71NqauFkW

Loading 25966 records from https://s3.amazonaws.com/tripdata/JC-201705-citibike-tripdata.csv.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key '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'
        ;
FILLING THE TABLE IN REDSHIFT
Loading 32060 records from https://s3.amazonaws.com/tripdata/JC-201706-citibike-tripdata.csv.zip
saved file citi_trips_new.csv in bucket citibike/citi_trips_new.csv

        copy citi_trips_new
        from 's3://axial-etl-prod/citibike/citi_trips_new.csv'
        delimiter ','
        ignoreheader 1
        csv quote as '"'
        dateformat 'auto'
        timeformat 'auto'
        access_key_id 'AKIAITLHNSTGGBDAJVYQ'
        secret_access_key 'yE9ptQ0yh71NqauF