🦄🦄🦄       🦄🦄🦄     🦄🦄🦄      🦄🦄🦄      🦄🦄🦄      🦄🦄🦄     🦄🦄🦄    🦄🦄🦄    🦄🦄🦄   🦄🦄🦄    🦄🦄🦄     🦄🦄🦄 🦄🦄🦄 🦄🦄🦄 🦄🦄🦄 

**Welcome to the ETL task for getting Unicorn.Sales🦄 Business to New heights.**

In this notebook, we will carry out following *ETL Tasks*:

1. *Download Data: Get the Data from prepopulated Aurora Database from your account.*  
2. *Data Cleaning: Clean the data, add/delete/rename columns in the data downloaded in step 1.*
3. *Data Upload: Upload these cleaned csv's to S3 bucket.*

As a player, you will need to fill in values or modify the code wherever it is noted as `PlayerInputRequired` 



-----------------------------------------------------------------------------------------------------------------------------------------------------------

## Task 1: **Downloadig Data**

In this task, we will: 
- [ ] Install Dependencies
- [ ] Connect to database 
- [ ] Query database tables for the data

#### **Install Dependencies**  
[Takes about 5 mins] 

`psycopg2` is a python package to set up connection with rds database which holds the data. As psycopg2 has multiple depencies which may conflict with other packages when installed using pip, we will installing this package using conda install. Conda install handles dependcies better given the numerous conda environments deployed by sagemaker studio. 

In [9]:
%conda install psycopg2

Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.

Retrieving notices: ...working... done

Note: you may need to restart the kernel to use updated packages.


- [x] Install Dependencies

#### **Connect to Database**

  *To set up connection with database, we will need following database credentials. From* [AWS secret manager](https://console.aws.amazon.com/secretsmanager/), *look up hostname(endpoint), region and password. We will initialize all the credentials/params in the following cell.*
  
Following is an **example**: 
 
```
host="gameday-aurora-postgres.cluster-cj8rrrrrkb.us-east-2.rds.amazonaws.com"       <- db endpoint  
PORT="5432"                                                                         <- default
USER="postgres"                                                                     <- default
DBNAME="postgres"                                                                   <- DBName 
password = "PASSWORD"                                                               <- database password  
```

In [8]:
host="ENDPOINT_PlayerInputRequired"                         # PlayerInputRequired                           
PORT="5432"
USER="postgres"
DBNAME="unicorn_sales"
password = "dbPassword_PlayerInputRequired"                 # PlayerInputRequired 

*`Destination bucket` to upload cleaned data. Look up your [stack output](https://console.aws.amazon.com/cloudformation) to get bucket name from key `PersonalizeBucketName`* 
- Select the stack
- Click on Output tab to view all the outputs of your deployed stack

In [None]:
dest_s3Bucket = 'S3_BUCKET_NAME_PlayerInputRequired'   # PlayerInputRequired 

*Import python packages and set up connection using connect method of* `psycopg2`. 

In [None]:
import pandas as pd
import psycopg2
conn = psycopg2.connect(host = host, 
                        database = DBNAME, 
                        user = USER, 
                        password = password)
cur = conn.cursor()

#### **Query database tables for the data**

##### Modify following queries to select desired columns from database. 

In [None]:
itens_df.head()   # print the dataframe to view all the column names and select desired one in following query.

In [None]:
items_df=pd.read_sql("""SELECT *  FROM gameday.product""", conn)     # PlayerInputRequired 

In [None]:
users_df.head()   # print the dataframe to view all the column names and select desired one in following query

In [None]:
users_df=pd.read_sql("""SELECT * FROM gameday.customer """, conn)    # PlayerInputRequired

In [None]:
interactions_df.head().  # print the dataframe to view all the column names and select desired one in following query

In [None]:
interactions_df=pd.read_sql("""SELECT * FROM gameday.internetsales""", conn)  # PlayerInputRequired

In [None]:
# make a copy of the original interactions as it will be referenced couple of time after cleaning
interactions_df_orig = interactions_df

-----------------------------------------------------------------------------------------------------------------------------------------------------------

# Task 2: Data Cleaning

#### Clean `interactions_df`

In [None]:
import numpy as np

# filter unique product keys
unique_product_keys = set(items_df['productkey'].unique())  

# filter interactions data rows where product key exists in unique product key
interactions_df = interactions_df[interactions_df['productkey'].isin(unique_product_keys)]  

#convert order datetime column datatype to pandas datetime data type
interactions_df['OrderDate_datetime'] = pd.to_datetime(interactions_df['orderdate'])

#add column TIMESTAMP which contains above OrderDate time converted in numpy int64 datatype
interactions_df['TIMESTAMP'] = interactions_df.OrderDate_datetime.values.astype(np.int64) // 10 ** 9
 
# column rename column 'productkey' to 'ITEM_ID' and column 'customerkey' to 'USER_ID'
interactions_df = interactions_df.rename(columns={'productkey':'ITEM_ID', 'customerkey':'USER_ID'}) 

# select columns ITEM_ID, USER_ID and TIMESTAMP only from interactions dataframe and drop other columns. 
interactions_df = interactions_df[['ITEM_ID', 'USER_ID', 'TIMESTAMP']]

lets print how interactions_df dataframe looks like:

In [None]:
interactions_df.head(3)

Save cleaned df to csv in local directory

In [None]:
# import os and pathlib to create directory locally
import os
from pathlib import Path 

# path where we want to store the cleaned interactions dataframe
path = Path("./data/clean")

# create the directory structure if it doesnt exist
path.mkdir(parents=True, exist_ok=True)

# save the dataframe to a csv file named interactions.csv
interactions_df.to_csv(os.path.join(path, 'interactions.csv'), index=False)

#### Clean `users_df` 

In [None]:
# filter unique product keys
unique_user_keys = set(interactions_df_orig['customerkey'].unique())

# filter users data rows where product key exists in unique product key
users_df = users_df[users_df['customerkey'].isin(unique_user_keys)]

# column rename column customerkey to USER_ID, 'gender' to 'Gender', 'yearlyincome' to 'YearlyIncome
users_df = users_df.rename(columns={'customerkey':'USER_ID', 'gender': 'Gender', 'yearlyincome': 'YearlyIncome'})

# save users dataframe locally to users.csv file
users_df.to_csv(os.path.join(path, 'users.csv'), index=False)

In [None]:
#lets print users_df dataframe to checkout cleaned version 
users_df.head(3)

#### Clean `items_df`

In [None]:
items_df = items_df[items_df['productsubcategorykey'] < 4]
items_df['productsubcategorykey'] = items_df['productsubcategorykey'].replace({1: 'Mountain Cosmic Unicorn', 2: 'Road Cosmic Unicorn', 3: 'Touring Cosmic Unicorn'})
items_df = items_df.rename(columns={'productkey':'ITEM_ID', 'productsubcategorykey':'ProductSubcategory', 'listprice': 'ListPrice'})

items_df.head(3)

In [None]:
items_df.to_csv('./data/clean/items.csv', index=False)

-----------------------------------------------------------------------------------------------------------------------------------------------------------

# Task 4: Upload data to S3 bucket

In the previous steps, we cleaned the data and saved dataframe into a csv on our local disk. To use this data for training in personalize, lets save it into s3. 

In [None]:
#upload to s3 bucket
import boto3
boto3.Session().resource('s3').Bucket(dest_s3Bucket)\
        .Object('data/interactions.csv').upload_file('./data/clean/interactions.csv')

#upload to s3 bucket
import boto3
boto3.Session().resource('s3').Bucket(dest_s3Bucket)\
        .Object('data/items.csv').upload_file('./data/clean/items.csv')

#upload to s3 bucket
import boto3
boto3.Session().resource('s3').Bucket(dest_s3Bucket)\
        .Object('data/users.csv').upload_file('./data/clean/users.csv')

Now that the cleaned csv files are on S3, we will be importing these into personlize engine for training in next steps.

Optional:  Files uploaded to s3 bucket can be checked [here](https://console.aws.amazon.com/s3).

**Congrats!** your have completed ETL task and got data ready for next steps.