# Assignment 3.1: Impacting the Business with a Distributed Data Science Pipeline (Part 2)

# 1. Using the data you identified in the previous module, ingest the data into AWS S3 so that you can access it via SageMaker Studio. You may manually upload to an S3 bucket.
This was done maually and put into the following S3 bucket "s3://sagemaker-us-east-1-508233972396/Instacart-data"

# 2. Once you have manually uploaded your data, you will need to implement a Sagemaker Studio Notebook to copy the data to the Sagemaker S3 bucket, and then to the local SageMaker Instance.
This is done below.

In [5]:
# List all the source files in my S3 Bucket
!aws s3 ls s3://sagemaker-us-east-1-508233972396/Instacart-data/

2022-03-20 21:37:12          0 
2022-03-20 21:42:41       2603 aisles.csv
2022-03-20 21:42:41        270 departments.csv
2022-03-20 21:42:38  577550706 order_products__prior.csv
2022-03-20 21:42:38   24680147 order_products__train.csv
2022-03-20 21:42:38  108968645 orders.csv
2022-03-20 21:42:40    2166953 products.csv
2022-03-20 21:42:39    1475693 sample_submission.csv


In [6]:
# Load necessary libraries and Account IDs
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity().get("Account")

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

# Set S3 Source Location

In [20]:
s3_public_path_csv = "s3://sagemaker-us-east-1-508233972396/Instacart-data"

In [21]:
%store s3_public_path_csv

Stored 's3_public_path_csv' (str)


In [22]:
bucket

'sagemaker-us-east-1-508233972396'

# Set S3 Destination Location

In [23]:
s3_private_path_csv = "s3://{}/Instacart-data/csv".format(bucket)
print(s3_private_path_csv)

s3://sagemaker-us-east-1-508233972396/Instacart-data/csv


In [24]:
%store s3_private_path_csv

Stored 's3_private_path_csv' (str)


# Copy Data From the Source S3 Bucket to our Destination S3 Bucket in this Account

In [26]:
!aws s3 cp --recursive $s3_public_path_csv/ $s3_private_path_csv/ --exclude "*" --include "*.csv"

copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/aisles.csv to s3://sagemaker-us-east-1-508233972396/Instacart-data/csv/aisles.csv
copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/departments.csv to s3://sagemaker-us-east-1-508233972396/Instacart-data/csv/departments.csv
copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/products.csv to s3://sagemaker-us-east-1-508233972396/Instacart-data/csv/products.csv
copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/sample_submission.csv to s3://sagemaker-us-east-1-508233972396/Instacart-data/csv/sample_submission.csv
copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/order_products__train.csv to s3://sagemaker-us-east-1-508233972396/Instacart-data/csv/order_products__train.csv
copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/orders.csv to s3://sagemaker-us-east-1-508233972396/Instacart-data/csv/orders.csv
copy: s3://sagemaker-us-east-1-508233972396/Instacart-data/order_products__prior.csv to 

In [27]:
# List the files in our destination bucket.
!aws s3 ls $s3_private_path_csv/

2022-03-20 22:15:20       2603 aisles.csv
2022-03-20 22:15:20        270 departments.csv
2022-03-20 22:15:20  577550706 order_products__prior.csv
2022-03-20 22:15:20   24680147 order_products__train.csv
2022-03-20 22:15:20  108968645 orders.csv
2022-03-20 22:15:20    2166953 products.csv
2022-03-20 22:15:20    1475693 sample_submission.csv


## Create an Athena Database to help with data exploration

In [34]:
# libraries and IDs

import boto3
import sagemaker
from pyathena import connect

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

In [41]:
database_name = "insta_db"

In [42]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [43]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [44]:
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)
print(statement)

CREATE DATABASE IF NOT EXISTS insta_db


In [45]:
# Execute the sql above.
import pandas as pd

pd.read_sql(statement, conn)

In [46]:
# Execute the sql to Verify database was created.
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,database_name
0,default
1,dsoaws
2,insta_db


In [47]:
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

### Ingest Products Table

In [52]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "products"

In [53]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [54]:
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         product_id int,
         product_name string,
         aisle_id int,
         department_id int

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_csv
)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS insta_db.products(
         product_id int,
         product_name string,
         aisle_id int,
         department_id int

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://sagemaker-us-east-1-508233972396/Instacart-data/csv'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')


In [55]:
# Execute SQL Above
import pandas as pd

pd.read_sql(statement, conn)

In [56]:
# Verify the table was created.

statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,products


In [58]:
product_category = "Chocolate Sandwich Cookies"

statement = """SELECT * FROM {}.{}
    WHERE product_name = '{}' LIMIT 100""".format(
    database_name, table_name_csv, product_category
)

print(statement)

SELECT * FROM insta_db.products
    WHERE product_name = 'Chocolate Sandwich Cookies' LIMIT 100


In [59]:
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,product_id,product_name,aisle_id,department_id
0,1,Chocolate Sandwich Cookies,61,19


### Ingest aisles table

In [63]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "aisles"

In [64]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [65]:
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         aisle_id int,
         aisle string

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_csv
)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS insta_db.aisles(
         aisle_id int,
         aisle string

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://sagemaker-us-east-1-508233972396/Instacart-data/csv'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')


In [66]:
# Execute SQL Above
import pandas as pd

pd.read_sql(statement, conn)

In [67]:
# Verify the table was created.

statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,aisles
1,products


In [68]:
product_category = "coffee"

statement = """SELECT * FROM {}.{}
    WHERE aisle = '{}' LIMIT 100""".format(
    database_name, table_name_csv, product_category
)

print(statement)

SELECT * FROM insta_db.aisles
    WHERE aisle = 'coffee' LIMIT 100


In [69]:
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,aisle_id,aisle
0,26,coffee


### Ingest departments table

In [72]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "departments"

In [73]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [74]:
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         department_id int,
         department string

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_csv
)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS insta_db.departments(
         department_id int,
         department string

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://sagemaker-us-east-1-508233972396/Instacart-data/csv'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')


In [75]:
# Execute SQL Above
import pandas as pd

pd.read_sql(statement, conn)

In [76]:
# Verify the table was created.

statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,aisles
1,departments
2,products


In [77]:
product_category = "alcohol"

statement = """SELECT * FROM {}.{}
    WHERE department = '{}' LIMIT 100""".format(
    database_name, table_name_csv, product_category
)

print(statement)

SELECT * FROM insta_db.departments
    WHERE department = 'alcohol' LIMIT 100


In [78]:
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,department_id,department
0,5,alcohol


### Ingest order_products_train table

In [123]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "order_products_train"

In [124]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [125]:
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         order_id bigint,
         product_id bigint,
         add_to_cart_order bigint,
         reordered int

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_csv
)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS insta_db.order_products_train(
         order_id bigint,
         product_id bigint,
         add_to_cart_order bigint,
         reordered int

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://sagemaker-us-east-1-508233972396/Instacart-data/csv'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')


In [126]:
# Execute SQL Above
import pandas as pd

pd.read_sql(statement, conn)

In [127]:
# Verify the table was created.

statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,aisles
1,departments
2,order_products_train
3,orders
4,products


In [128]:
statement = """SELECT * FROM {}.{} LIMIT 5""".format(
    database_name, table_name_csv
)

print(statement)

SELECT * FROM insta_db.order_products_train LIMIT 5


In [129]:
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,order_id,product_id,add_to_cart_order,reordered
0,1,,,
1,2,,,
2,3,,,
3,4,,,
4,5,,,


### Ingest sample_submission table

In [None]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "sample_submission"

### Ingest order_products_prior table

In [None]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "order_products_prior"

### Ingest orders table

In [115]:
# Set Athena parameters
database_name = "insta_db"
table_name_csv = "orders"

In [116]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [118]:
# SQL statement to execute
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
         order_id bigint,
         user_id int,
         eval_set string,
         order_number int,
         order_dow int,
         order_hour_of_day int,
         days_since_prior_order int

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{}'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')""".format(
    database_name, table_name_csv, s3_private_path_csv
)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS insta_db.orders(
         order_id bigint,
         user_id int,
         eval_set string,
         order_number int,
         order_dow int,
         order_hour_of_day int,
         days_since_prior_order int

) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://sagemaker-us-east-1-508233972396/Instacart-data/csv'
TBLPROPERTIES ('compressionType'='gzip', 'skip.header.line.count'='1')


In [119]:
# Execute SQL Above
import pandas as pd

pd.read_sql(statement, conn)

In [120]:
# Verify the table was created.

statement = "SHOW TABLES in {}".format(database_name)

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,tab_name
0,aisles
1,departments
2,order_products_train
3,orders
4,products


In [121]:
statement = """SELECT * FROM {}.{} LIMIT 5""".format(
    database_name, table_name_csv
)

print(statement)

SELECT * FROM insta_db.orders LIMIT 5


In [122]:
# Execute SQL Above
import pandas as pd

pd.read_sql(statement, conn)

Unnamed: 0,order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
0,1,,,,,,
1,2,,,,,,
2,3,,,,,,
3,4,,,,,,
4,5,,,,,,


# 2. Once the data is accessible, implement code within SageMaker Studio notebooks to explore the data and identify key fields, data types, areas of opportunity and possible bias.

In [None]:
##

# Release Resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

## Check to see if resources are released.

In [None]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}