#Importando arquivo do Drive

Acessar Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Acessar Pasta das Bases

In [2]:
import os
os.chdir("/content/drive/MyDrive/MIT_Infnet/Aula_Hadoop/Projeto")

#Configuração do ambiente PySpark

Instalar PySpark

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=4130c4a6d6d9610bc03b87cc4be9df53f5cec516caad01ed2dbec5237396b915
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


Importar SparkSession ao invés de SparkContext diretamente para trabalhar com DataFrames

In [20]:
from pyspark.sql import SparkSession

from pyspark.sql import functions as F


Criar a SparkSession

In [5]:
spark = SparkSession.builder \
    .appName("Auxilio Emergencial") \
    .master("local") \
    .getOrCreate()

Verificar o nome da aplicação (opcional)

In [6]:
print(spark.sparkContext.appName)

Auxilio Emergencial


# Carregar os dados

Ler o arquivo CSV usando DataFrame

In [8]:
municipio_df = spark.read.csv("POP2022_Municipios_20230622.csv",sep=';', header=True, inferSchema=True)
auxilio_df = spark.read.csv("auxilio_emergencial.csv",sep=',', header=True, inferSchema=True)

In [32]:
municipio_df = municipio_df.withColumnRenamed("UF", "uf_municipio")

joined_df = auxilio_df.join(municipio_df, auxilio_df["uf"] == municipio_df["uf_municipio"])

Mostrar as primeiras 5 linhas do DataFrame

In [33]:
municipio_df.show(5)
auxilio_df.show(5)
joined_df.show(5)

+------------+-------+----------+--------------------+-----------+
|uf_municipio|COD. UF|COD. MUNIC|   NOME DO MUNICÍPIO| POPULAÇÃO |
+------------+-------+----------+--------------------+-----------+
|          RO|     11|        15|Alta Floresta D'O...|    21.558 |
|          RO|     11|        23|           Ariquemes|   100.896 |
|          RO|     11|        31|              Cabixi|     5.107 |
|          RO|     11|        49|              Cacoal|    92.202 |
|          RO|     11|        56|          Cerejeiras|    15.237 |
+------------+-------+----------+--------------------+-----------+
only showing top 5 rows

+-------+----+---------------------+---------+----------------+----------------+--------------------+---------------+---------------+-----------+-------------+-------+----------+-----+
|ano_mes|  uf|codigo_ibge_municipio|municipio|nis_beneficiario|cpf_beneficiario|        beneficiario|nis_responsavel|cpf_responsavel|responsavel|enquadramento|parcela|observacao|valor|
+-

Converter DataFrame para RDD

In [10]:
municipio_rdd = municipio_df.rdd
auxilio_rdd = auxilio_df.rdd

Mostrar as primeiras linhas do RDD

In [11]:
municipio_rdd.take(5)
auxilio_rdd.take(5)

[Row(ano_mes=202004, uf=None, codigo_ibge_municipio=None, municipio=None, nis_beneficiario=None, cpf_beneficiario='***832922**', beneficiario='ALICE CILENE DE OLIVEIRA DIAS', nis_responsavel=-2, cpf_responsavel=None, responsavel=None, enquadramento='EXTRA CADUN', parcela=1, observacao=None, valor=600.0),
 Row(ano_mes=202004, uf=None, codigo_ibge_municipio=None, municipio=None, nis_beneficiario=None, cpf_beneficiario='***208119**', beneficiario='APARICIO LINHARES LOUZADA', nis_responsavel=-2, cpf_responsavel=None, responsavel=None, enquadramento='EXTRA CADUN', parcela=1, observacao=None, valor=600.0),
 Row(ano_mes=202004, uf=None, codigo_ibge_municipio=None, municipio=None, nis_beneficiario=None, cpf_beneficiario='***884918**', beneficiario='CELIA APARECIDA DE SOUZA', nis_responsavel=-2, cpf_responsavel=None, responsavel=None, enquadramento='EXTRA CADUN', parcela=1, observacao=None, valor=600.0),
 Row(ano_mes=202004, uf=None, codigo_ibge_municipio=None, municipio=None, nis_beneficiario=

# Perguntas do projeto Hadoop usando PySpark

1- Qual é o total de auxílio emergencial distribuído em cada município? (Valor total x UF)

In [12]:
auxilio_por_uf = auxilio_df.groupBy("uf").agg({"valor": "sum"}).withColumnRenamed("sum(valor)", "total_auxilio")

auxilio_por_uf.show(n=auxilio_por_uf.count(), truncate=False)

+----+-------------+
|uf  |total_auxilio|
+----+-------------+
|NULL|247800.0     |
|AL  |3.7236192E9  |
|AC  |9.316482E8   |
|AM  |4.4732892E9  |
|BA  |1.70635566E10|
|AP  |9.52971E8    |
|CE  |1.02217854E10|
|ES  |3.2871432E9  |
|DF  |1.9446858E9  |
|GO  |5.8288482E9  |
|MA  |8.2498932E9  |
|MG  |1.6283049E10 |
|MT  |2.9414358E9  |
|MS  |2.2079406E9  |
|PA  |9.8359806E9  |
|PB  |4.5076518E9  |
|PE  |1.07698908E10|
|PI  |3.9194124E9  |
|PR  |7.898199E9   |
|RJ  |1.48293744E10|
|RN  |3.678657E9   |
|RO  |1.5636414E9  |
|RR  |6.479202E8   |
|RS  |6.877362E9   |
|SC  |3.926445E9   |
|SE  |2.5983708E9  |
|SP  |3.1680576E10 |
|TO  |1.4666646E9  |
+----+-------------+



2- Qual a total de população por UF? (UF x Qtd pessoa)

In [14]:
populacao_por_uf = municipio_df.groupBy("uf").agg({" POPULAÇÃO ": "sum"}).withColumnRenamed("sum( POPULAÇÃO )", "total_pessoa")

populacao_por_uf.show(n=populacao_por_uf.count(), truncate=False)

+---+------------------+
|uf |total_pessoa      |
+---+------------------+
|SC |7762.1539999999995|
|RO |1616.3790000000004|
|PI |3270.173999999999 |
|AM |1897.531          |
|RR |634.805           |
|GO |6462.566000000001 |
|TO |1584.3060000000003|
|MT |4781.240999999998 |
|SP |32147.36100000001 |
|PB |4030.9610000000016|
|ES |3975.1            |
|RS |9683.796          |
|MS |2833.7419999999993|
|AL |3125.2539999999995|
|MG |19175.14599999998 |
|PA |7075.626000000004 |
|BA |12048.036000000006|
|SE |2211.867999999999 |
|PE |7556.527          |
|CE |6340.273999999999 |
|RN |3303.9530000000013|
|RJ |9989.677          |
|MA |5739.231000000001 |
|AC |829.7800000000001 |
|DF |NULL              |
|PR |9963.589999999995 |
|AP |774.268           |
+---+------------------+



3- Valor médio do auxilio por pessoa em cada uf? (Valor total/Total poupulacao x Uf)

In [None]:
medio_pessoa_uf = joined_df.groupBy("uf") \
    .agg((F.sum("valor") / F.first(" POPULAÇÃO ")).alias("valor_medio_por_pessoa"))

medio_pessoa_uf.show(n=medio_pessoa_uf.count(), truncate=False)

4- Quantidade de vezes recebido o auxilio por uf? (Parcela x Uf)

In [None]:
qtd_auxilio_uf = auxilio_df.groupBy("uf") \ .agg(F.count("parcela").alias("total_parcelas"))

qtd_auxilio_uf.show(n=qtd_auxilio_uf.count(), truncate=False)

5- Quais os nomes que mais se repetem que receberam auxilio? (Qtd nome)

In [None]:
mes_repetido = auxilio_df.groupBy("beneficiario") \
    .agg(F.count("*").alias("qtd_nome")) \
    .orderBy(F.col("qtd_nome").desc()) \
    .limit(10)

mes_repetido.show(n=mes_repetido.count(), truncate=False)

# Perguntas Extras

6- Beneficiários com maior quantidade de valor recebido (Pessoa x valor)

In [None]:
beneficiarios_valor_df = auxilio_emergencial.groupBy("beneficiario") \
    .agg(F.sum("valor").alias("total_valor_recebido")) \
    .orderBy(F.col("total_valor_recebido").desc()) \
    .limit(10)

beneficiarios_valor_df.show(n=beneficiarios_valor_df.count(), truncate=False)

7- Quantidade de auxilio distribuido por mês (ano_mes x qtd_parcela)

In [None]:
auxilio_por_mes_df = auxilio_emergencial.groupBy("ano_mes") \
    .agg(F.count("parcela").alias("qtd_parcelas")) \
    .orderBy("ano_mes")

auxilio_por_mes_df.show(n=auxilio_por_mes_df.count(), truncate=False)