In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName("ENEMTeste").getOrCreate()

In [3]:
enem = (
    spark
    .read
    .format("csv")
    .options(header=True,inferSchema=True,delimiter=";", encoding="latin1")
    .load("enem2019/DADOS/MICRODADOS_ENEM_2019.csv")
)

In [4]:
enem.count()

5095270

In [5]:
from pyspark.sql.functions import count, col, mean

In [6]:
#Consulta a dados usando API Spark DataFrames

(
    enem.groupBy(["TP_SEXO", "SG_UF_RESIDENCIA"])
    .agg(
        mean(col("NU_IDADE")).alias("med_idade"),    
        mean(col("NU_NOTA_MT")).alias("med_nota_mt")
    )
    .show()
)

+-------+----------------+------------------+------------------+
|TP_SEXO|SG_UF_RESIDENCIA|         med_idade|       med_nota_mt|
+-------+----------------+------------------+------------------+
|      F|              DF| 23.08285149937152| 517.9624757342744|
|      F|              PB|22.583268720189846|491.15081486656857|
|      F|              ES|22.412574909777174| 523.7890381945264|
|      F|              PI|22.086648062532106|483.89192631785204|
|      M|              MS|22.712471968259443| 536.4454554406697|
|      F|              SE| 22.37932546818122|490.77354970390377|
|      M|              AM|23.486011658632027|498.55375681970463|
|      M|              RR| 22.34894026974952| 520.2750000000001|
|      M|              SP|20.975866306954437| 578.2618169116553|
|      F|              CE| 21.11123120643171| 501.9193588902578|
|      M|              PE|22.165988572246118| 541.9512436978805|
|      F|              TO|21.222175468125396| 481.6204369992716|
|      M|              MA

In [7]:
##trabalhar com spark para fazer os pre processamentos
enem.createOrReplaceTempView("enem")

In [8]:
spark.sql("""
    select tp_sexo, sg_uf_residencia, avg(nu_idade) as med_idade, avg(nu_nota_mt) as med_nota_mt
    from enem
    group by tp_sexo, sg_uf_residencia
""").show()

+-------+----------------+------------------+------------------+
|tp_sexo|sg_uf_residencia|         med_idade|       med_nota_mt|
+-------+----------------+------------------+------------------+
|      F|              DF| 23.08285149937152| 517.9624757342744|
|      F|              PB|22.583268720189846|491.15081486656857|
|      F|              ES|22.412574909777174| 523.7890381945264|
|      F|              PI|22.086648062532106|483.89192631785204|
|      M|              MS|22.712471968259443| 536.4454554406697|
|      F|              SE| 22.37932546818122|490.77354970390377|
|      M|              AM|23.486011658632027|498.55375681970463|
|      M|              RR| 22.34894026974952| 520.2750000000001|
|      M|              SP|20.975866306954437| 578.2618169116553|
|      F|              CE| 21.11123120643171| 501.9193588902578|
|      M|              PE|22.165988572246118| 541.9512436978805|
|      F|              TO|21.222175468125396| 481.6204369992716|
|      M|              MA

In [10]:
# consulta mais pesada ainda

#encapsulando
def consulta(query: str):
    return spark.sql(query).show()


In [12]:
enem.printSchema()

root
 |-- NU_INSCRICAO: long (nullable = true)
 |-- NU_ANO: integer (nullable = true)
 |-- CO_MUNICIPIO_RESIDENCIA: integer (nullable = true)
 |-- NO_MUNICIPIO_RESIDENCIA: string (nullable = true)
 |-- CO_UF_RESIDENCIA: integer (nullable = true)
 |-- SG_UF_RESIDENCIA: string (nullable = true)
 |-- NU_IDADE: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- TP_ESTADO_CIVIL: integer (nullable = true)
 |-- TP_COR_RACA: integer (nullable = true)
 |-- TP_NACIONALIDADE: integer (nullable = true)
 |-- CO_MUNICIPIO_NASCIMENTO: integer (nullable = true)
 |-- NO_MUNICIPIO_NASCIMENTO: string (nullable = true)
 |-- CO_UF_NASCIMENTO: integer (nullable = true)
 |-- SG_UF_NASCIMENTO: string (nullable = true)
 |-- TP_ST_CONCLUSAO: integer (nullable = true)
 |-- TP_ANO_CONCLUIU: integer (nullable = true)
 |-- TP_ESCOLA: integer (nullable = true)
 |-- TP_ENSINO: integer (nullable = true)
 |-- IN_TREINEIRO: integer (nullable = true)
 |-- CO_ESCOLA: integer (nullable = true)
 |-- CO_MU

In [14]:
consulta("""
    select tp_sexo, sg_uf_residencia, no_municipio_residencia, avg(nu_idade) as med_idade, avg(nu_nota_mt) as med_nota_mt
    from enem
    where sg_uf_residencia in ('MG','SP')
    group by tp_sexo, sg_uf_residencia, no_municipio_residencia
    order by med_nota_mt desc
""")

+-------+----------------+-----------------------+------------------+-----------------+
|tp_sexo|sg_uf_residencia|no_municipio_residencia|         med_idade|      med_nota_mt|
+-------+----------------+-----------------------+------------------+-----------------+
|      M|              SP|                Trabiju|              18.8|711.8333333333334|
|      M|              SP|               Balbinos|              19.0|647.8166666666666|
|      M|              SP|     São Caetano do Sul| 20.02294197031039|635.8275300171525|
|      M|              SP|                 Arapeí|19.571428571428573|635.0833333333334|
|      M|              MG|            Silveirânia|19.321428571428573|634.7130434782608|
|      M|              SP|                 Ocauçu|             18.55|634.6909090909089|
|      M|              MG|     Itamarati de Minas|19.941176470588236|           629.45|
|      M|              MG|      Conceição do Pará|19.952380952380953|629.0833333333335|
|      M|              MG|      

In [28]:
(
    enem
    .where(col("SG_UF_RESIDENCIA").isin(["MG","SP"]))
    .groupBy(["TP_SEXO","SG_UF_RESIDENCIA","NO_MUNICIPIO_RESIDENCIA"])
    .agg(
        mean(col("NU_IDADE")).alias("med_idade"),
        mean(col("NU_NOTA_MT")).alias("med_nota_mt")
  
    )
    .orderBy(col("med_nota_mt").desc())
    .show()
)

+-------+----------------+-----------------------+------------------+-----------------+
|TP_SEXO|SG_UF_RESIDENCIA|NO_MUNICIPIO_RESIDENCIA|         med_idade|      med_nota_mt|
+-------+----------------+-----------------------+------------------+-----------------+
|      M|              SP|                Trabiju|              18.8|711.8333333333334|
|      M|              SP|               Balbinos|              19.0|647.8166666666666|
|      M|              SP|     São Caetano do Sul| 20.02294197031039|635.8275300171525|
|      M|              SP|                 Arapeí|19.571428571428573|635.0833333333334|
|      M|              MG|            Silveirânia|19.321428571428573|634.7130434782608|
|      M|              SP|                 Ocauçu|             18.55|634.6909090909089|
|      M|              MG|     Itamarati de Minas|19.941176470588236|           629.45|
|      M|              MG|      Conceição do Pará|19.952380952380953|629.0833333333335|
|      M|              MG|      