In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as sqlfuncs
from itertools import chain

# shuffle partition, sexo boleano, cache, lazy
# misturar em 1 arquivo

In [2]:
# Usado somente para processamento em cluster Hadoop
# from pyspark import SparkContext
# sc = SparkContext()

spark = SparkSession.builder \
    .appName("Processador Planos de Saude") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

# da pra infeirr o schema ou fazer os casts
dataset = spark.read.options(
        delimiter=",", 
        header="True", 
        encoding="ISO-8859-1", 
        dateFormat="dd/MM/yyyy",
        inferSchema="True"
    ).csv("csv_exemplo.csv") # 1 ou mais arquivos.. ideal ler todos, depende de particionamento e etc do hive tb

dataset

#ID_TEMPO_COMPETENCIA,CD_OPERADORA,DT_INCLUSAO,CD_BENE_MOTV_INCLUSAO,IND_PORTABILIDADE,ID_MOTIVO_MOVIMENTO,LG_BENEFICIARIO_ATIVO,DT_NASCIMENTO,TP_SEXO,CD_PLANO_RPS,CD_PLANO_SCPA,NR_PLANO_PORTABILIDADE,DT_PRIMEIRA_CONTRATACAO,DT_CONTRATACAO,ID_BENE_TIPO_DEPENDENTE,LG_COBERTURA_PARCIAL,LG_ITEM_EXCLUIDO_COBERTURA,CD_MUNICIPIO,SG_UF,LG_RESIDE_EXTERIOR,DT_REATIVACAO,DT_ULTIMA_REATIVACAO,DT_ULTIMA_MUDA_CONTRATUAL,DT_CANCELAMENTO,DT_ULTIMO_CANCELAMENTO,CD_BENE_MOTIV_CANCELAMENTO,DT_CARGA
202101,358169,16/12/2007,11.0,NAO,31.0,1,13/06/2006,F,463663115.0,,,28/06/2006,14/07/2020,1.0,0.0,0.0,120040,AC,0,,,12/08/2020,,,,05/03/2021
202101,334189,14/09/2005,,NAO,74.0,1,08/07/1957,M,,,,01/04/2004,01/04/2004,1.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,316849,17/01/2000,,NAO,74.0,1,12/05/1953,F,,,,14/10/1977,14/10/1977,,,,120040,AC,0,,,,,,,05/03/2021
202101,334189,11/03/2008,11.0,NAO,,1,03/06/1956,M,,,,01/02/2008,01/02/2008,1.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,334189,14/09/2005,,NAO,74.0,1,16/12/1956,M,,,,01/04/2004,01/04/2004,1.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,403997,08/09/2010,11.0,NAO,74.0,1,17/07/1995,M,428204993.0,,,13/08/2010,13/08/2010,10.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,403997,07/08/2010,11.0,NAO,21.0,1,06/11/1973,F,428204993.0,,,27/11/2000,27/11/2000,3.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,403997,15/11/2007,11.0,NAO,74.0,1,28/12/1973,F,428204993.0,,,04/10/2007,04/10/2007,4.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,403997,08/05/2002,,NAO,22.0,1,15/09/1976,F,428204993.0,,,25/10/2000,25/10/2000,1.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021
202101,403997,15/08/2007,11.0,NAO,74.0,1,07/08/1995,M,428204993.0,,,29/06/2005,29/06/2005,4.0,0.0,0.0,120040,AC,0,,,,,,,05/03/2021


In [3]:
dataset.schema.names

['#ID_TEMPO_COMPETENCIA',
 'CD_OPERADORA',
 'DT_INCLUSAO',
 'CD_BENE_MOTV_INCLUSAO',
 'IND_PORTABILIDADE',
 'ID_MOTIVO_MOVIMENTO',
 'LG_BENEFICIARIO_ATIVO',
 'DT_NASCIMENTO',
 'TP_SEXO',
 'CD_PLANO_RPS',
 'CD_PLANO_SCPA',
 'NR_PLANO_PORTABILIDADE',
 'DT_PRIMEIRA_CONTRATACAO',
 'DT_CONTRATACAO',
 'ID_BENE_TIPO_DEPENDENTE',
 'LG_COBERTURA_PARCIAL',
 'LG_ITEM_EXCLUIDO_COBERTURA',
 'CD_MUNICIPIO',
 'SG_UF',
 'LG_RESIDE_EXTERIOR',
 'DT_REATIVACAO',
 'DT_ULTIMA_REATIVACAO',
 'DT_ULTIMA_MUDA_CONTRATUAL',
 'DT_CANCELAMENTO',
 'DT_ULTIMO_CANCELAMENTO',
 'CD_BENE_MOTIV_CANCELAMENTO',
 'DT_CARGA']

In [4]:
unnecessary_columns = [column for column in dataset.schema.names if column not in ["TP_SEXO", "DT_NASCIMENTO", "SG_UF"]]

clean_dataset = dataset.drop(*tuple(unnecessary_columns))

In [5]:
clean_dataset = clean_dataset.withColumn("DT_NASCIMENTO", sqlfuncs.to_date(sqlfuncs.col("DT_NASCIMENTO"), "dd/MM/yyyy"))
clean_dataset = clean_dataset.withColumn("IDADE", sqlfuncs.floor(sqlfuncs.datediff(sqlfuncs.current_date(), sqlfuncs.to_date(sqlfuncs.col("DT_NASCIMENTO"), "dd/MM/yyyy"))/365.25))
clean_dataset

DT_NASCIMENTO,TP_SEXO,SG_UF,IDADE
2006-06-13,F,AC,14
1957-07-08,M,AC,63
1953-05-12,F,AC,67
1956-06-03,M,AC,64
1956-12-16,M,AC,64
1995-07-17,M,AC,25
1973-11-06,F,AC,47
1973-12-28,F,AC,47
1976-09-15,F,AC,44
1995-08-07,M,AC,25


In [6]:
clean_dataset = clean_dataset.drop("DT_NASCIMENTO")
clean_dataset = clean_dataset.withColumn("ATE_18", 
                         sqlfuncs.when(sqlfuncs.col("IDADE") <= sqlfuncs.lit(18), sqlfuncs.lit(1))
                         .otherwise(sqlfuncs.lit(0))
                        ) \
              .withColumn("ATE_45", 
                         sqlfuncs.when(sqlfuncs.col("IDADE") <= sqlfuncs.lit(45), sqlfuncs.lit(1))
                         .otherwise(sqlfuncs.lit(0))
                        ) \
              .withColumn("ATE_60", 
                         sqlfuncs.when(sqlfuncs.col("IDADE") <= sqlfuncs.lit(60), sqlfuncs.lit(1))
                         .otherwise(sqlfuncs.lit(0))
                        ) \
              .withColumn("ATE_80", 
                         sqlfuncs.when(sqlfuncs.col("IDADE") <= sqlfuncs.lit(80), sqlfuncs.lit(1))
                         .otherwise(sqlfuncs.lit(0))
                        ) \
              .withColumn("MAIOR_80", 
                         sqlfuncs.when(sqlfuncs.col("IDADE") > sqlfuncs.lit(80), sqlfuncs.lit(1))
                         .otherwise(sqlfuncs.lit(0))
                        )
clean_dataset = clean_dataset.drop("IDADE")
clean_dataset

TP_SEXO,SG_UF,ATE_18,ATE_45,ATE_60,ATE_80,MAIOR_80
F,AC,1,1,1,1,0
M,AC,0,0,0,1,0
F,AC,0,0,0,1,0
M,AC,0,0,0,1,0
M,AC,0,0,0,1,0
M,AC,0,1,1,1,0
F,AC,0,0,1,1,0
F,AC,0,0,1,1,0
F,AC,0,1,1,1,0
M,AC,0,1,1,1,0


In [7]:
map_uf_regions = {
    "AC": "Norte",
    "AL": "Nordeste",
    "AP": "Norte",
    "AM": "Norte",
    "BA": "Nordeste",
    "CE": "Nordeste",
    "DF": "Centro-Oeste",
    "ES": "Sudeste",
    "GO": "Centro-Oeste",
    "MA": "Nordeste",
    "MT": "Centro-Oeste",
    "MS": "Centro-Oeste",
    "MG": "Sudeste",
    "PA": "Norte",
    "PB": "Nordeste",
    "PR": "Sul",
    "PE": "Nordeste",
    "PI": "Nordeste",
    "RJ": "Sudeste",
    "RN": "Nordeste",
    "RS": "Sul",
    "RO": "Norte",
    "RR": "Norte",
    "SC": "Sul",
    "SP": "Sudeste",
    "SE": "Nordeste",
    "TO": "Norte"
}

mapping = sqlfuncs.create_map([sqlfuncs.lit(x) for x in chain(*map_uf_regions.items())])


clean_dataset = clean_dataset.withColumn("REGIAO", mapping[clean_dataset['SG_UF']])
clean_dataset = clean_dataset.drop("SG_UF")
clean_dataset

TP_SEXO,ATE_18,ATE_45,ATE_60,ATE_80,MAIOR_80,REGIAO
F,1,1,1,1,0,Norte
M,0,0,0,1,0,Norte
F,0,0,0,1,0,Norte
M,0,0,0,1,0,Norte
M,0,0,0,1,0,Norte
M,0,1,1,1,0,Norte
F,0,0,1,1,0,Norte
F,0,0,1,1,0,Norte
F,0,1,1,1,0,Norte
M,0,1,1,1,0,Norte


In [9]:
processed_data = clean_dataset.groupby("TP_SEXO", "REGIAO").agg(
    sqlfuncs.sum("ATE_18").alias("ATE_18"),
    sqlfuncs.sum("ATE_45").alias("ATE_45"),
    sqlfuncs.sum("ATE_60").alias("ATE_60"),
    sqlfuncs.sum("ATE_80").alias("ATE_80"),
    sqlfuncs.sum("MAIOR_80").alias("MAIOR_80")
).orderBy("REGIAO")

processed_data

TP_SEXO,REGIAO,ATE_18,ATE_45,ATE_60,ATE_80,MAIOR_80
F,Centro-Oeste,4894,19604,31076,41392,1753
M,Centro-Oeste,5131,17642,26881,35735,1115
M,Nordeste,10383,42130,60069,75873,2260
F,Nordeste,10007,47895,74471,97233,4624
F,Norte,10706,41106,59363,74358,3011
M,Norte,11193,36789,50430,61058,1563
F,Sudeste,3593,16813,27647,40130,3354
M,Sudeste,3693,15996,24571,34556,1911
F,Sul,2644,12515,20604,29875,2037
M,Sul,2731,11869,19084,26786,1299


In [14]:
#processed_data.write.csv("../processed_example.csv")
processed_data.toPandas().to_csv("processed_example.csv")