In [40]:
import boto3, re, sys, math, json, os, sagemaker, urllib.request
import io
import sagemaker
from sagemaker import get_execution_role
from IPython.display import Image
from IPython.display import display
from time import gmtime, strftime
from pyathena import connect
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

In [41]:
sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

In [42]:
ingest_create_athena_db_passed = False

In [43]:
# set a database name
database_name = "inspection"

In [44]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [45]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [46]:
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)
print(statement)
pd.read_sql(statement, conn)

CREATE DATABASE IF NOT EXISTS inspection


In [47]:
statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
df_show.head(5)

Unnamed: 0,database_name
0,default
1,inspection


In [48]:
if database_name in df_show.values:
    ingest_create_athena_db_passed = True

In [49]:
%store ingest_create_athena_db_passed

Stored 'ingest_create_athena_db_passed' (bool)


In [50]:
%store

Stored variables and their in-db values:
ingest_create_athena_db_passed             -> True
s3_private_path_tsv                        -> 's3://sagemaker-us-east-1-373893336492/inspections
s3_public_path_tsv                         -> 's3://inspections-pds'
setup_dependencies_passed                  -> True
setup_iam_roles_passed                     -> True
setup_s3_bucket_passed                     -> True


In [51]:
ingest_create_athena_tsv_passed = False

In [52]:
try:
    ingest_create_athena_db_passed
except NameError:
    print("ERROR YOU HAVE TO CREATE THE DATABASE")

In [53]:
print(ingest_create_athena_db_passed)

True


In [54]:
print(ingest_create_athena_db_passed)

True


In [64]:
s3_private_path_tsv = "s3://{}/".format(bucket)

print(s3_private_path_tsv)

s3://sagemaker-us-east-1-373893336492/


In [74]:
# SQL statement to execute
inspect_path = "s3://{}/inspections-pds/inspections/".format(bucket)

table_name = 'inspect'

statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
                business_id string,
                business_name string,
                business_address string,
                inspection_date date,
                inspection_score int,
                inspection_type string,
                violation_description string,
                risk_category string,
                sup_dist string
                )
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY '\\t'
                LOCATION '{}'
                TBLPROPERTIES ('skip.header.line.count'='1')""".format(
                    database_name, table_name, inspect_path)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS inspection.inspect(
                business_id string,
                business_name string,
                business_address string,
                inspection_date date,
                inspection_score int,
                inspection_type string,
                violation_description string,
                risk_category string,
                sup_dist string
                )
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY '\t'
                LOCATION 's3://sagemaker-us-east-1-373893336492/inspections-pds/inspections/'
                TBLPROPERTIES ('skip.header.line.count'='1')


In [75]:
pd.read_sql(statement, conn)

In [76]:
# SQL statement to execute
dist_path = "s3://{}/inspections-pds/district/".format(bucket)
table_name = 'dist'

statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
                sup_name string,
                sup_dist string
                )
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY '\\t'
                LINES TERMINATED BY '\\n'
                LOCATION '{}'
                TBLPROPERTIES ('skip.header.line.count'='1')""".format(
                    database_name, table_name, dist_path)

print(statement)

CREATE EXTERNAL TABLE IF NOT EXISTS inspection.dist(
                sup_name string,
                sup_dist string
                )
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY '\t'
                LINES TERMINATED BY '\n'
                LOCATION 's3://sagemaker-us-east-1-373893336492/inspections-pds/district/'
                TBLPROPERTIES ('skip.header.line.count'='1')


In [77]:
pd.read_sql(statement, conn)

In [91]:
table_name = 'buildings'
buildings_path = "s3://{}/inspections-pds/build/".format(bucket)

statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
                business_id string,
                business_name string,
                business_address string,
                inspection_date date,
                inspection_score int,
                inspection_type string,
                violation_description string,
                risk_category string,
                sup_dist string
                )
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY '\\t'
                LOCATION '{}'
                TBLPROPERTIES ('skip.header.line.count'='1')""".format(
                    database_name, table_name, buildings_path)

In [91]:
pd.read_sql(statement, conn)

In [96]:
table_name = 'registered'
buildings_path = "s3://{}/inspections-pds/businesses/".format(bucket)

statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
                street_address string,
                business_start_date date,
                location_start_date date,
                dba_name string
                )
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY '\\t'
                LOCATION '{}'
                TBLPROPERTIES ('skip.header.line.count'='1')""".format(
                    database_name, table_name, buildings_path)

In [97]:
pd.read_sql(statement, conn)

In [98]:
statement = "show tables in {}".format(database_name)
pd.read_sql(statement, conn)

Unnamed: 0,tab_name
0,buildings_2
1,dist
2,inspect
3,registered


In [None]:
pd.read_sql(f'SELECT * FROM {database_name}.{table_name} t1 INNER JOIN \
                            {database_name}.{table_name2} t2 ON t1.seg_id \
                            = t2.seg_id LIMIT 5', conn)

In [None]:
df = pd.read_sql(f'SELECT * FROM (SELECT * FROM {database_name}.{table_name} \
                           t1 INNER JOIN {database_name}.{table_name2} t2 \
                           ON t1.seg_id = t2.seg_id) m1 LEFT JOIN (SELECT street_name, \
                                                                   SUM(total_count) total_count \
                                                                   FROM {database_name}.{table_name3} \
                                                                   GROUP BY street_name) t3 \
                           ON m1.address_street = t3.street_name', conn)

In [71]:
!aws s3 cp --recursive $s3_public_path_tsv/ $s3_private_path_tsv/buildings/ --exclude "*" --include "buildings.tsv"


usage: aws s3 cp <LocalPath> <S3Uri> or <S3Uri> <LocalPath> or <S3Uri> <S3Uri>
Error: Invalid argument type


In [92]:
df = pd.read_sql(statement, conn)


In [95]:
statement = "show tables in {}".format(database_name)
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,tab_name
0,buildings_2
1,dist
2,inspect


In [93]:
print(buildings_path)

s3://sagemaker-us-east-1-373893336492/inspections-pds/build/


In [90]:
!aws s3 ls --recursive $buildings_path