In [None]:
import awswrangler as wr
import pandas as pd
import base64
import json
import os
import boto3


def extract_loyalty_data(encoded):
    decoded = base64.b64decode(encoded)
    return pd.DataFrame(pd.json_normalize(json.loads(decoded)))


def extract_fraud_score(transaction_id):
    fraud_score = wr.cloudwatch.read_logs(
        log_group_names=["/lab/tx-fraud-score"],
        query=f"fields @message | filter @message like /{transaction_id}/",
    )
    if not fraud_score.empty:
        return int(fraud_score.at[0, "message"].split(" ")[-1])
    return 0


def fetch_store_data(secret_id):
    con_mysql = wr.mysql.connect(secret_id=secret_id)
    return wr.mysql.read_sql_query("SELECT * FROM stores.stores", con=con_mysql)


def fetch_customer_activity(customer_id, table_name):
    activity = wr.dynamodb.read_items(
        table_name=table_name,
        key_condition_expression="customer_id = :customer_id",
        expression_attribute_values={":customer_id": customer_id},
    )
    if not activity.empty:
        return activity.at[0, "last_seen"]
    return None


def normalize_transaction(bucket, key):
    file_name = f"/tmp/{os.path.basename(key)}"
    wr.s3.download(path=f"s3://{bucket}/{key}", local_file=file_name)
    f = open(file_name)
    transaction = json.load(f)
    transaction = pd.json_normalize(transaction)
    return transaction


def extract_data(bucket, key):

    dynamodb_table = boto3.client("dynamodb").list_tables()["TableNames"][0]
    secret_id = boto3.client("secretsmanager").list_secrets()["SecretList"][0]["Name"]

    store_df = fetch_store_data(secret_id)

    transaction = normalize_transaction(bucket, key)
    transaction_id = transaction.at[0, "transaction_id"]

    loyalty_data = extract_loyalty_data(transaction.at[0, "embedded_data"])
    transaction = pd.concat([transaction, loyalty_data], axis=1)

    transaction["fraud_score"] = extract_fraud_score(transaction_id)

    store = store_df[store_df["store_id"] == transaction.at[0, "store_id"]]
    store = pd.DataFrame(store.drop(columns=["store_id"])).reset_index(drop=True)
    transaction = pd.concat([transaction, store], axis=1)

    transaction["customer_last_seen"] = fetch_customer_activity(
        transaction.at[0, "customer_id"], dynamodb_table
    )

    return transaction