# Importando a biblioteca

In [2]:
# Bibliotecas padrão do Python
import os, json, glob, time

# ────────────────────────────────────────────────────────────────────────────
# Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    HashingTF,
    IDF,
    StringIndexer,
)
from pyspark.ml.classification import (
    LogisticRegression,
    RandomForestClassifier,
    GBTClassifier,
    OneVsRest,        # estratégia Binary Relevance p/ multilabel
)
from pyspark.ml.linalg import Vectors, VectorUDT

# ────────────────────────────────────────────────────────────────────────────
# Imports opcionais (tentamos usar se disponíveis)
try:
    from pyspark.ml.feature import MultiLabelBinarizer         # Spark 3.5+
    HAS_MLB = True
except ImportError:
    HAS_MLB = False                                            # Spark ≤ 3.4

try:
    from pyspark.ml.evaluation import MultilabelClassificationEvaluator  # Spark 3.5+
    HAS_ML_EVAL = True
except ImportError:
    HAS_ML_EVAL = False                                        # Spark ≤ 3.4


# Lendo os arquivos

In [3]:
from pyspark.sql import SparkSession

# SparkSession otimizado para dataset pequeno (~7 000 linhas)
spark = (
    SparkSession.builder
        .appName("ClassificacaoDecretos")
        .master("local[*]")                # usa todos os núcleos disponíveis
        .config("spark.driver.memory", "4g")
        .config("spark.sql.shuffle.partitions", "16")  # ↓ overhead de shuffle (padrão é 200)
        .getOrCreate()
)


In [4]:
# Leitura dos arquivos JSON  (I/O costuma levar < 5 s para 7 000 linhas)
# Ajustes só fazem diferença se houver **centenas** de arquivos pequenos.
# ──────────────────────────────────────────────────────────────
df_raw = (
    spark.read
         .option("multiLine", True)       # mantém registros multilinha
         .option("wholeFile", True)       # ↳ lê um arquivo por tarefa → ↓ overhead de metadados
         .json("legislacao/*.json")
         .coalesce(8)                     # junta partições (16→8) p/ dataset pequeno
         .cache()                         # se for reutilizar várias vezes
)

df_raw.printSchema()
df_raw.show(2, truncate=False)

root
 |-- catalogo: struct (nullable = true)
 |    |-- frase: string (nullable = true)
 |-- classes: struct (nullable = true)
 |    |-- classe: string (nullable = true)
 |-- disps: struct (nullable = true)
 |    |-- disp: struct (nullable = true)
 |    |    |-- codItemDocumento: string (nullable = true)
 |    |    |-- nomeDispositivo: string (nullable = true)
 |    |    |-- refs: struct (nullable = true)
 |    |    |    |-- ref: struct (nullable = true)
 |    |    |    |    |-- codNormaPosterior: string (nullable = true)
 |    |    |    |    |-- comentario: string (nullable = true)
 |    |    |    |    |-- dispositivo: string (nullable = true)
 |-- edivs: struct (nullable = true)
 |    |-- ediv: struct (nullable = true)
 |    |    |-- codnormaAnterior: string (nullable = true)
 |    |    |-- comentario: string (nullable = true)
 |    |    |-- datAssinatura: string (nullable = true)
 |    |    |-- itens: struct (nullable = true)
 |    |    |    |-- item: struct (nullable = true)
 |    |

# Filtrando não nulos e preparando campos de features

In [5]:
from pyspark.sql.functions import col, udf, explode, expr
# agora expr está definido


df = (
    df_raw.filter(
        col("ementa").isNotNull() & col("classes.classe").isNotNull()
    )
    .select(
        "ementa",
        col("indexacao.frase").alias("indexacao_frase"),
        col("catalogo.frase").alias("catalogo_frase"),
        expr("transform(split(classes.classe, '/'), x -> trim(x))").alias("classe_array"),
        col("classes.classe").alias("classe_str"),
        col("identificacao.tipo").alias("tipo"),
        col("identificacao.dataassinatura").alias("dataassinatura")
    )
    .cache()           # ← materializa uma única vez, evita reprocessar depois
)
df.count()            # força a materialização já aqui (opcional)

7836

# Etapa 1: Pré-processamento

In [6]:
# ──────────────────────────────────────────────────────────────
# Texto unificado + ano (custo irrisório p/ 7 k linhas)
#   • Faz tudo num único .select() → só 1 stage em vez de 2
#   • Mantém o cache herdado; não precisa recachear
# ──────────────────────────────────────────────────────────────
from pyspark.sql.functions import concat_ws, year, to_date

df = (
    df.select(
        "*",
        concat_ws(" ", "ementa", "indexacao_frase", "catalogo_frase")
            .alias("texto_completo"),
        year(to_date("dataassinatura", "dd/MM/yyyy")).alias("ano")
    )
)


# Etapa 2 - Indexação dos campos tipo e classe

In [7]:
from pyspark.ml.feature import StringIndexer

# Converte tipo textual em número (categoria)
tipo_indexer = StringIndexer(inputCol="tipo", outputCol="tipo_indexado", handleInvalid="keep")

# ETAPA 3 — Vetorizar o texto com TF-IDF

In [8]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover

tokenizer = Tokenizer(inputCol="texto_completo", outputCol="tokens")

# (opcional) remover stop-words já corta ~20 % dos termos antes do hash
stop_rm   = StopWordsRemover(inputCol="tokens", outputCol="tokens_limpos")

hashingTF = HashingTF(
    inputCol="tokens_limpos",          # ou "tokens" se não usar StopWordsRemover
    outputCol="rawFeatures",
    numFeatures=4096,                  # ↓ metade do default de 10 000
    binary=True                        # só presença/ausência; mais leve
)

idf = IDF(inputCol="rawFeatures", outputCol="text_features")


# ETAPA 4 — Combinar todas as features num vetor único

In [9]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["text_features"],
    outputCol="features"
)


# ETAPA 5 — Definir o modelo de classificação

In [11]:
import pyspark.sql.functions as F
from pyspark.ml.classification import LogisticRegression

# lista de rótulos distintos (strings) presentes na coluna classe_array
labels = (
    df.select(F.explode("classe_array").alias("lbl"))
      .distinct()
      .rdd.map(lambda r: r.lbl)
      .collect()
)

lr_template = LogisticRegression(
    featuresCol="features",
    maxIter=30, regParam=0.1
)

lr_models = {
    lbl: lr_template.copy({lr_template.labelCol: f"label_{lbl}"})
    for lbl in labels
}


# ETAPA 6 — Criar o pipeline completo

In [12]:
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, NGram,
    HashingTF, IDF, VectorAssembler
)

# tokenização
tokenizer  = Tokenizer(inputCol="texto_completo", outputCol="tok")
# remove palavras muito comuns (português)
stop_rm    = StopWordsRemover(inputCol="tok", outputCol="tok_rm", caseSensitive=False, locale="pt")
# gera bigramas
bigrams    = NGram(n=2, inputCol="tok_rm", outputCol="tok2")

# hash-TF: presença/ausência, vetor menor
hashingTF  = HashingTF(
    inputCol="tok2",
    outputCol="raw",
    numFeatures=4096,    # ↓ de 10 000
    binary=True          # só presença (mais rápido)
)

idf = IDF(
    inputCol="raw",
    outputCol="text_features",
    minDocFreq=2         # ignora termos que aparecem 1×
)

assembler = VectorAssembler(
    inputCols=["text_features"],
    outputCol="features"
)

preprocess_pipeline = Pipeline(stages=[
    tokenizer, stop_rm, bigrams, hashingTF, idf, assembler
])

#  ETAPA 7 — Treinar e avaliar

In [13]:
# ──────────────────────────────────────────────────────────────
# SPLIT ▸ PREPROCESS ▸ CACHE ▸ TREINO MULTILABEL ▸ PREDIÇÃO
# (versão otimizada: colunas binárias criadas de uma vez +
#  Logistic Regression treinada em paralelo com 4 threads)
# ──────────────────────────────────────────────────────────────
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.classification import LogisticRegression
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# 1) divide dados
train_raw, test_raw = df.randomSplit([0.8, 0.2], seed=42)

# 2) aplica pipeline de atributos e materializa em cache
preprocess_model = preprocess_pipeline.fit(train_raw)
train_feat = preprocess_model.transform(train_raw).cache()
test_feat  = preprocess_model.transform(test_raw).cache()
train_feat.count()                      # força materialização

# 3) rótulos distintos
labels = (train_feat
          .select(F.explode("classe_array").alias("lbl"))
          .distinct()
          .rdd.map(lambda r: r.lbl)
          .collect())

# 4) cria TODAS as colunas binárias de uma vez
for lbl in labels:
    train_feat = train_feat.withColumn(
        f"label_{lbl}",
        F.array_contains("classe_array", F.lit(lbl)).cast("double")
    )

# 5) função de treino (Logistic Regression binária)
def train_lr(label):
    n_pos = train_feat.filter(F.col(f"label_{label}") == 1).count()
    max_iter = 50 if n_pos > 500 else 30

    lr = LogisticRegression(
        featuresCol="features",
        labelCol=f"label_{label}",
        maxIter=max_iter,
        regParam=0.1        # L2
    )
    return label, lr.fit(train_feat)

# 6) treino em paralelo + barra de progresso
lr_models = {}
with ThreadPoolExecutor(max_workers=4) as pool, tqdm(total=len(labels),
                                                     desc="Treinando LR",
                                                     unit="rótulo") as pbar:
    futures = [pool.submit(train_lr, lbl) for lbl in labels]
    for fut in as_completed(futures):
        lbl, model = fut.result()
        lr_models[lbl] = model
        pbar.update(1)

# 7) gera previsões no conjunto de teste
@F.udf(returnType=DoubleType())
def prob_one(vec: VectorUDT) -> float:          # extrai probabilidade da classe 1
    return float(vec[1]) if vec is not None else 0.0

threshold = 0.35
test_pred = test_feat

for lbl in tqdm(labels, desc="Predizendo", unit="rótulo"):
    model = lr_models[lbl]
    test_pred = (
        model.transform(test_pred)
             .withColumn(f"prob_{lbl}", prob_one("probability"))
             .withColumn(f"pred_{lbl}", (F.col(f"prob_{lbl}") >= threshold).cast("int"))
             .drop("probability", "rawPrediction", "prediction")
    )

# 8) ACTION para executar todo o DAG de predição
test_pred.cache()
test_pred.count()          # ← executa todas as transformações

# Agora `test_pred` contém prob_<rótulo> e pred_<rótulo> para cada classe

Treinando LR: 100%|██████████| 161/161 [06:13<00:00,  2.32s/rótulo]
Predizendo: 100%|██████████| 161/161 [05:28<00:00,  2.04s/rótulo]


1535

In [17]:
from pyspark.sql import functions as F

# 1) junte todas as predições 0/1 em um array de rótulos
pred_cols = [
    F.when(F.col(f"pred_{lbl}") == 1, F.lit(lbl)).otherwise(None)
    for lbl in labels
]

test_pred = test_pred.withColumn(
    "pred_labels",
    F.filter(F.array(*pred_cols), lambda x: x.isNotNull())   # remove None
)

# 2) para cada lei, calcule a proporção de rótulos corretos
#    (|pred ∩ real|  /  |real|)
test_pred = (
    test_pred.withColumn(
        "acertos", F.size(F.array_intersect("pred_labels", "classe_array"))
    )
    .withColumn(
        "total_reais", F.size("classe_array").cast("double")
    )
    .withColumn(
        "proporcao_certa",
        (F.col("acertos") / F.col("total_reais")).cast("double")
    )
)

# 3) acurácia média = média das proporções linha-a-linha
accuracy = test_pred.select(F.avg("proporcao_certa")).first()[0]
print(f"Acurácia média por rótulo (ex.: A B C vs A B D = 0.6667): {accuracy:.4f}")

Acurácia média por rótulo (ex.: A B C vs A B D = 0.6667): 0.8351


In [18]:
# ──────────────────────────────────────────────────────────────
# MÉTRICAS MULTILABEL  –  precision, recall e F1 (média MICRO)
# ──────────────────────────────────────────────────────────────
from pyspark.sql import functions as F

# 0) garanta que existe a coluna pred_labels (array de rótulos previstos)
#    – se você já criou antes, pule este bloco
pred_cols = [
    F.when(F.col(f"pred_{lbl}") == 1, F.lit(lbl)).otherwise(None)
    for lbl in labels
]
test_pred = test_pred.withColumn(
    "pred_labels",
    F.filter(F.array(*pred_cols), lambda x: x.isNotNull())
)

# 1) true-positives, false-positives, false-negatives
test_pred = (
    test_pred
    .withColumn("tp_list", F.array_intersect("pred_labels", "classe_array"))
    .withColumn("fp_list", F.array_except("pred_labels", "classe_array"))
    .withColumn("fn_list", F.array_except("classe_array", "pred_labels"))
)

agg = (
    test_pred.select(
        F.size("tp_list").alias("tp"),
        F.size("fp_list").alias("fp"),
        F.size("fn_list").alias("fn")
    )
    .groupBy()
    .sum("tp", "fp", "fn")
    .first()
)

tp, fp, fn = agg["sum(tp)"], agg["sum(fp)"], agg["sum(fn)"]

precision = tp / (tp + fp) if tp + fp else 0.0
recall    = tp / (tp + fn) if tp + fn else 0.0
f1_micro  = 2 * precision * recall / (precision + recall) if precision + recall else 0.0

print(f"Precision (micro): {precision:.4f}")
print(f"Recall    (micro): {recall:.4f}")
print(f"F1-score  (micro): {f1_micro:.4f}")

Precision (micro): 0.9546
Recall    (micro): 0.8464
F1-score  (micro): 0.8972
