In [127]:
import boto3
import logging

In [133]:
# Create a logger object
logger = logging.getLogger("create_table_statement")

# Set the logging level to INFO
logger.setLevel(logging.INFO)

# Create a FileHandler to write the log messages to a file
file_handler = logging.FileHandler('logs/create_table_statement.log')
file_handler.setLevel(logging.INFO)

# Create a StreamHandler to write the log messages to the console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# Create a formatter to format the log messages
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Set the formatter for the handlers
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Add the handlers to the logger object
logger.addHandler(file_handler)
logger.addHandler(console_handler)

In [134]:
# Log files destination place
bucket_name = "abc" # change bucket name
key_path = f"cleaning_test_data_and_tables_temp_remove_after_2024.01"

# Athena configuration for Athena query log files
output_location = f"s3://{bucket_name}/{key_path}/athena_query_output_create_table_statement/"

In [None]:
# Read table names from a file
with open("tables_to_delete.txt") as f:
    tables = f.read().splitlines()
len(tables)

In [None]:
# Create Athena and s3 client
athena = boto3.client('athena')
s3 = boto3.client('s3')

# Loop through the list of tables and save "create table statement" for each table
for table in tables:
    split_name = table.split(".")

    database_name = split_name[0]
    table_name = split_name[1]


    # Build the SQL query
    sql_query = f"SHOW CREATE TABLE {database_name}.{table_name}"

    # Execute the QL query
    response = athena.start_query_execution(
        QueryString=sql_query,
        QueryExecutionContext={
            'Database': database_name
        },
        ResultConfiguration={
            'OutputLocation': output_location
        }
    )

    # Query execution status
    query_execution_id = response['QueryExecutionId']
    query_status = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']

    # Wait for the query to complete
    while query_status in ['QUEUED', 'RUNNING']:
        query_status = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']

    # # Query results
    if query_status == 'SUCCEEDED':
        result = athena.get_query_results(QueryExecutionId=query_execution_id)
        # Save the query execution ID and table name to a log file
        with open("./logs/log_create_table_statement.txt", "a") as f:
            f.write(f"Table: {table_name} ::: Query execution ID: {response['QueryExecutionId']} ::: Query status: {query_status} ::: Query result: {result}\n")

        # Save CREATE TABLE statement to separate file
        with open(f"./create_table_statement/{database_name}.{table_name}_create_table_statement.txt", "a") as f:
            result_text = result['ResultSet']['Rows']
            for item in result_text:
                f.write(f"{item['Data'][0]['VarCharValue']}\n")

        logger.info(f"SAVED 'create table' statement for '{database_name}.{table_name}'")

        # Copy CREATE TABLE statement to s3
        file_name = f"{database_name}.{table_name}_create_table_statement.txt"
        file_local_path = f"./create_table_statement/{file_name}"
        key = f"{key_path}/create_table_statement/{file_name}"
        s3.upload_file(file_local_path, bucket_name, key)


In [138]:
# Copy log file to s3
file_name = f"create_table_statement.log"
file_local_path = f"./logs/{file_name}"
key = f"{key_path}/logs/{file_name}"
s3.upload_file(file_local_path, bucket_name, key)