In [1]:
# Iniciando a sessão Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, StructType, StructField
from pyspark.sql.window import Window
import unicodedata
import sys
from pyspark.context import SparkContext

# Inicialização da SparkSession localmente
spark = SparkSession.builder \
    .appName("LocalSparkSession") \
    .config("spark.sql.shuffle.partitions", "2") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

In [2]:
# Função para remover acentos
def remove_accents(inputStr):
    if inputStr is not None:
        return unicodedata.normalize('NFD', inputStr).encode('ascii', 'ignore').decode('utf-8')
    else:
        return None

# Registrar a função como uma UDF
remove_accents_udf = udf(remove_accents, StringType())

In [3]:
# Mapeamento das siglas dos estados
siglas_estados = {
    "Acre": "AC", "Alagoas": "AL", "Amapá": "AP", "Amazonas": "AM", "Bahia": "BA", "Ceará": "CE",
    "Distrito Federal": "DF", "Espírito Santo": "ES", "Goiás": "GO", "Maranhão": "MA", "Mato Grosso": "MT",
    "Mato Grosso do Sul": "MS", "Minas Gerais": "MG", "Pará": "PA", "Paraíba": "PB", "Paraná": "PR",
    "Pernambuco": "PE", "Piauí": "PI", "Rio de Janeiro": "RJ", "Rio Grande do Norte": "RN", "Rio Grande do Sul": "RS",
    "Rondônia": "RO", "Roraima": "RR", "Santa Catarina": "SC", "São Paulo": "SP", "Sergipe": "SE", "Tocantins": "TO"
}

# Função UDF para substituir o nome do estado pela sigla
def substituir_estado_por_sigla(estado_nome):
    return siglas_estados.get(estado_nome, estado_nome)

substituir_estado_por_sigla_udf = udf(substituir_estado_por_sigla, StringType())

In [11]:
# Esquema esperado
expected_schema = StructType([
    StructField("id", StringType(), False),   
    StructField("cpf", StringType(), False),
    StructField("nome_completo", StringType(), False),
    StructField("convenio", StringType(), False),
    StructField("data_nascimento", DateType(), False),
    StructField("sexo", StringType(), False),
    StructField("logradouro", StringType(), False),
    StructField("numero", IntegerType(), False),
    StructField("bairro", StringType(), False),
    StructField("cidade", StringType(), False),
    StructField("estado", StringType(), False),
    StructField("data_cadastro", TimestampType(), False)
])

# Validador de esquema e qualidade dos dados
def validar_esquema(df, expected_schema):
    actual_schema = df.schema
    for expected_field, actual_field in zip(expected_schema, actual_schema):
        if expected_field.name != actual_field.name or expected_field.dataType != actual_field.dataType:
            raise ValueError(f"Esquema não corresponde ao esperado.\nEsperado: {expected_schema}\nAtual: {actual_schema}")
    return df

In [12]:
# Função para tratar valores nulos
def tratar_valores_nulos(df):
    for field in df:
        if isinstance(field.dataType, StringType):
            df = df.withColumn(field.name, when(col(field.name).isNull(), lit("N/A")).otherwise(col(field.name)))
        elif isinstance(field.dataType, IntegerType):
            df = df.withColumn(field.name, when(col(field.name).isNull(), lit(0)).otherwise(col(field.name)))
        elif isinstance(field.dataType, DateType):
            df = df.withColumn(field.name, when(col(field.name).isNull(), lit("1970-01-01").cast(DateType())).otherwise(col(field.name)))
        elif isinstance(field.dataType, TimestampType):
            df = df.withColumn(field.name, when(col(field.name).isNull(), lit("1970-01-01 00:00:00").cast(TimestampType())).otherwise(col(field.name)))
    return df

# Remover CPFs duplicados e manter o registro com a data de cadastro mais recente
def remover_cpfs_duplicados(df):
    window_spec = Window.partitionBy("cpf").orderBy(col("data_cadastro").desc())
    df = df.withColumn("row_number", row_number().over(window_spec)) \
           .filter(col("row_number") == 1) \
           .drop("row_number")
    return df

In [6]:
# Função para mascarar CPF
def esconder_cpf(cpf):
    if cpf is not None and len(cpf) == 11:
        return "********" + cpf[-3:]
    return cpf

# Registrar a função como uma UDF
esconder_cpf_udf = udf(esconder_cpf, StringType())

# validação de dados com Great Expectations
def validar_qualidade_dados_ge(df):
    # Convertendo o DataFrame para um DataFrame do Great Expectations
    df_ge = SparkDFDataset(df)

    # Verificar se há valores nulos nas colunas obrigatórias
    colunas_obrigatorias = ["nome_completo", "convenio"]
    for coluna in colunas_obrigatorias:
        expectation_result = df_ge.expect_column_values_to_not_be_null(coluna)
        if not expectation_result["success"]:
            raise ValueError(f"A coluna {coluna} contém valores nulos ou em branco.")

    # Verificar se o DataFrame possui registros
    row_count = df_ge.count()
    if row_count == 0:
        raise ValueError("O DataFrame está vazio. Nenhum registro encontrado.")

    # Verificar se a coluna 'cpf' não é nula, tem exatamente 11 caracteres, é uma string com dígitos e é única
    expectation_result = df_ge.expect_column_values_to_match_regex("cpf", r"^\d{11}$")
    if not expectation_result["success"]:
        raise ValueError(f"Existem CPFs com formato inválido ou que não são strings com 11 dígitos.")

    # Verificar se os valores de CPF são únicos
    expectation_result = df_ge.expect_column_values_to_be_unique("cpf")
    if not expectation_result["success"]:
        raise ValueError(f"Existem CPFs duplicados.")

    print("Todos os registros estão válidos.")
    
    # Mascarar os CPFs
    df = df.withColumn("cpf", esconder_cpf_udf(col("cpf")))
    return df

In [7]:
# Função para ler, transformar e padronizar dados do Parquet
def transform_parquet(spark, file_path):
    df_parquet = spark.read.parquet(file_path)
    df_parquet_tratamento = df_parquet.alias("df_parquet_tratamento")
    df_transformado_parquet = df_parquet_tratamento.withColumnRenamed("documento_cpf", "cpf") \
                                    .withColumn("cpf", regexp_replace(col("cpf"), "[.-]", "")) \
                                    .withColumn("nome_completo", trim(regexp_replace(regexp_replace(col("nome_completo"), "Sr\\.|Sra\\.|Dr\\.|Srta\\.|Dra\\.", ""), "\\s+", " "))) \
                                    .withColumn("data_nascimento", to_date(col("data_nascimento"), "yyyy-MM-dd")) \
                                    .withColumn("data_cadastro", to_timestamp(col("data_cadastro"), "yyyy-MM-dd'T'HH:mm:ss")) \
                                    .withColumn("cidade", initcap(col("cidade"))) \
                                    .withColumn("sexo", when(col("sexo") == "Fem", "F").otherwise(when(col("sexo") == "Masc", "M").otherwise(col("sexo")))) \
                                    .withColumn("numero", col("numero").cast(IntegerType())) \
                                    .withColumnRenamed("uf", "estado") \
                                    .drop("__index_level_0__") \
                                    .drop("pais")
    for column_name in df_transformado_parquet.schema.names:
        if isinstance(df_transformado_parquet.schema[column_name].dataType, StringType):
            df_transformado_parquet = df_transformado_parquet.withColumn(column_name, remove_accents_udf(col(column_name)))
    colunas_ordenadas = ["cpf", "nome_completo", "convenio" ,"data_nascimento", "sexo" , "logradouro" , "numero" , "bairro", "cidade" , "estado" , "data_cadastro"]
    df_transformado_parquet = df_transformado_parquet.select(colunas_ordenadas)
    return df_transformado_parquet

# Função para ler, transformar e padronizar dados do CSV
def transform_csv(spark, file_path):
    df_csv = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(file_path)
    df_csv_tratamento = df_csv.alias("df_csv_tratamento")
    df_transformado_csv = df_csv_tratamento.withColumn("nome", trim(regexp_replace(regexp_replace(col("nome"), "Sr\\.|Sra\\.|Dr\\.|Srta\\.|Dra\\.", ""), "\\s+", " "))) \
                                    .withColumnRenamed("nome", "nome_completo") \
                                    .withColumn("data_nascimento", to_date(col("data_nascimento"), "dd/MM/yyyy")) \
                                    .withColumn("data_nascimento", col("data_nascimento").cast("date")) \
                                    .withColumn("data_cadastro", to_timestamp(col("data_cadastro"), "yyyy-MM-dd HH:mm:ss"))\
                                    .withColumn("cidade", initcap(col("cidade"))) \
                                    .withColumn("sexo", when(col("sexo") == "Fem", "F").otherwise(when(col("sexo") == "Masc", "M").otherwise(col("sexo")))) \
                                    .withColumn("numero", regexp_replace(col("numero"), "[\\.0]", "")) \
                                    .withColumn("numero", col("numero").cast(IntegerType())) \
                                    .drop("pais_cadastro")
    for column_name in df_transformado_csv.schema.names:
        if isinstance(df_transformado_csv.schema[column_name].dataType, StringType):
            df_transformado_csv = df_transformado_csv.withColumn(column_name, remove_accents_udf(col(column_name)))
    colunas_ordenadas = ["cpf", "nome_completo", "convenio" ,"data_nascimento", "sexo" , "logradouro" , "numero" , "bairro", "cidade" , "estado" , "data_cadastro"]
    df_transformado_csv = df_transformado_csv.select(colunas_ordenadas)
    return df_transformado_csv

# Função para ler, transformar e padronizar dados do JSON
def transform_json(spark, file_path):
    df_json = spark.read.json(file_path)
    df_json_tratamento = df_json.alias("df_json_tratamento")
    df_transformado_json = df_json_tratamento\
        .withColumn("nome", trim(regexp_replace(regexp_replace(col("nome"), "Sr\\.|Sra\\.|Dr\\.|Srta\\.|Dra\\.", ""), "\\s+", " ")))\
        .withColumnRenamed("nome", "nome_completo")\
        .withColumn("cpf", regexp_replace(col("cpf"), "[.-]", "")) \
        .withColumn("data_nascimento", to_date(col("data_nascimento"), "MMMM dd, yyyy"))\
        .withColumn("data_nascimento", col("data_nascimento").cast("date"))\
        .withColumn("data_cadastro", from_unixtime(col("data_cadastro") / 1000).cast("timestamp"))\
        .withColumn("cidade", initcap(col("cidade")))\
        .withColumn("numero", col("numero").cast(IntegerType()))\
        .withColumn("estado", substituir_estado_por_sigla_udf(col("estado")))
    for column_name in df_transformado_json.schema.names:
        if isinstance(df_transformado_json.schema[column_name].dataType, StringType):
            df_transformado_json = df_transformado_json.withColumn(column_name, remove_accents_udf(col(column_name)))
    colunas_ordenadas = ["cpf", "nome_completo", "convenio" ,"data_nascimento", "sexo" , "logradouro" , "numero" , "bairro", "cidade" , "estado" , "data_cadastro"]
    df_transformado_json = df_transformado_json.select(colunas_ordenadas)
    return df_transformado_json

In [8]:
# Caminhos dos arquivos locais ou do S3
parquet_path = "/home/jovyan/work/data/dados_cadastro_1.parquet"
csv_path = "/home/jovyan/work/data/dados_cadastro_2.csv"
json_path = "/home/jovyan/work/data/dados_cadastro_3.json"

# Transformar e padronizar dados
df_transformado_parquet = transform_parquet(spark, parquet_path)
df_transformado_csv = transform_csv(spark, csv_path)
df_transformado_json = transform_json(spark, json_path)

In [9]:
# Combinando os DataFrames
tabela_unica = df_transformado_parquet.union(df_transformado_csv).union(df_transformado_json)

In [13]:
# Tratar valores nulos
tabela_unica = tratar_valores_nulos(tabela_unica)

In [14]:
# Remover CPFs duplicados
tabela_unica = remover_cpfs_duplicados(tabela_unica)

In [15]:
# Cache do DataFrame antes da validação de dados
tabela_unica.cache()

DataFrame[cpf: string, nome_completo: string, convenio: string, data_nascimento: date, sexo: string, logradouro: string, numero: int, bairro: string, cidade: string, estado: string, data_cadastro: timestamp]

In [16]:
# Mascarar os CPFs
tabela_unica = tabela_unica.withColumn("cpf", esconder_cpf_udf(col("cpf")))

In [17]:
# Adicionar coluna de chave primária usando UUID
tabela_unica = tabela_unica.withColumn("id", expr("uuid()"))

# Reordenar as colunas para que 'id' seja a primeira coluna
primeira_coluna = ['id']
outras_colunas = [col for col in tabela_unica.columns if col != 'id']
tabela_unica = tabela_unica.select(primeira_coluna + outras_colunas)

In [18]:
# Validar esquema final
tabela_unica = validar_esquema(tabela_unica, expected_schema)

In [19]:
# Conferir o DataFrame resultante
tabela_unica.show(5, truncate=False)

+------------------------------------+-----------+---------------------+------------+---------------+----+--------------------+------+--------------------+------------------+------+-------------------+
|id                                  |cpf        |nome_completo        |convenio    |data_nascimento|sexo|logradouro          |numero|bairro              |cidade            |estado|data_cadastro      |
+------------------------------------+-----------+---------------------+------------+---------------+----+--------------------+------+--------------------+------------------+------+-------------------+
|0f162b94-8cd1-4aa7-93d0-48ed3489982c|********752|Luiz Miguel Rodrigues|Porto Seguro|2017-06-04     |M   |Loteamento de Mendes|373   |Conjunto Serra Verde|Santos            |SP    |2014-04-02 04:22:42|
|1afcab0d-4d06-43be-bcd0-156398cd5f4d|********634|Heitor Vieira        |Gold        |1978-05-28     |M   |Patio Vieira        |554   |Vila Havai          |Pires             |CE    |2001-09-18 

In [20]:
# Salvar o DataFrame resultante no S3 particionado por convenio
output_path = "/home/jovyan/work/output/dados_tratados"
tabela_unica.write.partitionBy("convenio").parquet(output_path, mode="overwrite")