In [1]:
import pandas, os
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StringType, IntegerType, FloatType, DecimalType, BooleanType
from pyspark.sql.functions import col, lit, row_number, monotonically_increasing_id

In [2]:
os.environ['url'] = 'jdbc:postgresql://localhost:5432/enem'
os.environ['user'] = 'postgres'
os.environ['password'] = 'senha'
os.environ['driver'] = 'org.postgresql.Driver'

In [5]:
spark = SparkSession \
    .builder \
    .appName('ENEM') \
    .config('spark.jars','postgresql-42.5.0.jar') \
    .getOrCreate()

In [6]:
dados = spark.read.csv('DADOS\MICRODADOS_ENEM_2021.csv', header=True, sep=';', encoding='ISO-8859-1')

In [7]:
#Juntando cidades das escolas e da aplicação das provas
municipios_header = ['pk_codigo_municipio', 'nome', 'codigo_uf', 'sigla_uf']
municipios_esc = dados.select('CO_MUNICIPIO_ESC', 'NO_MUNICIPIO_ESC', 'CO_UF_ESC', 'SG_UF_ESC')
municipios_esc = (municipios_esc.withColumnRenamed('CO_MUNICIPIO_ESC', 'pk_codigo_municipio')
        .withColumnRenamed('NO_MUNICIPIO_ESC', 'nome')
        .withColumnRenamed('CO_UF_ESC', 'codigo_uf')
        .withColumnRenamed('SG_UF_ESC', 'sigla_uf')
        )
municipios_prova = dados.select('CO_MUNICIPIO_PROVA', 'NO_MUNICIPIO_PROVA', 'CO_UF_PROVA', 'SG_UF_PROVA')
municipios_prova = (municipios_prova.withColumnRenamed('CO_MUNICIPIO_PROVA', 'pk_codigo_municipio')
        .withColumnRenamed('NO_MUNICIPIO_PROVA', 'nome')
        .withColumnRenamed('CO_UF_PROVA', 'codigo_uf')
        .withColumnRenamed('SG_UF_PROVA', 'sigla_uf')
        )
municipios = municipios_esc.union(municipios_prova)
municipios = municipios.dropDuplicates(['pk_codigo_municipio'])
municipios = municipios.dropna()

In [8]:
municipios.select('*').write.format('jdbc') \
    .option('url', os.getenv('url')) \
    .option('dbtable', 'municipios') \
    .option('driver', os.getenv('driver')) \
    .option('user', os.getenv('user')).option('password', os.getenv('password')).mode('append').save()

del municipios
del municipios_esc
del municipios_prova

In [7]:
dados = (dados
        .withColumnRenamed('NU_INSCRICAO', 'pk_numero_inscricao')
        .withColumnRenamed('NU_ANO', 'ano')
        .withColumnRenamed('TP_FAIXA_ETARIA', 'fk_faixa_etaria')
        .withColumnRenamed('TP_SEXO', 'sexo')
        .withColumnRenamed('TP_ESTADO_CIVIL', 'fk_estado_civil')
        .withColumnRenamed('TP_COR_RACA', 'fk_raca')
        .withColumnRenamed('TP_NACIONALIDADE', 'fk_nacionalidade')
        .withColumnRenamed('TP_ST_CONCLUSAO', 'fk_conclusao_em')
        .withColumnRenamed('TP_ANO_CONCLUIU', 'fk_ano_conclusao_em')
        .withColumnRenamed('TP_ESCOLA', 'fk_tipo_escola')
        .withColumnRenamed('TP_ENSINO', 'fk_tipo_ensino')
        .withColumnRenamed('IN_TREINEIRO', 'treineiro')
        .withColumnRenamed('CO_MUNICIPIO_ESC', 'fk_codigo_municipio')
        .withColumnRenamed('TP_DEPENDENCIA_ADM_ESC', 'fk_dependencia_adm')
        .withColumnRenamed('TP_LOCALIZACAO_ESC', 'fk_localizacao')
        .withColumnRenamed('TP_SIT_FUNC_ESC', 'fk_situacao_funcionamento')
        .withColumnRenamed('CO_MUNICIPIO_PROVA', 'fk_local_aplicacao_prova')
        .withColumnRenamed('TP_PRESENCA_CN', 'fk_presenca_prova_cn')
        .withColumnRenamed('TP_PRESENCA_CH', 'fk_presenca_prova_ch')
        .withColumnRenamed('TP_PRESENCA_LC', 'fk_presenca_prova_lc')
        .withColumnRenamed('TP_PRESENCA_MT', 'fk_presenca_prova_mt')
        .withColumnRenamed('CO_PROVA_CN','fk_codigo_prova_cn')
        .withColumnRenamed('CO_PROVA_CH','fk_codigo_prova_ch')
        .withColumnRenamed('CO_PROVA_LC','fk_codigo_prova_lc')
        .withColumnRenamed('CO_PROVA_MT','fk_codigo_prova_mt')
        .withColumnRenamed('NU_NOTA_MT','nota_mt')
        .withColumnRenamed('NU_NOTA_LC','nota_lc')
        .withColumnRenamed('NU_NOTA_CN','nota_cn')
        .withColumnRenamed('NU_NOTA_CH','nota_ch')
        .withColumnRenamed('TP_LINGUA', 'fk_prova_lingua_estrangeira')
        .withColumnRenamed('TP_STATUS_REDACAO', 'fk_situacao_redacao')
        .withColumnRenamed('NU_NOTA_COMP1', 'nota_competencia1')
        .withColumnRenamed('NU_NOTA_COMP2', 'nota_competencia2')
        .withColumnRenamed('NU_NOTA_COMP3', 'nota_competencia3')
        .withColumnRenamed('NU_NOTA_COMP4', 'nota_competencia4')
        .withColumnRenamed('NU_NOTA_COMP5', 'nota_competencia5')
        .withColumnRenamed('NU_NOTA_REDACAO', 'nota_redacao')
        )

In [10]:
escolas = dados.select('fk_codigo_municipio', 'fk_dependencia_adm', 'fk_localizacao', 'fk_situacao_funcionamento')
escolas = escolas.drop_duplicates()
escolas = escolas.dropna()

escolas = escolas.withColumn('fk_dependencia_adm', escolas['fk_dependencia_adm'].cast(IntegerType()))
escolas = escolas.withColumn('fk_localizacao', escolas['fk_localizacao'].cast(IntegerType()))
escolas = escolas.withColumn('fk_situacao_funcionamento', escolas['fk_situacao_funcionamento'].cast(IntegerType()))
escolas = escolas.withColumn(
    "id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

DataFrame[summary: string, fk_codigo_municipio: string, fk_dependencia_adm: string, fk_localizacao: string, fk_situacao_funcionamento: string, id: string]

In [11]:
escolas.select('*').write.format('jdbc') \
    .option('url', os.getenv('url')) \
    .option('dbtable', 'escolas') \
    .option('driver', os.getenv('driver')) \
    .option('user', os.getenv('user')).option('password', os.getenv('password')).mode('append').save()


In [12]:
colunas = ['pk_numero_inscricao', 'fk_local_aplicacao_prova', 'sexo', 'treineiro', 'fk_codigo_municipio']
colunas_participantes_integer = ['fk_faixa_etaria', 'fk_estado_civil', 'fk_raca', 'fk_nacionalidade',
'fk_conclusao_em', 'fk_ano_conclusao_em', 'fk_tipo_escola', 'fk_tipo_ensino', 'fk_presenca_prova_cn',
'fk_presenca_prova_ch', 'fk_presenca_prova_lc', 'fk_presenca_prova_mt', 'ano', 'fk_dependencia_adm', 'fk_localizacao', 'fk_situacao_funcionamento']

participantes = dados.select(colunas + colunas_participantes_integer)
for column in colunas_participantes_integer:
    participantes = participantes.withColumn(column, col(column).cast(IntegerType()))
participantes = participantes.withColumn('treineiro', col('treineiro').cast(BooleanType()))
participantes = participantes.na.fill(value = 0, subset=['fk_tipo_ensino'])
participantes = participantes.withColumn('ano', lit(2021))

In [13]:
escolas = escolas.selectExpr([column + ' temp_' + column for column in escolas.columns])
escolas.columns

['temp_fk_codigo_municipio',
 'temp_fk_dependencia_adm',
 'temp_fk_localizacao',
 'temp_fk_situacao_funcionamento',
 'temp_id']

In [15]:
condition = [participantes.fk_dependencia_adm == escolas.temp_fk_dependencia_adm, \
    participantes.fk_localizacao == escolas.temp_fk_localizacao, \
    participantes.fk_situacao_funcionamento == escolas.temp_fk_situacao_funcionamento, \
    participantes.fk_codigo_municipio == escolas.temp_fk_codigo_municipio]
participantes = participantes.join(escolas, condition, how='left').withColumnRenamed('temp_id', 'fk_escola')
participantes = participantes.drop('temp_fk_codigo_municipio', 'temp_fk_dependencia_adm', 'temp_fk_localizacao',
'temp_fk_situacao_funcionamento', 'fk_codigo_municipio', 'fk_dependencia_adm', 'fk_localizacao',
'fk_situacao_funcionamento')

In [17]:
participantes.select('*').write.format('jdbc') \
    .option('url', os.getenv('url')) \
    .option('dbtable', 'participantes') \
    .option('driver', os.getenv('driver')) \
    .option('user', os.getenv('user')).option('password', os.getenv('password')).mode('append').save()

del participantes
del escolas
del colunas
del colunas_participantes_integer

In [18]:
#Deve ter um jeito melhor de fazer isso
colunas_tabela_prova = ['pk_numero_inscricao', 'fk_codigo_prova_cn', 'fk_codigo_prova_ch',
'fk_codigo_prova_lc', 'fk_codigo_prova_mt', 'nota_cn', 'nota_ch', 'nota_lc', 'nota_mt', 'fk_prova_lingua_estrangeira']
colunas_tabela_prova_int = ['fk_codigo_prova_cn', 'fk_codigo_prova_ch',
'fk_codigo_prova_lc', 'fk_codigo_prova_mt', 'fk_prova_lingua_estrangeira']
colunas_tabela_prova_float = ['nota_cn', 'nota_ch', 'nota_lc', 'nota_mt']
resultados_provas_objetivas = dados.select(colunas_tabela_prova)
for column in colunas_tabela_prova_int:
    resultados_provas_objetivas = resultados_provas_objetivas.withColumn(column, col(column).cast(IntegerType()))
for column in colunas_tabela_prova_float:    
    resultados_provas_objetivas = resultados_provas_objetivas.withColumn(column, col(column).cast(FloatType()))
resultados_provas_objetivas = resultados_provas_objetivas.withColumnRenamed('pk_numero_inscricao', 'fk_numero_inscricao')
resultados_provas_objetivas = resultados_provas_objetivas.withColumn(
    "id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

resultados_provas_objetivas = resultados_provas_objetivas.na.fill(value = 0, subset=['nota_cn', 'nota_ch', 'nota_lc', 'nota_mt'])

In [19]:
resultados_provas_objetivas.select('*').write.format('jdbc') \
    .option('url', os.getenv('url')) \
    .option('dbtable', 'resultados_provas_objetivas') \
    .option('driver', os.getenv('driver')) \
    .option('user', os.getenv('user')).option('password', os.getenv('password')).mode('append').save()

del resultados_provas_objetivas

In [8]:
colunas = ['pk_numero_inscricao']
colunas_integer = ['fk_situacao_redacao', 'nota_competencia1', 'nota_competencia2', 'nota_competencia3',
'nota_competencia4', 'nota_competencia5', 'nota_redacao']

redacoes = dados.select(colunas+colunas_integer)
for column in colunas_integer:
    redacoes = redacoes.withColumn(column, col(column).cast(IntegerType()))
redacoes = redacoes.withColumnRenamed('pk_numero_inscricao', 'fk_numero_inscricao')
redacoes = redacoes.withColumn(
    "id",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

redacoes = redacoes.na.fill(value = 10, subset=['fk_situacao_redacao'])
redacoes = redacoes.na.fill(value = 0, subset=['nota_competencia1', 'nota_competencia2', 'nota_competencia3',
'nota_competencia4', 'nota_competencia5', 'nota_redacao'])

In [9]:
redacoes.select('*').write.format('jdbc') \
    .option('url', os.getenv('url')) \
    .option('dbtable', 'redacoes') \
    .option('driver', os.getenv('driver')) \
    .option('user', os.getenv('user')).option('password', os.getenv('password')).mode('append').save()

del redacoes