In [None]:
from pyspark.sql import SparkSession 

sc = SparkSession.builder.appName("DataFrame").config("spark.driver.memory", "8g").getOrCreate()

In [None]:
logs_interesse = [
    "Eleitor foi habilitado",
    "Voto confirmado para [Deputado Federal]",
    "Voto confirmado para [Deputado Estadual]",
    "Voto confirmado para [Senador]",
    "Voto confirmado para [Governador]",
    "Voto confirmado para [Presidente]",
    "Tecla indevida pressionada",
]
regex_interesse = [
    "Município:",
    "Zona Eleitoral:",
    "Local de Votação:",
    "Seção Eleitoral:",
]
regex_interesse = "|".join(regex_interesse)

In [None]:
import pyspark.sql.functions as F

DATA_PATH = "/app/data/"
input_file = DATA_PATH+"decodificado/*" 

source = sc.read.csv(input_file, sep="\t", header=False)
source = source.withColumn("arquivo", F.input_file_name())

In [None]:
"""distinto = source.select('arquivo').distinct()
amostra = distinto.sample(False, (100/distinto.count()),seed=1100)
amostra = amostra.toPandas()['arquivo'].values
source =  source.filter(F.col("arquivo").isin(*amostra))
print(f'Número de urnas selecionadas {len(amostra)}')"""

In [None]:
df = source.withColumnRenamed("_c0", "data").withColumnRenamed("_c4", "descricao")
df = df.select("data", "descricao")
df = df.withColumn("arquivo", F.input_file_name())
df = df.filter(
    F.col("descricao").isin(logs_interesse) | F.col("descricao").rlike(regex_interesse)
)

df = df.withColumn("data", F.to_timestamp(F.col("data"), "dd/MM/yyyy HH:mm:ss"))
df.show(truncate=False)

In [None]:
from pyspark.sql.window import Window
from unidecode import unidecode

window = (
    Window.orderBy("data")
    .partitionBy("arquivo") #Paraleliza pelo arquivo
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

df_window = df
for nome_extracao in ["Município", "Zona Eleitoral", "Seção Eleitoral"]:
    nome_col = unidecode(nome_extracao).replace(" ", "_")
    df_window = df_window.withColumn(
        nome_col,
        F.when(
            F.col("descricao").contains(nome_extracao),
            F.regexp_extract(F.col("descricao"), rf"{nome_extracao}: (\d+)", 1).cast(
                "integer"
            ),
        ).otherwise(None),
    )

    df_window = df_window.withColumn(
        nome_col, F.last(nome_col, ignorenulls=True).over(window)
    )
df_window.show(truncate=False)

In [None]:
df_teclas = df_window.filter(~F.col("descricao").rlike(regex_interesse))
df_teclas = df_teclas.withColumn(
    "erros_tecla",
    F.when(F.col("descricao") == "Tecla indevida pressionada", 1).otherwise(0),
)

df_teclas = df_teclas.withColumn(
    "id_eleitor", F.when(F.col("descricao") == "Eleitor foi habilitado", 1).otherwise(0)
)
my_window = (
    Window.orderBy("data")
    .partitionBy("arquivo")
    .rowsBetween(Window.unboundedPreceding, 0)
)
df_teclas = df_teclas.withColumn("id_eleitor", F.sum("id_eleitor").over(my_window))


df_teclas.orderBy(["arquivo","data"], ascending=[1,1]).show(truncate=False)

In [None]:
my_window = (
    Window.orderBy("data")
    .partitionBy("arquivo","id_eleitor")
    .rowsBetween(Window.unboundedPreceding, 0)
)
df_soma_ate_momento = df_teclas.withColumn("erros_ate_agora", F.sum("erros_tecla").over(my_window))
df_soma_ate_momento.show()

In [None]:
agrupado_eleitor = df_soma_ate_momento.groupBy("arquivo", "id_eleitor").agg({"erros_tecla": "sum"}).orderBy(["arquivo","id_eleitor"], ascending=[1,1])
agrupado_eleitor.show(truncate=False)

In [None]:
agrupado_eleitor.orderBy(["sum(erros_tecla)"], ascending=[0]).show(truncate=False)

In [None]:
df_soma_tecla = df_soma_ate_momento.join(agrupado_eleitor, on=['arquivo', 'id_eleitor'], how='left')
df_soma_tecla = df_soma_tecla.drop("erros_tecla")
df_soma_tecla.show(5)

In [None]:
df_sem_tecla = df_soma_tecla.filter(
    ~F.col("descricao").isin(["Tecla indevida pressionada"])
)
df_sem_tecla.show(5)

In [None]:
w = Window.partitionBy("arquivo", "id_eleitor").orderBy("data")
df_tempo_segundos = (
    df_sem_tecla.withColumn("lag_tempo", F.lag(df_teclas["data"], 1).over(w))
    .withColumn(
        "tempo(segundos)",
        (
            F.unix_timestamp(
                df_teclas["data"],
            )
            - F.unix_timestamp(F.col("lag_tempo"))
        ),
    )
)
df_tempo_segundos.show()

In [None]:
window_spec = Window.partitionBy("arquivo", "id_eleitor").orderBy("data")


df_with_diff = df_tempo_segundos.withColumn("erro_nesse_cargo", F.col("erros_ate_agora") - F.lag(F.col("erros_ate_agora")).over(window_spec))
df_with_diff.show(30)

In [None]:
df_final = df_with_diff.drop("lag_tempo")
df_final = df_final.filter(
    ~F.col("descricao").isin(["Eleitor foi habilitado"])
)
df_final = df_final.withColumn("descricao", F.regexp_replace("descricao", r"(\[|\]|\bVoto confirmado para\b)", ""))
df_final.show()

In [None]:
df_final.write.mode('overwrite').parquet(DATA_PATH+"dados_log_urna")

In [None]:
sc.stop()