## S3 連接操作

**S3 連接參數**

為了教學的目的, 學院在地端啟動了Minio的服務來模擬公有雲的s3服務。

請使用以下的帳號來登入Minio:

* `endpoint_url` (s3服務的網路位址): **http://10.34.124.114:9500**
* `aws_access_key_id` (使用者帳號): **demo**
* `aws_secret_access_key` (使用者密碼): **demo8888**
* `region_name` (區域編碼): **us-east-1**

**範例數據**: `nyc_taxi_dataset.zip`檔案是`紐約市計程車和禮車協會`所發佈的一個公開資料集。

* 檔案目錄: `data/nyc_taxi_dataset.zip`
* 資料筆數: 729,322 （73萬筆）
* 資料格式: CSV

資料欄位:
* id - 車程編號
* vendor_id - 計程車行編號
* pickup_datetime - 上車日期時間
* dropoff_datetime - 下車日期時間
* passenger_count - 乘客人數
* pickup_longitude - 上車經度
* pickup_latitude - 上車緯度
* dropoff_longitude - 下車經度
* dropoff_latitude - 下車緯度
* store_and_fwd_flag - 是否直接連線上傳b Y/N
* trip_duration - 車程持續時間(以秒為單位)

## 作業#01

1.在MiniO Web建立你的bucket(`de+{工號}`)。  
2.將CSV文件加載到DataFrame中並導出成Parquet文件(`to_parquet`, 並且以gzip來進行壓縮), 然後上傳到學員個人的bucket。

* **bucket_name**: de+{工號}
* **object_key**: nyc_taxi_trip_duration.parquet.gz

In [19]:
## task01
import pandas as pd
import boto3

MY_BUCKET_NAME = 'dez18033044'
MY_OBJECT_KEY = 'nyc_taxi_trip_duration.parquet.gz'

df_nyc_taxi_dataset = pd.read_csv("data/nyc_taxi_dataset.zip", compression='zip')
print(df_nyc_taxi_dataset.head(10))
df_nyc_taxi_dataset.to_parquet("nyc_taxi_trip_duration.parquet.gz", compression='gzip')
# Upload csv to MinIO
s3_resource = boto3.resource('s3',
                    endpoint_url='http://10.34.124.114:9500',
                    aws_access_key_id='demo',
                    aws_secret_access_key='demo8888',
                    region_name='us-east-1')

s3_resource.Object(MY_BUCKET_NAME, MY_OBJECT_KEY).upload_file(Filename="nyc_taxi_trip_duration.parquet.gz")
print('Upload file to MinIO successfully')

          id  vendor_id      pickup_datetime     dropoff_datetime  \
0  id1080784          2  2016-02-29 16:40:21  2016-02-29 16:47:01   
1  id0889885          1  2016-03-11 23:35:37  2016-03-11 23:53:57   
2  id0857912          2  2016-02-21 17:59:33  2016-02-21 18:26:48   
3  id3744273          2  2016-01-05 09:44:31  2016-01-05 10:03:32   
4  id0232939          1  2016-02-17 06:42:23  2016-02-17 06:56:31   
5  id1918069          2  2016-02-14 18:31:42  2016-02-14 18:55:57   
6  id2429028          1  2016-04-20 20:30:14  2016-04-20 20:36:51   
7  id1663798          2  2016-06-19 16:48:14  2016-06-19 17:06:35   
8  id2436943          2  2016-03-28 19:17:03  2016-03-28 19:48:29   
9  id2933909          1  2016-04-10 22:01:41  2016-04-10 22:25:30   

   passenger_count  pickup_longitude  pickup_latitude  dropoff_longitude  \
0                1        -73.953918        40.778873         -73.963875   
1                2        -73.988312        40.731743         -73.994751   
2           

## 作業#02

透過教學的方法,來從s3 (minio)上讀取這個nyc_taxi_trip_duration.parquet.gz檔案, 並計算出總共有多少名乘客數(total passengers)。並打印出來。

* **bucket_name**: de+{工號}
* **object_key**: nyc_taxi_trip_duration.parquet.gz

In [2]:
## task02
import io
import os
import pandas as pd
import boto3
import tempfile

total_passenger = 0
MY_BUCKET_NAME = 'dez18033044'
MY_OBJECT_KEY = 'nyc_taxi_trip_duration.parquet.gz'

s3_resource = boto3.resource('s3',
                    endpoint_url='http://10.34.124.114:9500',
                    aws_access_key_id='demo',
                    aws_secret_access_key='demo8888',
                    region_name='us-east-1')

with tempfile.NamedTemporaryFile('w+') as fp:
    temp_filename = fp.name

    # download s3 object to local folder
    try:
        s3_resource.Object(MY_BUCKET_NAME, MY_OBJECT_KEY).download_file(temp_filename)
        print('Download s3 object to local success!')
    except ClientError as e:
        printt('Download s3 object to local fail:{e}')

    # read CSV into DataFrame
    df_nyc_taxi_dataset_01 = pd.read_parquet(temp_filename)
    
    total_passenger = df_nyc_taxi_dataset_01['passenger_count'].sum()

print(f'Total passengers: {total_passenger}')

Download s3 object to local success!
Total passengers: 1212173


# 資料庫連接操作

這個作業主要是要讓學員學習如何連接到關聯型資料庫並進行一些讀取與寫入的操作。同時為讓學員去思考, 將數據寫入到資料庫與寫入到BlobStorage服務的差別, 以及將數據從資料庫讀取與從BlobStorage服務來讀取在日常的數據ETL的設計上該如何去構思。

## 作業#03

請使用DBAPI來連結到這個範例資料庫, 並請計算出15個資料表每一個資料表的Record Count的筆數。

In [16]:
#task01
import psycopg2

# all tables
tables = ['category', 'film_category', 'film', 'language', 'film_actor', 'inventory', 
          'rental', 'payment', 'staff', 'actor', 'customer', 'address', 'city', 'country', 'store']

results = {
    'category':0, 'film_category':0, 'film':0, 'language':0, 'film_actor':0, 'inventory':0, 
          'rental':0, 'payment':0, 'staff':0, 'actor':0, 'customer':0, 'address':0, 'city':0, 'country':0, 'store':0
}

db_conn = psycopg2.connect(host='10.34.124.114', dbname='dvdrental', user='dxlab', password='wistron888')
print('Connect [postgres] database successfully!')

# write your code below

# Open a cursor to perform database operations
cur = db_conn.cursor()

for table in tables:
    # 執行你的SQL
    cur.execute(f"SELECT COUNT(1) FROM {table}")
    results[table] = cur.fetchone()
    
db_conn.close()
# print out the final result
print(results)

Connect [postgres] database successfully!
{'category': (17,), 'film_category': (1000,), 'film': (1000,), 'language': (6,), 'film_actor': (5462,), 'inventory': (4581,), 'rental': (16044,), 'payment': (14606,), 'staff': (2,), 'actor': (200,), 'customer': (599,), 'address': (603,), 'city': (600,), 'country': (109,), 'store': (2,)}


## 作業#04

請使用DBAPI來連結到這個範例資料庫, 並且執行一個Left-join結果的Query然後把結果存成一個CSV檔(`join_query.csv`)。
CSV的檔案必需要包含欄位名稱(column in 1st row), 欄位之間以","來分隔。並且把這個檔案上傳到學員個人的bucket(`de+{工號}`)。

**S3 連接參數**

請使用以下的帳號來登入Minio, 每個學員都是不同的登入帳號與密碼:

* `endpoint_url` (s3服務的網路位址): **http://10.34.124.114:9000**
* `aws_access_key_id` (使用者帳號): **demo**
* `aws_secret_access_key` (使用者密碼): **demo8888**
* `region_name` (區域編碼): **us-east-1**

* `object_key`: **join_query.csv**


**CSV導出參考**: https://gist.github.com/madan712/f27ac3b703a541abbcd63871a4a56636

```sql
SELECT
    film.film_id,
    title,
    inventory_id
FROM
    film
LEFT JOIN inventory 
    ON inventory.film_id = film.film_id
ORDER BY title;
```


![](images/film-and-inventory-tables.png)

**SQL語法參考**: https://www.postgresqltutorial.com/postgresql-left-join/

In [15]:
#task02
import psycopg2
import pandas as pd
import boto3

stmt = """
SELECT
    film.film_id,
    title,
    inventory_id
FROM
    film
LEFT JOIN inventory 
    ON inventory.film_id = film.film_id
ORDER BY title;
"""
MY_BUCKET_NAME = 'dez18033044'
MY_OBJECT_KEY = 'join_query.csv'

db_conn = psycopg2.connect(host='10.34.124.114', dbname='dvdrental', user='dxlab', password='wistron888')
print('Connect [postgres] database successfully!')

# write your code below
cursor = db_conn.cursor()

cursor.execute(stmt)
# Get the query title name and content
query_title = [desc[0] for desc in cursor.description]

query_list = []
for each in cursor.fetchall():
    query_list.append(list(each))

# Convert query result as a DataFrame
df_inventory = pd.DataFrame(query_list, columns=query_title)

df_inventory.to_csv('join_query.csv', sep=',', index=False)
# Upload csv to MinIO
s3_resource = boto3.resource('s3',
                    endpoint_url='http://10.34.124.114:9500',
                    aws_access_key_id='demo',
                    aws_secret_access_key='demo8888',
                    region_name='us-east-1')
s3_resource.Bucket(MY_BUCKET_NAME).upload_file(Key=MY_OBJECT_KEY, Filename="join_query.csv")
print('Upload file to MinIO successfully')
db_conn.close()

Connect [postgres] database successfully!
Upload file to MinIO successfully
