In [None]:
import pandas as pd
import os
import gc
from numba import jit,cuda
import matplotlib.pyplot as plt
import numpy as np

@jit(target_backend='cuda') 
def read(file):
    dt_range = pd.to_datetime(pd.date_range(start='07/01/2021', end='10/31/2021')).date
    df = pd.read_parquet(file)
    if ('taxi' in file) or ('hvfhv' in file):
        df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], errors='coerce')
        df['pickup_date'] = df['pickup_datetime'].dt.date
        df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'], errors='coerce')
        df['dropoff_date'] = df['dropoff_datetime'].dt.date
        df = df[df["pickup_date"].isin(dt_range) & df["dropoff_date"].isin(dt_range)]
        dt_wkends = df[df['Day of week']>4]['pickup_date'].unique()
        if 'hvfhv' in file:
            print(df.head())
            df.drop(['congestion_surcharge', 'dispatching_base_num', 'Affiliated_base_number',
                     'sales_tax','SR_Flag','bcf','base_passenger_fare','driver_pay'], axis=1,inplace=True)
        
    elif 'citibike' in file:
        df['started_datetime'] = pd.to_datetime(df['started_datetime'], errors='coerce')
        df['started_date'] = df['started_datetime'].dt.date
        df['ended_datetime'] = pd.to_datetime(df['ended_datetime'], errors='coerce')
        df['ended_date'] = df['ended_datetime'].dt.date
        df.rename(columns = {'started_date':'pickup_date','ended_date':'dropoff_date'}, inplace = True)
        df.rename(columns = {'started_datetime':'pickup_datetime','ended_datetime':'dropoff_datetime'}, inplace = True)
        df = df[df["pickup_date"].isin(dt_range) & df["dropoff_date"].isin(dt_range)]
        dt_wkends = df[df['Day of week']>4]['pickup_date'].unique()

        
    elif 'subway' in file:
        df['DATE_TIME'] = pd.to_datetime(df['DATE_TIME'], errors='coerce')
        df['DATE'] = df['DATE_TIME'].dt.date    
        df = df[df["DATE"].isin(dt_range)]    
        dt_wkends = df[df['Day of week']>4]['DATE'].unique()
    
    return df,dt_wkends.tolist()



def trips_by_hour(df,key):
    grouped = df.fillna(0).groupby(by=["pickup_date"])
    sta_df = pd.DataFrame()
    for dt, dfi in grouped:
        tmp =pd.DataFrame(dfi.set_index('pickup_datetime')[key].resample('4H').sum())
        sta_df = pd.concat([sta_df,tmp])

    return sta_df

    
     
def run(dir,file):
    
    # pd.DataFrame(dt_wkends).to_csv(dir+'dt_wkends.csv')
    # print(df.head())
    if 'taxi' in file:
        if os.path.exists(dir+'sta_H_taxi.csv'):
            sta = pd.read_csv(dir+'sta_H_taxi.csv')
            sta['dt'] = pd.to_datetime(sta['dt'], errors='coerce')
            col = 'taxi'
            
        else:
            df,dt_wkends = read(dir+file)
            gc.collect()
            col = 'taxi'
            sta = trips_by_hour(df,key='passenger_count')
            del df
            gc.collect()
            out = sta.reset_index()
            out.rename(columns={'pickup_datetime':'dt','passenger_count':'ridership'}).to_csv(dir+'sta_H_taxi.csv',index=False)
        
    elif 'hvfhv' in file:
        if os.path.exists(dir+'sta_H_hvfhv.csv'):
            sta = pd.read_csv(dir+'sta_H_hvfhv.csv')
            sta['dt'] = pd.to_datetime(sta['dt'], errors='coerce')
            col='hvfhv'
        else:
            df,dt_wkends = read(dir+file)
            print(df.head())
            gc.collect()
            col='hvfhv'
            df['passenger_count'] = 1
            sta = trips_by_hour(df,key='passenger_count')
            del df
            gc.collect()
            out = sta.reset_index()
            out.rename(columns={'pickup_datetime':'dt','passenger_count':'ridership'}).to_csv(dir+'sta_H_hvfhv.csv',index=False)

    
    elif 'citibike' in file:
        if os.path.exists(dir+'sta_H_citibike.csv'):
            sta = pd.read_csv(dir+'sta_H_citibike.csv')
            sta['dt'] = pd.to_datetime(sta['dt'], errors='coerce')
            col='citibike'
        else:
            df,dt_wkends = read(dir+file)
            gc.collect()
            col='citibike'
            df['passenger_count'] = 1
            sta = trips_by_hour(df,key='passenger_count')
            del df
            gc.collect()
            out = sta.reset_index()
            out.rename(columns={'pickup_datetime':'dt','passenger_count':'ridership'}).to_csv(dir+'sta_H_citibike.csv',index=False)
        
        
    else:
        if os.path.exists(dir+'sta_H_subway.csv'):
            sta = pd.read_csv(dir+'sta_H_subway.csv')
            sta['dt'] = pd.to_datetime(sta['dt'], errors='coerce')
            col='subway'
        else:
            df,dt_wkends = read(dir+file)
            
            gc.collect()
            col='subway'
            grouped = df.fillna(0).groupby(by=["DATE"])
            sta = pd.DataFrame()
            for dt, dfi in grouped:
                tmp =pd.DataFrame(dfi.set_index('DATE_TIME')['ENTRIES_D'].resample('4H').sum())
                sta = pd.concat([sta,tmp])
            del df
            gc.collect()
            out = sta.reset_index()
            out.rename(columns={'DATE_TIME':'dt','EXITS_D':'ridership'}).to_csv(dir+'sta_H_subway.csv',index=False)
        
        
    return sta,col


def vis(idir,sta_df,dt_non_work):
    title = 'Hourly ridership pattern of multi-modal transportation'
    # fig,(ax1,ax2,ax3,ax4,ax5,ax6) = plt.subplots(6,1,figsize=(21, 14), dpi=150)
    fig= plt.figure(figsize=(20, 16), dpi=300)
    
    # ida_window = pd.to_datetime(['2021-08-26','2021-09-05'])
    ida_nyc_window = pd.to_datetime(['2021-09-01','2021-09-03'])
    henri_nyc_wd = pd.to_datetime(['2021-08-21','2021-08-24'])
    elsa_nyc_window = pd.to_datetime(['2021-07-08','2021-07-10'])
    noreaster_nyc_window = pd.to_datetime(['2021-10-25','2021-10-27'])
    
    maxy = 5
    
    dt_range_dict={'ax1':['2021-07-01 00:00:00','2021-07-21 00:00:00'],
                   'ax2':['2021-07-21 00:00:00','2021-08-10 00:00:00'],
                   'ax3':['2021-08-10 00:00:00','2021-08-30 00:00:00'],
                   'ax4':['2021-08-30 00:00:00','2021-09-19 00:00:00'],
                   'ax5':['2021-09-19 00:00:00','2021-10-09 00:00:00'],
                   'ax6':['2021-10-09 00:00:00','2021-10-31 23:59:59'],
    }
    i=0
    for axi,v in dt_range_dict.items():
        i=i+1
        ax = fig.add_subplot(6,1,i)
            
        t0 = pd.to_datetime(v[0])
        t1 = pd.to_datetime(v[1])
        if axi=='ax6':
            sta_df_i = sta_df[(t0<=sta_df.index) & (sta_df.index<=t1)]
        else:
            sta_df_i = sta_df[(t0<=sta_df.index) & (sta_df.index<t1)]
        sta_df_nwk_i = sta_df_i[np.isin(sta_df_i.index.date,dt_non_work[['non-workday']])]

        ax.plot(sta_df_i.index,sta_df_i,marker='.',label = sta_df_i.columns, alpha=0.5)
        # ax.scatter(sta_df_nwk_i.index,sta_df_nwk_i,marker='.',c='red',label='non-workday')
        plt.scatter(sta_df_nwk_i.index,sta_df_nwk_i['taxi'],marker = '*',c='black')
        plt.scatter(sta_df_nwk_i.index,sta_df_nwk_i['fhv'],marker = '*',c='black')
        plt.scatter(sta_df_nwk_i.index,sta_df_nwk_i['citibike'],marker = '*',c='black')
        plt.scatter(sta_df_nwk_i.index,sta_df_nwk_i['subway'],marker = '*',c='black',label='non-workday')
        
        ax.set_xticks(sta_df_i.index)
        xticks = sta_df_i.index.strftime('%m-%d,%H')
        ax.set_ylim(0,maxy)
        # ax.set_xlim(sta_df_i.index[0],sta_df_i.index[-1])
        ax.set_xticklabels(xticks, rotation = 90,size='small')
        ax.locator_params(axis='x',nbins=len(xticks))
        ax.margins(x=0.01,y=0.01)
        # ax.grid(axis='y', linestyle='--')
        if axi=='ax1':
            ax.fill_between(elsa_nyc_window,0,maxy, facecolor='lightskyblue', alpha=0.4)#,label='Elsa_period_nyc')
            ax.text(elsa_nyc_window[0],0.8*maxy,'Elsa',fontsize=16)
        if axi=='ax3':
            # ax.fill_between(ida_window,0,maxy, facecolor='lightskyblue', alpha=0.4,label='Ida_period')
            ax.fill_between(henri_nyc_wd,0,maxy, facecolor='lightskyblue', alpha=0.4)#,label='Henri_period_nyc')
            ax.text(henri_nyc_wd[0],0.8*maxy,'Henri',fontsize=16)
        if axi=='ax4':  
            ax.fill_between(ida_nyc_window,0,maxy, facecolor='lightskyblue', alpha=0.4)#,label='Ida_period_nyc')
            ax.text(ida_nyc_window[0],0.8*maxy,'Ida',fontsize=16)
            
        if axi=='ax6':
            ax.fill_between(noreaster_nyc_window,0,maxy, facecolor='lightskyblue', alpha=0.4)#,label='Nor'easter_period_nyc')
            ax.text(noreaster_nyc_window[0],0.8*maxy,"Nor'easter",fontsize=16)
            
        
            
    
    axes = fig.get_axes()
    h1, l1 = axes[1].get_legend_handles_labels()
    h2, l2 = axes[2].get_legend_handles_labels()
    
    zipped = list(zip(h2,l2))+[(i,j) for i,j in list(zip(h1,l1)) if 'period' in j]
    
    handles = [i for i,j in zipped]
    labels = [j for i,j in zipped]
    fig.legend(handles, labels, ncol=10,loc='lower center',fontsize=16)#loc='upper left')
    plt.tight_layout()
    plt.margins(x=0.01, y=0.1)
    axes[0].set_title(title,fontsize=18)
    # plt.ylim((0,maxy))
    # plt.grid(True, linestyle='--')
    
    plt.savefig(idir+title+'.png',format='png',bbox_inches='tight')
    plt.close(fig)
    

if __name__=='__main__':
    
    input_dir = ""
    
    if os.path.exists(input_dir+'sta_hourly.csv'):
        re = pd.DataFrame(pd.read_csv(input_dir+'sta_hourly.csv'))
        re['date_time'] =  pd.to_datetime(re['date_time'], errors='coerce')
        re = re.reset_index().set_index('date_time')
        
        dt_non_work = pd.DataFrame(pd.read_csv(input_dir+'dt_wkends.csv'))
        dt_non_work['non-workday'] =  pd.to_datetime(dt_non_work['non-workday'], errors='coerce').dt.date
        
        vre = re[['taxi_norm','hvfhv_norm','citibike_norm','subway_norm']]
        vre.rename(columns = {'taxi_norm':'taxi','hvfhv_norm':'fhv','citibike_norm':'citibike','subway_norm':'subway'}, inplace = True)
        
        # re_nwk = vre[vre.index.date.isin(dt_non_work['non-workday'])]
        vis(input_dir,vre,dt_non_work)
        
    else:
        items = [i for i in os.listdir(input_dir) if os.path.splitext(i)[1] == '.gzip']
        re = pd.DataFrame()
        for file in items:
            sta,col = run(input_dir, file)
            sta = sta.reset_index().set_index('dt')
            re[col] = sta['ridership']
            re[col+'_norm'] = re[col]/max(re[col])
            if col=='hvfhv':
                re[col+'_norm'] = re[col+'_norm']+1
            elif col=='citibike':
                re[col+'_norm'] = re[col+'_norm']+2
            elif col=='subway':
                re[col+'_norm'] = re[col+'_norm']+3

        vre = re[['taxi_norm','hvfhv_norm','citibike_norm','subway_norm']]
        vre.rename(columns = {'taxi_norm':'taxi','hvfhv_norm':'fhv','citibike_norm':'citibike','subway_norm':'subway'}, inplace = True)
        
        dt_non_work = pd.DataFrame(pd.read_csv(input_dir+'dt_wkends.csv'))
        dt_non_work['non-workday'] =  pd.to_datetime(dt_non_work['non-workday'], errors='coerce').dt.date
        
        # re_nwk = vre[vre.index.date.isin(dt_non_work['non-workday'])]
        vis(input_dir,vre,dt_non_work)

        re['date_time'] = re.index

        re.to_csv(input_dir+'sta_hourly.csv',index=False)

    