In [None]:
from datetime import date
import pprint

import awswrangler as wr
import pandas as pd
from tqdm import tqdm
from super_secret_package import SUPER_SECRET_FUNCTIONS

from IPython.display import display

S3_BASE_PATH = "s3://aws-wrangler-test-bucket/demo"
pp = pprint.PrettyPrinter(indent=4)

## Generating Input DataFrame

In [None]:
data = []
nb_of_ids = 100_000
secret_factor = 10_000

for day in tqdm(range(1, 11)):
    for id_ in range(nb_of_ids):
        data.append(
            {
                "value": SUPER_SECRET_FUNCTIONS[id_ % 5]((day - 1) * nb_of_ids / secret_factor + id_ / secret_factor),
                "id": id_,
                "date": date(2021, 5, day)
            }
        )
input_df = pd.DataFrame(data)
input_df["value"] = input_df["value"].astype("float")

### Showing some information about the df

In [None]:
def show_df_info(info_df):
    display(info_df)
    print(info_df.describe())
    display(info_df.info())

show_df_info(input_df.copy())

## Starting with awswrangler
### Storing on S3

In [None]:
%%time

wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/base.pq"
)

### Reading from S3

In [None]:
%%time
df_loaded = wr.s3.read_parquet([f"{S3_BASE_PATH}/base.pq"])

In [None]:
show_df_info(df_loaded)

### Fancier storing strategies (Partitioning and Bucketing)

In [None]:
%%time
wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/partitioned",
    partition_cols=["date"],
    dataset=True
)

In [None]:
%%time
wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/partitioned",
    partition_cols=["date"],
    dataset=True,
    mode="overwrite",
    concurrent_partitioning=True
)

In [None]:
%%time
df_loaded = wr.s3.read_parquet(f"{S3_BASE_PATH}/partitioned", dataset=True)
df_loaded.info()

In [None]:
%%time
df_loaded = wr.s3.read_parquet(
    f"{S3_BASE_PATH}/partitioned",
    partition_filter=lambda x: True if x["date"] == "2021-05-04" else False,
    dataset=True
)
df_loaded.info()


### Bucketing

In [None]:
%%time
wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/bucketed",
    bucketing_info=(["id"], 5),
    dataset=True
)

In [None]:
input_df.copy().info()

### Combined Partitioning and Bucketing

In [None]:
%%time
wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/par_bucketed",
    partition_cols=["date"],
    bucketing_info=(["id"], 5),
    dataset=True
)


## Use integration with Glue and Athena

In [None]:
database = "mydatabase"
table = "par_and_bucket_test"
base_table = "base_demo_table"

In [None]:
%%time
wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/par_bucketed",
    partition_cols=["date"],
    bucketing_info=(["id"], 5),
    dataset=True,
    mode="overwrite",
    concurrent_partitioning=True,
    database=database,
    table=table
)

In [None]:
%%time
wr.s3.to_parquet(
    df=input_df.copy(),
    path=f"{S3_BASE_PATH}/base.pq",
    dataset=True,
    mode="overwrite",
    database=database,
    table=base_table
)

In [None]:
wr.catalog.table(database=database, table=table)


In [None]:
df_athena = wr.athena.read_sql_table(table=table, database=database)
pp.pprint(df_athena.query_metadata)

In [None]:
queries = [
    f"SELECT * FROM {table} LIMIT 100;",
    f"SELECT * FROM {table} WHERE id = 1000;",
    f"SELECT * FROM {base_table} WHERE id = 1000;",
    f"SELECT * FROM {table} WHERE date = DATE('2021-05-04')",
    f"SELECT * FROM {base_table} WHERE date = DATE('2021-05-04')",
]

In [None]:
df_athena = wr.athena.read_sql_query(sql=queries[0], database=database, ctas_approach=False)
pp.pprint(df_athena.query_metadata)


## Let's see what's up with the secret functions

In [None]:
secret_number = 4
df_secret = wr.athena.read_sql_query(sql=f"SELECT * FROM {table} WHERE id % 5 = {secret_number};", database=database)
df_secret = df_secret.sort_values(["date", "id"])
df_secret = df_secret.reset_index(drop=True)
df_secret["value"].plot(figsize=(16,9))

## Cleanup

In [None]:
wr.s3.delete_objects(S3_BASE_PATH)


## Addon Cloudwatch

In [None]:
logs_df = wr.cloudwatch.read_logs(
    query="fields @timestamp, @message | sort @timestamp desc | limit 200",
    log_group_names=["/aws/lambda/my-hello-world"]
)
logs_df