# ให้น.ศ. เขียน python script hw2_xxxx.py ซึ่งใช้ AWS SDK for Python (Boto3) เพื่อ
สร้าง Amazon S3 bucket ชื่อ nyctlc-cs653-xxxx โดยแทน xxxx ด้วยเลข 4 ตัวท้ายของรหัสน.ศ. ของตัวเอง 


นำเข้าข้อมูล NYC TLC trip data (จาก https://registry.opendata.aws/nyc-tlc-trip-records-pds/)  ที่เกี่ยวข้องเข้า Bucket ที่สร้างไว้ในข้อ 1) 

In [None]:
pip install pyarrow

In [None]:
import boto3
import botocore
import pandas as pd
from IPython.display import display, Markdown

In [None]:
s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')

In [None]:
def create_bucket(bucket):
    import logging

    try:
        s3.create_bucket(Bucket=bucket)
    except botocore.exceptions.ClientError as e:
        logging.error(e)
        return 'Bucket ' + bucket + ' could not be created.'
    return 'Created or already exists ' + bucket + ' bucket.'

In [None]:
create_bucket('nyctlc-cs653-5199')

In [None]:
def list_buckets(match=''):
    response = s3.list_buckets()
    if match:
        print(f'Existing buckets containing "{match}" string:')
    else:
        print('All existing buckets:')
    for bucket in response['Buckets']:
        if match:
            if match in bucket["Name"]:
                print(f'  {bucket["Name"]}')

In [None]:
list_buckets(match='open')

In [None]:
def list_bucket_contents(bucket, match='', size_mb=0):
    bucket_resource = s3_resource.Bucket(bucket)
    total_size_gb = 0
    total_files = 0
    match_size_gb = 0
    match_files = 0
    for key in bucket_resource.objects.all():
        key_size_mb = key.size/1024/1024
        total_size_gb += key_size_mb
        total_files += 1
        list_check = False
        if not match:
            list_check = True
        elif match in key.key:
            list_check = True
        if list_check and not size_mb:
            match_files += 1
            match_size_gb += key_size_mb
            print(f'{key.key} ({key_size_mb:3.0f}MB)')
        elif list_check and key_size_mb <= size_mb:
            match_files += 1
            match_size_gb += key_size_mb
            print(f'{key.key} ({key_size_mb:3.0f}MB)')

    if match:
        print(f'Matched file size is {match_size_gb/1024:3.1f}GB with {match_files} files')            
    
    print(f'Bucket {bucket} total size is {total_size_gb/1024:3.1f}GB with {total_files} files')

In [None]:
list_bucket_contents(bucket='nyc-tlc', match='2017', size_mb=250)

In [None]:
def preview(bucket, key):
    data_source = {
            'Bucket': bucket,
            'Key': key
        }
    # Generate the URL to get Key from Bucket
    url = s3.generate_presigned_url(
        ClientMethod = 'get_object',
        Params = data_source
    )

    data = pd.read_parquet(url, engine='pyarrow')
    return data

In [None]:
df = preview(bucket='nyc-tlc', key=f'trip data/yellow_tripdata_2017-01.parquet')
df.head(10)

In [None]:
df.shape

In [None]:
df.info()

In [None]:
df.describe()

In [None]:
def key_exists(bucket, key):
    try:
        s3_resource.Object(bucket, key).load()
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            # The key does not exist.
            return(False)
        else:
            # Something else has gone wrong.
            raise
    else:
        # The key does exist.
        return(True)

def copy_among_buckets(from_bucket, from_key, to_bucket, to_key):
    if not key_exists(to_bucket, to_key):
        s3_resource.meta.client.copy({'Bucket': from_bucket, 'Key': from_key}, 
                                        to_bucket, to_key)        
        print(f'File {to_key} saved to S3 bucket {to_bucket}')
    else:
        print(f'File {to_key} already exists in S3 bucket {to_bucket}') 

In [None]:
for i in range(1,6):
    month_str = f'0{i}' if i < 10 else f'{i}'
    from_key = f'trip data/yellow_tripdata_2017-{month_str}.parquet'
    to_key = from_key
    copy_among_buckets(from_bucket='nyc-tlc', from_key=from_key,
                      to_bucket='nyctlc-cs653-5199', to_key=f'yellow_tripdata_2017-{month_str}.parquet')

# query ข้อมูลด้วย Amazon S3 select **เพื่อตอบคำถามต่อไปนี้**

ในเดือน Jan 2017 มีจำนวน yellow taxi rides ทั้งหมดเท่าไร แยกจำนวน rides ตามประเภทการจ่ายเงิน (payment)

In [None]:
import boto3
s3 = boto3.client('s3')
total = 0
for i in range(1,6):
    resp = s3.select_object_content(
        Bucket = 'nyctlc-cs653-5199',
        Key = 'yellow_tripdata_2017-01.parquet',
        Expression = f'SELECT COUNT(payment_type) FROM s3object s WHERE payment_type={i}',
        ExpressionType='SQL',  
        InputSerialization={'Parquet':{}},
        OutputSerialization={'CSV':{}},
    )
    for event in resp['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            total = total + int(records)
            print(f'จำนวน yellow taxi ที่มี payment_type={i} เท่ากับ {records}')
print(f'มี yellow taxi รวมทั้งสิ้น {total} คัน')

จำนวน yellow taxi ที่มี payment_type=1 เท่ากับ 6506189

จำนวน yellow taxi ที่มี payment_type=2 เท่ากับ 3144926

จำนวน yellow taxi ที่มี payment_type=3 เท่ากับ 46257

จำนวน yellow taxi ที่มี payment_type=4 เท่ากับ 13447

จำนวน yellow taxi ที่มี payment_type=5 เท่ากับ 1

มี yellow taxi รวมทั้งสิ้น 9710820 คัน

ตอบ ในเดือน Jan 2017 Yellow taxi rides ในแต่ละจุดรับผู้โดยสาร (Pickup location) เป็นจำนวน rides มากน้อยเท่าไร และมีค่าโดยสารรวมของ rides และจำนวนผู้โดยสารเฉลี่ยต่อ rides ในแต่ละจุดเท่าไร 

In [None]:
import boto3
s3 = boto3.client('s3')
total = 0
for i in range(1,6):
    resp = s3.select_object_content(
        Bucket = 'nyctlc-cs653-5199',
        Key = 'yellow_tripdata_2017-01.parquet',
        Expression = f'SELECT COUNT(PULocationID) FROM s3object s WHERE PULocationID={i}',
        ExpressionType='SQL',  
        InputSerialization={'Parquet':{}},
        OutputSerialization={'CSV':{}},
    )
    for event in resp['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            total = total + int(records)
            print(f'จำนวน yellow taxi ที่มี PULocationID={i} เท่ากับ {records}')
print(f'มี yellow taxi รวมทั้งสิ้น {total} คัน')

ตอบ PULocationID ทั้ง 265 ค่า ตัวอย่าง จุดรับผู้โดย􏰁ารจุดที่ 39 มีค่า PULocationID=180 แปลว่ามีรถแท็กซี่จอด ณ จุด นั้น 180 เที่ยว
ในการคานวณค่าโดยการรวมโดยแบ่งตาม PULocationID จะแบ่งการทางานอยู่ในฟังก์ชัน และการรับการคำนวณผู้โดยสารเฉลี่ยต่อเที่ยว ใช้ฟังก์ชัน
 
cal_total_fare
cal_avg_passenger_count
 
ในการคานวณค่าโดยสารรวมเลือกพิจารณาคอลัมน์total_amountเพราะเป็นค่าใช้จ่ายที่ ครอบคลุมค่าโดยสาร ค่าผ่านทาง และค่าภาษี ในค่าผ่านทาง S3 Select มีการใช้ฟังก์ชัน SUM ช่วยหาผลรวม SELECT SUM(total_amount) FROM s3object s WHERE PULocationID={ id }

In [None]:
import numpy as np
yellow_jan_PULocationID = df['PULocationID'].unique()
np.sort(yellow_jan_PULocationID)

In [None]:
def cal_total_fare(id):
  resp = s3.select_object_content(
        Bucket = 'nyctlc-cs653-5199',
        Key = 'yellow_tripdata_2017-01.parquet',
        Expression = f'SELECT SUM(total_amount) FROM s3object s WHERE PULocation={id}',
        ExpressionType='SQL',  
        InputSerialization={'Parquet':{}},
        OutputSerialization={'CSV':{}},
    )
    for event in resp['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            try:
              isinstance(float(records),float)
              return float(records)
            except:
              return None

ตอบ มีการสร้าง list 3 ตัวเพ่ือเตรียมการสร้าง dataFrame ในตอนท้าย ได้แก่ PULocationID สำาหรับเก็บ id ของจุดรับผู้โดยสารทั้ง 265 แห่ง, สำาหรับเก็บค่าโดยสารรวม แบ่งตาม PULocationID, และสำหรับเก็บจานวนผู้โดยสารเฉลี่ยต่อ เที่ยวในแต่ละจุด ดังนั้น list ท้ัง 3 ตัวนี้ต้องมีขนาด 265 เท่ากัน

ในเดือน Jan - Mar 2017 มีจำนวน yellow taxi rides ทั้งหมดเท่าไร แยกจำนวน rides ตามประเภทการจ่ายเงิน (payment)

In [None]:
def cal_rides_each_month(month):
  total = 0
  for type in range(1,6)
    resp = s3.select_object_content(
          Bucket = 'nyctlc-cs653-5199',
          Key = 'yellow_tripdata_2017-{month}.parquet',
          Expression = f'SELECT COUNT(payment_type) FROM s3object s WHERE payment_type={type}',
          ExpressionType='SQL',  
          InputSerialization={'Parquet':{}},
          OutputSerialization={'CSV':{}},
      )
    for event in resp['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            records =int(records)
            total = total + records
            type1+type.append(records)
            print(f'จำนวน yellow taxi rides เดือน {mont} ที่มี payment_type={type} เท่ากับ {records}')
    sum_rides.append(total)
    print(f'rides เดือน {month} มี yellow taxi rides รวมทั้งสิ้น {total} เที่ยว')

In [None]:
for month in range(1,6):
  cal_rides_each_month(month)

In [None]:
data={
    'month':['Jan','Feb','Mar','Apr','May'],
    'payment type 1': type1,
    'payment type 2': type2,
    'payment type 3': type3,
    'payment type 4': type4,
    'payment type 5': type5,
    'sum':sum_rides
}
hw_item3 = pd.DataFrame(data)
hw_item3