# Setup

In [None]:
%idle_timeout 10
%timeout 10
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2

In [None]:
import boto3
import os, sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Load data

In [None]:
# Load dataframes
df_taxas = spark.read.format('parquet').load("s3://tesouro-bronze/taxa_tesouro_direto.parquet")
df_operacoes = spark.read.format('parquet').load("s3://tesouro-bronze/operacoes_tesouro_direto.parquet")
df_investidores = spark.read.format('parquet').load("s3://tesouro-bronze/investidores_tesouro_direto.parquet")

In [None]:
# Calculate the min and max dates from all date columns
spark.sql("""
    SELECT `Data de Adesao` AS date FROM {df_investidores}
    UNION ALL
    SELECT `Data da Operacao` AS date FROM {df_operacoes}
    UNION ALL
    SELECT `Vencimento do Titulo` AS date FROM {df_operacoes}
    UNION ALL
    SELECT `Data Vencimento` AS date FROM {df_taxas}
    UNION ALL
    SELECT `Data Base` AS date FROM {df_taxas}
""", df_taxas=df_taxas, df_operacoes=df_operacoes, df_investidores=df_investidores)\
.createOrReplaceTempView("all_dates")

# Calculate the min and max dates using SQL
max_date = spark.sql("SELECT MAX(date) as max_date FROM all_dates").first()["max_date"]
min_date = spark.sql("SELECT MIN(date) as min_date FROM all_dates WHERE date <> '1900-01-01'").first()["min_date"]

# Calendar dimension

In [None]:
# Generate a date range based on the calculated min and max dates
df_dates = spark.range(0, (max_date - min_date).days + 1).selectExpr(f"DATE_ADD('{min_date}', CAST(id AS INT)) as date")

# Add calendar attributes
df_calendario = df_dates\
    .withColumn("ano", year(col("date"))) \
    .withColumn("trimestre", date_format(col("date"), "Q").cast("int")) \
    .withColumn("mes", month(col("date"))) \
    .withColumn("dia", dayofmonth(col("date"))) \
    .withColumn("dia da semana", dayofweek(col("date"))) \
    .withColumn("nome dia", date_format(col("date"), "EEEE")) \
    .withColumn("nome mes", date_format(col("date"), "MMMM")) \
    .withColumn("fim de semana", when(col("dia da semana").isin(1, 7), lit(1)).otherwise(lit(0)))

# Write the DataFrame to a Parquet file in S3
df_calendario.write.mode("overwrite").parquet("s3://tesouro-silver/dim_calendario.parquet")

# Region dimension

In [None]:
# Select and deduplicate the geographic fields
df_region = df_investidores.select("UF do Investidor", "Cidade do Investidor", "Pais do Investidor").distinct()
df_region = df_region.withColumn("region_id", row_number().over(Window.orderBy(monotonically_increasing_id()))-1)

# Write the DataFrame to a Parquet file in S3
df_region.write.mode("overwrite").parquet("s3://tesouro-silver/dim_regiao.parquet")

# Investidores dimension

In [None]:
# Add ID Cidade to the Investidores dimension
df_dim_investidores = df_investidores.join(
    df_region,
    on=df_investidores["Cidade do Investidor"] == df_investidores["Cidade do Investidor"]
    how="inner"
).drop("UF do Investidor", "Cidade do Investidor", "Pais do Investidor")

df_dim_investidores.write.mode("overwrite").parquet("s3://tesouro-silver/dim_investidores.parquet")