# Importar bibliotecas

In [1]:
import time
import json
import requests
import pandas as pd
from pandas.io.json import json_normalize
import numpy as np
import os

In [2]:
from datetime import datetime

# Parametros

In [3]:
path_notebook = os.getcwd()

In [4]:
local_dw = './dw/'
local_stage = './stage/'

In [5]:
#Inicio da execução do notebook
start_general = datetime.now()

## Definir funções

In [6]:
def renomear_colunas(nome_df, nomes_colunas):
    for i in range(0, len(nomes_colunas), 1):
        #print (nomes_colunas[i])
        nome_df.rename(columns = {
            nome_df.columns[i] : nomes_colunas[i]
        }, inplace = True) 

In [7]:
def gerar_surrogate_key(dataframe):
    return list(range(1, len(dataframe) +1, 1))

## Leitura do arquivo

In [8]:
nome_arquivo_local = 'pib-municipios-2010-2018.parquet'

In [9]:
stage_pib = pd.read_parquet(local_stage + nome_arquivo_local)

## Dimensão Região

In [10]:
dimensao_regiao = stage_pib.groupby([
    'id_Regiao'
    ,'nome_Regiao'
]).agg({
    'id_Regiao' : [('sk_Regiao', 'count' )]
}).reset_index()

#### Ajustes label das colunas

In [11]:
dimensao_regiao.columns = [(x[0] if x[1]=='' else x[1]) for x in dimensao_regiao.columns]

#### Gerar surrogate key

In [12]:
dimensao_regiao['sk_regiao'] = gerar_surrogate_key(dimensao_regiao)

## Salvar dataframe em arquivo local formato parquet

In [13]:
dimensao_regiao.to_parquet(local_dw + 'dRegiao.parquet', index=False)

## Dimensão Estado

In [14]:
dimensao_estado = stage_pib.groupby([
    'id_Estado'
    ,'nome_Estado'
    ,'sigla_Estado'
]).agg({
    'id_Estado' : [('sk_Estado', 'count' )]
}).reset_index()

#### Ajustes label das colunas

In [15]:
dimensao_estado.columns = [(x[0] if x[1]=='' else x[1]) for x in dimensao_estado.columns]

#### Gerar surrogate key

In [16]:
dimensao_estado['sk_Estado'] = gerar_surrogate_key(dimensao_estado)

## Salvar dataframe em arquivo local formato parquet

In [17]:
dimensao_estado.to_parquet(local_dw + 'dEstado.parquet', index=False)

## Dimensão Município

In [18]:
dimensao_municipio = stage_pib.groupby([
    'id_Municipio'
    ,'nome_Municipio'
    ,'nome_Regiao_Metropolitana'
    ,'valida_Regiao_Metropolitana'
    ,'id_Mesoregiao'
    ,'nome_Mesoregiao'
    ,'id_Microregiao'
    ,'nome_Microregiao'
    ,'id_Regiao_Imediata'
    ,'nome_Regiao_Imediata'
    ,'municipio_Regiao_Imediata'
    ,'id_Regiao_Intermediaria'
    ,'nome_Regiao_Intermediaria'
    ,'municipio_Regiao_Intermediaria'
    ,'codigo_Concentracao_Urbana'
    ,'nome_Concentracao_Urbana'
    ,'tipo_Concentracao_Urbana'
    ,'valida_Concentracao_Urbana'
    ,'codigo_Arranjo_Populacional'
    ,'nome_Arranjo_Populacional'
    ,'valida_Arranjo_Populacional'
    ,'hierarquia_Urbana'
    ,'hierarquia_Urbana_Principais'
    ,'codigo_Regiao_Rural'
    ,'nome_Regiao_Rural'
    ,'nome_Regiao_Rural_Nucleo'
]).agg({
    'id_Municipio' : [('sk_Municipio', 'count' )]
}).reset_index()

#### Ajustes label das colunas

In [19]:
dimensao_municipio.columns = [(x[0] if x[1]=='' else x[1]) for x in dimensao_municipio.columns]

#### Gerar surrogate key

In [20]:
dimensao_municipio['sk_Municipio'] = gerar_surrogate_key(dimensao_municipio)

## Salvar dataframe em arquivo local formato parquet

In [21]:
dimensao_municipio.to_parquet(local_dw + 'dMunicipio.parquet', index=False)

## Dimensão Ano

In [22]:
ano = list(range(1900, 2101, 1))

In [23]:
sk_ano = list(range(1, len(ano) +1, 1))

#### Ajustes label das colunas

In [24]:
dimensao_ano = pd.DataFrame(data = {'Ano': ano, 'sk_Ano': sk_ano})

## Salvar dataframe em arquivo local formato parquet

In [25]:
dimensao_ano.to_parquet(local_dw + 'dAno.parquet', index=False)

### Consultar IDs dimensões

#### ID Região

In [26]:
stage_pib = pd.merge(
    stage_pib
    ,dimensao_regiao[['id_Regiao', 'sk_Regiao']]
    ,on = ('id_Regiao')
    ,how = 'inner'
)

#### ID Estado

In [27]:
stage_pib = pd.merge(
    stage_pib
    ,dimensao_estado[['id_Estado', 'sk_Estado']]
    ,on = ('id_Estado')
    ,how = 'inner'
)

#### ID Municipio

In [28]:
stage_pib = pd.merge(
    stage_pib
    ,dimensao_municipio[['id_Municipio', 'sk_Municipio']]
    ,on = ('id_Municipio')
    ,how = 'inner'
)

#### ID Ano do PIB

In [29]:
stage_pib = pd.merge(
    stage_pib
    ,dimensao_ano[['Ano', 'sk_Ano']]
    ,left_on = ('ano_PIB')
    ,right_on = ('Ano')
    ,how = 'inner'
)

In [30]:
stage_pib = stage_pib.rename(columns = {'sk_Ano' : 'sk_Ano_PIB'})

#### ID Ano de Referência

In [31]:
stage_pib = pd.merge(
    stage_pib
    ,dimensao_ano[['Ano', 'sk_Ano']]
    ,left_on = ('ano_Referencia')
    ,right_on = ('Ano')
    ,how = 'inner'
)

In [32]:
stage_pib = stage_pib.rename(columns = {'sk_Ano' : 'sk_Ano_Referencia'})

## Tabela Fato Região

In [33]:
fato_regiao = stage_pib.groupby([
    'sk_Regiao'
    ,'sk_Ano_PIB'
    ,'sk_Ano_Referencia'
]).agg({
    'id_Estado' : [('quantidade_Estados', pd.Series.nunique)]
    ,'id_Municipio' : [('quantidade_Municipios', pd.Series.nunique)]
    ,'populacao_Projetada' : [('menor_Populacao', 'min'), ('maior_Populacao', 'max'), ('total_Populacao', 'sum')]
    ,'valor_PIB' : [('menor_valor_PIB', 'min'), ('maior_valor_PIB', 'max'), ('valor_PIB', 'sum')]
    ,'valor_Bruto_Agropecuaria' : [('valor_Agropecuaria', 'sum')]
    ,'valor_Bruto_Industria' : [('valor_Industria', 'sum')]
    ,'valor_Bruto_Servicos' : [('valor_Servicos', 'sum')]
    ,'valor_Bruto_Administracao_Publica' : [('valor_Administracao_Publica', 'sum')]
    ,'valor_Impostos_Liquido' : [('valor_Impostos_Liquido', 'sum')]
    ,'valor_PIB_percapita' : [('menor_PIB_percapita', 'min'), ('maior_PIB_percapita', 'max'), ('valor_PIB_percapita', 'sum')]
    
}).reset_index().sort_values(by = [
    ('sk_Ano_PIB', '')
    ,('valor_PIB', 'valor_PIB')
], ascending=[False, False])

#### Ajustes label das colunas

In [34]:
fato_regiao.columns = [(x[0] if x[1]=='' else x[1]) for x in fato_regiao.columns]

## Salvar dataframe em arquivo local formato parquet

In [35]:
fato_regiao.to_parquet(local_dw + 'fPIB_Regiao.parquet', index=False)

## Tabela Fato Estado

In [36]:
fato_estado = stage_pib.groupby([
    'sk_Estado'
    ,'sk_Ano_PIB'
    ,'sk_Ano_Referencia'
]).agg({
    'id_Municipio' : [('quantidade_Municipios', pd.Series.nunique)]
    ,'populacao_Projetada' : [('menor_Populacao', 'min'), ('maior_Populacao', 'max'), ('total_Populacao', 'sum')]
    ,'valor_PIB' : [('menor_valor_PIB', 'min'), ('maior_valor_PIB', 'max'), ('valor_PIB', 'sum')]
    ,'valor_Bruto_Agropecuaria' : [('valor_Agropecuaria', 'sum')]
    ,'valor_Bruto_Industria' : [('valor_Industria', 'sum')]
    ,'valor_Bruto_Servicos' : [('valor_Servicos', 'sum')]
    ,'valor_Bruto_Administracao_Publica' : [('valor_Administracao_Publica', 'sum')]
    ,'valor_Impostos_Liquido' : [('valor_Impostos_Liquido', 'sum')]
    ,'valor_PIB_percapita' : [('menor_PIB_percapita', 'min'), ('maior_PIB_percapita', 'max'), ('valor_PIB_percapita', 'sum')]
    
}).reset_index().sort_values(by = [
    ('sk_Ano_PIB', '')
    ,('valor_PIB', 'valor_PIB')
], ascending=[False, False])

#### Ajustes label das colunas

In [37]:
fato_estado.columns = [(x[0] if x[1]=='' else x[1]) for x in fato_estado.columns]

## Salvar dataframe em arquivo local formato parquet

In [38]:
fato_estado.to_parquet(local_dw + 'fPIB_Estado.parquet', index=False)

## Tabela Fato Município

In [39]:
fato_municipio = stage_pib.groupby([
    'sk_Municipio'
    ,'sk_Ano_PIB'
    ,'sk_Ano_Referencia'
]).agg({
    'populacao_Projetada' : [('menor_Populacao', 'min'), ('maior_Populacao', 'max'), ('total_Populacao', 'sum')]
    ,'valor_PIB' : [('menor_valor_PIB', 'min'), ('maior_valor_PIB', 'max'), ('valor_PIB', 'sum')]
    ,'valor_Bruto_Agropecuaria' : [('valor_Agropecuaria', 'sum')]
    ,'valor_Bruto_Industria' : [('valor_Industria', 'sum')]
    ,'valor_Bruto_Servicos' : [('valor_Servicos', 'sum')]
    ,'valor_Bruto_Administracao_Publica' : [('valor_Administracao_Publica', 'sum')]
    ,'valor_Impostos_Liquido' : [('valor_Impostos_Liquido', 'sum')]
    ,'valor_PIB_percapita' : [('menor_PIB_percapita', 'min'), ('maior_PIB_percapita', 'max'), ('valor_PIB_percapita', 'sum')]
    
}).reset_index().sort_values(by = [
    ('sk_Ano_PIB', '')
    ,('valor_PIB', 'valor_PIB')
], ascending=[False, False])

#### Ajustes label das colunas

In [40]:
fato_municipio.columns = [(x[0] if x[1]=='' else x[1]) for x in fato_municipio.columns]

## Salvar dataframe em arquivo local formato parquet

In [41]:
fato_municipio.to_parquet(local_dw + 'fPIB_Municipio.parquet', index=False)

In [42]:
#Fim da execução do notebook
end_general = datetime.now()

In [43]:
# Write the DataFrame to a BigQuery table

print("Tempo total notebook " + str(end_general - start_general))

Tempo total notebook 0:00:02.607044
