In [1]:
import boto3
import botocore
import pandas as pd
from IPython.display import display, Markdown

s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')

In [2]:
# Create Bucket
def create_bucket(bucket):
    import logging

    try:
        s3.create_bucket(Bucket=bucket)
    except botocore.exceptions.ClientError as e:
        logging.error(e)
        return 'Bucket ' + bucket + ' could not be created.'
    return 'Created or already exists ' + bucket + ' bucket.'

create_bucket('nyc-tlc-cs653-5132')

'Created or already exists nyc-tlc-cs653-5132 bucket.'

In [None]:
# List Buckets
def list_buckets(match=''):
    response = s3.list_buckets()
    if match:
        print(f'Existing buckets containing "{match}" string:')
    else:
        print('All existing buckets:')
    for bucket in response['Buckets']:
        if match:
            if match in bucket["Name"]:
                print(f'  {bucket["Name"]}')

list_buckets(match='nyc')

In [4]:
# List Bucket Contents
def list_bucket_contents(bucket, match='', size_mb=0):
    bucket_resource = s3_resource.Bucket(bucket)
    total_size_gb = 0
    total_files = 0
    match_size_gb = 0
    match_files = 0
    for key in bucket_resource.objects.all():
        key_size_mb = key.size/1024/1024
        total_size_gb += key_size_mb
        total_files += 1
        list_check = False
        if not match:
            list_check = True
        elif match in key.key:
            list_check = True
        if list_check and not size_mb:
            match_files += 1
            match_size_gb += key_size_mb
            print(f'{key.key} ({key_size_mb:3.0f}MB)')
        elif list_check and key_size_mb <= size_mb:
            match_files += 1
            match_size_gb += key_size_mb
            print(f'{key.key} ({key_size_mb:3.0f}MB)')

    if match:
        print(f'Matched file size is {match_size_gb/1024:3.1f}GB with {match_files} files')            
    
    print(f'Bucket {bucket} total size is {total_size_gb/1024:3.1f}GB with {total_files} files')

list_bucket_contents(bucket='nyc-tlc', match='2017', size_mb=250)

csv_backup/green_tripdata_2017-01.csv ( 91MB)
csv_backup/green_tripdata_2017-02.csv ( 87MB)
csv_backup/green_tripdata_2017-03.csv ( 99MB)
csv_backup/green_tripdata_2017-04.csv ( 92MB)
csv_backup/green_tripdata_2017-05.csv ( 91MB)
csv_backup/green_tripdata_2017-06.csv ( 83MB)
csv_backup/green_tripdata_2017-07.csv ( 78MB)
csv_backup/green_tripdata_2017-08.csv ( 74MB)
csv_backup/green_tripdata_2017-09.csv ( 75MB)
csv_backup/green_tripdata_2017-10.csv ( 79MB)
csv_backup/green_tripdata_2017-11.csv ( 75MB)
csv_backup/green_tripdata_2017-12.csv ( 77MB)
trip data/fhv_tripdata_2017-01.parquet ( 55MB)
trip data/fhv_tripdata_2017-02.parquet ( 54MB)
trip data/fhv_tripdata_2017-03.parquet ( 64MB)
trip data/fhv_tripdata_2017-04.parquet ( 60MB)
trip data/fhv_tripdata_2017-05.parquet ( 64MB)
trip data/fhv_tripdata_2017-06.parquet (134MB)
trip data/fhv_tripdata_2017-07.parquet (137MB)
trip data/fhv_tripdata_2017-08.parquet (139MB)
trip data/fhv_tripdata_2017-09.parquet (144MB)
trip data/fhv_tripdata_20

In [3]:
pip install pyarrow

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Note: you may need to restart the kernel to use updated packages.


In [4]:
# Preview Dataset
def preview_dataset(bucket, key):
    data_source = {
            'Bucket': bucket,
            'Key': key
        }
    # Generate the URL to get Key from Bucket
    url = s3.generate_presigned_url(
        ClientMethod = 'get_object',
        Params = data_source
    )

    # data = pd.read_csv(url, nrows=rows)
    data = pd.read_parquet(url, engine='pyarrow')
    return data

df = preview_dataset(bucket='nyc-tlc', key=f'trip data/yellow_tripdata_2017-01.parquet')
df.head(5)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2017-01-01 00:32:05,2017-01-01 00:37:48,1,1.2,1,N,140,236,2,6.5,0.5,0.5,0.0,0.0,0.3,7.8,,
1,1,2017-01-01 00:43:25,2017-01-01 00:47:42,2,0.7,1,N,237,140,2,5.0,0.5,0.5,0.0,0.0,0.3,6.3,,
2,1,2017-01-01 00:49:10,2017-01-01 00:53:53,2,0.8,1,N,140,237,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,
3,1,2017-01-01 00:36:42,2017-01-01 00:41:09,1,1.1,1,N,41,42,2,6.0,0.5,0.5,0.0,0.0,0.3,7.3,,
4,1,2017-01-01 00:07:41,2017-01-01 00:18:16,1,3.0,1,N,48,263,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3,,


In [7]:
df.shape

(9710820, 19)

In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9710820 entries, 0 to 9710819
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int64         
 1   tpep_pickup_datetime   datetime64[ns]
 2   tpep_dropoff_datetime  datetime64[ns]
 3   passenger_count        int64         
 4   trip_distance          float64       
 5   RatecodeID             int64         
 6   store_and_fwd_flag     object        
 7   PULocationID           int64         
 8   DOLocationID           int64         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   object        
 18  airport_fee           

In [9]:
df.describe()

Unnamed: 0,VendorID,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
count,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0,9710820.0
mean,1.547111,1.629222,2.813879,1.03958,164.1076,161.7635,1.337539,12.37414,0.3234837,0.4975229,1.751259,0.2764088,0.2996798,15.52662
std,0.4977756,1.272268,3.611637,0.5058933,66.65001,70.6721,0.4913687,265.222,0.4425521,0.04881261,2.571219,1.638808,0.013421,265.3384
min,1.0,0.0,0.0,1.0,1.0,1.0,1.0,-350.0,-55.2,-0.5,-41.0,-15.0,-0.3,-350.3
25%,1.0,1.0,0.95,1.0,114.0,107.0,1.0,6.5,0.0,0.5,0.0,0.0,0.3,8.3
50%,2.0,1.0,1.6,1.0,162.0,162.0,1.0,9.0,0.0,0.5,1.3,0.0,0.3,11.3
75%,2.0,2.0,2.9,1.0,233.0,234.0,2.0,13.5,0.5,0.5,2.26,0.0,0.3,16.73
max,2.0,9.0,264.71,99.0,265.0,265.0,5.0,625900.8,55.54,56.5,999.99,911.08,0.3,625901.6


In [10]:
# Copy Among Buckets
def key_exists(bucket, key):
    try:
        s3_resource.Object(bucket, key).load()
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            # The key does not exist.
            return(False)
        else:
            # Something else has gone wrong.
            raise
    else:
        # The key does exist.
        return(True)

def copy_among_buckets(from_bucket, from_key, to_bucket, to_key):
    if not key_exists(to_bucket, to_key):
        s3_resource.meta.client.copy({'Bucket': from_bucket, 'Key': from_key}, 
                                        to_bucket, to_key)        
        print(f'File {to_key} saved to S3 bucket {to_bucket}')
    else:
        print(f'File {to_key} already exists in S3 bucket {to_bucket}') 

In [11]:
for i in range(1, 6):
    copy_among_buckets(from_bucket='nyc-tlc', from_key=f'trip data/yellow_tripdata_2017-0{i}.parquet',
                      to_bucket='nyc-tlc-cs653-5132', to_key=f'yellow_tripdata-2017-0{i}.parquet')

File yellow_tripdata-2017-01.parquet saved to S3 bucket nyc-tlc-cs653-5132
File yellow_tripdata-2017-02.parquet saved to S3 bucket nyc-tlc-cs653-5132
File yellow_tripdata-2017-03.parquet saved to S3 bucket nyc-tlc-cs653-5132
File yellow_tripdata-2017-04.parquet saved to S3 bucket nyc-tlc-cs653-5132
File yellow_tripdata-2017-05.parquet saved to S3 bucket nyc-tlc-cs653-5132


In [13]:
import boto3

s3 = boto3.client('s3')

# Answer a
bucket = 'nyc-tlc-cs653-5132'
key = 'yellow_tripdata-2017-01.parquet'
expression_type = 'SQL'
input_serialization = {'Parquet': {}}
output_serialization = {'CSV': {}}

def creditCard():
    return "Credit card"
def cash():
    return "Cash"
def noCharge():
    return "No charge"
def dispute():
    return "Dispute"
def unknow():
    return "Unknow"
def voidedTrip():
    return "Voided trip"
def default():
    return "Incorrect payment type"

switcher = {
    1: creditCard,
    2: cash,
    3: noCharge,
    4: dispute,
    5: unknow,
    6: voidedTrip
    }

def switch(payment_type):
    return switcher.get(payment_type, default)()

sum = 0
for i in range(1, 6):
    # Execute S3 Select query
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        Expression=f"select count(payment_type) from s3object s where payment_type = {i}",
        ExpressionType=expression_type,
        InputSerialization=input_serialization,
        OutputSerialization=output_serialization,
    )
    
    # Iterate through the response and print each line
    for event in response['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            sum = sum + int(records)
            print(f"ประเภทการจ่ายเงิน {switch(i)} มีจำนวน yellow taxi rides เท่ากับ {records}")
print(f"มีจำนวน yellow taxi rides ทั้งหมดเท่ากับ {sum}") 

ประเภทการจ่ายเงิน Credit card มีจำนวน yellow taxi rides เท่ากับ 6506189

ประเภทการจ่ายเงิน Cash มีจำนวน yellow taxi rides เท่ากับ 3144926

ประเภทการจ่ายเงิน No charge มีจำนวน yellow taxi rides เท่ากับ 46257

ประเภทการจ่ายเงิน Dispute มีจำนวน yellow taxi rides เท่ากับ 13447

ประเภทการจ่ายเงิน Unknow มีจำนวน yellow taxi rides เท่ากับ 1

มีจำนวน yellow taxi rides ทั้งหมดเท่ากับ 9710820


In [8]:
import numpy as np
PUL_ID = df['PULocationID'].unique()
np.sort(PUL_ID)


array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
        14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
        28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,  40,
        41,  42,  43,  45,  46,  47,  48,  49,  50,  51,  52,  53,  54,
        55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,  66,  67,
        68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,  79,  80,
        81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,  92,  93,
        94,  95,  96,  97,  98, 100, 101, 102, 105, 106, 107, 108, 109,
       110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122,
       123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135,
       136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148,
       149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161,
       162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174,
       175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 18

In [None]:
import boto3

s3 = boto3.client('s3')

# Answer a
bucket = 'nyc-tlc-cs653-5132'
key = 'yellow_tripdata-2017-01.parquet'
expression_type = 'SQL'
input_serialization = {'Parquet': {}}
output_serialization = {'CSV': {}}

# คำนวนค่าโดยสารรวมของ rides ในแต่ละจุด
def sum_total_amount(pulid):
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        Expression=f"select sum(total_amount)  from s3object s where PULocationID = {i}",
        ExpressionType=expression_type,
        InputSerialization=input_serialization,
        OutputSerialization=output_serialization,
    )
    
    # Iterate through the response and print each line
    for event in response['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            
            try:
                isinstance(float(records), float)
                return float(records)
            except:
                return None

# จำนวนผู้โดยสารเฉลี่ยต่อ rides ในแต่ละจุด
def avg_passenger_count(pulid):
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        Expression=f"select avg(passenger_count) from s3object s where PULocationID = {i}",
        ExpressionType=expression_type,
        InputSerialization=input_serialization,
        OutputSerialization=output_serialization,
    )
    
    # Iterate through the response and print each line
    for event in response['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            
            try:
                isinstance(float(records), float)
                return float(records)
            except:
                return None            

# Define 3 array, keep to show in DataFrame
pul_id_arr=[]
total_fare_arr=[]
avg_pass_arr=[]

for i in range(1, 266):
    # Execute S3 Select query
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        Expression=f"select count(PULocationID) from s3object s where PULocationID = {i}",
        ExpressionType=expression_type,
        InputSerialization=input_serialization,
        OutputSerialization=output_serialization,
    )
    
    # Iterate through the response and print each line
    for event in response['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            
            pul_id_arr.append(i)
            print(f"จุดรับผู้โดยสารที่ {i} มีจำนวน Yellow taxi rides เท่ากับ {int(records)}")
            
            total_fare = sum_total_amount(i)
            if isinstance(total_fare, float):
                total_fare = float("{:.2f}".format(total_fare))
                total_fare_arr.append(total_fare)
                print(f"ค่าโดยสารรวม {total_fare} $")
            else:
                total_fare = 0.0
                total_fare_arr.append(total_fare)
                print(f"ค่าโดยสารรวม - $")
                
            avg_pass = avg_passenger_count(i)
            if isinstance(avg_pass, float):
                avg_pass = float("{:.2f}".format(avg_pass))
                avg_pass_arr.append(avg_pass)
                print(f"จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ {avg_pass} คน")
            else:
                avg_pass = 0.0
                avg_pass_arr.append(avg_pass)
                print(f"จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ  - คน")
            print(f"-----------------------------------------")
    
            

จุดรับผู้โดยสารที่ 1 มีจำนวน Yellow taxi rides เท่ากับ 696
ค่าโดยสารรวม 59188.54 $
จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ 1.4 คน
-----------------------------------------
จุดรับผู้โดยสารที่ 2 มีจำนวน Yellow taxi rides เท่ากับ 7
ค่าโดยสารรวม 330.26 $
จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ 1.57 คน
-----------------------------------------
จุดรับผู้โดยสารที่ 3 มีจำนวน Yellow taxi rides เท่ากับ 32
ค่าโดยสารรวม 655.01 $
จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ 1.66 คน
-----------------------------------------
จุดรับผู้โดยสารที่ 4 มีจำนวน Yellow taxi rides เท่ากับ 27602
ค่าโดยสารรวม 392919.74 $
จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ 1.65 คน
-----------------------------------------
จุดรับผู้โดยสารที่ 5 มีจำนวน Yellow taxi rides เท่ากับ 2
ค่าโดยสารรวม 69.6 $
จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ 1.5 คน
-----------------------------------------
จุดรับผู้โดยสารที่ 6 มีจำนวน Yellow taxi rides เท่ากับ 38
ค่าโดยสารรวม 1372.88 $
จำนวนผู้โดยสารเฉลี่ยต่อ rides เท่ากับ 1.13 คน
------------------------------

In [None]:
label_data = {'จุดรับผู้โดยสารที่': pul_id_arr, 
              'ค่าโดยสารรวม': total_fare_arr, 
              'จำนวนผู้โดยสารเฉลี่ยต่อรอบ': avg_pass_arr}
dfm = pd.DataFrame(label_data)
dfm

In [23]:
import boto3

s3 = boto3.client('s3')

# Define parameter
bucket = 'nyc-tlc-cs653-5132'
expression_type = 'SQL'
input_serialization = {'Parquet': {}}
output_serialization = {'CSV': {}}

# Define 6 array of payment type, keep to show in DataFrame
cc_arr = []
c_arr = []
nc_arr = []
dp_arr = []
uk_arr = []
vt_arr = []
summary = []

def creditCard():
    return "Credit card"
def cash():
    return "Cash"
def noCharge():
    return "No charge"
def dispute():
    return "Dispute"
def unknow():
    return "Unknow"
def voidedTrip():
    return "Voided trip"
def default():
    print("Incorrect payment type")

switcher = {
    1: creditCard,
    2: cash,
    3: noCharge,
    4: dispute,
    5: unknow,
    6: voidedTrip
    }

def switch(payment_type):
    return switcher.get(payment_type, default)()

def sum_rides_by_month(month):
    sum = 0
    for i in range(1, 6):
        # Execute S3 Select query for answer a
        response = s3.select_object_content(
            Bucket=bucket,
            Key=f'yellow_tripdata-2017-0{month}.parquet',
            Expression=f"select count(payment_type) from s3object s where payment_type = {i}",
            ExpressionType=expression_type,
            InputSerialization=input_serialization,
            OutputSerialization=output_serialization,
        )
    
        # Iterate through the response and print each line
        for event in response['Payload']:
            if 'Records' in event:
                records = event['Records']['Payload'].decode('utf-8')

                try:
                    isinstance(int(records), int)
                    records = int(records)
                except:
                    return None 

                sum = sum + records
                if i==1:
                    cc_arr.append(records)
                elif i==2:
                    c_arr.append(records)
                elif i==3:
                    nc_arr.append(records)
                elif i==4:
                    dp_arr.append(records)
                elif i==5:
                    uk_arr.append(records)
                elif i==6:
                    vt_arr.append(records)
                    
                print(f"ในเดือน {month} ที่ประเภทการจ่ายเงิน {switch(i)} มีจำนวน yellow taxi rides เท่ากับ {records}")

    summary.append(sum)
    print(f"เดือน {month} มีจำนวน yellow taxi rides รวมทั้งสิ้น {sum} รอบ")
    print(f"-----------------------------------------")

In [24]:
for month in range(1,6):
    sum_rides_by_month(month)

ในเดือน 1 ที่ประเภทการจ่ายเงิน Credit card มีจำนวน yellow taxi rides เท่ากับ 6506189
ในเดือน 1 ที่ประเภทการจ่ายเงิน Cash มีจำนวน yellow taxi rides เท่ากับ 3144926
ในเดือน 1 ที่ประเภทการจ่ายเงิน No charge มีจำนวน yellow taxi rides เท่ากับ 46257
ในเดือน 1 ที่ประเภทการจ่ายเงิน Dispute มีจำนวน yellow taxi rides เท่ากับ 13447
ในเดือน 1 ที่ประเภทการจ่ายเงิน Unknow มีจำนวน yellow taxi rides เท่ากับ 1
เดือน 1 มีจำนวน yellow taxi rides รวมทั้งสิ้น 9710820 รอบ
-----------------------------------------
ในเดือน 2 ที่ประเภทการจ่ายเงิน Credit card มีจำนวน yellow taxi rides เท่ากับ 6261976
ในเดือน 2 ที่ประเภทการจ่ายเงิน Cash มีจำนวน yellow taxi rides เท่ากับ 2849713
ในเดือน 2 ที่ประเภทการจ่ายเงิน No charge มีจำนวน yellow taxi rides เท่ากับ 44719
ในเดือน 2 ที่ประเภทการจ่ายเงิน Dispute มีจำนวน yellow taxi rides เท่ากับ 13367
ในเดือน 2 ที่ประเภทการจ่ายเงิน Unknow มีจำนวน yellow taxi rides เท่ากับ 0
เดือน 2 มีจำนวน yellow taxi rides รวมทั้งสิ้น 9169775 รอบ
-----------------------------------------
ในเดือ

In [25]:
label_data = {'month': ['January', 'February', 'March', 'April', 'May'], 
              'Credit card': cc_arr, 
              'Cash': c_arr,
              'No charge': nc_arr,
              'Dispute': dp_arr,
              'Unknow': uk_arr,
              'Summary': summary
             }
dfm = pd.DataFrame(label_data)
dfm

Unnamed: 0,month,Credit card,Cash,No charge,Dispute,Unknow,Summary
0,January,6506189,3144926,46257,13447,1,9710820
1,February,6261976,2849713,44719,13367,0,9169775
2,March,6994699,3231928,53815,14999,0,10295441
3,April,6695495,3281576,54383,15680,1,10047135
4,May,6780947,3250362,55027,15791,0,10102127
