### Início

#### Importação das Libs e criação da sessão spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

import pyspark.sql.functions as f

import pandas as pd
import requests
import zipfile
import os

import findspark
findspark.init()

spark = (
    SparkSession
    .builder
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)


21/09/18 16:00:44 WARN Utils: Your hostname, iMac-de-Sergio-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.222 instead (on interface en0)
21/09/18 16:00:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/18 16:00:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
import pyspark.sql.functions as f

# Local Datalake Paths
local_zones = {
    'landing': './data/landing_zone',
    'raw': './data/raw_zone',
    'staging': './data/staging_zone',
    'consumer': './data/consumer_zone',
}

#### Etapa 1 - Download do Dataset de Pensionistas

In [3]:
# Link para download do arquivo de pensionistas
zipUrl = 'http://repositorio.dados.gov.br/segrt/pensionistas/PENSIONISTAS_082021.zip'

### Landing Zone
# Path do diretorio para gravação do arquivo
outPath = f'{local_zones["landing"]}/pensionistas'
os.makedirs(outPath, exist_ok=True)
    
# baixar arquivo direto da fonte
req = requests.get(zipUrl)
zipOut = f'{outPath}/PENSIONISTAS.zip'
print('Saving:', zipOut)
with open(zipOut, 'wb') as f:
    f.write(req.content)
    f.close()


### Raw Zone
outPath = f'{local_zones["raw"]}/pensionistas'
os.makedirs(outPath, exist_ok=True)

# Unzip - Extract all
with zipfile.ZipFile(zipOut, 'r') as zipObj:
   # Extract all
   listOfFileNames = zipObj.namelist()
   # varre a lista de arquivos - file names
   for fileName in listOfFileNames:
       # Apenas .csv
       if fileName.endswith('.csv'):
           print('Unzip:', fileName, ' -> ', outPath)
           zipObj.extract(fileName, path=outPath)

Saving: ./data/landing_zone/pensionistas/PENSIONISTAS.zip
Unzip: PENSIONISTAS_082021.csv  ->  ./data/raw_zone/pensionistas


#### Etapa 02 - Leitura do dataframe da Raw e formatação para Staging

In [4]:
import pyspark.sql.functions as f

csvFile = f'{local_zones["raw"]}/pensionistas/{fileName}'

# Carregando o Dataframe
df_origem = (
    spark.read
    .format('csv')
    .option('sep',';')
    .option('header',True)
    .load(csvFile)
)

# Selecionando as Colunas representativas
cols = ['NOME DO BENEFICIARIO', 
        'NOME DO ORGAO',
        'DATA DE NASCIMENTO',
        'UF DA UPAG DE VINCULACAO',
        'NATUREZA PENSAO',
        'DATA INICIO DO BENEFICIO',
        'DATA FIM DO BENEFICIO',
        'RENDIMENTO LIQUIDO',
        'PAGAMENTO SUSPENSO']

# Renomeando das colunas
df_pensionistas = (
    df_origem
    .select(cols)
    .withColumnRenamed('NOME DO BENEFICIARIO','nome')
    .withColumnRenamed('NOME DO ORGAO','orgao')
    .withColumnRenamed('DATA DE NASCIMENTO','dtnasc')
    .withColumnRenamed('UF DA UPAG DE VINCULACAO','uf')
    .withColumnRenamed('NATUREZA PENSAO','natpensao')
    .withColumnRenamed('DATA INICIO DO BENEFICIO','dtiniben')
    .withColumnRenamed('DATA FIM DO BENEFICIO','dtfimben')
    .withColumnRenamed('RENDIMENTO LIQUIDO','rendLiquido')
    .withColumnRenamed('PAGAMENTO SUSPENSO','pagsuspenso')
)


# Trantando e convertendo as colunas
df_pensionistas = (
    df_pensionistas
    .select('nome', 'orgao', 'dtnasc', 'uf', 'natpensao', 'dtiniben', 'dtfimben', 'rendLiquido', 'pagsuspenso')
    .withColumn('dtnasc', f.to_date( f.col('dtnasc'),'ddMMyyyy') )
    .withColumn('dtiniben', f.to_date( f.col('dtiniben'),'ddMMyyyy') )
    .withColumn('dtfimben', f.to_date( f.col('dtfimben'),'ddMMyyyy') )
    .withColumn('rendLiquido', f.regexp_replace(f.regexp_replace(f.col('rendLiquido'), "\\.", ""), "\\,", "."))
    .withColumn('rendLiquido', f.col('rendLiquido').cast('float') )
    .withColumn('pagsuspenso', f.when(f.col('pagsuspenso') == 'NAO', False)
                            .when(f.col('pagsuspenso') == 'SIM', True)
                            .otherwise(None) )
)

del df_origem



In [5]:
df_pensionistas.limit(5).toPandas()

Unnamed: 0,nome,orgao,dtnasc,uf,natpensao,dtiniben,dtfimben,rendLiquido,pagsuspenso
0,MARIA DA CRUZ DOS SANTOS,"MINIST.DA AGRICULTURA,PECUARIA E ABAST.",1955-01-28,DF,VITALICIA,2021-01-29,,3039.580078,False
1,CARMEN LUCINDA FARKAS DE ARAUJO,"MINIST.DA AGRICULTURA,PECUARIA E ABAST.",1951-04-18,DF,VITALICIA,1998-02-10,,1216.25,False
2,MARIA DE LOURDES DA SILVA,"MINIST.DA AGRICULTURA,PECUARIA E ABAST.",1948-01-14,DF,TEMPORARIA,2016-01-25,,2697.459961,False
3,DIOMARINA ALVES DOS SANTOS,"MINIST.DA AGRICULTURA,PECUARIA E ABAST.",1944-06-18,DF,VITALICIA,2010-12-19,,6953.77002,False
4,MARIZETE DIAS SOUZA,"MINIST.DA AGRICULTURA,PECUARIA E ABAST.",1954-03-18,DF,VITALICIA,2012-10-13,,14994.889648,False


In [6]:
(
    df_pensionistas
    .groupBy('pagsuspenso')
    .agg(f.count('nome'))
    .toPandas()
)



Unnamed: 0,pagsuspenso,count(nome)
0,True,2791
1,False,289255


In [7]:
# dados Nulos

cols = ['nome','orgao','dtnasc','uf','natpensao','dtiniben','dtfimben','rendLiquido','pagsuspenso']
nulos = []
for c in cols:
    nulos.append( [c, df_pensionistas.select(c).filter(f'{c} is NULL').count()] )

nulos




[['nome', 0],
 ['orgao', 0],
 ['dtnasc', 54],
 ['uf', 11],
 ['natpensao', 0],
 ['dtiniben', 842],
 ['dtfimben', 281168],
 ['rendLiquido', 0],
 ['pagsuspenso', 0]]

In [8]:

(
    df_pensionistas
    .select(cols)
    .filter('dtnasc is NULL')
    .limit(5)
    .toPandas()
)



Unnamed: 0,nome,orgao,dtnasc,uf,natpensao,dtiniben,dtfimben,rendLiquido,pagsuspenso
0,MARIETTA DA SILVA BASTOS,DEPARTAMENTO NAC.DE INFRAEST. DE TRANSP.,,RJ,VITALICIA,1994-10-27,,1100.0,False
1,IBERTINA RODRIGUES DA SILVA,DEP.DE CENTRAL.SERV.DE INATIVOS E PENS.,,DF,TEMPORARIA,,,1100.0,False
2,MARIA GENEROSA RODRIGUES,DEP.DE CENTRAL.SERV.DE INATIVOS E PENS.,,DF,TEMPORARIA,,,1100.0,False
3,NAIZA SANTOS MARCOS,DEP.DE CENTRAL.SERV.DE INATIVOS E PENS.,,DF,TEMPORARIA,,,1100.0,False
4,MARIA CONCEICAO DE MELO,DEP.DE CENTRAL.SERV.DE INATIVOS E PENS.,,DF,TEMPORARIA,,,1100.0,False


In [9]:
### Gravação na Staging
outPath = f'{local_zones["staging"]}/pensionistas'
df_pensionistas.write.format('parquet').save(outPath, mode='overwrite')



#### Etapa 03 - Enriquecimento dos dados no dataframe da Staging para gravação no Consumer

In [None]:
# Contagem por UF
(
    df_pensionistas
    .groupBy('uf')
    .agg(
        f.format_number(f.count(f.col('nome')),0).alias('Quant')
        )
    .orderBy('uf')
    .toPandas()
)

In [15]:
# Trantando e convertendo as colunas
df_pensionistas = (
    df_pensionistas
    .select('nome', 'orgao', 'dtnasc', 'uf', 'natpensao', 'dtiniben', 'dtfimben', 'rendLiquido', 'pagsuspenso')
    .withColumn('limitMax35', f.expr('round(rendLiquido*0.35,2)').cast('float') )
    .withColumn('limitMax40', f.expr('round(rendLiquido*0.40,2)').cast('float') )
    .withColumn('codFaixaRenda', f.when(f.col('rendLiquido').between(0,3000),1)
                             .when(f.col('rendLiquido').between(3001,7000),2)
                             .when(f.col('rendLiquido').between(7001,15000),3)
                             .when(f.col('rendLiquido').between(15001,30000),4)
                             .otherwise(5) )
    .withColumn('descFaixaRenda', f.when(f.col('codFaixaRenda')==1,'Entre 0 e 3000')
                             .when(f.col('codFaixaRenda')==2,'Entre 3001 e 7000')
                             .when(f.col('codFaixaRenda')==3,'Entre 7001 e 15000')
                             .when(f.col('codFaixaRenda')==4,'Entre 15001 e 30000')
                             .otherwise('Acima de 30.000') )
    .withColumn('idade' , f.floor( f.datediff( f.current_date(), f.col('dtnasc') ) / 365))
    .withColumn('codFaixaIdade', f.when(f.col('idade').between(0,15),1)
                             .when(f.col('idade').between(16,30),2)
                             .when(f.col('idade').between(31,60),3)
                             .when(f.isnull(f.col('idade')),0)
                             .otherwise(4) )    
    .withColumn('descFaixaIdade', f.when(f.col('codFaixaIdade')==1,'Entre 0 e 15 anos')
                             .when(f.col('codFaixaIdade')==2,'Entre 16 e 30 anos')
                             .when(f.col('codFaixaIdade')==3,'Entre 31 e 60 anos')
                             .when(f.col('codFaixaIdade')==0,'Idade nao informada')
                             .otherwise('Acima de 60 anos') )
)

In [16]:
# Contagem por Faixa de Idade
(
    df_pensionistas
    .groupBy('codFaixaIdade','descFaixaIdade')
    .agg(
        f.format_number(f.count(f.col('nome')),0).alias('Quant')
        )
    .orderBy('codFaixaIdade','descFaixaIdade')
    .toPandas()
)



Unnamed: 0,codFaixaIdade,descFaixaIdade,Quant
0,0,Idade nao informada,54
1,1,Entre 0 e 15 anos,3380
2,2,Entre 16 e 30 anos,6113
3,3,Entre 31 e 60 anos,67230
4,4,Acima de 60 anos,215269


In [17]:
# Contagem por Faixa de Renda
(
    df_pensionistas
    .groupBy('codFaixaRenda','descFaixaRenda')
    .agg(
        f.format_number(f.count(f.col('nome')),0).alias('Quant')
        )
    .orderBy('codFaixaRenda','descFaixaRenda')
    .toPandas()
)



Unnamed: 0,codFaixaRenda,descFaixaRenda,Quant
0,1,Entre 0 e 3000,129938
1,2,Entre 3001 e 7000,124437
2,3,Entre 7001 e 15000,28801
3,4,Entre 15001 e 30000,8607
4,5,Acima de 30.000,263


In [18]:
### Gravação na Consumer
outPath = f'{local_zones["consumer"]}/pensionistas'
df_pensionistas.write.format('parquet').save(outPath, mode='overwrite')

