## オープンデータの紹介
データ分析の分野で時折登場するNew York City TaxiのTripデータは、"Registry of Open Data on AWS"の一つとして登録されています。今回のblogでは、このデータをデータドリフト検出の題材として利用します。


NYC taxi Trip Record as a open data  
https://registry.opendata.aws/nyc-tlc-trip-records-pds/

Example to access the data above  
https://github.com/aws-samples/aws-open-data-analytics-notebooks/tree/master/exploring-data

In [9]:
!pip install awswrangler

Collecting awswrangler
  Downloading awswrangler-1.9.0-py3-none-any.whl (133 kB)
[K     |████████████████████████████████| 133 kB 8.8 MB/s eta 0:00:01
Collecting sqlalchemy-redshift<0.9.0,>=0.7.0
  Downloading sqlalchemy_redshift-0.8.1-py2.py3-none-any.whl (33 kB)
Collecting pymysql<0.11.0,>=0.9.0
  Downloading PyMySQL-0.10.0-py2.py3-none-any.whl (47 kB)
[K     |████████████████████████████████| 47 kB 7.9 MB/s  eta 0:00:01
[?25hCollecting pyarrow~=1.0.0
  Downloading pyarrow-1.0.1-cp36-cp36m-manylinux2014_x86_64.whl (17.3 MB)
[K     |████████████████████████████████| 17.3 MB 11.7 MB/s eta 0:00:01
[?25hCollecting psycopg2-binary~=2.8.0
  Downloading psycopg2_binary-2.8.5-cp36-cp36m-manylinux1_x86_64.whl (2.9 MB)
[K     |████████████████████████████████| 2.9 MB 67.4 MB/s eta 0:00:01
Installing collected packages: sqlalchemy-redshift, pymysql, pyarrow, psycopg2-binary, awswrangler
Successfully installed awswrangler-1.9.0 psycopg2-binary-2.8.5 pyarrow-1.0.1 pymysql-0.10.0 sqlalchemy-

In [195]:
import awswrangler as wr
import os

pd.set_option('display.max_columns', 100)
pd.set_option('display.max_rows', 2000)

In [188]:
def read_and_parse_csv(cab_color, year, month):
    # Set next month as string
    if int(month) < 12:
        limit_year = year
        limit_month = str(int(month) + 1).zfill(2)
    else:
        limit_year = str(int(year) + 1).zfill(4)
        limit_month = '01'
    
    df = wr.s3.read_csv(f's3://nyc-tlc/trip data/{cab_color}_tripdata_{year}-{month}.csv')

    # Convert pickup/drop timestamp string to timestamp data type
    if cab_color == 'yellow':
        df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    elif cab_color == 'green':
        df['pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
        df['dropoff_datetime'] = pd.to_datetime(df['lpep_dropoff_datetime'])
    else:
        raise Exception('Unexpected cab_color type')

    df['trip_duration_sec'] = df.apply(lambda x: (x.dropoff_datetime - x.pickup_datetime).total_seconds(), axis=1)
    df['pickup_ymdh'] = df.pickup_datetime.dt.strftime('%Y-%m-%d %H:00:00')
    df['date'] = df.pickup_datetime.dt.strftime('%Y-%m-%d')
    
    # Drop abnormal records based on trip duration (sec) and pickup timestamp
    df = df[(df.trip_duration_sec >= 10) & (df.trip_duration_sec <= 14400)]
    df = df[(df.pickup_ymdh >= f'{year}-{month}-01 00:00:00') & (df.pickup_ymdh < f'{limit_year}-{limit_month}-01 00:00:00')].copy()

    return df

In [191]:
def generate_trip_summary(df_records):
    groupby_target = ['date', 'pickup_ymdh', 'PULocationID']
    #groupby_target = ['date', 'pickup_ymdh']
    calc_target = ['trip_duration_sec', 'fare_amount', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

    g = df_records.groupby(by=groupby_target)
    df_count = g.VendorID.count().rename('count')
    df_mean = g[calc_target].mean().rename(columns={x:f'{x}_mean' for x in calc_target})
    df_std = g[calc_target].std().rename(columns={x:f'{x}_std' for x in calc_target})

    df_summary = pd.concat([df_count, df_mean, df_std], axis=1).reset_index()
    
    return df_summary

In [192]:
def save_s3_by_date(df_summary, cab_color, bucket, prefix):
    ymd_list = sorted(list(set(df_summary.date.tolist())))
    for ymd in ymd_list:
        file_ymd = ymd.replace('-', '')
        df_save = df_summary[df_summary.date == ymd]
        wr.s3.to_csv(df_save, f's3://{bucket}/{prefix}/nyctaxi_tripdata_{cab_color}_{file_ymd}.csv', index=False)

## Tripデータの取得と前処理
NYC Taxiのtripデータは、タクシーの乗車ごとの時刻、乗車区域、降車区域、料金などの情報で成り立っています。今回は全体としての傾向をつかむために、2019年1月から2020年6月までのデータを取得した上で、1時間ごとの乗車回数や平均の乗車時間、料金などにサマリーしてから自分自身のS3 Bucketに保存します。次のステップで、サマリーしたデータを元にデータドリフトの発生有無を見ていきましょう

NYC Taxiにはマンハッタン島で乗客を乗せられるYellowライセンスと、マンハッタン島以外で乗客を乗せられるGreenライセンスがあり、それぞれのデータは別のファイルとして提供されています。Yellowライセンスの乗車データは1ヶ月あたり600-700MBと大きくて処理に時間がかかるので、今回はデータ量が1/10程度に収まるGreenライセンスの乗車データを対象にします。マンハッタン島の乗車データを分析してみたい方は、"cob_color"を"yellow"に設定してチャレンジしてみてください。

### Read NYC cab trip open data, generate summary data and save it to S3

In [194]:
year_month = [('2019', '01'), ('2019', '02'), ('2019', '03'), ('2019', '04'), 
              ('2019', '05'), ('2019', '06'), ('2019', '07'), ('2019', '08'), 
              ('2019', '09'), ('2019', '10'), ('2019', '11'), ('2019', '12'),
              ('2020', '01'), ('2020', '02'), ('2020', '03'), ('2020', '04'), 
              ('2020', '05'), ('2020', '06')]
cab_color = 'green'
bucket = 'tuki-bkt-misc'
prefix = 'data/nyctaxi/daily_w_location'

for year, month in year_month:
    df_records = read_and_parse_csv(cab_color, year, month)
    df_summary = generate_trip_summary(df_records)
    save_s3_by_date(df_summary, cab_color, bucket, prefix)



In [None]:
df_records.head(5)

In [None]:
df_summary.head(5)

In [None]:
# Test reading CSV
wr.s3.read_csv('s3://tuki-bkt-misc/data/nyctaxi/summary_daily/nyctaxi_tripdata_green_20200715.csv')