In [None]:
from pyspark import SparkConf
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, col, count, avg, round, explode
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, FloatType, IntegerType



spark_conf = SparkConf()
spark_conf.set("spark.executor.memory", "4g")
spark_conf.set("spark.driver.memory", "4g")
spark_conf.set("spark.network.timeout", "600s")
spark_conf.set("spark.executor.instances", "4")
spark_conf.set("spark.executor.cores", "4")
spark_conf.set("spark.default.parallelism", "6")
spark_conf.set("spark.sql.shuffle.partitions", "6")
spark_conf.set("spark.sql.parquet.enableVectorizedReader", "true")

spark = SparkSession.builder.enableHiveSupport().config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config(conf=spark_conf).getOrCreate()

In [None]:
table_layer = [
    ["enem_microdados", "raw"],
    ["dim_candidato", "trusted"],
    ["dim_escola", "trusted"],
    ["dim_status_redacao", "trusted"],
    ["dim_tipo_prova", "trusted"],
    ["fact_candidato", "trusted"],
    ["fact_candidato_denormalized", "analytics"]
]

df_grouped = None
for table_info in table_layer:
    table = table_info[0]
    layer = table_info[1]
    print(f"{table} - {layer}")

    df = spark.read.format("delta").load(
        (
            os.path.join(
                os.getcwd(), "layers", layer, "tables", table
            )
        )
    )
    faulty_records = 0
    try:
        df_error = spark.read.format("delta").load(
            (
                os.path.join(
                    os.getcwd(), "layers", layer, "tables", f"{table}_errors"
                )
            )
        )
        faulty_records = df_error.count()
        
        def extrair_valor(error_list):
            valor = list(list(error_list.items())[0])
            return valor
        
        schema_valor = ArrayType(StringType())
        udf_extrair_valor = udf(extrair_valor, schema_valor)

        df_exploded = df_error.select(explode("error_list").alias("explodido"))
        df_exploded = df_exploded.select(explode("explodido").alias("explodido"))
        df_result = df_exploded.withColumn("_explodido", udf_extrair_valor("explodido"))
        df_result = df_result.withColumn("valor", col("_explodido").getItem(1))
        df_result = df_result.drop("_chave","explodido","_explodido").drop_duplicates()
        faulty_dqs = ",".join([value[0] for value in df_result.collect()])
    except Exception as e:
        print(f"Exception: {e}")
        df_error = None
        faulty_records = 0
        faulty_dqs = None
        has_error = False

    df_columns = df.columns  
    sys_columns = ['ingestion_at','YEAR','MONTH','DAY']
    columns = list(filter(lambda x: x not in sys_columns, df_columns))

    valid_records = df.count()
    total_records = valid_records+faulty_records
    
    if df_error:
        df_table_grouped = (
            df_error.groupBy().agg(avg("total_quality_errors").alias("avg_total_quality_errors"))
                        .withColumn("total_faulty_columns_perc", (col("avg_total_quality_errors") / lit(len(columns)))*100) 
                        .withColumn("table_name", lit(table)) 
                        .withColumn("layer", lit(layer))
                        .withColumn("total_columns_num", lit(len(columns)))
                        .withColumn("total_records", lit(total_records)) 
                        .withColumn("valid_records", lit(valid_records)) 
                        .withColumn("faulty_records", lit(faulty_records)) 
                        .withColumn("faulty_records_perc", lit((faulty_records/total_records)*100))
                        .withColumn("faulty_dqs", lit(faulty_dqs))
            )
    else:
        schema = StructType([
            StructField("avg_total_quality_errors", FloatType()),
            StructField("total_faulty_columns_perc", FloatType()),
            StructField("table_name", StringType()),
            StructField("layer", StringType()),
            StructField("total_columns_num", IntegerType()),
            StructField("total_records", IntegerType()),
            StructField("valid_records", IntegerType()),
            StructField("faulty_records", IntegerType()),
            StructField("faulty_records_perc", FloatType()),
            StructField("faulty_dqs", StringType()),
        ])


        data = [(0.0, 0.0, table, layer, len(columns), total_records, valid_records, 0, 0.0, "")]

        df_table_grouped = spark.createDataFrame(data, schema=schema)
    
    if df_grouped:
        df_grouped = df_grouped.unionByName(df_table_grouped)
    else:
        df_grouped = df_table_grouped


In [None]:
df_grouped = df_grouped.withColumn("avg_total_quality_errors", round("avg_total_quality_errors", 2))
df_grouped = df_grouped.withColumn("total_faulty_columns_perc", round("total_faulty_columns_perc", 2))
df_grouped = df_grouped.withColumn("faulty_records_perc", round("faulty_records_perc", 2))

df_grouped.show()

In [None]:
df_grouped.select("faulty_dqs").collect()

In [None]:
table_layer = [
    ["enem_microdados", "raw"],
    # ["base_estados_educacao", "raw"],
    # ["dim_candidato", "trusted"],
    # ["dim_escola", "trusted"],
    # ["dim_perfil_gestor", "trusted"],
    # ["dim_projeto_educacao", "trusted"],
    # ["dim_status_redacao", "trusted"],
    # ["dim_tipo_prova", "trusted"],
    # ["fact_candidato", "trusted"],
    # ["enem_results_ibge_view", "analytics"],
    # ["fact_candidato_denormalized", "analytics"]
]

df_grouped = None
for table_info in table_layer:
    table = table_info[0]
    layer = table_info[1]

    df = spark.read.format("delta").load(
            (
                os.path.join(
                    os.getcwd(), "layers", layer, "tables", f"{table}_errors"
                )
            )
        )

    # Defina uma UDF para extrair a coluna 'valor'
    def extrair_valor(error_list):
        valor = list(list(error_list.items())[0])
        return valor

    # Registre as UDFs no Spark
    schema_valor = ArrayType(StringType())
    udf_extrair_valor = udf(extrair_valor, schema_valor)

    df_explodido = df.select(explode("error_list").alias("explodido"))
    df_explodido = df_explodido.select(explode("explodido").alias("explodido"))
    df_resultado = df_explodido.withColumn("_explodido", udf_extrair_valor("explodido"))
    df_resultado = df_resultado.withColumn("chave", col("_explodido").getItem(0))
    df_resultado = df_resultado.withColumn("valor", col("_explodido").getItem(1))
    df_resultado = df_resultado.drop("_chave","explodido","_explodido")

    df_resultado = df_resultado.groupBy("chave", "valor").agg(count("*").alias("total"))

In [None]:
df_resultado.show(100)