# Test DynamoDB API

In [None]:
from typing import Tuple, Dict
import logging

import numpy as np
import boto3

import time
from pprint import pprint

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## Constant

In [None]:
REGION = 'us-east-1'
TABLE_NAME = "ActivityCnt"

In [None]:
import boto3
client = boto3.client('dynamodb', region_name=REGION)

## query

In [None]:
response = client.query(
    TableName=TABLE_NAME,
    #Select='ALL_PROJECTED_ATTRIBUTES',
    KeyConditionExpression='gender = :genderVal',
    ExpressionAttributeValues={
        ':genderVal': {
            'S': 'male',
        }
    },
    #ProjectionExpression="activity, cnt"
)

In [None]:
pprint(response)

In [None]:
response.get("Items")[0]

In [None]:
def get_act_cnt_from_dynamodb(gender: str) -> Dict[str, int]:
    """Get per-gender activity count from DyanmoDB

    Args:
        gender (str): [description]

    Returns:
        Dict[str, int]: [description]
    """
    # Query DybamoDB
    try:
        response = client.query(
            TableName=TABLE_NAME,
            KeyConditionExpression='gender = :genderVal',
            ExpressionAttributeValues={
                ':genderVal': {
                    'S': gender,
                }
            },
        )
    except Exception as e:
        logger.error(
            f"Failed to query DynamoDB: {TABLE_NAME}. Error message: {e}. "
            "Return empty dictionary."
        )
        return dict()

    # Create final dict
    res = dict()
    for item in response.get('Items', []):
        activity = item.get("activity", {}).get("S", "")
        cnt = int(item.get("cnt", {}).get("N", '0'))
        res[activity] = cnt

    logger.info(f"For {gender}, the activity count is: {res}")
    return res

In [None]:
get_act_cnt_from_dynamodb("male")

In [None]:
sum([])

## get_item

In [None]:
ts = time.time()
response = client.get_item(
    TableName = TABLE_NAME,
    Key = {
        "gender": {
            "S": "male",
        },
        "activity": {
            "S": "swimming",
        }
    }
)
tused = time.time() - ts
print(tused)

In [None]:
pprint(response)

In [None]:
# Getting an itme that does not exist
# Response will still be dict, but it will not have 'Item' key
response = client.get_item(
    TableName = TABLE_NAME,
    Key = {
        "gender": {
            "S": "male2",
        },
        "activity": {
            "S": "swimming",
        }
    }
)
pprint(response)

## put_item

In [None]:
response = client.put_item(
    TableName=TABLE_NAME,
    Item={
        "gender": {
            "S": "male",
        },
        "activity": {
            "S": "baseball",
        },
        "cnt": {
            "N": "1"
        },
    },
    ReturnValues="ALL_OLD"
)

In [None]:
pprint(response)

## update_item

In [None]:
response = client.update_item(
    TableName=TABLE_NAME,
    Key = {
        "gender": {
            "S": "male",
        },
        "activity": {
            "S": "biking",
        },
    },
    UpdateExpression="ADD cnt :x SET isNew = :val",
    ExpressionAttributeValues={
        ':x': {
            'N': '1',
        },
        ':val': {
            'BOOL': True
        }
    },
)

In [None]:
pprint(response)

In [None]:
response = client.update_item(
    TableName=TABLE_NAME,
    Key = {
        "gender": {
            "S": "male",
        },
        "activity": {
            "S": "biking",
        },
    },
    UpdateExpression="REMOVE isNew",
)

In [None]:
pprint(response)

## describe_table

In [None]:
response = client.describe_table(TableName=TABLE_NAME)
pprint(response)

## AppJobs WorkFlow

In [None]:
client = boto3.client('dynamodb', region_name=REGION)

In [None]:
TABLE_NAME = "AppJobs"
client = boto3.client('dynamodb', region_name=REGION)

In [None]:
import time
from pprint import pprint

### Create new job item

In [None]:
response = client.put_item(
    TableName=TABLE_NAME,
    Item={
        "jobId": {
            "S": "ccc",
        },
        "requestedTs": {
            "N": str(time.time()),
        },
        "jobToDo": {
            "S": 'Y',
        },
        "input": {
            "M": {
                "user": {
                    "S": "user1",
                },
                "tstart": {
                    "N": str(time.time()),
                },
                "tend": {
                    "N": str(time.time()),
                }
            }
        }
    },
    ReturnValues="ALL_OLD"
)

In [None]:
pprint(response)

### Get all new jobs

In [None]:
response = client.query(
    TableName=TABLE_NAME,
    IndexName="jobToDo-requestedTs-index",
    KeyConditionExpression='jobToDo = :x',
    ExpressionAttributeValues={
        ':x': {
            'S': 'Y',
        }
    },
)

In [None]:
pprint(response)

In [None]:
response.get("Items")

In [None]:
item = response.get("Items")[-1]

In [None]:
item

In [None]:
item.get("requestedTs").get('N', "")

### Update the new jobs to working in progress

In [None]:
response = client.update_item(
    TableName=TABLE_NAME,
    Key = {
        "jobId": {
            "S": "firstJobId",
        },
        "requestedTs": {
            "N": "1608849485.8781471",
        },
    },
    UpdateExpression="REMOVE jobToDo SET jobStatus = :val",
    ExpressionAttributeValues={
        ':val': {
            'S': "Working in progress"
        }
    },
)

In [None]:
pprint(response)

In [None]:
def update_new_job(item: dict) -> Tuple[dict, bool]:
    """Set the (jobId, requestedTs) to "working in progress"

    Args:
        item (dict):

    Returns:
        dict: updated item (if update is successful)
        bool: whether update is success or not

    """
    jobId = item.get("jobId", {}).get('S', "")
    requestedTs = item.get("requestedTs").get('N', "")
    try:
        logger.info(f"Update jobId={jobId}, requestedTs={requestedTs}")
        response = client.update_item(
            TableName=TABLE_NAME,
            Key={
                "jobId": {
                    "S": jobId,
                },
                "requestedTs": {
                    "N": requestedTs,
                },
            },
            UpdateExpression="REMOVE jobToDo SET jobStatus = :val",
            ExpressionAttributeValues={
                ':val': {
                    'S': "Working in progress"
                }
            },
        )
    except Exception as e:
        logger.error(
            f"Fail to update {TABLE_NAME}. jobId = {jobId}, "
            f"requestedTs={requestedTs}. Exception message: {e}"
        )
        return item, False

    # Check response code
    if response.get('ResponseMetadata', {}).get('HTTPStatusCode') != 200:
        logger.error(
            f"Receive non-200 http status code. Full response = {response}"
        )
        return item, False

    # Update item
    item['jobStatus'] = {'S': "Working in progress"}
    del item['jobToDo']
    return item, True


In [None]:
item, ok = update_new_job(item)

In [None]:
item

In [None]:
item.get("input", {}).get('M', {}).get('tstart', {}).get('S', "")

In [None]:
import datetime
tstart = datetime.datetime.strptime(
        item.get("input", {}).get('M', {}).get('tstart', {}).get('S', ""),
        '%Y-%m-%d'
    ).date()
tstart

In [None]:
item

### Update item to be completed

In [None]:
def update_complete_job(item: dict, key: str) -> bool:
    """Update item with completed job information

    Args:
        item (dict): [description]
        key (str): [description]

    Returns:
        bool: whether the update is success or not

    """
    jobId = item.get("jobId", {}).get('S', "")
    requestedTs = item.get("requestedTs").get('N', "")
    try:
        logger.info(
            f"Job complete update: jobId={jobId}, requestedTs={requestedTs}")
        _ = client.update_item(
            TableName=TABLE_NAME,
            Key={
                "jobId": {
                    "S": jobId,
                },
                "requestedTs": {
                    "N": requestedTs,
                },
            },
            UpdateExpression="SET jobStatus = :val, outData = :out",
            ExpressionAttributeValues={
                ':val': {
                    'S': "Done"
                },
                ':out': {
                    'M': {
                        "Bucket": {
                            'S': S3_BUCKET,
                        },
                        "Key": {
                            'S': key,
                        }
                    }
                }
            },
        )
    except Exception as e:
        logger.error(
            f"Job complete update fail: {TABLE_NAME}, jobId = {jobId}, "
            f"requestedTs={requestedTs}. Exception message: {e}"
        )
        return False
    return True

In [None]:
key="test.csv"
S3_BUCKET = "ml-app-2020"

In [None]:
update_complete_job(item, key)

In [None]:
item

In [None]:
# Get job
jobId = "f611a542-99f3-4dd8-bda5-c2fd52eba316"

response = client.query(
    TableName=TABLE_NAME,
    KeyConditionExpression='jobId = :x',
    ExpressionAttributeValues={
        ':x': {
            'S': jobId,
        }
    },
)

In [None]:
item = response.get("Items")[0]
item

In [None]:
item.get('jobStatus', {}).get('S', 'UNKNOWN')

In [None]:
item.get('outData', {}).get('M', {}).get("Bucket", {}).get('S', "")

In [None]:
response

# ECS

In [None]:
import boto3
from pprint import pprint
REGION = 'us-east-1'

client = boto3.client('ecs', region_name=REGION)

In [None]:
# list_task_definition_families
response = client.list_task_definition_families(
)

pprint(response)

In [None]:
# list_task_definitions
response = client.list_task_definitions(
    familyPrefix="ml_app_frontend",
    status="ACTIVE"
)

pprint(response)

In [None]:
# list_tasks
response = client.list_tasks(
    cluster="ml-app-frontend",
    family="ml_app_backend",
    desiredStatus="RUNNING",
)
pprint(response)

In [None]:
# describe_tasks
response = client.describe_tasks(
    cluster="ml-app-frontend",
    tasks=[
        "b2e2e7ae38014977bcd4188f4e3aaa59"
    ]
    
)
pprint(response)

pprint(response.get("tasks", [])[0].get("lastStatus"))

## Stop task
response = client.stop_task(
    cluster="ml-app-frontend",
    task="74062aea67234c62a458bd6730ccda1a"
)

In [None]:
from pprint import pprint
pprint(response)

In [None]:
# Run Task (frontend)
respose = client.run_task(
    cluster="arn:aws:ecs:us-east-1:381982364978:cluster/ml-app-frontend",
    count=1,
    launchType="FARGATE",
    taskDefinition="ml_app_frontend:6",
    networkConfiguration={
        "awsvpcConfiguration": {
            'subnets': ["subnet-0957b628","subnet-c66e8999","subnet-33001a54"],
            'securityGroups': ["sg-0e46917d122ae54de"],
            'assignPublicIp': 'ENABLED'
        }
    }
)

In [None]:
# Run Task (backend)
respose = client.run_task(
    cluster="arn:aws:ecs:us-east-1:381982364978:cluster/ml-app-frontend",
    count=1,
    launchType="FARGATE",
    taskDefinition="ml_app_backend:1",
    networkConfiguration={
        "awsvpcConfiguration": {
            'subnets': ["subnet-c66e8999"],
            'securityGroups': ["sg-09af5a1923947eac8"],
            'assignPublicIp': 'ENABLED'
        }
    }
)

In [None]:
from pprint import pprint
pprint(respose)

# S3

In [None]:
client = boto3.client('s3', region_name=REGION)

In [None]:
# List bucket
response = client.list_buckets()
pprint(response)

In [None]:
# put_object
response = client.put_object(
    Body="fileuploaded",
    Bucket="ml-app-2020",
    Key="myfile.txt"
)

In [None]:
pprint(response)

In [None]:
with open("sample.csv", "r") as f:
    data = f.read()

response = client.put_object(
    Body=data,
    Bucket="ml-app-2020",
    Key="sample_csv/sample.csv"
)

In [None]:
# delete a file
response = client.delete_object(
    Bucket="ml-app-2020",
    Key="setup.sh",
    
)
pprint(response)

In [None]:
# List objects
response = client.list_objects_v2(
    Bucket="ml-app-2020",
    Prefix="",
)

In [None]:
pprint(response)

In [None]:
# Download object
response = client.get_object(
    Bucket="ml-app-2020",
    Key="sample_csv/sample.csv"
)

In [None]:
pprint(response)

In [None]:
# Load the CSV file to pandas
import pandas as pd
df = pd.read_csv(response["Body"])

In [None]:
df

In [None]:
res = response["Body"].read()

In [None]:
res

In [None]:
## Write df to S3 directly
response = client.put_object(
    Body=df.to_csv(index=False).encode(),
    Bucket="ml-app-2020",
    Key="sample_csv/sample_2.csv"
)