In [1]:
import boto3
import s3fs
import dask.dataframe as dd

In [4]:
print(boto3.__version__)
print(s3fs.__version__)

1.9.66
0.5.1


## Reading Data Stored at S3

In [2]:
s3 = boto3.client('s3')
path = 's3://chicagobucket/data/TNP_Trips.csv'
mykey = '########################'
mysecret = '############################'
# s3fs has to be version 0.3.0 or higher to use dask dataframe
aws_chicago_df = dd.read_csv(path, storage_options = {'key': mykey, 'secret': mysecret}, dtype={'Trip Seconds': 'float64', 'Tip':'float64'})

In [5]:
print("\n# of Rows: ", aws_chicago_df.shape[0])
print("\n# of Columns: ", aws_chicago_df.shape[1])
print('\nName of Columns: ', aws_chicago_df.columns.tolist())
print("\nTotal Missing values : ", aws_chicago_df.isnull().sum().values.sum())
print('\n')
print(aws_chicago_df.dtypes)


# of Rows:  Delayed('int-3fad9342-9866-4259-9782-ab86f1de17ba')

# of Columns:  21

Name of Columns:  ['Trip ID', 'Trip Start Timestamp', 'Trip End Timestamp', 'Trip Seconds', 'Trip Miles', 'Pickup Census Tract', 'Dropoff Census Tract', 'Pickup Community Area', 'Dropoff Community Area', 'Fare', 'Tip', 'Additional Charges', 'Trip Total', 'Shared Trip Authorized', 'Trips Pooled', 'Pickup Centroid Latitude', 'Pickup Centroid Longitude', 'Pickup Centroid Location', 'Dropoff Centroid Latitude', 'Dropoff Centroid Longitude', 'Dropoff Centroid Location']

Total Missing values :  dask.array<sum-aggregate, shape=(), dtype=int64, chunksize=(), chunktype=numpy.ndarray>


Trip ID                        object
Trip Start Timestamp           object
Trip End Timestamp             object
Trip Seconds                  float64
Trip Miles                    float64
Pickup Census Tract           float64
Dropoff Census Tract          float64
Pickup Community Area         float64
Dropoff Community Area    

## Reading Snappy Compressed Parquet File from S3

In [None]:
#df = wr.s3.read_parquet(path='s3://chicagobucket/date_data/trip_start_date=2018-11-01/', dataset=True)

## Data Preprocessing

Splited and Converted 1.4 million rows of data starting November 2018 to September 2020 reported by Transportation Network Providers into 700 daily snappy compressed parquet files using AWS Athena. Directly loading 43GB csv file caused Jupyter Notebook to freeze.

SQL Queries:
``` mysql
CREATE TABLE chicago AS 
SELECT "trip id" as trip_id,
date_parse("trip start timestamp", '%m/%d/%Y %h:%i:%s %p') as trip_start_timestamp,
date_parse("trip end timestamp", '%m/%d/%Y %h:%i:%s %p') as trip_end_timestamp,
DATE(date_parse("trip start timestamp", '%m/%d/%Y %h:%i:%s %p')) as trip_start_date,
HOUR(date_parse("trip start timestamp", '%m/%d/%Y %h:%i:%s %p')) as trip_start_hour,
"trip seconds" as trip_seconds,
"trip miles" as trip_miles,
"pickup census tract" as pickup_census_tract,
"dropoff census tract" as dropoff_census_tract,
"fare" as fare,
"tip" as tip,
"additional charges" as additional_charges,
"trip total" as trip_total,
"shared trip authorized" as shared_trip_authorized,
"trips pooled" as trips_pooled,
"pickup centroid latitude" as pickup_latitude,
"pickup centroid longitude"as pickup_longitude,
"dropoff centroid latitude" as dropoff_latitude,
"dropoff centroid longitude" as dropoff_longitude
FROM data;
```

Use Partitioned By = 'trip_start_date' to split the data into daily data 
``` mysql
CREATE TABLE parquet_from_to
WITH (format='PARQUET', parquet_compression='SNAPPY', 
      partitioned_by = array['trip_start_date'], external_location = 's3://chicagobucket/date_data/') AS
SELECT trip_start_timestamp,
         trip_end_timestamp,
        trip_start_hour,
         trip_seconds,
         trip_miles,
         pickup_census_tract,
         dropoff_census_tract,
         pickup_community_area,
         dropoff_community_area,
         fare,
        tip,
        additional_charges,
        trip_total,
        shared_trip_authorized,
         trips_pooled,
         pickup_latitude,
         pickup_longitude,
         dropoff_latitude,
         dropoff_longitude,
         trip_start_date
FROM chicago_parquet
WHERE trip_start_date
    BETWEEN DATE('from') 'ex) from = "2020-09-30"'
        AND DATE('to') 'ex) to = "2020-09-30"'
```

To sync AWS S3 Folder, type aws s3 sync s3://mybucket .ON your prompt