In [1]:
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, date_format, col, lit, when, explode, array, struct, explode, expr, coalesce
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

from typing import List
from functools import reduce
from google.colab import drive

drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Configuração do logger
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# logger = logging.getLogger(__name__)

In [3]:
# Configurações
APP_NAMES = ["tb_plan_crosswalk", "tb_plan_id_crosswalk"]
BASE_PATH = "/content/drive/My Drive/projetos/Projeto Integrador/dataset/"
OUTPUT_PATH = f"{BASE_PATH}silver/tb_plan_crosswalk/"
TABLE_NAME = "tb_silver_plan_crosswalk"

COLUMNS: List[str] = [
    "*"
] # Alterar

In [4]:
def log_info(message: str):
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"{current_time} - INFO - {message}")

def log_error(message: str):
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"{current_time} - ERROR - {message}")

def create_spark_session():
    log_info("Criando sessão Spark")
    spark = SparkSession.builder.appName("UnifyServiceArea").getOrCreate()
    spark.conf.set("spark.sql.parquet.mergeSchema", "false")
    spark.conf.set("spark.sql.files.maxPartitionBytes", "128m")
    spark.conf.set("spark.sql.files.openCostInBytes", "134217728")
    spark.conf.set("spark.sql.broadcastTimeout", "3600")
    return spark

def union_dataframes(df1: DataFrame, df2: DataFrame) -> DataFrame:
    return df1.unionByName(df2, allowMissingColumns=True)

def read_and_process_data(spark: SparkSession, app_names: List[str], base_path: str) -> DataFrame:
    dfs = []
    for app_name in app_names:
        file_path = f"{base_path}bronze/{app_name}/"
        log_info(f"Tentando ler dados de {file_path}")
        try:
            df = spark.read.parquet(file_path)
            log_info(f"Dados lidos com sucesso de {file_path}")
            df_processed = df.select("*").distinct() \
                            .withColumn("ingestDate", current_timestamp()) \
                            .withColumn("partitionDate", date_format(current_timestamp(), "yyyyMMdd")) \
                            .withColumn("source", lit(app_name))
            dfs.append(df_processed)
        except Exception as e:
            log_error(f"Erro ao ler dados de {file_path}: {str(e)}")
            continue

    if not dfs:
        raise ValueError("Nenhum dado foi lido com sucesso")

    return reduce(union_dataframes, dfs)

def prepare_silver_data(df: DataFrame) -> DataFrame:
    log_info("Preparando dados para o formato silver")
    columns = df.columns
    return df.select([
        col(c).cast("string") if c not in ["ingestDate", "partitionDate", "source"] else col(c)
        for c in columns
    ])

def save_as_parquet(df: DataFrame, output_path: str):
    log_info(f"Salvando dados como Parquet em {output_path}")
    df.write \
      .mode("overwrite") \
      .partitionBy("partitionDate") \
      .parquet(output_path)

In [8]:
def refine_dataframe(df):
    def remove_year(column_name):
        return column_name.split('_')[0]

    def extract_year(column_name):
        parts = column_name.split('_')
        return parts[-1] if len(parts) > 1 and parts[-1].isdigit() else None

    # Identificar colunas únicas e seus anos
    column_years = [(c, extract_year(c)) for c in df.columns if '_20' in c]
    unique_columns = set(remove_year(c) for c, _ in column_years)

    # Criar expressões para cada coluna única
    select_expr = []
    year_columns = []

    for unique_col in unique_columns:
        matching_columns = [(c, y) for c, y in column_years if remove_year(c) == unique_col]

        # Criar uma expressão coalesce para cada coluna única
        coalesce_expr = [when(col(c).isNotNull(), struct(lit(y).alias("year"), col(c).alias("value")))
                         for c, y in matching_columns]
        select_expr.append(coalesce(*coalesce_expr).alias(f"{unique_col}_temp"))

        # Adicionar expressão para o ano
        year_columns.append(f"{unique_col}_temp.year")

    # Adicionar colunas que não têm anos
    other_columns = [c for c in df.columns if '_20' not in c]
    select_expr.extend([col(c) for c in other_columns])

    # Aplicar a seleção e adicionar a coluna YEAR
    df_refined = df.select(*select_expr, *other_columns)
    df_refined = df_refined.withColumn("YEAR", coalesce(*year_columns))

    # Extrair os valores das estruturas e renomear as colunas
    for unique_col in unique_columns:
        df_refined = df_refined.withColumn(unique_col, col(f"{unique_col}_temp.value"))
        df_refined = df_refined.drop(f"{unique_col}_temp")

    return df_refined

In [9]:
def main():
    try:
        spark = create_spark_session()

        df_combined = read_and_process_data(spark, APP_NAMES, BASE_PATH)
        df_silver = prepare_silver_data(df_combined)

        # Refinar o DataFrame
        df_refined = refine_dataframe(df_silver)

        # Remover colunas duplicadas (se houver)
        df_final = df_refined.select(list(dict.fromkeys(df_refined.columns)))

        # Continuar com o salvamento e outras operações...
        save_as_parquet(df_final, OUTPUT_PATH)

        log_info(f"Dados salvos como Parquet em: {OUTPUT_PATH}")

        # Verificar se o arquivo foi salvo corretamente
        saved_df = spark.read.parquet(OUTPUT_PATH)
        log_info("Schema dos dados salvos:")
        saved_df.printSchema()
        log_info("Primeiras 5 linhas dos dados salvos:")
        saved_df.show(5)
        log_info(f"Contagem de registros salvos: {saved_df.count()}, na tabela: {TABLE_NAME}")
        log_info("Processo concluído com sucesso")

    except Exception as e:
        log_error(f"Erro durante a execução: {str(e)}")

In [10]:
if __name__ == "__main__":
    main()

2025-03-11 19:31:36 - INFO - Criando sessão Spark
2025-03-11 19:31:36 - INFO - Tentando ler dados de /content/drive/My Drive/projetos/Projeto Integrador/dataset/bronze/tb_plan_crosswalk/
2025-03-11 19:31:36 - INFO - Dados lidos com sucesso de /content/drive/My Drive/projetos/Projeto Integrador/dataset/bronze/tb_plan_crosswalk/
2025-03-11 19:31:36 - INFO - Tentando ler dados de /content/drive/My Drive/projetos/Projeto Integrador/dataset/bronze/tb_plan_id_crosswalk/
2025-03-11 19:31:36 - INFO - Dados lidos com sucesso de /content/drive/My Drive/projetos/Projeto Integrador/dataset/bronze/tb_plan_id_crosswalk/
2025-03-11 19:31:37 - INFO - Preparando dados para o formato silver
2025-03-11 19:31:38 - INFO - Salvando dados como Parquet em /content/drive/My Drive/projetos/Projeto Integrador/dataset/silver/tb_plan_crosswalk/
2025-03-11 19:31:56 - INFO - Dados salvos como Parquet em: /content/drive/My Drive/projetos/Projeto Integrador/dataset/silver/tb_plan_crosswalk/
2025-03-11 19:31:56 - INFO 