In [172]:
import os
import json
import time
import boto3
import pandas as pd

# constants
ATHENA_DATABASE = 'udacity_project3'
ATHENA_TABLE = 'customer_landing'
S3_BUCKET = 'orlevitas-s3-bucket'
S3_OUTPUT_FOLDER = 'project3/data/customer/trusted' 
ATHENA_S3_OUTPUT_FOLDER  = 'project3/data/customer/trusted/tmp' 
S3_OUTPUT_FILENAME = 'customer_trusted.json'
REGION = 'us-east-1'
QUERY = """SELECT *
FROM {athena_database}.{athena_table} 
WHERE sharewithresearchasofdate IS NOT NULL
""".format(athena_database=ATHENA_DATABASE, athena_table=ATHENA_TABLE)

In [175]:
def run_query():
    client = boto3.client('athena', region_name=REGION)
    
    response = client.start_query_execution(
        QueryString=QUERY,
        QueryExecutionContext={'Database': ATHENA_DATABASE},
        ResultConfiguration={'OutputLocation': f's3://{S3_BUCKET}/{ATHENA_S3_OUTPUT_FOLDER}/'}
    )
    return response['QueryExecutionId']

def get_query_results(query_execution_id):
    client = boto3.client('athena', region_name=REGION)
    
    # Wait for the query to complete
    while True:
        response = client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response['QueryExecution']['Status']['State']
        
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(5)
    
    if status != 'SUCCEEDED':
        raise Exception(f'Query failed with status: {status}')
    
    results = client.get_query_results(QueryExecutionId=query_execution_id)
    return results

def parse_results(results):
    columns = [col['Label'] for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']]
    rows = results['ResultSet']['Rows'][1:]  # Skip header row
    data = []

    for row in rows:
        values = [datum.get('VarCharValue', None) for datum in row['Data']]
        data.append(dict(zip(columns, values)))

    return data

def save_to_s3(data):
    s3_client = boto3.client('s3', region_name=REGION)
    
    # Save to local file
    with open(S3_OUTPUT_FILENAME, 'w') as f:
        # json.dump(data, f)
         f.write(data)


    # Upload to S3
    s3_client.upload_file(S3_OUTPUT_FILENAME, S3_BUCKET, f'{S3_OUTPUT_FOLDER}/{S3_OUTPUT_FILENAME}')
    print(f'File uploaded to s3://{S3_BUCKET}/{S3_OUTPUT_FOLDER}/{S3_OUTPUT_FILENAME}')

def delete_s3_objects(bucket_name, prefix):
    """
    Delete all objects under a prefix in an S3 bucket.
    """
    # Initialize Boto3 S3 client
    s3_client = boto3.client('s3', region_name=REGION)
    paginator = s3_client.get_paginator('list_objects_v2')
    delete_requests = {'Objects': []}

    # Paginate through S3 objects and add delete requests to batch
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                delete_requests['Objects'].append({'Key': obj['Key']})

        # Delete objects in batches of up to 1000
        if len(delete_requests['Objects']) >= 1000:
            s3_client.delete_objects(Bucket=bucket_name, Delete=delete_requests)
            delete_requests = {'Objects': []}

    # Delete any remaining objects
    if delete_requests['Objects']:
        s3_client.delete_objects(Bucket=bucket_name, Delete=delete_requests)

    # Delete the prefix itself
    s3_client.delete_object(Bucket=bucket_name, Key=prefix)

def convert_json_to_string(data):
    def filter_non_null_fields(data_dict):
        return {key: value for key, value in data_dict.items() if value is not None}
    
    json_results_dict = []
    converted_string = ''
    
    for line in data:
        filted_dict = filter_non_null_fields(line)
        json_string = json.dumps(filted_dict)
        converted_string +=  json_string + '\n'
        
    return converted_string

In [180]:
query_execution_id = run_query()
print(f'Query execution ID: {query_execution_id}')

results = get_query_results(query_execution_id)
print('Query execution completed.')

data = parse_results(results)
print('Results parsed.')

# converted_string = ''
# for dictionary in data:
#     json_string = json.dumps(dictionary)
#     converted_string +=  json_string + '\n'
# gett = s3_client.get_object(Bucket=S3_BUCKET, Key='project3/data/customer/landing/customer-1691348231425.json')
# my_trusted =gett['Body'].read().decode('utf-8')
# stripped_line = my_trusted.splitlines()

converted_string = convert_json_to_string(data)
print('converted json to string')

save_to_s3(converted_string)
print('Results saved to S3.')

delete_s3_objects(S3_BUCKET, ATHENA_S3_OUTPUT_FOLDER)
print('Deleted temporary files from athena query to S3.')

# Clean up local file
if os.path.exists(S3_OUTPUT_FILENAME):
    os.remove(S3_OUTPUT_FILENAME)

print('Finished')

Query execution ID: cb00e5b9-635b-4d23-982e-e4a67b1d0457
Query execution completed.
Results parsed.
converted json to string
File uploaded to s3://orlevitas-s3-bucket/project3/data/customer/trusted/customer_trusted.json
Results saved to S3.
Deleted temporary files from athena query to S3.
Finished


In [70]:
s3_client = boto3.client('s3', region_name=REGION)
response = s3_client.get_object(Bucket=S3_BUCKET, Key=f'{S3_OUTPUT_FOLDER}/{S3_OUTPUT_FILENAME}')
response_body = response['Body'].read().decode('utf-8')
# file_content = json.loads(response_body)
# file_content.__len__()

In [58]:
ACCELEROMETER_LANDING = 'project3/data/accelerometer/landing/'

s3_client = boto3.client('s3', region_name=REGION)

# List objects within the specified directory
response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=ACCELEROMETER_LANDING)

json_data = []

# Iterate over the objects in the response
for obj in response['Contents']:
    key = obj['Key']
    
    response = s3_client.get_object(Bucket=S3_BUCKET, Key=key)
    #response = s3_client.get_object(Bucket=S3_BUCKET, Key=f'project3/data/accelerometer/landing/accelerometer-1691348231445.json')
    file_content =response['Body'].read().decode('utf-8')
    json_strings = file_content.splitlines()
    json_data.extend([json.loads(json_str) for json_str in json_strings])

print(len(json_data))

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [59]:
print(len(json_data))

81273
