# Notebook 3: Funções de Transformação e Limpeza de Dados

## Introdução
Neste notebook, iremos implementar funções para transformar e limpar os dados da tabela `therapy` do conjunto de dados FAERS. Com base na análise exploratória realizada no notebook anterior (Exploratory_Data_Analysis_Therapy), identificámos várias áreas que requerem atenção, incluindo a padronização de datas, tratamento de valores nulos e correção de inconsistências.

## Objetivos
- Criar funções reutilizáveis para a limpeza e transformação de dados.
- Padronizar formatos de data.
- Lidar com valores nulos e inconsistentes.
- Preparar os dados para análises futuras.

## Estrutura do Notebook
1. **Importação de Bibliotecas**
2. **Carregamento dos Dados**
3. **Definição do Schema**
4. **Definição de Metadados**
5. **Funções de Transformação**
   - Padronização de Datas
   - Tratamento de Valores Nulos
   - Correção de Inconsistências
   - Conversão de Unidades de Duração
6. **Aplicação das Funções**
7. **Validação dos Dados Transformados**
8. **Gravação dos Dados Limpos**

Vamos começar por implementar cada uma destas secções para garantir uma limpeza e transformação eficaz dos nossos dados.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import to_date, col, when, expr
from pyspark.sql.functions import col, when, to_date, expr

In [0]:
# Carregar dados do Parquet
df_therapy = spark.read.parquet('dbfs:/FileStore/FAERS-grupo-4/therapy_raw')

# Definir schema

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType


# Definir o esquema final manualmente
therapy_schema = StructType([
    StructField("primaryid", IntegerType(), True),
    StructField("caseid", IntegerType(), True),
    StructField("dsg_drug_seq", IntegerType(), True),
    StructField("start_dt", StringType(), True),
    StructField("end_dt", StringType(), True),
    StructField("dur", IntegerType(), True),
    StructField("dur_cod", StringType(), True),
    StructField("dur_days", IntegerType(), True),
    StructField("ongoing", BooleanType(), True),
    StructField("datas_consistentes", BooleanType(), True),
    StructField("duracao_em_dias", DoubleType(), True),
    StructField("terapia_em_andamento", BooleanType(), True)
])


In [0]:
# Definir metadados atualizados
meta_data_therapy = {
    "primaryid": "Unique number for identifying a FAERS report. This is a concatenated key of Case ID and Case Version Number. It is the identifier for the case sequence version number as reported by the manufacturer.",
    "caseid": "Number for identifying a FAERS case.",
    "dsg_drug_seq": "Drug sequence number for identifying a drug for a Case.",
    "start_dt": "Date the therapy was started (or re-started) for this drug (YYYYMMDD). If a complete date not available, a partial date is provided.",
    "end_dt": "Date therapy was stopped for this drug.",
    "dur": "Numeric value of the duration (length) of therapy.",
    "dur_cod": "Unit abbreviation for duration of therapy.",
    "dur_days": "Duration of therapy in days.",
    "ongoing": "Boolean flag indicating if the therapy is ongoing.",
    "datas_consistentes": "Boolean flag indicating if the therapy dates are consistent.",
    "duracao_em_dias": "Duration of therapy in days calculated.",
    "terapia_em_andamento": "Boolean flag indicating if the therapy is ongoing based on missing end date."
}


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType

# Definir o esquema final manualmente
therapy_schema_final = StructType([
    StructField("primaryid", IntegerType(), True),
    StructField("caseid", IntegerType(), True),
    StructField("dsg_drug_seq", IntegerType(), True),
    StructField("start_dt", StringType(), True),
    StructField("end_dt", StringType(), True),
    StructField("dur", IntegerType(), True),
    StructField("dur_cod", StringType(), True),
    StructField("dur_days", IntegerType(), True),
    StructField("ongoing", BooleanType(), True),
    StructField("datas_consistentes", BooleanType(), True),
    StructField("duracao_em_dias", DoubleType(), True),
    StructField("terapia_em_andamento", BooleanType(), True)
])


In [0]:
# Renomear colunas para corresponder aos metadados
df_therapy = df_therapy.withColumnRenamed("primaryid", "primaryid") \
    .withColumnRenamed("caseid", "caseid") \
    .withColumnRenamed("dsg_drug_seq", "dsg_drug_seq") \
    .withColumnRenamed("start_dt", "start_dt") \
    .withColumnRenamed("end_dt", "end_dt") \
    .withColumnRenamed("dur", "dur") \
    .withColumnRenamed("dur_cod", "dur_cod")


# Configurar os metadados e renomear colunas da tabela

## Funções de Transformação, validação dados e para guardar parquet 'therapy_final' 

In [0]:
from pyspark.sql.functions import col, when, datediff, to_date, lit, avg, expr, coalesce, last_day, concat, round, isnan, date_add, date_sub, stddev
from pyspark.sql.window import Window

def tratar_dados_therapy(df):
    # Converter datas, tratando datas parciais
    df = df.withColumn("start_dt", 
        when(col("start_dt").cast("string").rlike("^[0-9]{8}$"), to_date(col("start_dt").cast("string"), "yyyyMMdd"))
        .when(col("start_dt").cast("string").rlike("^[0-9]{6}$"), to_date(concat(col("start_dt").cast("string"), lit("01")), "yyyyMMdd"))
        .when(col("start_dt").cast("string").rlike("^[0-9]{4}$"), to_date(concat(col("start_dt").cast("string"), lit("0101")), "yyyyMMdd"))
        .otherwise(None)
    )
    df = df.withColumn("end_dt", 
        when(col("end_dt").cast("string").rlike("^[0-9]{8}$"), to_date(col("end_dt").cast("string"), "yyyyMMdd"))
        .when(col("end_dt").cast("string").rlike("^[0-9]{6}$"), last_day(to_date(concat(col("end_dt").cast("string"), lit("01")), "yyyyMMdd")))
        .when(col("end_dt").cast("string").rlike("^[0-9]{4}$"), to_date(concat(col("end_dt").cast("string"), lit("1231")), "yyyyMMdd"))
        .otherwise(None)
    )

    # Normalizar duração para dias
    df = df.withColumn("dur_days", 
        when(col("dur").isNotNull() & col("dur_cod").isNotNull(),
            round(when(col("dur_cod") == "YR", col("dur") * 365)
            .when(col("dur_cod") == "MON", col("dur") * 30)
            .when(col("dur_cod") == "WK", col("dur") * 7)
            .when(col("dur_cod") == "DAY", col("dur"))
            .when(col("dur_cod") == "HR", col("dur") / 24)
            .when(col("dur_cod") == "MIN", col("dur") / (24 * 60))
            .when(col("dur_cod") == "SEC", col("dur") / (24 * 60 * 60))
            .otherwise(None)).cast("integer")
        ).otherwise(None)
    )

    # Calcular duração em dias inicial
    df = df.withColumn("duracao_em_dias", 
        when(col("start_dt").isNotNull() & col("end_dt").isNotNull() & (col("end_dt") >= col("start_dt")),
             datediff(col("end_dt"), col("start_dt")) + 1)
        .when(col("dur_days").isNotNull(), col("dur_days"))
        .otherwise(None)
    )

    # Calcular estatísticas para identificar outliers
    stats = df.select(avg("duracao_em_dias").alias("mean"), 
                      stddev("duracao_em_dias").alias("std")).first()
    mean_duration = stats["mean"]
    std_duration = stats["std"]
    lower_bound = mean_duration - 3 * std_duration
    upper_bound = mean_duration + 3 * std_duration

    # Remover outliers e tratar durações negativas
    df = df.withColumn("duracao_em_dias", 
        when((col("duracao_em_dias") < lower_bound) | (col("duracao_em_dias") > upper_bound) | (col("duracao_em_dias") < 0), None)
        .otherwise(col("duracao_em_dias"))
    )

    # Imputar média para casos sem informação suficiente
    mean_duration_valid = df.filter(col("duracao_em_dias").isNotNull()).select(avg("duracao_em_dias")).first()[0]
    df = df.withColumn("duracao_em_dias", 
        when(col("duracao_em_dias").isNull(), round(lit(mean_duration_valid)).cast("integer"))
        .otherwise(col("duracao_em_dias").cast("integer"))
    )

    # Recalcular datas com base na duração imputada
    df = df.withColumn("end_dt",
        when(col("start_dt").isNotNull() & col("end_dt").isNull(),
             date_add(col("start_dt"), col("duracao_em_dias") - 1))
        .otherwise(col("end_dt"))
    )
    df = df.withColumn("start_dt",
        when(col("start_dt").isNull() & col("end_dt").isNotNull(),
             date_sub(col("end_dt"), col("duracao_em_dias") - 1))
        .otherwise(col("start_dt"))
    )

    # Corrigir casos onde end_dt é menor que start_dt
    df = df.withColumn("end_dt",
        when((col("end_dt").isNotNull()) & (col("start_dt").isNotNull()) & (col("end_dt") < col("start_dt")),
             date_add(col("start_dt"), col("duracao_em_dias") - 1))
        .otherwise(col("end_dt"))
    )

    # Identificar terapias em andamento
    df = df.withColumn("terapia_em_andamento", 
        when(col("end_dt").isNull() & col("start_dt").isNotNull(), True)
        .otherwise(False)
    )

    # Verificar consistência das datas
    df = df.withColumn("datas_consistentes",
        when(col("end_dt").isNotNull() & col("start_dt").isNotNull() & (col("end_dt") >= col("start_dt")), True)
        .otherwise(False)
    )

    return df

# Aplicar o tratamento
df_therapy_tratado = tratar_dados_therapy(df_therapy)

# Validar os dados
def validar_dados(df):
    print("Esquema do DataFrame:")
    df.printSchema()
    
    print("\nResumo estatístico da duração em dias:")
    df.select("duracao_em_dias").summary().show()
    
    print("\nContagem de valores nulos:")
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        print(f"{column}: {null_count}")
    
    print("\nExemplos de registros:")
    df.show(5, truncate=False)

    print("\nDistribuição de terapias em andamento:")
    df.groupBy("terapia_em_andamento").count().show()

    print("\nDistribuição de consistência de datas:")
    df.groupBy("datas_consistentes").count().show()

#validar_dados(df_therapy_tratado)

In [0]:
from pyspark.sql.functions import lit

# Função para restaurar metadados
def restore_metadata(schema, df):
    for field in schema.fields:
        if field.name not in df.columns:
            df = df.withColumn(field.name, lit(None).cast(field.dataType))
        df = df.withColumnRenamed(field.name, field.name)
    return df

# Restaurar metadados após o processamento
df_therapy_tratado = restore_metadata(therapy_schema_final, df_therapy_tratado)


In [0]:
def validar_dados(df):
    print("Esquema do DataFrame:")
    df.printSchema()
    
    print("\nResumo estatístico da duração em dias:")
    df.select("duracao_em_dias").summary().show()
    
    print("\nContagem de valores nulos:")
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        print(f"{column}: {null_count}")
    
    print("\nExemplos de registros:")
    df.show(5, truncate=False)

    print("\nDistribuição de terapias em andamento:")
    df.groupBy("terapia_em_andamento").count().show()

    print("\nDistribuição de consistência de datas:")
    df.groupBy("datas_consistentes").count().show()

#validar_dados(df_therapy_tratado)

# Função para gravar dados
def gravar_dados(df, caminho):
    df.write.mode("overwrite").parquet(caminho)

# Especificar o caminho para salvar o DataFrame transformado
save_path = "dbfs:/FileStore/FAERS-grupo-4/therapy_final"

# Gravar o DataFrame transformado
gravar_dados(df_therapy_tratado, save_path)
