# AWS

Testing using dynamoDB to store the data for the streamlit app.


## Instructions

Tutorial for using dynamoDB locally: https://www.honeybadger.io/blog/using-dynamodb-with-python/

Tutorial for working with dynamoDB on AWS: https://www.analyticsvidhya.com/blog/2022/05/working-with-dynamodb-in-python-using-boto3/

Resources:

- AWS dynamoDB documentation: https://docs.aws.amazon.com/code-library/latest/ug/python_3_dynamodb_code_examples.html
- boto3 documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html
- A Pythonic interface for dynamoDB: https://github.com/pynamodb/PynamoDB


## AWS DynamoDB with Python: A Beginner's Guide

https://medium.com/@ramjoshi.blogs/aws-dynamodb-with-python-a-beginners-guide-b9cf101436b2


In [None]:
if 0:
    # Set up to use local modules
    %load_ext autoreload
    %autoreload 2
    import os
    import sys
    module_path = os.path.abspath(os.path.join('..'))
    sys.path.insert(0, module_path)
    import boto3
    from dotenv import load_dotenv
    load_dotenv()

    # 2. Creating a DynamoDB Table
    # Initialize our DynamoDB resource
    dynamodb = boto3.resource(
        service_name="dynamodb",
        region_name=os.getenv("REGION"),
        aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    )
    # Check if the table already exists
    table = dynamodb.Table("Users")
    if table.table_status == "ACTIVE":
        print("Table already exists")
    else:
        print("Table does not exist")
        table = dynamodb.create_table(
            TableName="Users",
            KeySchema=[
                {"AttributeName": "username", "KeyType": "HASH"},
                {"AttributeName": "last_name", "KeyType": "RANGE"},
            ],
            AttributeDefinitions=[
                {"AttributeName": "username", "AttributeType": "S"},
                {"AttributeName": "last_name", "AttributeType": "S"},
            ],
            ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
        )
    print("Table status:", table.table_status)
    # 3. Inserting Data into a DynamoDB Table
    table = dynamodb.Table("Users")
    table.put_item(
        Item={
            "username": "johndoe",
            "last_name": "Doe",
            "first_name": "John",
            "age": 25,
            "account_type": "standard_user",
        }
    )
    display(table)
    # 4. Querying Data from a DynamoDB Table
    response = table.get_item(Key={"username": "johndoe", "last_name": "Doe"})
    item = response["Item"]
    display(item)

## AWS SDK for pandas

- pypi: https://pypi.org/project/awswrangler/
- GitHub: https://github.com/aws/aws-sdk-pandas
- Documentation tutorial: https://aws-sdk-pandas.readthedocs.io/en/stable/tutorials/028%20-%20DynamoDB.html

Tutorial articles for awswrangler: https://blog.devgenius.io/write-data-to-dynamodb-using-awswrangler-and-boto3-d6aad2622389


In [None]:
if 0:
    import awswrangler as wr
    import pandas as pd

    # awswrangler relies on Boto3.Session() to manage AWS credentials and configurations
    my_session = boto3.Session(
        region_name=os.getenv("REGION"),
        aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    )
    display(my_session)
    dynamodb = boto3.resource(
        service_name="dynamodb",
        region_name=os.getenv("REGION"),
        aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    )
    dynamodb_client = boto3.client("dynamodb", region_name=os.getenv("REGION"))
    # Writing a DataFrame to a DynamoDB Table
    table_name = "movies"
    df = pd.DataFrame(
        {
            "title": ["Titanic", "Snatch", "The Godfather"],
            "year": [1997, 2000, 1972],
            "genre": ["drama", "caper story", "crime"],
        }
    )
    existing_tables = dynamodb_client.list_tables()["TableNames"]
    # Create the table table_name if it doesn't exist
    if table_name in existing_tables:
        print(f"Table {table_name} already exists")
    else:
        print(f"Table {table_name} does not exist")
    # Writing rows of DataFrame
    wr.dynamodb.put_df(df=df, table_name=table_name)
    read_df = wr.dynamodb.read_items(table_name=table_name, max_items_evaluated=10)
    print(read_df)

## Final, all in one

create_table documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/create_table.html


In [None]:
# Set up to use local modules
%load_ext autoreload
%autoreload 2
import os
import sys
module_path = os.path.abspath(os.path.join('..')) # Add parent directory to path
sys.path.insert(0, module_path)

import time
from decimal import Decimal

import boto3
from dotenv import load_dotenv
import pandas as pd
import awswrangler as wr
import numpy as np

import botocore

from src import process

load_dotenv()

data_df = process.load_processed_results()
# display(data_df.tail())

In [None]:
# Increment through all columns of data_df and convert to Decimal or string as needed
for col in data_df.columns:
    if data_df[col].dtype == "float64":
        data_df[col] = data_df[col].astype(str).apply(Decimal)
    # If datetime or category, convert to string
    elif (
        data_df[col].dtype == "datetime64[ns]" or data_df[col].dtype.name == "category"
    ):
        data_df[col] = data_df[col].astype(str)

# Get the types of all columns
# data_df.dtypes

In [None]:
def get_dynamodb_table_size(table_name, verbose=False):
    """Prints the size and item count of a DynamoDB table."""
    dynamodb = boto3.client(
        "dynamodb", region_name=os.getenv("REGION")
    )  # Specify your region

    try:
        # Retrieve table information
        table_info = dynamodb.describe_table(TableName=table_name)
        table_size_bytes = table_info["Table"]["TableSizeBytes"]
        item_count = table_info["Table"]["ItemCount"]

        if verbose:
            print(f"Size of {table_name}: {table_size_bytes} bytes")
            print(f"Item count in {table_name}: {item_count}")
        return table_size_bytes, item_count
    except dynamodb.exceptions.ResourceNotFoundException:
        print(f"Table {table_name} not found.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")
    return None, None

In [None]:
def wait_for_table_ready(table_name, wait_time=10, max_attempts=30):
    """
    Waits for a DynamoDB table to become ACTIVE.

    :param table_name: Name of the DynamoDB table to check.
    :param wait_time: Time to wait between checks (in seconds).
    :param max_attempts: Maximum number of status checks.
    """
    dynamodb = boto3.client("dynamodb")
    for _ in range(max_attempts):
        response = dynamodb.describe_table(TableName=table_name)
        status = response["Table"]["TableStatus"]
        if status == "ACTIVE":
            print(f"Table {table_name} is ready.")
            return True
        else:
            print(
                f"Waiting for table {table_name} to become ACTIVE. Current status: {status}"
            )
            time.sleep(wait_time)

    print(f"Table {table_name} did not become ACTIVE after {max_attempts} attempts.")
    return False

In [None]:
#
# TODO implement use of this function
def retry_with_backoff(
    func, max_attempts=8, initial_delay=1, backoff_factor=2, *args, **kwargs
):
    """
    Retry wrapper for functions with exponential backoff.

    :param func: The function to execute.
    :param max_attempts: Maximum number of retry attempts.
    :param initial_delay: Initial delay between retries in seconds.
    :param backoff_factor: Factor by which the delay increases.
    :param args: Arguments to pass to the function.
    :param kwargs: Keyword arguments to pass to the function.
    """
    attempt = 0
    delay = initial_delay
    while attempt < max_attempts:
        try:
            return func(*args, **kwargs)
        except botocore.exceptions.ClientError as error:
            if (
                error.response["Error"]["Code"]
                == "ProvisionedThroughputExceededException"
            ):
                print(
                    f"Attempt {attempt + 1} failed with ProvisionedThroughputExceededException. Retrying in {delay} seconds..."
                )
                time.sleep(delay)
                attempt += 1
                delay *= backoff_factor
            else:
                # If the exception is not related to provisioned throughput, raise it
                raise
        except Exception as e:
            # Handle any other exceptions that might occur
            print(f"An unexpected error occurred: {e}")
            break
    else:
        # If the loop completes without returning or breaking, all attempts have failed
        raise Exception("All retry attempts failed.")


def write_to_dynamodb(data_df, table_name):
    wr.dynamodb.put_df(df=data_df, table_name=table_name)


# Example usage
# data_df = pd.DataFrame(...)
# table_name = "your_table_name"
# retry_with_backoff(write_to_dynamodb, data_df=data_df, table_name=table_name)

In [None]:
# Initialize DynamoDB resource
table_name = "monkeytype"
dynamodb = boto3.resource(
    service_name="dynamodb",
    region_name=os.getenv("REGION"),
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
dynamodb_client = boto3.client("dynamodb", region_name=os.getenv("REGION"))
existing_tables = dynamodb_client.list_tables()["TableNames"]

In [None]:
# Check size of table if it exists
if table_name in existing_tables:
    table_size_bytes, item_count = get_dynamodb_table_size(table_name, verbose=True)
else:
    print(f"Table {table_name} does not exist")

In [None]:
# Create and write to table

# Check if the table already exists
if table_name in existing_tables:
    print(f"Table {table_name} already exists")
    table = dynamodb.Table(table_name)
else:
    print(f"Table {table_name} does not exist")
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {"AttributeName": "id", "KeyType": "HASH"},
            {"AttributeName": "timestamp", "KeyType": "RANGE"},
        ],  # HASH = partition key, RANGE = sort key
        AttributeDefinitions=[
            {"AttributeName": "id", "AttributeType": "S"},
            {"AttributeName": "timestamp", "AttributeType": "N"},
        ],  # S = string, N = number, B = binary
        ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
    )

# If the table is active, and the table is smaller than the size of data_df, write to the table
if wait_for_table_ready(table_name):
    if item_count > len(data_df):
        print(f"Table {table_name} is ready for writing.")
        retry_with_backoff(write_to_dynamodb, data_df=data_df, table_name=table_name)
        # wr.dynamodb.put_df(df=data_df, table_name=table_name)
    else:
        print(f"Table already has {item_count} items, not writing.")
else:
    print("Table is not ready for writing.")

In [None]:
# Retrieve example items
read_df = wr.dynamodb.read_items(table_name=table_name, max_items_evaluated=5)
print(read_df)

In [None]:
dynamodb = boto3.client("dynamodb", region_name=os.getenv("REGION"))
table_info = dynamodb.describe_table(TableName=table_name)
table_info