# Ingest the data to psql

In [15]:
%load_ext autoreload
%autoreload 2

In [16]:
from pathlib import Path
from functools import reduce

from sqlalchemy import create_engine
import pandas as pd
import numpy as np

from nyc_taxi.etl.psql_tools import TransactionManager

## Read the headers of all files and unify them

In [17]:
def read_header(path):
    with open(path, 'r') as f:
        return f.readline().replace('\n', '').strip().lower()

In [18]:
filepaths = list(Path('../data/green_taxi').glob('*.csv'))

In [5]:
headers = pd.DataFrame(dict(path=filepaths, columns=[read_header(p) for p in filepaths]))

In [6]:
header_union = reduce(
    lambda x, y: x.union(y), 
    [set(c.split(',')) for c in headers['columns'].unique()])

header_intersection = reduce(
    lambda x, y: x.intersection(y), 
    [set(c.split(',')) for c in headers['columns'].unique()])

print(f'Non-shared columns:\n{header_union.difference(header_intersection)}')

Non-shared columns:
{'pickup_longitude', 'improvement_surcharge', 'dropoff_longitude', 'dolocationid', 'congestion_surcharge', 'pulocationid', 'pickup_latitude', 'dropoff_latitude'}


In [7]:
non_shared_column_dtypes = {
    'congestion_surcharge': np.float,
    'dolocationid': pd.Int64Dtype(),
    'dropoff_latitude': np.float,
    'dropoff_longitude': np.float,
    'improvement_surcharge': np.float,
    'pickup_latitude': np.float,
    'pickup_longitude': np.float,
    'pulocationid': pd.Int64Dtype()
}

## Read files with same columns and import to psql

In [11]:
engine = create_engine(TransactionManager().conn_string)

In [28]:
def read_and_preprocess_csv(path, extra_column_dtypes=non_shared_column_dtypes, all_columns=header_union):
    df = pd.read_csv(path)
    df = df.rename(columns={col: col.lower().strip() for col in df.columns})
    
    missing_columns = all_columns.difference(df.columns)
    return df.assign(**{
        col: pd.Series(len(df)*[np.nan], dtype=extra_column_dtypes[col]) 
        for col in missing_columns})

In [29]:
def import_dataframe_to_psql(df, engine, table_name='nyc_green_taxi_records'):
    df.to_sql(table_name, if_exists='append')

In [33]:
def read_first_lines(path):
    with open(path, 'r') as f:
        f.readline()
        return f.readline()

## Remove window newlines

In [4]:
if '\n'.strip():
    print('True...')

In [13]:
with open('/home/mariosk/Desktop/green_tripdata_2018-06.csv', 'r') as f:
    updated_text = f.read().replace(r'\r', '')
    updated_text = '\n'.join(list(filter(
        lambda x: x.strip(), 
        updated_text.split('\n'))))
    
with open('/home/mariosk/Desktop/green_tripdata_2018-06.csv', 'w') as f:
    f.write(updated_text)

### Double-check for empty lines in the beginning of csvs

In [19]:
for path in filepaths:
    second_line = read_first_lines(path)
    if second_line.strip() == '':
        print(path, ': ', second_line)

## Files with more elements in rows as columns

In [27]:
example_prob = pd.read_csv('../../data/green_taxi/green_tripdata_2014-04.csv', header=None, skiprows=1)

In [57]:
def read_csv_ignoring_empty_extra_columns(path):
    with open(path, 'r') as f:
        header = f.readline().strip().split(',')
        first_line = f.readline().strip().split(',')

    extra_columns = len(first_line) - len(header)
    if extra_columns == 0:
        df = pd.read_csv(path)
    else:
        extended_header = header + list(range(extra_columns))
        df = pd.read_csv(path, names=extended_header, skiprows=1, header=None)
        df = df[header]
    return df

In [59]:
read_csv_ignoring_empty_extra_columns('../../data/green_taxi/green_tripdata_2014-04.csv').head()

Unnamed: 0,VendorID,lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Total_amount,Payment_type,Trip_type
0,2,2014-04-01 00:00:00,2014-04-01 14:24:20,N,1,0.0,0.0,0.0,0.0,1,7.45,23.0,0.0,0.5,0.0,0.0,,23.5,2,1.0
1,2,2014-04-01 00:00:00,2014-04-01 17:21:33,N,1,0.0,0.0,-73.987663,40.780872,1,8.95,31.0,1.0,0.5,0.0,0.0,,32.5,2,1.0
2,2,2014-04-01 00:00:00,2014-04-01 15:06:18,N,1,0.0,0.0,-73.946922,40.831764,1,1.32,6.5,0.0,0.5,0.0,0.0,,7.0,2,1.0
3,2,2014-04-01 00:00:00,2014-04-01 08:09:27,N,1,0.0,0.0,-73.94767,40.808651,5,0.1,3.0,0.0,0.5,0.0,0.0,,3.5,2,1.0
4,2,2014-04-01 00:00:00,2014-04-01 16:15:13,N,1,0.0,0.0,0.0,0.0,1,7.09,23.5,0.0,0.5,4.7,0.0,,28.7,1,1.0
