## Importação de Bibliotecas

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, MapType
from pyspark.sql.functions import explode, col, from_json, expr, regexp_extract, udf, regexp_extract
import pandas as pd
import ast
import requests
import json

## Função para chamar a API e carregar os dados do IBE retorna dataframe

In [0]:
def load_data_ibge(url):
    response = requests.get(url)

    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print(f'Erro: {response.status_code} - {response.text}')

## Função para obter os valores do dicionário

In [0]:
def parse_json_string(json_string):
    if json_string:
        return dict(map(lambda x: x.split('='), json_string.strip('{}').split(', ')))
    else:
        return {}

## Carga dos dados de localidades do estado de SP

In [0]:
url_localidade = 'https://servicodados.ibge.gov.br/api/v1/localidades/estados/35/distritos'

df_localidade = spark.read.json(sc.parallelize(load_data_ibge(url_localidade)))
df_localidade = df_localidade.select(col("municipio.nome").alias("municipio"),
                          col("municipio.microrregiao.nome").alias("microrregiao"),
                          col("municipio.microrregiao.mesorregiao.nome").alias("mesorregiao"),
                          col("municipio.microrregiao.mesorregiao.UF.id").alias("id_estado"),
                          col("municipio.microrregiao.mesorregiao.UF.nome").alias("estado"),
                          col("municipio.microrregiao.mesorregiao.UF.regiao.nome").alias("regiao"),
                          col("municipio.microrregiao.mesorregiao.UF.regiao.sigla").alias("sigla_regiao"),
                          col("municipio.microrregiao.mesorregiao.UF.sigla").alias("sigla_estado"),
                          col("nome").alias("bairro"))
# display(df_localidade)
df_localidade.createOrReplaceTempView("dim_localidade")

## Carga dos dados de IDH do estado de SP

In [0]:
url_idh = 'https://servicodados.ibge.gov.br/api/v1/pesquisas/-/indicadores/30255/resultados/35'
df_idh = spark.createDataFrame(load_data_ibge(url_idh))
df_idh = df_idh.select(explode("res").alias("element"))

# Registre a função UDF
parse_json_udf = udf(parse_json_string, MapType(StringType(), StringType()))

# Aplicar a função UDF para criar colunas 'res_dict' e 'notas_dict'
df_idh = df_idh.withColumn("res_dict", parse_json_udf(col("element.res")))
df_idh = df_idh.withColumn("notas_dict", parse_json_udf(col("element.notas")))

# Criar colunas para cada atributo
for year in range(1991, 2022):
    df_idh = df_idh.withColumn(f"res_{year}", col("res_dict").getItem(str(year)).cast(DoubleType()))
    df_idh = df_idh.withColumn(f"notas_{year}", col("notas_dict").getItem(str(year)).cast(DoubleType()))

df_idh = df_idh.withColumn("localidade", col("element").getItem("localidade").cast(IntegerType()))
df_idh = df_idh.select("localidade" ,"res_2016", "res_2017", "res_2018", "res_2019", "res_2020", "res_2021") 
# display(df_idh)
df_idh.createOrReplaceTempView("dim_idh")

## Carga dos dados de valor adicionado da indústria em proporção do PIB e per capita do estado de SP

In [0]:
url_pib = 'https://apisidra.ibge.gov.br/values/t/6587/n1/all/n3/all/v/9312/p/last%203/d/v9312%201'
df_pib = spark.read.json(sc.parallelize(load_data_ibge(url_pib)))
df_pib = (
    df_pib
    .withColumnRenamed("D1C", "brasil_unidade_federacao_codigo")
    .withColumnRenamed("D1N", "brasil_unidade_federacao")
    .withColumnRenamed("D2C", "variavel_codigo")
    .withColumnRenamed("D2N", "variavel")
    .withColumnRenamed("D3C", "ano_codigo")
    .withColumnRenamed("D3N", "ano")
    .withColumnRenamed("MC", "unidade_medida_codigo")
    .withColumnRenamed("MN", "unidade_medida")
    .withColumnRenamed("NC", "nivel_territorial_codigo")
    .withColumnRenamed("NN", "nivel_territorial")
    .withColumnRenamed("V", "valor")
)
df_pib = df_pib.filter(col("brasil_unidade_federacao_codigo") != 1)
df_pib.createOrReplaceTempView("dim_pib")

## Carga dos dados População Projetada do estado de SP

In [0]:
url_populacao = 'https://servicodados.ibge.gov.br/api/v1/pesquisas/-/indicadores/49645/resultados/35'
df_populacao = spark.createDataFrame(load_data_ibge(url_populacao))
df_populacao = df_populacao.select(explode("res").alias("element"))

# Registre a função UDF
parse_json_udf = udf(parse_json_string, MapType(StringType(), StringType()))

# Aplicar a função UDF para criar colunas 'res_dict' e 'notas_dict'
df_populacao = df_populacao.withColumn("res_dict", parse_json_udf(col("element.res")))
df_populacao = df_populacao.withColumn("notas_dict", parse_json_udf(col("element.notas")))

# Criar colunas para cada atributo
for year in range(2013, 2060):
    df_populacao = df_populacao.withColumn(f"res_{year}", col("res_dict").getItem(str(year)).cast(DoubleType()))
    df_populacao = df_populacao.withColumn(f"notas_{year}", col("notas_dict").getItem(str(year)).cast(DoubleType()))

df_populacao = df_populacao.withColumn("localidade", col("element").getItem("localidade").cast(IntegerType()))
df_populacao = df_populacao.select("localidade" ,"res_2016", "res_2017", "res_2018", "res_2019", "res_2020", "res_2021") 
df_populacao.createOrReplaceTempView("dim_populacao")

## Criação da tabela fato, indicadores do estado de SP

In [0]:
df_indicadores_sp = spark.sql("""
SELECT id_estado 
       ,estado
       ,idh.res_2016 idh_2016
       ,idh.res_2017 idh_2017
       ,idh.res_2018 idh_2018
       ,idh.res_2019 idh_2019
       ,idh.res_2020 idh_2020
       ,idh.res_2021 idh_2021
       ,pib.valor pib
       ,pib.ano ano_pib
       ,popu.res_2016 popu_2016
       ,popu.res_2017 popu_2017
       ,popu.res_2018 popu_2018
       ,popu.res_2019 popu_2019
       ,popu.res_2020 popu_2020
       ,popu.res_2021 popu_2021

FROM   dim_localidade loca
inner  join dim_idh   idh on idh.localidade = loca.id_estado
inner  join dim_pib   pib on pib.brasil_unidade_federacao_codigo = loca.id_estado
inner  join dim_populacao popu on popu.localidade = loca.id_estado 
WHERE  loca.municipio = 'São Paulo'
       AND pib.ano IN(2016, 2017, 2018, 2019, 2020, 2021)
GROUP  BY id_estado 
          ,estado
          ,idh.res_2016
          ,idh.res_2017
          ,idh.res_2018
          ,idh.res_2019
          ,idh.res_2020
          ,idh.res_2021 
          ,pib.valor
          ,pib.ano
          ,popu.res_2016
          ,popu.res_2017
          ,popu.res_2018
          ,popu.res_2019
          ,popu.res_2020
          ,popu.res_2021""")
df_indicadores_sp.createOrReplaceTempView("tblfato_indicador_sp")

In [0]:
%sql
SELECT * 
FROM tblfato_indicador_sp

id_estado,estado,idh_2016,idh_2017,idh_2018,idh_2019,idh_2020,idh_2021,pib,ano_pib,popu_2016,popu_2017,popu_2018,popu_2019,popu_2020,popu_2021
35,São Paulo,0.835,0.831,0.837,0.845,0.823,0.806,13.4,2020,44760305.0,45149603.0,45538936.0,45919049.0,46289333.0,46649132.0
35,São Paulo,0.835,0.831,0.837,0.845,0.823,0.806,13.6,2019,44760305.0,45149603.0,45538936.0,45919049.0,46289333.0,46649132.0
35,São Paulo,0.835,0.831,0.837,0.845,0.823,0.806,13.2,2018,44760305.0,45149603.0,45538936.0,45919049.0,46289333.0,46649132.0


## Valor adicionado da indústria de transformação em proporção dos PIB dos estados

In [0]:
%sql
SELECT * 
FROM   dim_pib

brasil_unidade_federacao_codigo,brasil_unidade_federacao,variavel_codigo,variavel,ano_codigo,ano,unidade_medida_codigo,unidade_medida,nivel_territorial_codigo,nivel_territorial,valor
11,Rondônia,9312,Valor adicionado da indústria de transformação em proporção do PIB,2018,2018,2,%,3,Unidade da Federação,8.0
11,Rondônia,9312,Valor adicionado da indústria de transformação em proporção do PIB,2019,2019,2,%,3,Unidade da Federação,7.9
11,Rondônia,9312,Valor adicionado da indústria de transformação em proporção do PIB,2020,2020,2,%,3,Unidade da Federação,8.0
12,Acre,9312,Valor adicionado da indústria de transformação em proporção do PIB,2018,2018,2,%,3,Unidade da Federação,4.1
12,Acre,9312,Valor adicionado da indústria de transformação em proporção do PIB,2019,2019,2,%,3,Unidade da Federação,3.9
12,Acre,9312,Valor adicionado da indústria de transformação em proporção do PIB,2020,2020,2,%,3,Unidade da Federação,3.8
13,Amazonas,9312,Valor adicionado da indústria de transformação em proporção do PIB,2018,2018,2,%,3,Unidade da Federação,22.8
13,Amazonas,9312,Valor adicionado da indústria de transformação em proporção do PIB,2019,2019,2,%,3,Unidade da Federação,22.6
13,Amazonas,9312,Valor adicionado da indústria de transformação em proporção do PIB,2020,2020,2,%,3,Unidade da Federação,23.4
14,Roraima,9312,Valor adicionado da indústria de transformação em proporção do PIB,2018,2018,2,%,3,Unidade da Federação,1.2


In [0]:
%sql
SELECT * 
FROM   dim_populacao

localidade,res_2016,res_2017,res_2018,res_2019,res_2020,res_2021
35,44760305.0,45149603.0,45538936.0,45919049.0,46289333.0,46649132.0
