###  Desafio de Integração de dados IBGE
Julio Cesar Correa 

23/02/2024
 

#### Pyspark - Ingest - Localidades - Municipios do estado de SP

In [8]:
import os
import pandas as pd, numpy as np
import json
import requests
#pip install import_ipynb
#import import_ipynb

In [9]:
import delta
import requests
#import mimesis

In [10]:
import pyspark
from pyspark.sql import SparkSession

In [11]:
# Import all functions and types
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [111]:
def get_spark() -> SparkSession:
    builder = (
        pyspark.sql.SparkSession.builder.appName("IBGE: Localidades")
        .config("spark.sql.repl.eagerEval.enabled", True)
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
    )
    spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    return spark

In [113]:
# call Spark method 
spark = get_spark()

In [252]:
from helpers import download_endpoint_helper

#### Data Processing -  Localidades

##### Read Data e prepare master dataset

In [201]:

print("Reading delta file ...!")
#option("multiline","True")
df = spark.read.format("delta").load("/Users/dal/Documents/desafio_ibge/data/delta_dir/transient/url_localidade_mun_sp")
#df.collect()
#df.show(n=2, truncate=False, vertical=True)
#df.dtypes

Reading delta file ...!


_Create Schema to map all nested columns_

In [182]:
schema_microrregiao =StructType([
                        StructField("id", StringType(), True),
                        StructField("nome", StringType(), True),
                        StructField("mesorregiao", StructType([
                            StructField("id", StringType(), True),
                            StructField("nome", StringType(), True),
                            StructField("UF",StructType([
                                StructField("id", StringType(), True),
                                StructField("sigla", StringType(), True),
                                StructField("nome", StringType(), True),
                                StructField("regiao",StructType([
                                    StructField("id", StringType(), True),
                                    StructField("sigla", StringType(), True),
                                    StructField("nome", StringType(), True),    
                                ]))
                            ]))
                    ])),
])

_Check mapped columns_

In [238]:
df.withColumn("mi", from_json("microrregiao", schema_microrregiao))\
    .select(col('id').alias("id_municipio"), col('nome').alias("nome_municipio"),col('mi.*'))\
    .show(n=2, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------
 id_municipio   | 3509403                                                       
 nome_municipio | Cajuru                                                        
 id             | 35015                                                         
 nome           | Batatais                                                      
 mesorregiao    | {3502, Ribeirão Preto, {35, SP, São Paulo, {3, SE, Sudeste}}} 
-RECORD 1-----------------------------------------------------------------------
 id_municipio   | 3509452                                                       
 nome_municipio | Campina do Monte Alegre                                       
 id             | 35042                                                         
 nome           | Itapetininga                                                  
 mesorregiao    | {3511, Itapetininga, {35, SP, São Paulo, {3, SE, Sudeste}}}   
only showing top 2 rows



_Aplica o schema criado para o novo dataset _

In [239]:
df_master = df.withColumn("mi", from_json("microrregiao", schema_microrregiao)) \
    .select(col('id').alias("id_municipio"), col('nome').alias("nome_municipio"),col('mi.*'))

In [258]:
df_master.printSchema()

root
 |-- id_municipio: string (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- id: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- mesorregiao: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- nome: string (nullable = true)
 |    |-- UF: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- sigla: string (nullable = true)
 |    |    |-- nome: string (nullable = true)
 |    |    |-- regiao: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- sigla: string (nullable = true)
 |    |    |    |-- nome: string (nullable = true)



In [241]:
#df_master.show(n=2, truncate=False, vertical=True)x

In [243]:
#check data
(df_master.select(df_master.id,df_master.mesorregiao.id,df_master.mesorregiao.UF.sigla)
    .show(n=2, truncate=False, vertical=True))

-RECORD 0---------------------
 id                   | 35015 
 mesorregiao.id       | 3502  
 mesorregiao.UF.sigla | SP    
-RECORD 1---------------------
 id                   | 35042 
 mesorregiao.id       | 3511  
 mesorregiao.UF.sigla | SP    
only showing top 2 rows



##### Processing : Local helpers

In [262]:
#save data as delta table
def save_ibge_data_as_delta(df, path, table_name):
    df.withColumn("load_timestamp", current_timestamp())
    df.write.mode(saveMode="overwrite").option("mergeSchema", "true").format("delta").save(path +"/"+ table_name)

##### Processing : Municipios

In [263]:
print("Processando a tabela de Municipios...!")

Processando a tabela de Municipios...!


In [264]:
#Data Transformations

In [276]:
df_municipio = df_master.selectExpr("id_municipio as id", "nome_municipio as nome", "id as microrregiao_id")

In [283]:
path_raw = download_endpoint_helper.delta_root_dir + "/" + download_endpoint_helper.raw_dir 

In [284]:
save_ibge_data_as_delta(df_municipio, path_raw,'tb_municipio')

##### Processing : Microrregião

In [279]:
print("Processando a tabela de Microrregiões ...!")

Processando a tabela de Microrregiões ...!


In [280]:
#Data Transformations

In [285]:
df_microrregiao = df_master.selectExpr("id","nome", "mesorregiao.id as mesorregiao_id")

In [286]:
save_ibge_data_as_delta(df_microrregiao, path_raw,'tb_microrregiao')

##### Processing : Mesorregião

In [227]:
print("Processando a tabela de Mesorregiões...!")

Processando a tabela de Mesorregiões...!


In [288]:
df_mesorregiao = df_master.selectExpr("mesorregiao.id as id", "mesorregiao.nome as nome","mesorregiao.UF.id as uf_id")

In [289]:
save_ibge_data_as_delta(df_mesorregiao, path_raw,'tb_mesorregiao')

##### Processing : UFs

In [228]:
print("Processando a tabela de UFs...!")

Processando a tabela de Regiões...!


In [295]:
df_uf = df_master.selectExpr("mesorregiao.UF.id as id","mesorregiao.UF.sigla as sigla", \
                                      "mesorregiao.UF.nome as nome","mesorregiao.UF.regiao.id as regiao_id")

In [296]:
save_ibge_data_as_delta(df_uf, path_raw,'tb_uf')

In [298]:
##### Processing : Região

In [300]:
df_regiao = df_master.selectExpr("mesorregiao.UF.regiao.id as id","mesorregiao.UF.regiao.sigla as sigla", \
                                      "mesorregiao.UF.regiao.nome as nome")

In [301]:
save_ibge_data_as_delta(df_regiao, path_raw,'tb_regiao')

print("Finalizando processamento...!")