### Instalando biblioteca para download dos dados


In [70]:
%pip install pandas requests

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.2.1 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


### Baixando arquivo csv

In [71]:
import requests

# URL do arquivo de dados
url = 'https://aplicacoes.mds.gov.br/sagi/servicos/misocial?fq=anomes_s:2023*&wt=csv&omitHeader=true&fq=cadunico_tot_fam_i:{0%20TO%20*]&q=*&fl=ibge:codigo_ibge,anomes:anomes_s,cadunico_tot_fam:cadunico_tot_fam_i,cadunico_tot_pes:cadunico_tot_pes_i,cadunico_tot_fam_rpc_ate_meio_sm:cadunico_tot_fam_rpc_ate_meio_sm_i,cadunico_tot_pes_rpc_ate_meio_sm:cadunico_tot_pes_rpc_ate_meio_sm_i,cadunico_tot_fam_pob:cadunico_tot_fam_pob_i,cadunico_tot_pes_pob:cadunico_tot_pes_pob_i,cadunico_tot_fam_ext_pob:cadunico_tot_fam_ext_pob_i,cadunico_tot_pes_ext_pob:cadunico_tot_pes_ext_pob_i,cadunico_tot_fam_pob_e_ext_pob:cadunico_tot_fam_pob_e_ext_pob_i,cadunico_tot_pes_pob_e_ext_pob:cadunico_tot_pes_pob_e_ext_pob_i&rows=100000000&sort=anomes_s%20desc,%20codigo_ibge%20asc'
file_path = 'cad_unico_renda.csv'

# Fazendo a requisição para baixar os dados
response = requests.get(url)

# Verificando se a requisição foi bem-sucedida
if response.status_code == 200:
    # salvando o conteúdo do arquivo localmente
    with open(file_path, 'wb') as f:
        f.write(response.content)
else:
    print(f"Falha ao baixar os dados. Status code: {response.status_code}")


### Instalando o pyspark

In [72]:
%pip install --user pyspark

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.2.1 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


### Criando a sessão spark

In [73]:
from pyspark.sql import SparkSession

# Criando a sessão spark
spark_session = SparkSession.builder.appName('spark') \
                                    .config("spark.executor.memory", "4g") \
                                    .getOrCreate()

### Lendo arquivo csv e montando dataframe do spark

In [74]:
df = spark_session.read.options(header="true", delimiter=",", encoding="ISO-8859-1", inferSchema=True).csv('cad_unico_renda.csv')
df.show()

+------+------+----------------+----------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+------------------------+------------------------------+------------------------------+
|  ibge|anomes|cadunico_tot_fam|cadunico_tot_pes|cadunico_tot_fam_rpc_ate_meio_sm|cadunico_tot_pes_rpc_ate_meio_sm|cadunico_tot_fam_pob|cadunico_tot_pes_pob|cadunico_tot_fam_ext_pob|cadunico_tot_pes_ext_pob|cadunico_tot_fam_pob_e_ext_pob|cadunico_tot_pes_pob_e_ext_pob|
+------+------+----------------+----------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+------------------------+------------------------------+------------------------------+
|110001|202311|            4978|           12757|                            2886|                            8590|                 771|                2489|                     722|        

### Padronizando e transformando os dados

In [75]:
from itertools import chain
from pyspark.sql.functions import col, substring, lpad
from pyspark.sql import functions as F

# Removendo prefixo cadunico_
for col_name in df.columns:
    if col_name.startswith('cadunico_'):
        new_col_name = col_name.replace('cadunico_', '', 1)
        df = df.withColumnRenamed(col_name, new_col_name)

# Separando a coluna 'anomes' em 'ano' e 'mes'
df = df.withColumn('ano', substring(col('anomes'), 1, 4).cast('string'))
df = df.withColumn('mes', substring(col('anomes'), 5, 2).cast('string'))

# Concatenando ano e mês em um formato de data
df = df.withColumn('data', F.concat(df['ano'], F.lit('-'), df['mes']))

# Convertendo para tipo de dados data
df = df.withColumn('data', F.to_date(df['data'], 'yyyy-MM'))

# Excluindo colunas redundantes
df = df.drop('mes', 'anomes')

# Renomenando coluna data para mes
df = df.withColumnRenamed('data', 'mes')

# Convertendo o código do IBGE para string
df = df.withColumn("ibge", col("ibge").cast("string"))

# Criando uma nova coluna pegando apenas os dois primeiros dígitos do código do município (estado)
df = df.withColumn('estado', substring('ibge', 1, 2))

# Criando um dicionário de mapeamento de códigos dos estados para as siglas dos estados
estado_dict = {
    '12': 'AC',
    '27': 'AL',
    '13': 'AM',
    '16': 'AP',
    '29': 'BA',
    '23': 'CE',
    '53': 'DF',
    '32': 'ES',
    '52': 'GO',
    '21': 'MA',
    '51': 'MT',
    '50': 'MS',
    '31': 'MG',
    '15': 'PA',
    '25': 'PB',
    '41': 'PR',
    '26': 'PE',
    '22': 'PI',
    '33': 'RJ',
    '24': 'RN',
    '43': 'RS',
    '11': 'RO',
    '14': 'RR',
    '42': 'SC',
    '35': 'SP',
    '28': 'SE',
    '17': 'TO'
}

# Convertendo o dicionário em uma expressão de mapeamento
mapping_expr = F.create_map([F.lit(x) for x in chain(*estado_dict.items())])

# Aplicando a expressão para criar uma nova coluna com as siglas dos estados
df = df.withColumn('nome_estado', mapping_expr[df['estado']])

df = df.drop('estado')

df = df.withColumnRenamed('nome_estado', 'estado')

# Exibindo o DataFrame resultante
print("DataFrame resultante:")

df.show()

DataFrame resultante:
+------+-------+-------+-----------------------+-----------------------+-----------+-----------+---------------+---------------+---------------------+---------------------+----+----------+------+
|  ibge|tot_fam|tot_pes|tot_fam_rpc_ate_meio_sm|tot_pes_rpc_ate_meio_sm|tot_fam_pob|tot_pes_pob|tot_fam_ext_pob|tot_pes_ext_pob|tot_fam_pob_e_ext_pob|tot_pes_pob_e_ext_pob| ano|       mes|estado|
+------+-------+-------+-----------------------+-----------------------+-----------+-----------+---------------+---------------+---------------------+---------------------+----+----------+------+
|110001|   4978|  12757|                   2886|                   8590|        771|       2489|            722|           1965|                 1493|                 4454|2023|2023-11-01|    RO|
|110002|  19112|  44321|                  11575|                  30371|       2904|       8033|           3368|           8190|                 6272|                16223|2023|2023-11-01|    RO

### Resgatando nomes dos municípios

In [76]:
import json

municipios_code_path ='C:\\Users\\usuario\\Desktop\\ETL\\dados_abertos\\municipios-code.json'

municipios_dict = {}

with open(municipios_code_path, 'r', encoding='utf-8') as file:
    municipios_dict = json.load(file)


# Imprimindo o número de chaves (keys) no dicionário
print("Número de chaves no dicionário:", len(municipios_dict))

# Removendo o último dígito de cada chave
municipios_dict_corrigido = {key[:-1]: value for key, value in municipios_dict.items()}

# Exibindo o número de chaves no dicionário corrigido
print("Número de chaves no dicionário corrigido:", len(municipios_dict_corrigido))

# Convertendo o dicionário em uma expressão de mapeamento
mapping_expr = F.create_map([F.lit(x) for x in chain(*municipios_dict_corrigido.items())])

# Aplicando a expressão para criar uma nova coluna com os nomes dos estados
df = df.withColumn('municipio', mapping_expr[df['ibge']])

# Exibindo o DataFrame resultante
print("DataFrame resultante:")

df.show()


Número de chaves no dicionário: 5570
Número de chaves no dicionário corrigido: 5570
DataFrame resultante:
+------+-------+-------+-----------------------+-----------------------+-----------+-----------+---------------+---------------+---------------------+---------------------+----+----------+------+--------------------+
|  ibge|tot_fam|tot_pes|tot_fam_rpc_ate_meio_sm|tot_pes_rpc_ate_meio_sm|tot_fam_pob|tot_pes_pob|tot_fam_ext_pob|tot_pes_ext_pob|tot_fam_pob_e_ext_pob|tot_pes_pob_e_ext_pob| ano|       mes|estado|           municipio|
+------+-------+-------+-----------------------+-----------------------+-----------+-----------+---------------+---------------+---------------------+---------------------+----+----------+------+--------------------+
|110001|   4978|  12757|                   2886|                   8590|        771|       2489|            722|           1965|                 1493|                 4454|2023|2023-11-01|    RO| Alta Floresta'Oeste|
|110002|  19112|  44321|  

### Exportando dataframe resultante para CSV

In [77]:
df.toPandas().to_csv('cad_unico_renda_tratado2.csv', index=False)

### Desalocando sessão do spark


In [78]:
# Stopping spark session
spark_session.stop()