# 1. Preparando ambiente

## Colocando pasta dos dados como Raiz

In [24]:
from pyspark.sql import SparkSession

# Cria uma SparkSession
spark = (SparkSession.builder
         .appName("MinhaAppSpark")
         .master("local[*]")  # "local[*]" usa todos os núcleos disponíveis localmente
         .config("spark.ui.port", "4040") 
         .getOrCreate()
         )
spark.sparkContext.setLogLevel("ERROR")


# Verifique se a sessão foi criada
print(spark.version)

3.5.3


# Variáveis

In [None]:
import sys
import os

# Adiciona o diretório atual ao Python path
sys.path.append(os.path.dirname(os.getcwd()))
   
from libs.storage import Storage

storage_log = Storage("files/sipe-u-log-hist-ate-2025")
storage_pefil = Storage("files/perfil")
storage_data = Storage("data")
storage_servidor = Storage("files/siape-servidor")

limit = 10000
random_seed = 42
storage_log

# 2. Pré-Processamento

## Escolhendo os CPF

- len(co_nivel_funcao ) = 3
- de 1.01 ate 1.07 - chere
- de 1.08 ate 1.10 - coordenador
- de 1.11 ate 1.13 - coordenador geral
- acima de 1.13 - diretor

In [None]:
from pyspark.sql.functions import col, when, length, row_number, count, desc
from pyspark.sql.window import Window

# Carregar os dados da pasta usando o Spark
df_cpfs = (
    spark.read.parquet(storage_servidor.get_directory_name())
    .where(length(col("co_nivel_funcao")) == 3)  # Check length of co_nivel_funcao
    .withColumn(
        "classificacao",
        when(col("co_nivel_funcao").between(101, 107), "Chefe")
        .when(col("co_nivel_funcao").between(108, 110), "Coordenador")
        .when(col("co_nivel_funcao").between(111, 113), "Coordenador Geral")
        .otherwise("Diretor")
    )
    .where("da_obito = 0")
    .where("co_situacao_servidor = 1")
    .where("da_ocor_inatividade_serv = 0")
    .orderBy(desc("da_ocor_ingr_orgao_serv"))
)
# Agrupando por classificação e amostrando 100 registros por grupo
windowSpec = Window.partitionBy("classificacao").orderBy("co_nivel_funcao")  # Order by co_nivel_funcao
df_amostra = df_cpfs.withColumn("row_number", row_number().over(windowSpec)) \
                   .filter(col("row_number") <= 130) \
                   .drop("row_number")  # Remove the auxiliary column
                   
# removendo quem tece mais de 2 papeis
windowSpec = Window.partitionBy("nu_cpf")
df_cpfs_count = df_amostra.withColumn("count", count("*").over(windowSpec))
df_amostra = df_cpfs_count.where(col("count") == 1).drop("count")

df_amostra.show()

In [None]:
df_cpfs = df_amostra.groupBy(["nu_cpf", "classificacao"]).count().orderBy("count", ascending=False).drop("count")

df_cpfs.groupBy(["classificacao"]).count().orderBy("count", ascending=False).show()
df_cpfs.show()

### Carregando dados

In [None]:
from pyspark.sql.functions import trim, col

cpf_list = [row.nu_cpf for row in df_cpfs.select('nu_cpf').collect()]


# Carregar os dados da pasta usando o Spark
df = (
    spark.read.parquet(storage_log.get_directory_name())
    .withColumn('ch_arvore_senha_log', trim(col('ch_arvore_senha_log')))
    .where("ch_arvore_senha_log <> 'SIAPE' or trim(ch_arvore_senha_log) <> ''")
    .where("trim(ch_arvore_senha_log) <> 'SIAPE     SIAPENET  ORGAO               LOGIN'")
    .where("nu_cpf_usuario_log <> '11111111111'")
    .where(col('nu_cpf_usuario_log').isin(cpf_list))  # Aqui aplicamos o filtro correto
    .join(df_cpfs.select('nu_cpf', 'classificacao'), 
          col('nu_cpf_usuario_log') == col('nu_cpf'), 
          'left')  # Usar left join para garantir que todos os registros do df original estejam presentes
    .limit(limit)
)

# Mostrar algumas informações sobre o DataFrame
df.printSchema()

## Transformação 1

Alterar a coluna 'ch_arvore_senha_log' em 5 novas colunas.

Regra da expressão recular:
1. SIGLA do Sistema
1. [SUBSISTEMA]
1. [MODULO]
1. [OPCAO]
1. [ATIVIDADE]
1.  TRANSACAO

In [18]:
# prompt: Transformar a coluna ch_arvore_senha_log em 6 colunas com a regra:
# Regra da expressão recular:
# - SIGLA_SISTEMA
# - [SUBSISTEMA]
# - [MODULO]
# - [OPCAO]
# - [ATIVIDADE]
# - TRANSACAO

from pyspark.sql.functions import regexp_extract

# Definir a expressão regular para extrair as informações da coluna 'ch_arvore_senha_log'
regex = r"^(\w+)(?:\s+(\w+))?(?:\s+(\w+))?(?:\s+(\w+))?(?:\s+(\w+))?\s+(\w+)$"

# Criar novas colunas usando regexp_extract
df = df.withColumn("SIGLA_SISTEMA", regexp_extract("ch_arvore_senha_log", regex, 1)) \
    .withColumn("SUBSISTEMA", regexp_extract("ch_arvore_senha_log", regex, 2)) \
    .withColumn("MODULO", regexp_extract("ch_arvore_senha_log", regex, 3)) \
    .withColumn("OPCAO", regexp_extract("ch_arvore_senha_log", regex, 4)) \
    .withColumn("ATIVIDADE", regexp_extract("ch_arvore_senha_log", regex, 5)) \
    .withColumn("TRANSACAO", regexp_extract("ch_arvore_senha_log", regex, 6))

# Remover colunas que não tem sistema [não deveria ter nenhuma]
df = df.dropna(how='any', subset=['SIGLA_SISTEMA', 'TRANSACAO'])

In [19]:
# remover colunas: tx_dados_atualizados_log|tx_parametros_transacao
# muita informação pessoal
df = df.drop(
    "tx_dados_atualizados_log", 
    "tx_dados_atualizados_log_2", 
    "tx_parametros_transacao",
    "nu_cpf"
    )

In [None]:
from pyspark.sql.functions import countDistinct

# Assuming 'df' is your PySpark DataFrame

# Get a list of all columns in the DataFrame
all_columns = df.columns

# Create a dictionary to store the unique counts for each column
unique_counts = {}

# Iterate through each column and calculate the number of unique values
for column in all_columns:
    unique_counts[column] = df.select(countDistinct(column)).first()[0]

# Print the unique counts for each column
for column, count in unique_counts.items():
    print(f"Column '{column}': {count} unique values")

## Transformaçao 2 - Anonimização

Trocar os nomes dos orgão e os cpf por uma sequencia numérica

In [None]:
# prompt: como anonimizar dados de uma coluna? tenho algumas colunas para serem anonimizadas: co_orgao_log, matricula_inst_legal, co_orgao_servidor, nu_iden_unica_siape_log.
# Gostaria que fosse guardada a transformação para poder fazer a inversa.  Os valores que se repetirem precisam ter os mesmos indices
# 123 > 1
# 124 > 2
# 123 > 1

from pyspark.sql.functions import monotonically_increasing_id

# Lista de colunas a serem anonimizadas
colunas_anonimizar = [
    'co_orgao_log',
    'matricula_inst_legal',
    'co_orgao_servidor',
    'nu_cpf_usuario_log',
    'nu_iden_unica_siape_log',
    # 'ch_arvore_senha_log'
]
# colunas_anonimizar = []

# Criar um dicionário para armazenar a transformação de cada coluna
mapeamento_colunas = {}


def create_mapped(df, colunas_anonimizar):
    for coluna in colunas_anonimizar:
        # Criar um novo DataFrame com uma coluna de ID única para cada valor da coluna a ser anonimizada
        df_temp = df.select(coluna).distinct().withColumn("novo_valor", monotonically_increasing_id() + 1)

        # Salvar esse mapeamento em arquivo
        df_temp.write.mode("overwrite").parquet(storage_pefil.get_path(f"mapeamento_{coluna}"))
        print(df_temp.show(5))

        df = do_mapped(df, colunas_anonimizar, coluna, df_temp)
    return df


def load_mappped(df, colunas_anonimizar):
    for coluna in colunas_anonimizar:
        # Criar um novo DataFrame com uma coluna de ID única para cada valor da coluna a ser anonimizada
        df_temp = spark.read.parquet(storage_pefil.get_path(f"mapeamento_{coluna}", True))

        df = do_mapped(df, colunas_anonimizar, coluna, df_temp)
    return df


def do_mapped(df, colunas_anonimizar, coluna, df_temp):
    global mapeamento_colunas
    # Criar um dicionário com o mapeamento entre o valor original e o novo valor
    mapeamento_colunas[coluna] = df_temp.rdd.map(lambda row: (row[coluna], row["novo_valor"])).collectAsMap()
    # Substituir os valores da coluna original pelos novos valores usando o mapeamento
    df = df.replace(mapeamento_colunas[coluna], subset=[coluna])
    return df

print(colunas_anonimizar)
df = create_mapped(df, colunas_anonimizar)
# load_mappped(colunas_anonimizar)
# df.head(3)

# Salvando df selecionado

In [None]:
storage_data.get_path(f"dataset")

## Salvando dataset

In [None]:
df.write.mode("overwrite").parquet(storage_data.get_path(f"dataset"))