In [None]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from aws.dynamicframe import DynamicFrame
import pandas as pd
from datetime import datetime

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
# retorna o nome das tabelas que foram atualizadas pela última vez
def return_last_updated_tables(tables):
    table_names= []

    for i in range(len(tables['TableList'])):
        table_names.append(tables['TableList'][i]['Name'])
    return table_names


In [1]:
#concatena as tabelas em uma lista de dataframes da biblioteca Pandas
def concat_pandas_df(table_names):
    dataframes = []
    for table_name in table_names:
        data_frame = glueContext.create_dynamic_frame.from_catalog(database=database_name, table_name=table_name, transformation_ctx=f"dynamic_frame_{table_name}").toDF().toPandas()
        dataframes.append(data_frame)
    return dataframes

In [14]:
#substitui os nomes trocados das colunas
def subst_nomes_cols(dataframes):
    for table in dataframes:
        col = table.columns[table.columns.str.contains('col')]
        id = '_id'
        if col.any() or id in table.iloc[0].values:
            table.columns = table.iloc[0] 
            table = table.set_index(table.columns[0])
            table = table.iloc[1:]
            table.drop_duplicates(inplace=True)
    return dataframes

In [None]:
def remover_colunas_produtos(dataframes):
    columns_to_drop = ['product_name_lenght','product_description_lenght','product_photos_qty','product_weight_g','product_length_cm','product_height_cm','product_width_cm']
    for index, table in enumerate(dataframes):
        if 'product_name_lenght' in table.columns:
            dataframes[index] = table.drop(columns=[col for col in columns_to_drop if col in table.columns])
    return dataframes

In [None]:
# apaga a tabela com a tradução das categorias
def del_category_df(dataframes, table_names):
    new_dataframes = []
    for table in dataframes:
        if 'product_category_name_english' not in table.columns:
            new_dataframes.append(table)
    table_names.remove('product_category_name_translation_csv_parquet')
    dataframes = new_dataframes

    return dataframes

In [62]:
# conserta os tipos de dados das tabelas
def conserta_nome_df(dataframes):
    for table in dataframes:
        for column in table.columns:
            if '_id' in column and table[column].dtype != 'string':
                table[column] = table[column].astype('string')
                print("Colunas '_id' transformadas em objeto.")
            if '_prefix' in column and table[column].dtype != 'string':
                table[column] = table[column].astype('string')
                print("Colunas '_prefix' transformadas em string.")
            if 'price' in column and table[column].dtype != 'float64':
                table[column] = table[column].astype('float64')
                print("Colunas 'price' transformadas em float.")
            if '_value' in column and table[column].dtype != 'float64':
                table[column] = table[column].astype('float64')
                print("Colunas 'price' transformadas em float.")
            if '_date' in column and table[column].dtype != 'datetime64':
                table[column] = table[column].astype('datetime64')
                print("Colunas 'date' transformadas em datetime.")
            else:
                continue
    return dataframes            




In [63]:
# procura por valores nulos e os substitui por 0
def procura_valores_nulos(dataframes):
    for table in dataframes:
        nulls = table.isnull().sum()
        if nulls.any() > 0:
            table.fillna(0)
            print(f'{nulls} valores nulos encontrados e substituídos por 0')
        else:
            continue
    return dataframes

In [64]:
# procura por valores duplicados (excluindo a tabela geolocation pois algumas cidades estão repetidas propositalmente)
def procura_valores_duplicados(dataframes):
    for table in dataframes:
        if table.duplicated().any() == True and 'geolocation_city' not in table.columns :
            table.drop_duplicates(keep='first')
            print('Valores duplicados encontrados. Somente a primeira ocorrência foi mantida.')
        else:
            continue
    return dataframes

In [None]:
# escreve os dados no bucket 'curated'
def save_in_curated(dataframes, table_names):
    for table in dataframes:
        table_name = table_names
        dynamic_frame = glueContext.create_dynamic_frame.from_pandas(table, glueContext, "dynamic_frame")
        current_date = datetime.now()

        year_directory = current_date.strftime("%Y")
        month_directory = current_date.strftime("%Y/%m")
        day_directory = current_date.strftime("%Y/%m/%d")
        parquet_path = f"s3://olist-project-dw/curated/{year_directory}/{month_directory}/{day_directory}/{table_name}.parquet"

        glueContext.write_dynamic_frame.from_options(
            frame=dynamic_frame,
            connection_type="s3",
            connection_options={"path": parquet_path},
            format="parquet"
        )

        print("Tabelas salvas com sucesso!")


In [None]:
try:
    glue = session.client('glue')
    response = glue.get_tables(DatabaseName='olist-project-cleaned')
except Exception as e:
    print('Ocorreu um erro ao acessar o AWS Glue')

In [None]:
try:
    table_names = return_last_updated_tables(response)
except Exception as e:
    print("Ocorreu um erro ao carregar as tabelas do bucker cleaned: ")
    print(str(e))

In [None]:
try:
    dataframes = concat_pandas_df(table_names)
except Exception as e:
    print('Ocorreu um erro ao adicionar as tabelas à lista: ')
    print(str(e))

In [None]:
try:
    dataframes = subst_nomes_cols(dataframes)
    dataframes = remover_colunas_produtos(dataframes)
    dataframes = del_category_df(dataframes, table_names)
    dataframes = conserta_nome_df(dataframes)
    dataframes = procura_valores_nulos(dataframes)
    dataframes = procura_valores_duplicados(dataframes)
except Exception as e:
    print("Ocorreu um erro em uma transformação: ")
    print(str(e))


In [None]:
try:
    save_in_curated(dataframes, table_names)
except Exception as e:
    print("Ocorreu um erro ao salvar as tabelas no bucket curated: ")
    print(str(e))