In [1]:
spark

In [2]:
sc

In [3]:
spark.sparkContext.setLogLevel("INFO")

# Pre-processando a Base de Dados

In [None]:
from pyspark.sql.functions import *
import pandas as pd

In [None]:
!hdfs dfs -cat '/user/jonatas/data/MICRODADOS_ENEM_2021.csv' | head -3

Dada a quantidade elevada de campos do conjunto de dados, foi optado por ser feita a inferência do esquema automaticamente para agilizar análise dos dados em detrimento da otimização do espaço usado na memória. ponto de melhoria  

In [None]:
df_dados_enem = spark.read.csv("hdfs:///user/jonatas/data/MICRODADOS_ENEM_2021.csv"
                               ,sep=";"
                               ,inferSchema="True"
                               ,header="True"
                               ,encoding="latin1"
                              )

Salvamos os dados em formato parquet para otimizar espaço e agilizar a leitura dos dados brutos caso seja nessessário futuramente

In [None]:
!hdfs dfs -rm -r /user/jonatas/data/MICRODADOS_ENEM_2021

In [None]:
df_dados_enem.write.parquet("hdfs:///user/jonatas/data/MICRODADOS_ENEM_2021")

In [None]:
!hdfs dfs -ls -h /user/jonatas/data/

In [None]:
!hdfs dfs -du -s -h hdfs:///user/jonatas/data/MICRODADOS_ENEM_2021

In [None]:
df_dados_enem = spark.read.parquet("hdfs:///user/jonatas/data/MICRODADOS_ENEM_2021")

In [None]:
df_dados_enem.show(1)

In [None]:
df_dados_enem.count()

In [None]:
df_dados_enem.printSchema()

Com auxílio do arquivo de dicionário e a análise do schema do dataframe foi possível identificar os campos e a sua relevância para a análise que pretendemos fazer

Realizando assim uma primeira filtragem dos dados,  foram removidos os campos que não são interessantes para a análise. Como a cor do caderno das provas realizadas, e o vetor das respostas e gabarito. Como dada a anatureza dos dados, o foco na análise se dará no perfil social, racial, econômico, regional e nota dos participantes, informações como qual cor de caderno eles receberam, ou quais resposta marcaram é irelevante para o estudo.

In [None]:
df_tratado = df_dados_enem.drop(
    'CO_PROVA_CN',
    'CO_PROVA_CH', 
    'CO_PROVA_LC', 
    'CO_PROVA_MT',
    'TX_RESPOSTAS_CN',
    'TX_RESPOSTAS_CH',
    'TX_RESPOSTAS_LC',
    'TX_RESPOSTAS_MT',
    'TX_GABARITO_CN',
    'TX_GABARITO_CH',
    'TX_GABARITO_LC',
    'TX_GABARITO_MT'
)

In [None]:
df_tratado.show(2)

In [None]:
!hdfs dfs -rm -r /user/jonatas/data/df_tratado_1

In [None]:
df_tratado.write.parquet("hdfs:///user/jonatas/data/df_tratado_1")

In [None]:
!hdfs dfs -du -s -h /user/jonatas/data/df_tratado

# REVER Inicialmento foi optado por fazer uma análise de principais componentes para identificar quais variávies mais explicam a nota final do enem que aqui foi definida como a média das 5 provas

Using the largest variance criteria would be akin to feature extraction,

O principal problema se mantem sendo o tamanho da base de dados e a quantidade de colunas, o que frustrou tentativas anteriores de uma manipulação como dataframe do pandas, sendo assim necessário uma análise mais minunciosa da base e tratamento como dataframe do spark

# Análise das colunas restantes

In [None]:
from pyspark.sql.functions import *
import pandas as pd

In [None]:
df_tratado = spark.read.parquet("hdfs:///user/jonatas/data/df_tratado_1")

In [None]:
df_tratado

Como sabemos que estamos analisando os dados do enem 2021, não precisamos da coluna de ano. Como Além do Distrito Federal e do distrito insular de Fernando de Noronha, o Brasil tem 5.568 municípios, foi optado fazer a análise somente a nível estadual pois atarpalharia o modelo de dummies, assim as colunas de código do município e nome do município. Mantendo somente codigo da UF da escola onde foi cursado o EM e a da realização da prova. As colunas de presença nas provas objetivas e status da redação não são necessárias pois só serão considerados os participantes presentes em todas as provas e que tenham feito pontos, a remoção dos que não se encaixam nessa categoria será feita com a remoção dos NaN da Coluna a ser criada NF_Enem. As notas das competências da redação também não serão avaliadas individualmente e nem o tipo de língua estrangeira selecionada.

In [None]:
df_tratado = df_tratado.drop(
    #'CO_PROVA_CN','CO_PROVA_CH', 'CO_PROVA_LC', 'CO_PROVA_MT',
    #'TX_RESPOSTAS_CN','TX_RESPOSTAS_CH','TX_RESPOSTAS_LC','TX_RESPOSTAS_MT',
    #'TX_GABARITO_CN','TX_GABARITO_CH','TX_GABARITO_LC','TX_GABARITO_MT',
    'NU_INSCRICAO',
    'NU_ANO',
    'CO_MUNICIPIO_ESC','NO_MUNICIPIO_ESC','SG_UF_ESC','TP_SIT_FUNC_ESC',
    'CO_MUNICIPIO_PROVA','NO_MUNICIPIO_PROVA', 'SG_UF_PROVA',
    'TP_PRESENCA_CN', 'TP_PRESENCA_CH', 'TP_PRESENCA_LC','TP_PRESENCA_MT',
    #'NU_NOTA_CN','NU_NOTA_CH','NU_NOTA_LC','NU_NOTA_MT', 'NU_NOTA_REDACAO',
    'TP_STATUS_REDACAO', 'NU_NOTA_COMP1', 'NU_NOTA_COMP2', 'NU_NOTA_COMP3', 'NU_NOTA_COMP4', 'NU_NOTA_COMP5', 
    'TP_LINGUA'
)

In [None]:
!hdfs dfs -rm -r /user/jonatas/data/df_tratado_2

In [None]:
df_tratado.write.parquet("hdfs:///user/jonatas/data/df_tratado_2")

In [None]:
!hdfs dfs -du -s -h /user/jonatas/data/df_tratado_2

In [None]:
df_tratado = spark.read.parquet("hdfs:///user/jonatas/data/df_tratado_2")

In [None]:
df_tratado = df_tratado.withColumn(
    'NF_ENEM',
    (
    df_tratado.NU_NOTA_CN +
    df_tratado.NU_NOTA_CH +
    df_tratado.NU_NOTA_LC +
    df_tratado.NU_NOTA_MT +
    df_tratado.NU_NOTA_REDACAO
    ) / 5
)

In [None]:
df_tratado = df_tratado.drop(
    'NU_NOTA_CN','NU_NOTA_CH','NU_NOTA_LC','NU_NOTA_MT', 'NU_NOTA_REDACAO'
)

df_tratado = df_tratado.na.drop(subset=['NF_ENEM'])

In [None]:
!hdfs dfs -rm -r /user/jonatas/data/df_tratado

In [None]:
df_tratado.write.parquet("hdfs:///user/jonatas/data/df_tratado")

In [None]:
!hdfs dfs -du -s -h /user/jonatas/data/df_tratado

Conseguimos reduzir bastante o tamanho da base de dados

# Análise com Pandas

In [None]:
from pyspark.sql.functions import *
import pandas as pd

In [None]:
df_spark_enem = spark.read.parquet("hdfs:///user/jonatas/data/df_tratado")

In [None]:
df_spark_enem.count()


In [None]:
df_spark_enem = df_spark_enem.limit(500000)

In [None]:
df_pandas_enem = df_spark_enem.toPandas()

In [None]:
df_pandas_enem.info

In [None]:
print(df_pandas_enem.columns.tolist())

In [None]:
df_pandas_enem

Para realizarmos a análise precisamos da criação de dummies e  não necessitamos da coluna de numero de inscrição

In [None]:
dumies_list = pd.Series([
    'TP_FAIXA_ETARIA', 'TP_SEXO', 'TP_ESTADO_CIVIL', 'TP_COR_RACA', 'TP_NACIONALIDADE', 'TP_ST_CONCLUSAO', 'TP_ANO_CONCLUIU',
    'TP_ESCOLA', 'TP_ENSINO', 'IN_TREINEIRO', 'CO_UF_ESC', 'TP_DEPENDENCIA_ADM_ESC', 'TP_LOCALIZACAO_ESC', 'CO_UF_PROVA', 
    'Q001', 'Q002', 'Q003', 'Q004', 'Q005', 'Q006', 'Q007', 'Q008', 'Q009', 'Q010', 'Q011', 'Q012', 
    'Q013', 'Q014', 'Q015', 'Q016', 'Q017', 'Q018', 'Q019', 'Q020', 'Q021', 'Q022', 'Q023', 'Q024', 'Q025'
])

In [None]:
dumies_list

In [None]:
!hdfs dfs -rm -r hdfs:///user/jonatas/data/df_train_limit_500000

In [None]:
df_pandas_enem.to_parquet("hdfs:///user/jonatas/data/df_train_limit_500000")

In [None]:
df_analise = pd.get_dummies(df_pandas_enem, columns=dumies_list)

In [None]:
!hdfs dfs -rm -r hdfs:///user/jonatas/data/df_train_dummys_limit_500000

In [None]:
df_analise.to_parquet("hdfs:///user/jonatas/data/df_train_dummys_limit_500000")

In [None]:
df_analise

In [None]:
print(df_analise.columns.tolist())

In [None]:
lista = df_analise.corr()["NF_ENEM"]
#lista_abs = df_teste.corr()["NF_ENEM"].abs().sort_values(ascending = False)

In [None]:
#lista_pros = lista.sort_values(ascending = False)

In [None]:
#print(lista_pros.to_string())

In [None]:
#lista_cons = lista.sort_values(ascending = True)

In [None]:
#print(lista_cons.to_string())

In [None]:
lista_abs = lista.abs().sort_values(ascending = False)

In [None]:
print(lista_abs[1:136].to_string())

In [None]:
regressores = lista_abs.index[1:136].tolist()
regressores

In [None]:
df_train = df_analise[['NF_ENEM'] + regressores]

In [None]:
df_train

In [None]:
df_train.to_parquet("hdfs:///user/jonatas/data/df_train_dummys_selected_limit_500000/")

In [None]:
!hdfs dfs -ls /user/jonatas/data/

# Definição da base de Teste

In [8]:
from pyspark.sql.functions import *
import pandas as pd

In [5]:
df_test = spark.read.parquet("hdfs:///user/jonatas/data/df_tratado").withColumn('index',monotonically_increasing_id()).orderBy(col('index').desc()).drop('index')

In [None]:
df_test.printSchema()

In [6]:
df_test = df_test.limit(100000)

In [7]:
df_test.count()

100000

In [None]:
df_test = df_test.toPandas()

In [None]:
!hdfs dfs -rm -r /user/jonatas/data/df_test_limit_100000

In [10]:
df = spark.read.parquet('hdfs:///user/jonatas/data/df_test_limit_100000')
df.count()

AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'

In [None]:
df_test.write.parquet('hdfs:///user/jonatas/data/df_test_limit_100000')

In [None]:
df_test = spark.read.parquet("hdfs:///user/jonatas/data/df_test_limit_500000")

In [None]:
df_test = df_test.toPandas()

In [None]:
df_test = pd.get_dummies(df_test, columns=dumies_list)

In [None]:
df_test = df_test[['NF_ENEM'] + regressores]

# Implementação do modelo de previsão - Foward Selection

In [None]:
df = pd.read_parquet("hdfs:///user/jonatas/data/df_train_dummys_selected_limit_500000/")

In [None]:
# Dividir a Base de teste entre váriáveis dependendes e independente

X, y = df.iloc[:,1:].values, df.iloc[:,0].values

In [None]:
print (X[0:10])

In [None]:
from sklearn import linear_model

#Definir modelo
model = linear_model.Lasso(alpha=1)

model.fit(X, y)

In [None]:
df_teste = spark.read.parquet("hdfs:///user/jonatas/data/df_tratado_2")

In [None]:
df_teste = df_teste.withColumn("index", monotonically_increasing_id())
df_teste = df_teste.orderBy(desc("index")).drop("index").limit(500000)

In [None]:
df_teste

In [None]:
df_teste = df_teste.toPandas()

In [None]:
df_teste = df_teste.drop(
    [
        'NU_INSCRICAO',
        'TP_DEPENDENCIA_ADM_ESC'
    ],axis=1
)

dumies_list = pd.Series([
    'TP_FAIXA_ETARIA', 'TP_SEXO', 'TP_ESTADO_CIVIL', 'TP_COR_RACA', 'TP_NACIONALIDADE', 'TP_ST_CONCLUSAO', 'TP_ANO_CONCLUIU',
    'TP_ESCOLA', 'TP_ENSINO', 'IN_TREINEIRO', 'CO_UF_ESC', 
    #'TP_DEPENDENCIA_ADM_ESC',
    'TP_LOCALIZACAO_ESC', 'CO_UF_PROVA', 
    'Q001', 'Q002', 'Q003', 'Q004', 'Q005', 'Q006', 'Q007', 'Q008', 'Q009', 'Q010', 'Q011', 'Q012', 
    'Q013', 'Q014', 'Q015', 'Q016', 'Q017', 'Q018', 'Q019', 'Q020', 'Q021', 'Q022', 'Q023', 'Q024', 'Q025'
])

In [None]:
df_teste.cor

decidir se tira quem faltou

Inicialmento foi optado por fazer uma análise de principais componentes para identificar quais variávies mais explicam a nota final do enem a qual é a média das 5 provas