# AWS Data Lake - Udacity STEDI Project

## 1. Import Package

In [115]:
import boto3
import configparser
from time import sleep
import pandas as pd
from botocore.exceptions import ClientError


## 2. Create resource

### Load configuration from aws.cfg

In [116]:
config = configparser.ConfigParser()
config.read('aws.cfg')

AWS_REGION = config.get('AWS', 'AWS_REGION')
AWS_ACCESS_KEY = config.get('AWS', 'AWS_ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')
S3_BUCKET_NAME = config.get('S3', 'S3_BUCKET_NAME')
GLUE_DB          = config.get("GLUE","GLUE_DB")

### Create S3 bucket

In [117]:
print("Creating clients for S3")
s3 =  boto3.resource('s3',
                    region_name=AWS_REGION,
                    aws_access_key_id=AWS_ACCESS_KEY,
                    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
                        )

print("Creating AWS S3")
try:
    s3.create_bucket(Bucket=S3_BUCKET_NAME, CreateBucketConfiguration={
        'LocationConstraint': AWS_REGION
    })
except Exception as e:
    if 'BucketAlreadyOwnedByYou' in str(e):
        print(f"Bucket '{S3_BUCKET_NAME}' already exists.")


directories = ['customer',
                'step_trainer', 'accelerometer']
                
for directory in directories:
    bucket = s3.Bucket(S3_BUCKET_NAME)
    s3_key = directory + '/'
    try:
        bucket.Object(s3_key).load()
        print(
            f'S3 directory s3://{S3_BUCKET_NAME}/{s3_key} already exists.')
    except:
        print(f'Creating S3 directory s3://{S3_BUCKET_NAME}/{s3_key}')
        bucket.put_object(Key=s3_key)
        # Wait for the directory to be created
        while True:
            try:
                bucket.Object(s3_key).load()
                print(
                    f'S3 directory s3://{S3_BUCKET_NAME}/{s3_key} created successfully.')
                break
            except:
                print(
                    f'Waiting for S3 directory s3://{S3_BUCKET_NAME}/{s3_key} to be created...')
                sleep(5)

Creating clients for S3
Creating AWS S3


Creating S3 directory s3://anhdtv-stedi/customer/
S3 directory s3://anhdtv-stedi/customer/ created successfully.
Creating S3 directory s3://anhdtv-stedi/step_trainer/
S3 directory s3://anhdtv-stedi/step_trainer/ created successfully.
Creating S3 directory s3://anhdtv-stedi/accelerometer/
S3 directory s3://anhdtv-stedi/accelerometer/ created successfully.


## 3. Sync data from local directory to AWS s3

### Set OS environment variables for AWS CLI

In [118]:
import os
os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY
os.environ["AWS_DEFAULT_REGION"] = AWS_REGION

### Sync data from local to S3 bucket

In [119]:
! aws s3 sync ./data/customers/ s3://anhdtv-stedi/customer/landing
! aws s3 sync ./data/accelerometer/ s3://anhdtv-stedi/accelerometer/landing
! aws s3 sync ./data/step_trainer/ s3://anhdtv-stedi/step_trainer/landing

upload: data/customers/customers-1655293787679.json to s3://anhdtv-stedi/customer/landing/customers-1655293787679.json
upload: data/customers/customers-1655295864820.json to s3://anhdtv-stedi/customer/landing/customers-1655295864820.json
upload: data/customers/customers-1655296192446.json to s3://anhdtv-stedi/customer/landing/customers-1655296192446.json
upload: data/customers/customers-1655296191557.json to s3://anhdtv-stedi/customer/landing/customers-1655296191557.json
upload: data/customers/customers-1655296191186.json to s3://anhdtv-stedi/customer/landing/customers-1655296191186.json
upload: data/customers/customers-1655296191788.json to s3://anhdtv-stedi/customer/landing/customers-1655296191788.json
upload: data/customers/customers-1655296191365.json to s3://anhdtv-stedi/customer/landing/customers-1655296191365.json
upload: data/customers/customers-1655293823654.json to s3://anhdtv-stedi/customer/landing/customers-1655293823654.json
upload: data/customers/customers-1655296192003.j

### Check data on S3 bucket

In [120]:
s3 = boto3.resource('s3',
                    aws_access_key_id=AWS_ACCESS_KEY,
                    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                    region_name=AWS_REGION  
)

stedi_bucket = s3.Bucket(S3_BUCKET_NAME)

#### Check Customer data

In [121]:
customer_data = [ obj for obj in stedi_bucket.objects.limit(10).filter(Prefix="customer")]
customer_data

[s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655293787679.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655293823654.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655295864820.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655296152387.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655296191186.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655296191365.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655296191557.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655296191788.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='customer/landing/customers-1655296192003.json')]

#### Check Step_trainer data

In [122]:
step_trainer_data = [ obj for obj in stedi_bucket.objects.limit(10).filter(Prefix="step_trainer")]
step_trainer_data

[s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655296678763.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655471583651.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655562044886.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655562460721.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655562669699.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655563258079.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655563990886.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step_trainer-1655564157236.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='step_trainer/landing/step

#### Check Accelerometer data

In [123]:
accelerometer_data = [ obj for obj in stedi_bucket.objects.limit(10).filter(Prefix="accelerometer")]
accelerometer_data

[s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655296678763.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655471583651.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655562044886.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655562460721.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655562669699.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655563258079.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655563990886.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelerometer/landing/accelerometer-1655564157236.json'),
 s3.ObjectSummary(bucket_name='anhdtv-stedi', key='accelero

## 4. Configuring the S3 VPC Gateway Endpoint

### 4.1 Use the AWS CLI to identify the VPC that needs access to S3:

In [124]:
! aws ec2 describe-vpcs

{
    "Vpcs": [
        {
            "CidrBlock": "172.31.0.0/16",
            "DhcpOptionsId": "dopt-0874ef9faa0280288",
            "State": "available",
            "VpcId": "vpc-0f260bb33946d0513",
            "OwnerId": "004668495805",
            "InstanceTenancy": "default",
            "CidrBlockAssociationSet": [
                {
                    "AssociationId": "vpc-cidr-assoc-0d6e74c47db6c36c9",
                    "CidrBlock": "172.31.0.0/16",
                    "CidrBlockState": {
                        "State": "associated"
                    }
                }
            ],
            "IsDefault": true
        }
    ]
}


### 4.2 Identify the routing table you want to configure with your VPC Gateway

In [125]:
! aws ec2 describe-route-tables

{
    "RouteTables": [
        {
            "Associations": [
                {
                    "Main": true,
                    "RouteTableAssociationId": "rtbassoc-0ca5602dd6ff0b069",
                    "RouteTableId": "rtb-02be98e72fc4f5ef7",
                    "AssociationState": {
                        "State": "associated"
                    }
                }
            ],
            "PropagatingVgws": [],
            "RouteTableId": "rtb-02be98e72fc4f5ef7",
            "Routes": [
                {
                    "DestinationCidrBlock": "172.31.0.0/16",
                    "GatewayId": "local",
                    "Origin": "CreateRouteTable",
                    "State": "active"
                },
                {
                    "DestinationCidrBlock": "0.0.0.0/0",
                    "GatewayId": "igw-02385e932565c21bf",
                    "Origin": "CreateRoute",
                    "State": "active"
                },
                {
           

### 4.3 Create an S3 Gateway Endpoint

In [126]:
! aws ec2 create-vpc-endpoint --vpc-id 'vpc-0f260bb33946d0513' --service-name com.amazonaws.us-west-2.s3 --route-table-ids 'rtb-02be98e72fc4f5ef7'


An error occurred (RouteAlreadyExists) when calling the CreateVpcEndpoint operation: route table rtb-02be98e72fc4f5ef7 already has a route with destination-prefix-list-id pl-68a54001


### 4.4 Creating the Glue Service IAM Role

In [127]:
! aws iam create-role --role-name stedi-role --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole"}]}'


An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name stedi-role already exists.


### 4.5 Grant Glue Privileges on the S3 Bucket

In [128]:
! aws iam put-role-policy --role-name stedi-role --policy-name S3Access --policy-document '{ \
    "Version": "2012-10-17", \
    "Statement": [ \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "s3:GetBucketLocation", \
                "s3:ListBucket", \
                "s3:GetObject", \
                "s3:GetObjectTagging" \
            ], \
            "Resource": [ \
                "arn:aws:s3:::cd0030bucket", \
                "arn:aws:s3:::cd0030bucket/*" \
            ] \
        }, \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "s3:GetBucketLocation", \
                "s3:ListBucket", \
                "s3:GetObject", \
                "s3:PutObject", \
                "s3:deleteObject", \
                "s3:GetObjectTagging", \
                "s3:PutObjectTagging" \
            ], \
            "Resource": [ \
                "arn:aws:s3:::congdinh2023-stedi-lakehouse", \
                "arn:aws:s3:::congdinh2023-stedi-lakehouse/*" \
            ] \
        } \
    ] \
}'

### 4.5 Glu Policy - Give Glue access to data in special S3 buckets used for Glue configuration, and several other resources

In [129]:
! aws iam put-role-policy --role-name stedi-role --policy-name GlueAccess --policy-document '{ \
    "Version": "2012-10-17", \
    "Statement": [ \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "glue:*", \
                "s3:GetBucketLocation", \
                "s3:ListBucket", \
                "s3:ListAllMyBuckets", \
                "s3:GetBucketAcl", \
                "ec2:DescribeVpcEndpoints", \
                "ec2:DescribeRouteTables", \
                "ec2:CreateNetworkInterface", \
                "ec2:DeleteNetworkInterface", \
                "ec2:DescribeNetworkInterfaces", \
                "ec2:DescribeSecurityGroups", \
                "ec2:DescribeSubnets", \
                "ec2:DescribeVpcAttribute", \
                "iam:ListRolePolicies", \
                "iam:GetRole", \
                "iam:GetRolePolicy", \
                "cloudwatch:PutMetricData" \
            ], \
            "Resource": [ \
                "*" \
            ] \
        }, \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "s3:CreateBucket", \
                "s3:PutBucketPublicAccessBlock" \
            ], \
            "Resource": [ \
                "arn:aws:s3:::aws-glue-*" \
            ] \
        }, \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "s3:GetObject", \
                "s3:PutObject", \
                "s3:DeleteObject" \
            ], \
            "Resource": [ \
                "arn:aws:s3:::aws-glue-*/*", \
                "arn:aws:s3:::*/*aws-glue-*/*" \
            ] \
        }, \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "s3:GetObject" \
            ], \
            "Resource": [ \
                "arn:aws:s3:::crawler-public*", \
                "arn:aws:s3:::aws-glue-*" \
            ] \
        }, \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "logs:CreateLogGroup", \
                "logs:CreateLogStream", \
                "logs:PutLogEvents", \
                "logs:AssociateKmsKey" \
            ], \
            "Resource": [ \
                "arn:aws:logs:*:*:/aws-glue/*" \
            ] \
        }, \
        { \
            "Effect": "Allow", \
            "Action": [ \
                "ec2:CreateTags", \
                "ec2:DeleteTags" \
            ], \
            "Condition": { \
                "ForAllValues:StringEquals": { \
                    "aws:TagKeys": [ \
                        "aws-glue-service-resource" \
                    ] \
                } \
            }, \
            "Resource": [ \
                "arn:aws:ec2:*:*:network-interface/*", \
                "arn:aws:ec2:*:*:security-group/*", \
                "arn:aws:ec2:*:*:instance/*" \
            ] \
        } \
    ] \
}'

## 5. Glue

In [130]:
glue = boto3.client('glue')

### 5.1 Create glue database

In [131]:
glue.create_database(DatabaseInput={'Name': GLUE_DB})

{'ResponseMetadata': {'RequestId': 'f9102c2e-c03d-49b0-8a3c-c2e50ef0b77f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:00 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'f9102c2e-c03d-49b0-8a3c-c2e50ef0b77f'},
  'RetryAttempts': 0}}

### 5.3 Create Glue tables

In [132]:
# create Glue tables
table_input_customer = {
    'Name': 'customer_landing',
    'Description': 'Customer landing table from S3',
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'customerName', 'Type': 'string'},
            {'Name': 'email', 'Type': 'string'},
            {'Name': 'phone', 'Type': 'string'},
            {'Name': 'birthDay', 'Type': 'string'},
            {'Name': 'serialNumber', 'Type': 'string'},
            {'Name': 'registrationDate', 'Type': 'bigint'},
            {'Name': 'lastUpdateDate', 'Type': 'bigint'},
            {'Name': 'shareWithResearchAsOfDate', 'Type': 'bigint'},
            {'Name': 'shareWithPublicAsOfDate', 'Type': 'bigint'},
        ],
        'Location': 's3://anhdtv-stedi/customer/landing/',
        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
        'SerdeInfo': {
            'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
            'Parameters': { 'separatorChar': ',' }
        },
        'StoredAsSubDirectories': False
    },
    'TableType': 'EXTERNAL_TABLE'
}

table_input_accelerometer = {
    'Name': 'accelerometer_landing',
    'Description': 'Accelerometer landing table from S3',
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'user', 'Type': 'string'},
            {'Name': 'timeStamp', 'Type': 'bigint'},
            {'Name': 'x', 'Type': 'double'},
            {'Name': 'y', 'Type': 'double'},
            {'Name': 'z', 'Type': 'double'},
        ],
        'Location': 's3://anhdtv-stedi/accelerometer/landing/',
        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
        'SerdeInfo': {
            'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
            'Parameters': {'separatorChar': ','}
        },
        'StoredAsSubDirectories': False
    },
    'TableType': 'EXTERNAL_TABLE'
}

glue.create_table(DatabaseName=GLUE_DB, TableInput=table_input_customer)
glue.create_table(DatabaseName=GLUE_DB, TableInput=table_input_accelerometer)

{'ResponseMetadata': {'RequestId': '16892344-fa8c-4939-88d9-9df79cf2da19',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:01 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '16892344-fa8c-4939-88d9-9df79cf2da19'},
  'RetryAttempts': 0}}

### 5.4 Create customer trusted table

In [133]:
table_input_customer_trusted = {
    'Name': 'customer_trusted',
    'Description': 'Customer trusted table from S3',
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'customerName', 'Type': 'string'},
            {'Name': 'email', 'Type': 'string'},
            {'Name': 'phone', 'Type': 'string'},
            {'Name': 'birthDay', 'Type': 'string'},
            {'Name': 'serialNumber', 'Type': 'string'},
            {'Name': 'registrationDate', 'Type': 'bigint'},
            {'Name': 'lastUpdateDate', 'Type': 'bigint'},
            {'Name': 'shareWithResearchAsOfDate', 'Type': 'bigint'},
            {'Name': 'shareWithPublicAsOfDate', 'Type': 'bigint'},
        ],
        'Location': 's3://anhdtv-stedi/customer/trusted/',
        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
        'SerdeInfo': {
            'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
            'Parameters': { 'separatorChar': ',' }
        },
        'StoredAsSubDirectories': False
    },
    'TableType': 'EXTERNAL_TABLE'
}

glue.create_table(DatabaseName=GLUE_DB, TableInput=table_input_customer_trusted)

{'ResponseMetadata': {'RequestId': '6d86fc3e-a80b-430b-9ae8-15b9693cc7e9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:01 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '6d86fc3e-a80b-430b-9ae8-15b9693cc7e9'},
  'RetryAttempts': 0}}

### 5.4 Create Accelerometer trusted table

In [134]:
table_input_accelerometer_trusted = {
    'Name': 'accelerometer_trusted',
    'Description': 'Accelerometer trusted table from S3',
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'user', 'Type': 'string'},
            {'Name': 'timeStamp', 'Type': 'bigint'},
            {'Name': 'x', 'Type': 'double'},
            {'Name': 'y', 'Type': 'double'},
            {'Name': 'z', 'Type': 'double'},
        ],
        'Location': 's3://anhdtv-stedi/accelerometer/trusted/',
        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
        'SerdeInfo': {
            'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
            'Parameters': {'separatorChar': ','}
        },
        'StoredAsSubDirectories': False
    },
    'TableType': 'EXTERNAL_TABLE'
}

glue.create_table(DatabaseName=GLUE_DB, TableInput=table_input_accelerometer_trusted)

{'ResponseMetadata': {'RequestId': 'a0224f25-c0e5-4929-8294-a0c522c22f01',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:02 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'a0224f25-c0e5-4929-8294-a0c522c22f01'},
  'RetryAttempts': 0}}

### 5.5 Create Customer Curated table

In [135]:
table_input_customer_curated = {
    'Name': 'customer_curated',
    'Description': 'Customer Curated table from S3',
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'customerName', 'Type': 'string'},
            {'Name': 'email', 'Type': 'string'},
            {'Name': 'phone', 'Type': 'string'},
            {'Name': 'birthDay', 'Type': 'string'},
            {'Name': 'serialNumber', 'Type': 'string'},
            {'Name': 'registrationDate', 'Type': 'bigint'},
            {'Name': 'lastUpdateDate', 'Type': 'bigint'},
            {'Name': 'shareWithResearchAsOfDate', 'Type': 'bigint'},
            {'Name': 'shareWithPublicAsOfDate', 'Type': 'bigint'},
        ],
        'Location': 's3://anhdtv-stedi/customer/curated/',
        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
        'SerdeInfo': {
            'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
            'Parameters': { 'separatorChar': ',' }
        },
        'StoredAsSubDirectories': False
    },
    'TableType': 'EXTERNAL_TABLE'
}

glue.create_table(DatabaseName=GLUE_DB, TableInput=table_input_customer_curated)

{'ResponseMetadata': {'RequestId': 'd4b07b7a-aeec-44e9-a3b9-277f1ae8769c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:02 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'd4b07b7a-aeec-44e9-a3b9-277f1ae8769c'},
  'RetryAttempts': 0}}

### 5.5 Create Step Trainer trusted table

In [136]:
table_input_step_trainer_trusted = {
    'Name': 'step_trainer_trusted',
    'Description': 'Step Trainer trusted table from S3',
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'sensorReadingTime', 'Type': 'bigint'},
            {'Name': 'serialNumber', 'Type': 'string'},
            {'Name': 'distanceFromObject', 'Type': 'int'},
        ],
        'Location': 's3://anhdtv-stedi/step_trainer/trusted/',
        'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
        'SerdeInfo': {
            'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
            'Parameters': {'separatorChar': ','}
        },
        'StoredAsSubDirectories': False
    },
    'TableType': 'EXTERNAL_TABLE'
}

glue.create_table(DatabaseName=GLUE_DB, TableInput=table_input_step_trainer_trusted)

{'ResponseMetadata': {'RequestId': '18915b20-01ed-4e0d-a4e4-24ad1751c5f3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:02 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '18915b20-01ed-4e0d-a4e4-24ad1751c5f3'},
  'RetryAttempts': 0}}

## 6 - AWS Athena

### 6.1 Create Athena client object

In [137]:
athena = boto3.client('athena')

In [138]:

response = athena.start_query_execution(
    QueryString='CREATE DATABASE stedi',
    ResultConfiguration={
        'OutputLocation': 's3://anhdtv-stedi/'
    }
)

execution_id = response['QueryExecutionId']
print(f"Execution ID: {execution_id}")

Execution ID: 4accf83e-8d6e-46ef-b0ab-d3eb4a5f99ab


In [139]:

response = athena.start_query_execution(
    QueryString='DROP DATABASE IF EXISTS stedi',
    ResultConfiguration={
        'OutputLocation': 's3://anhdtv-stedi/'
    }
)

execution_id = response['QueryExecutionId']
print(f"Execution ID: {execution_id}")

Execution ID: 545ec0ec-b9fe-4fb0-b581-927a5851d933


In [140]:
# Read the SQL script file
with open('./sql_script/accelerometer_landing.sql', 'r') as f:
    query_string = f.read()

response = athena.start_query_execution(
    QueryString=query_string,
    QueryExecutionContext={
        'Database': 'stedi'
    },
    ResultConfiguration={
        'OutputLocation': 's3://anhdtv-stedi/accelerometer/landing'
    }
)

execution_id = response['QueryExecutionId']
print(f"Execution ID: {execution_id}")


Execution ID: 3e2a3231-4d2d-4d98-aae3-fd9a4e0cd042


In [141]:
# Read the SQL script file
with open('./sql_script/customer_landing.sql', 'r') as f:
    query_string = f.read()

response = athena.start_query_execution(
    QueryString=query_string,
    QueryExecutionContext={
        'Database': 'stedi'
    },
    ResultConfiguration={
        'OutputLocation': 's3://anhdtv-stedi/customer/landing'
    }
)

execution_id = response['QueryExecutionId']
print(f"Execution ID: {execution_id}")


Execution ID: 24c741c5-ebbe-435c-8763-293c8f6c7681


In [142]:
response = athena.start_query_execution(
    QueryString='DROP TABLE IF EXISTS stedi.customer_landing',
    QueryExecutionContext={
        'Database': 'stedi'
    },
    ResultConfiguration={
        'OutputLocation': 's3://anhdtv-stedi/customer/landing'
    }
)

execution_id = response['QueryExecutionId']
print(f"Execution ID: {execution_id}")


Execution ID: d138948f-fef9-4a60-a9df-2ee481f61a34


In [143]:
# Read the SQL script file
with open('./sql_script/step_trainer_landing.sql', 'r') as f:
    query_string = f.read()

response = athena.start_query_execution(
    QueryString=query_string,
    QueryExecutionContext={
        'Database': 'stedi'
    },
    ResultConfiguration={
        'OutputLocation': 's3://anhdtv-stedi/step_trainer/landing'
    }
)

execution_id = response['QueryExecutionId']
print(f"Execution ID: {execution_id}")


Execution ID: 499ec9fd-ff09-41e7-b1ba-1e06c94b150b


## 7. Delete Resource

### Delete S3 bucket 

In [144]:
try:
        bucket = s3.Bucket(S3_BUCKET_NAME)
        bucket.objects.all().delete()

        # Delete the bucket
        bucket.delete()
        print(f'Successfully deleted S3 bucket: {S3_BUCKET_NAME}')
except ClientError as e:
    if e.response['Error']['Code'] == 'BucketNotEmpty':
        print(f'S3 bucket {S3_BUCKET_NAME} is not empty. Deleting all objects inside the bucket...')
        
        s3.objects.all().delete()
        s3.delete_bucket(Bucket=S3_BUCKET_NAME)
        print(f'Successfully deleted S3 bucket: {S3_BUCKET_NAME}')
    elif e.response['Error']['Code'] == 'NoSuchBucket':
        print(f'S3 bucket {S3_BUCKET_NAME} does not exist')
    else:
        print(f'Error deleting S3 bucket {S3_BUCKET_NAME}: {e}')

# Wait for the bucket to be deleted
while True:
    try:
        s3.meta.client.head_bucket(Bucket=S3_BUCKET_NAME)
        time.sleep(10)
    except ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f'S3 bucket {S3_BUCKET_NAME} successfully deleted')
            break
        else:
            print(f'Error waiting for S3 bucket {S3_BUCKET_NAME} to be deleted: {e}')


Successfully deleted S3 bucket: anhdtv-stedi
S3 bucket anhdtv-stedi successfully deleted


### Delete Glue DB

In [145]:
glue.delete_database(Name=GLUE_DB)

{'ResponseMetadata': {'RequestId': 'ae43c8d1-1bf3-475f-b0c7-69db44b106a0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 04 Nov 2023 17:39:08 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'ae43c8d1-1bf3-475f-b0c7-69db44b106a0'},
  'RetryAttempts': 0}}