In [1]:
from utz import *

In [2]:
cache = '/cache'

In [3]:
Bucket = 'tripdata'

from boto3 import client
from botocore import UNSIGNED
from botocore.client import Config
s3 = client('s3', config=Config(signature_version=UNSIGNED))

In [4]:
resp = s3.list_objects_v2(Bucket=Bucket)
contents = pd.DataFrame(resp['Contents'])
zips = contents[contents.Key.str.endswith('.zip')]
zips

Unnamed: 0,Key,LastModified,ETag,Size,StorageClass
0,201306-citibike-tripdata.zip,2018-04-30 13:18:55+00:00,"""b520a12de58eea58a3586f89bfcfbd9d-2""",16785103,STANDARD
1,201307-201402-citibike-tripdata.zip,2017-01-18 22:23:25+00:00,"""7b3b260b2ab2e5349320121d04bd821c-22""",178262576,STANDARD
2,201307-citibike-tripdata.zip,2017-01-18 22:23:27+00:00,"""dd3e6fd5f91715b31eae72868086c08c-4""",27074629,STANDARD
3,201308-citibike-tripdata.zip,2017-01-18 22:23:27+00:00,"""2f661063576734f614b9f1d6bba0ec59-4""",32090869,STANDARD
4,201309-citibike-tripdata.zip,2017-01-18 22:23:27+00:00,"""a42f947db7bd14e423a7dbfbb11596a1-4""",33155593,STANDARD
...,...,...,...,...,...
153,JC-202009-citibike-tripdata.csv.zip,2020-10-13 19:56:03+00:00,"""f818f87d1b8357d8755b610ea231aeef""",1419708,STANDARD
154,JC-202010-citibike-tripdata.csv.zip,2020-11-04 14:51:11+00:00,"""148431d3598f7e962338c33da2afddf3""",798066,STANDARD
155,JC-202011-citibike-tripdata.csv.zip,2020-12-04 23:26:04+00:00,"""ab9ee4bbbc03633d610e18319d23fc21""",569245,STANDARD
156,JC-202012-citibike-tripdata.csv.zip,2021-01-05 14:25:45+00:00,"""112033c48cf3fa673b396364a7cc08f6""",315012,STANDARD


In [5]:
rgx = r'^(?P<JC>JC-)?(?P<year>\d{4})(?P<month>\d{2})-citibike-tripdata(?P<csv>\.csv)?(?P<zip>\.zip)?$'

In [6]:
fields = {
  'Trip Duration',
  'Start Time',
  'Stop Time',
  'Start Station ID',
  'Start Station Name',
  'Start Station Latitude',
  'Start Station Longitude',
  'End Station ID',
  'End Station Name',
  'End Station Latitude',
  'End Station Longitude',
  'Bike ID',
  'User Type',
  'Birth Year',
  'Gender'
}
def normalize_field(f): return sub(r'\s', '', f.lower())
normalize_fields_map = { normalize_field(f): f for f in fields }
normalize_fields_map

{'endstationlongitude': 'End Station Longitude',
 'bikeid': 'Bike ID',
 'birthyear': 'Birth Year',
 'gender': 'Gender',
 'endstationlatitude': 'End Station Latitude',
 'startstationid': 'Start Station ID',
 'starttime': 'Start Time',
 'startstationlatitude': 'Start Station Latitude',
 'startstationlongitude': 'Start Station Longitude',
 'endstationname': 'End Station Name',
 'usertype': 'User Type',
 'endstationid': 'End Station ID',
 'startstationname': 'Start Station Name',
 'tripduration': 'Trip Duration',
 'stoptime': 'Stop Time'}

In [7]:
def normalize_fields(df):
    return df.rename(columns={
        col: normalize_fields_map[normalize_field(col)]
        for col in df.columns
    })

In [8]:
from zipfile import ZipFile

In [9]:
def to_parquet(zip_path, error='warn', overwrite=False):
    name = basename(zip_path)
    m = match(rgx, name)
    if not m:
        msg = f'Unrecognized key: {name}'
        if error == 'warn':
            print(msg)
            return
        else:
            raise Exception(msg)
    assert name.endswith('.zip'), name
    base = splitext(zip_path)[0]
    if base.endswith('.csv'):
        base = splitext(base)[0]

    pqt_path = f'{base}.parquet'
    if exists(pqt_path):
        if overwrite:
            print(f'Overwriting {pqt_path}')
        else:
            print(f'Found {pqt_path}; skipping')
            return

    z = ZipFile(zip_path)
    names = z.namelist()
    print(f'{name}: zip names: {names}')
    [ name ] = [ f for f in names if f.endswith('.csv') and not f.startswith('_') ]
    with z.open(name,'r') as i:
        df = pd.read_csv(i)
        df = normalize_fields(df)
        df = df.astype({'Start Time':'datetime64[ns]','Stop Time':'datetime64[ns]'})
        df.to_parquet(pqt_path)

In [10]:
cached_zips = sorted(glob(f'{cache}/*.zip'))
[ basename(z) for z in cached_zips ]

['201306-citibike-tripdata.zip',
 '201307-201402-citibike-tripdata.zip',
 '201307-citibike-tripdata.zip',
 '201308-citibike-tripdata.zip',
 '201309-citibike-tripdata.zip',
 '201310-citibike-tripdata.zip',
 '201311-citibike-tripdata.zip',
 '201312-citibike-tripdata.zip',
 '201401-citibike-tripdata.zip',
 '201402-citibike-tripdata.zip',
 '201403-citibike-tripdata.zip',
 '201404-citibike-tripdata.zip',
 '201405-citibike-tripdata.zip',
 '201406-citibike-tripdata.zip',
 '201407-citibike-tripdata.zip',
 '201408-citibike-tripdata.zip',
 '201409-citibike-tripdata.zip',
 '201410-citibike-tripdata.zip',
 '201411-citibike-tripdata.zip',
 '201412-citibike-tripdata.zip',
 '201501-citibike-tripdata.zip',
 '201502-citibike-tripdata.zip',
 '201503-citibike-tripdata.zip',
 '201504-citibike-tripdata.zip',
 '201505-citibike-tripdata.zip',
 '201506-citibike-tripdata.zip',
 '201507-citibike-tripdata.zip',
 '201508-citibike-tripdata.zip',
 '201509-citibike-tripdata.zip',
 '201510-citibike-tripdata.zip',
 '2

In [11]:
from joblib import delayed, Parallel
parallel = Parallel(n_jobs=cpu_count())

In [12]:
cached_zips

['/cache/201306-citibike-tripdata.zip',
 '/cache/201307-201402-citibike-tripdata.zip',
 '/cache/201307-citibike-tripdata.zip',
 '/cache/201308-citibike-tripdata.zip',
 '/cache/201309-citibike-tripdata.zip',
 '/cache/201310-citibike-tripdata.zip',
 '/cache/201311-citibike-tripdata.zip',
 '/cache/201312-citibike-tripdata.zip',
 '/cache/201401-citibike-tripdata.zip',
 '/cache/201402-citibike-tripdata.zip',
 '/cache/201403-citibike-tripdata.zip',
 '/cache/201404-citibike-tripdata.zip',
 '/cache/201405-citibike-tripdata.zip',
 '/cache/201406-citibike-tripdata.zip',
 '/cache/201407-citibike-tripdata.zip',
 '/cache/201408-citibike-tripdata.zip',
 '/cache/201409-citibike-tripdata.zip',
 '/cache/201410-citibike-tripdata.zip',
 '/cache/201411-citibike-tripdata.zip',
 '/cache/201412-citibike-tripdata.zip',
 '/cache/201501-citibike-tripdata.zip',
 '/cache/201502-citibike-tripdata.zip',
 '/cache/201503-citibike-tripdata.zip',
 '/cache/201504-citibike-tripdata.zip',
 '/cache/201505-citibike-tripdata

In [13]:
parallel(delayed(to_parquet)(f, overwrite=True) for f in cached_zips); None

In [12]:
def parse_year(df):
    year = df['birth year']
    #[idx] = df.index
    if year == '\\N':
        return nan
    else:
        year = float(year)
        if isna(year):
            return nan
        elif floor(year) != year:
            raise Exception(f'Invalid year: {year}, {df}')
        else:
            return int(year)

In [13]:
import dask.dataframe as dd

In [16]:
pqts = list(glob(f'{cache}/*.parquet'))
pqts

['/cache/201306-citibike-tripdata.parquet',
 '/cache/201307-citibike-tripdata.parquet',
 '/cache/201308-citibike-tripdata.parquet',
 '/cache/201309-citibike-tripdata.parquet',
 '/cache/201310-citibike-tripdata.parquet',
 '/cache/201311-citibike-tripdata.parquet',
 '/cache/201312-citibike-tripdata.parquet',
 '/cache/201401-citibike-tripdata.parquet',
 '/cache/201402-citibike-tripdata.parquet',
 '/cache/201403-citibike-tripdata.parquet',
 '/cache/201404-citibike-tripdata.parquet',
 '/cache/201405-citibike-tripdata.parquet',
 '/cache/201406-citibike-tripdata.parquet',
 '/cache/201407-citibike-tripdata.parquet',
 '/cache/201408-citibike-tripdata.parquet',
 '/cache/201409-citibike-tripdata.parquet',
 '/cache/201410-citibike-tripdata.parquet',
 '/cache/201411-citibike-tripdata.parquet',
 '/cache/201412-citibike-tripdata.parquet',
 '/cache/201501-citibike-tripdata.parquet',
 '/cache/201502-citibike-tripdata.parquet',
 '/cache/201503-citibike-tripdata.parquet',
 '/cache/201504-citibike-tripdat

In [17]:
dfs = [ dd.read_parquet(pqt) for pqt in pqts ]

In [18]:
for idx, (pqt, df) in enumerate(zip(pqts, dfs)):
    stdout.write(f'{pqt}: ')
    print(len(df))

/cache/201306-citibike-tripdata.parquet: 577703
/cache/201307-citibike-tripdata.parquet: 843416
/cache/201308-citibike-tripdata.parquet: 1001958
/cache/201309-citibike-tripdata.parquet: 1034359
/cache/201310-citibike-tripdata.parquet: 1037712
/cache/201311-citibike-tripdata.parquet: 675774
/cache/201312-citibike-tripdata.parquet: 443966
/cache/201401-citibike-tripdata.parquet: 300400
/cache/201402-citibike-tripdata.parquet: 224736
/cache/201403-citibike-tripdata.parquet: 439117
/cache/201404-citibike-tripdata.parquet: 670780
/cache/201405-citibike-tripdata.parquet: 866117
/cache/201406-citibike-tripdata.parquet: 936880
/cache/201407-citibike-tripdata.parquet: 968842
/cache/201408-citibike-tripdata.parquet: 963489
/cache/201409-citibike-tripdata.parquet: 953887
/cache/201410-citibike-tripdata.parquet: 828711
/cache/201411-citibike-tripdata.parquet: 529188
/cache/201412-citibike-tripdata.parquet: 399069
/cache/201501-citibike-tripdata.parquet: 285552
/cache/201502-citibike-tripdata.parqu

In [19]:
colss = [ df.columns for df in dfs ]
colss

[Index(['tripduration', 'starttime', 'stoptime', 'start station id',
        'start station name', 'start station latitude',
        'start station longitude', 'end station id', 'end station name',
        'end station latitude', 'end station longitude', 'bikeid', 'usertype',
        'birth year', 'gender'],
       dtype='object'),
 Index(['tripduration', 'starttime', 'stoptime', 'start station id',
        'start station name', 'start station latitude',
        'start station longitude', 'end station id', 'end station name',
        'end station latitude', 'end station longitude', 'bikeid', 'usertype',
        'birth year', 'gender'],
       dtype='object'),
 Index(['tripduration', 'starttime', 'stoptime', 'start station id',
        'start station name', 'start station latitude',
        'start station longitude', 'end station id', 'end station name',
        'end station latitude', 'end station longitude', 'bikeid', 'usertype',
        'birth year', 'gender'],
       dtype='object')

In [25]:
d = dd.concat([ normalize_fields(df) for df in dfs ])
d

Unnamed: 0_level_0,Trip Duration,Start Time,Stop Time,Start Station ID,Start Station Name,Start Station Latitude,Start Station Longitude,End Station ID,End Station Name,End Station Latitude,End Station Longitude,Bike ID,User Type,Birth Year,Gender
npartitions=156,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
,int64,object,object,float64,object,float64,float64,float64,object,float64,float64,int64,object,object,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [26]:
len(d)

114213812

In [15]:
import dask.dataframe as dd

In [16]:
dfs = dd.read_parquet(f'{cache}/*.parquet')
dfs

Unnamed: 0_level_0,Trip Duration,Start Time,Stop Time,Start Station ID,Start Station Name,Start Station Latitude,Start Station Longitude,End Station ID,End Station Name,End Station Latitude,End Station Longitude,Bike ID,User Type,Birth Year,Gender
npartitions=156,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
,int64,datetime64[ns],datetime64[ns],int64,object,float64,float64,float64,object,float64,float64,int64,object,float64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [17]:
len(dfs)

114213812

In [None]:
def get_month(year=None, month=None, key=None, jc=False, cache='citibike', overwrite=False, dask=True):
    if year is not None and month is not None:
        assert key is None
        key = f'{year}{"%02d" % int(month)}-citibike-tripdata.zip'
        if jc: key = f'JC-{key}'
    elif key is not None:
        assert year is None
        assert month is None

    cache_path = f'{cache}/{key}'
    if not exists(cache_path) or overwrite:
        with open(cache_path,'wb') as f:
            resp = s3.get_object(Bucket=Bucket, Key=key)
            f.write(resp['Body'].read())

    if dask:
        from dask import dataframe as dd
        read_csv = dd.read_csv
    else:
        read_csv = pd.read_csv

    with open(cache_path,'rb') as f:
        z = ZipFile(BytesIO(body))
        [ filename ] = [ 
            filename
            for filename in z.namelist
            if match(rgx, filename)
        ]
        with z.open(filename,'rb') as i,
        csv_str = decode(z.read(m.string))
        print(type(csv_bytes))
        df = read_csv(csv_str)

    return df