## Obtendo o Spark

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

## Extração da Base de Dados

In [4]:
import gdown

url = "https://drive.google.com/uc?export=download&id=1Wn_O2wc9JDsB-5uH5an6_W6b3w8yzQF-"
gdown.download(url, "itens_prova.csv", quiet=False, fuzzy=True)

url = "https://drive.google.com/file/d/1vMTcHqZHJB16CB5n09JKK2mXS5kpGQpv/view?usp=sharing"
gdown.download(url, "microdados_enem.csv", quiet=False, fuzzy=True)

Downloading...
From: https://drive.google.com/uc?id=1Wn_O2wc9JDsB-5uH5an6_W6b3w8yzQF-
To: /content/itens_prova.csv
100%|██████████| 331k/331k [00:00<00:00, 61.0MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1vMTcHqZHJB16CB5n09JKK2mXS5kpGQpv
From (redirected): https://drive.google.com/uc?id=1vMTcHqZHJB16CB5n09JKK2mXS5kpGQpv&confirm=t&uuid=a0d42ea1-ae30-4693-bb9b-84656865cfe3
To: /content/microdados_enem.csv
100%|██████████| 1.78G/1.78G [00:19<00:00, 90.4MB/s]


'microdados_enem.csv'

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("microdados_enem") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [6]:
path = '/content/microdados_enem.csv'
enem = spark.read.csv(path, sep=';', inferSchema=True, header=True, encoding='ISO-8859-1')

In [7]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [8]:
dim_participante = enem.select(
    "NU_INSCRICAO",
    "TP_FAIXA_ETARIA",
    "TP_SEXO",
    "TP_ESTADO_CIVIL",
    "TP_COR_RACA",
    "TP_NACIONALIDADE",
    "TP_ST_CONCLUSAO",
    "TP_ANO_CONCLUIU",
    "TP_ESCOLA",
    "TP_ENSINO",
    "IN_TREINEIRO").dropDuplicates()

In [9]:
dim_local_escola = enem.select(
    "NU_INSCRICAO",
    "CO_MUNICIPIO_ESC",
    "NO_MUNICIPIO_ESC",
    "CO_UF_ESC",
    "SG_UF_ESC",
    "TP_DEPENDENCIA_ADM_ESC",
    "TP_LOCALIZACAO_ESC",
    "TP_SIT_FUNC_ESC").dropDuplicates()

In [10]:
dim_local_prova = enem.select(
    "NU_INSCRICAO",
    "CO_MUNICIPIO_PROVA",
    "NO_MUNICIPIO_PROVA",
    "CO_UF_PROVA",
    "SG_UF_PROVA").dropDuplicates()


In [11]:
socioeconomico_columns = [f"Q{str(i).zfill(3)}" for i in range(1, 26)]

dim_socioeconomico = enem.select(["NU_INSCRICAO"] + socioeconomico_columns).dropDuplicates()

In [12]:
def add_surrogate_key(df, key_name="id"):
    return df.withColumn(key_name, F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

dim_participante = add_surrogate_key(dim_participante, "ID_PARTICIPANTE")
dim_socioeconomico = add_surrogate_key(dim_socioeconomico, "ID_SOCIOECONOMICO")
dim_local_prova = add_surrogate_key(dim_local_prova, "ID_LOCAL_PROVA")
dim_local_escola = add_surrogate_key(dim_local_escola, "ID_LOCAL_ESCOLA")

In [13]:
fato_resultado_enem = enem.select(
    "NU_INSCRICAO",
    "NU_ANO",
    "TP_PRESENCA_CN",
    "TP_PRESENCA_CH",
    "TP_PRESENCA_LC",
    "TP_PRESENCA_MT",
    "NU_NOTA_CN",
    "NU_NOTA_CH",
    "NU_NOTA_LC",
    "NU_NOTA_MT",
    "TP_STATUS_REDACAO",
    "NU_NOTA_COMP1",
    "NU_NOTA_COMP2",
    "NU_NOTA_COMP3",
    "NU_NOTA_COMP4",
    "NU_NOTA_COMP5",
    "NU_NOTA_REDACAO"
)

In [14]:
fato_resultado_enem = (
    fato_resultado_enem
    .join(dim_participante.select("NU_INSCRICAO", "ID_PARTICIPANTE"), on="NU_INSCRICAO", how="left")
    .join(dim_socioeconomico.select("NU_INSCRICAO", "ID_SOCIOECONOMICO"), on="NU_INSCRICAO", how="left")
    .join(dim_local_prova.select("NU_INSCRICAO", "ID_LOCAL_PROVA"), on="NU_INSCRICAO", how="left")
    .join(dim_local_escola.select("NU_INSCRICAO", "ID_LOCAL_ESCOLA"), on="NU_INSCRICAO", how="left")
)

# Drop NU_INSCRICAO to keep only keys
fato_resultado_enem = fato_resultado_enem.drop("NU_INSCRICAO")

In [15]:
# Caminho base para salvar os arquivos
output_path = "/content/tabelas_enem"

# Cria diretório se ainda não existir
os.makedirs(output_path, exist_ok=True)

# Salva as tabelas em formato Parquet (mais eficiente para big data)
dim_participante.write.mode("overwrite").parquet(f"{output_path}/dim_participante.parquet")
dim_socioeconomico.write.mode("overwrite").parquet(f"{output_path}/dim_socioeconomico.parquet")
dim_local_prova.write.mode("overwrite").parquet(f"{output_path}/dim_local_prova.parquet")
dim_local_escola.write.mode("overwrite").parquet(f"{output_path}/dim_local_escola.parquet")
fato_resultado_enem.write.mode("overwrite").parquet(f"{output_path}/fato_resultado_enem.parquet")