# Script de Modelagem dos Dados

Matheus Costa e Silva - 24 de outubro de 2023

## Preparação de ambiente

In [1]:
import os
import sys
import re
import zipfile
import tempfile
import io
import requests
from functools import reduce
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import sum,avg,max, lit
import pyspark.sql.functions as func,rank, col
from bs4 import BeautifulSoup
from pyspark.sql.window import Window


#Create SparkSession
spark = SparkSession.builder.appName('Daiichi').getOrCreate()

23/10/24 23:50:23 WARN Utils: Your hostname, teste.local resolves to a loopback address: 127.0.0.1; using 192.168.0.68 instead (on interface en0)
23/10/24 23:50:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/24 23:50:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Download da base de dados TISS Ambulatorial

### Base de dados TISS Detalhe

Para fins de otimização de computação e armazenamento, optou-se por baixar somente a competência de dezembro do ano de 2022 para os seguintes estados: Goiás, Bahia e Minas Gerais

In [2]:
from pyspark.sql.functions import input_file_name
import os

# Inicie uma sessão do Spark
spark = SparkSession.builder.appName("Leitura de CSV").getOrCreate()

# Diretório raiz onde estão as pastas das UF's
diretorio_raiz = "/Users/matheuscosta/Untitled Folder/"

# Anos que você deseja processar
anos = ["2022"]

# Lista para armazenar os DataFrames de todas as UF's
dfs_ufs = []

# Loop através das UF's
aux = os.listdir(diretorio_raiz)
ufs = ["GO","MG","BA"]
for uf in ufs:
    for ano in anos:
        # Construa o caminho para o arquivo CSV "CONS" da UF e ano
        caminho_arquivo = os.path.join(diretorio_raiz, uf, ano, "DET")

        # Verifique se o arquivo existe antes de ler
        print(caminho_arquivo)
        df = spark.read.options(delimiter=',').csv(caminho_arquivo, header=True, inferSchema=True)
        df=df.withColumn("UF_Det", lit(uf))
        df=df.withColumn("ANO", lit(ano))
        dfs_ufs.append(df)


df_DET = dfs_ufs[0]
for df in dfs_ufs[1:]:
    df_DET = df_DET.union(df).distinct()

# Mostre o conteúdo lido
#df_total.show()
print((df_DET.count(), len(df_DET.columns)))

23/10/24 23:50:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


/Users/matheuscosta/Untitled Folder/GO/2022/DET


                                                                                

/Users/matheuscosta/Untitled Folder/MG/2022/DET


                                                                                

/Users/matheuscosta/Untitled Folder/BA/2022/DET




(14530030, 12)




### Base de dados TISS Consolidada

In [3]:
from pyspark.sql.functions import input_file_name
import os

# Inicie uma sessão do Spark
spark = SparkSession.builder.appName("Leitura de CSV").getOrCreate()

# Diretório raiz onde estão as pastas das UF's
diretorio_raiz = "/Users/matheuscosta/Untitled Folder/"

# Anos que você deseja processar
anos = ["2022"]

# Lista para armazenar os DataFrames de todas as UF's
dfs_ufs = []

# Loop através das UF's
aux = os.listdir(diretorio_raiz)
ufs = ["GO","MG","BA"]
for uf in ufs:
    for ano in anos:
        # Construa o caminho para o arquivo CSV "CONS" da UF e ano
        caminho_arquivo = os.path.join(diretorio_raiz, uf, ano, "CONS")

        # Verifique se o arquivo existe antes de ler
        print(caminho_arquivo)
        df = spark.read.options(delimiter=',').csv(caminho_arquivo, header=True, inferSchema=True)
        df=df.withColumn("UF_Cons", lit(uf))
        df=df.withColumn("ANO", lit(ano))
        dfs_ufs.append(df)
        num_linhas = df.count()

# Número de colunas
        num_colunas = len(df.columns)

# Exibindo as dimensões
        #print(f"Número de linhas: {num_linhas}")
        #print(f"Número de colunas: {num_colunas}")
        #df.show()
        

df_CONS = dfs_ufs[0]
for df in dfs_ufs[1:]:
    df_CONS = df_CONS.union(df).distinct()

# Mostre o conteúdo lido
#df_total.show()
print((df_CONS.count(), len(df_CONS.columns)))

/Users/matheuscosta/Untitled Folder/GO/2022/CONS




/Users/matheuscosta/Untitled Folder/MG/2022/CONS


                                                                                

/Users/matheuscosta/Untitled Folder/BA/2022/CONS


[Stage 29:>                                                         (0 + 8) / 9]

(7233013, 20)




### Base de dados TISS Planos de Saúde

In [4]:
df_planos = spark.read.options(delimiter=',').csv("BRFULLTOTAL", header=True, inferSchema=True)

## Base de dados Beneficiários Consolidada

In [5]:
from pyspark.sql.functions import input_file_name
import os

# Inicie uma sessão do Spark
spark = SparkSession.builder.appName("Leitura de CSV").getOrCreate()

# Diretório raiz onde estão as pastas das UF's
diretorio_raiz = "/Users/matheuscosta/Untitled Folder/"

# Anos que você deseja processar
anos = ["202212"]

# Lista para armazenar os DataFrames de todas as UF's
dfs_ufs = []

# Loop através das UF's
aux = os.listdir(diretorio_raiz)
ufs = ["GO","MG","BA"]
for uf in ufs:
    for ano in anos:
        print(uf)
        print(ano)
        # Construa o caminho para o arquivo CSV "CONS" da UF e ano
        caminho_arquivo = diretorio_raiz+uf+ano+"TOTAL"
        # Verifique se o arquivo existe antes de ler
        print(caminho_arquivo)
        df = spark.read.options(delimiter=',').csv(caminho_arquivo, header=True, inferSchema=True)
        df=df.withColumn("UF_Ben", lit(uf))
        df=df.withColumn("ANO", lit(ano))
        dfs_ufs.append(df)


df_BEN = dfs_ufs[0]
for df in dfs_ufs[1:]:
    df_BEN = df_BEN.union(df).distinct()

# Mostre o conteúdo lido
#df_total.show()
print((df_BEN.count(), len(df_BEN.columns)))

GO
202212
/Users/matheuscosta/Untitled Folder/GO202212TOTAL
MG
202212
/Users/matheuscosta/Untitled Folder/MG202212TOTAL


                                                                                

BA
202212
/Users/matheuscosta/Untitled Folder/BA202212TOTAL


[Stage 43:>                                                         (0 + 8) / 9]

(3050774, 24)


                                                                                

## Base de dados TUSS

### Utilizada para normalização e acesso de referencia de procedimentos realizados

In [6]:
df_19 = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_19*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_20 = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_20*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_22 = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_22*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_63 = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_63*.csv", header=True, inferSchema=True).select("Código","Grupo")

df_63 = df_63.withColumnRenamed("Código","Código do Termo")
df_63 = df_63.withColumnRenamed("Grupo","Termo")

dfs = [df_19,df_20,df_22,df_63]
df_tuss_proc = reduce(DataFrame.unionAll, dfs).dropDuplicates()
df_tuss_proc = df_tuss_proc.withColumnRenamed("Código do Termo","codTUSS")
#print((df_tuss_proc.count(), len(df_tuss_proc.columns)))

df_carater_atendimento = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_23*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_carater_atendimento = df_carater_atendimento.withColumnRenamed("Código do Termo","codTUSS")

df_tipo_atendimento = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_50*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_tipo_atendimento = df_tipo_atendimento.withColumnRenamed("Código do Termo","codTUSS")

df_cbo = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_24*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_cbo = df_cbo.withColumnRenamed("Código do Termo","codTUSS")

df_tipo_consulta = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_52*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_tipo_consulta = df_tipo_consulta.withColumnRenamed("Código do Termo","codTUSS")

df_tipo_acidente = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/TUSS/Tabela_36*.csv", header=True, inferSchema=True).select("Código do Termo","Termo")
df_tipo_acidente = df_tipo_acidente.withColumnRenamed("Código do Termo","codTUSS")

## Base de dados Rede Credenciada

In [7]:
df_GO = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/prestador_amb/*GO.csv", header=True, inferSchema=True)
df_MG = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/prestador_amb/*MG.csv", header=True, inferSchema=True)
df_BA = spark.read.options(delimiter=';').csv("/Users/matheuscosta/Untitled Folder/prestador_amb/*BA.csv", header=True, inferSchema=True)

dfs = [df_GO,df_MG,df_BA]

df_prestador_amb = reduce(DataFrame.unionAll, dfs)

print((df_prestador_amb.count(), len(df_prestador_amb.columns)))



(5902330, 20)




# Base de dados Disponível para análise

In [None]:
df_DET #evento detalhado Ambulatorial
df_CONS #evento consolidado Ambulatorial
df_planos #descrição plano de saúde

df_BEN #descrição consolidada de beneficiários

TUSS
df_tuss_proc #padronização de dados para procedimentos 

df_carater_atendimento #padronização de carater do atendimento
df_tipo_atendimento #padronização de tipo do atendimento
df_cbo #padronização de ocupação trabalhista
df_tipo_consulta #padronização tipo de consulta
df_tipo_acidente #padronização tipo de acidente

df_prestador_amb #descrição de rede de prestadores não hospitalares


In [9]:
df_prestador_amb.printSchema()

root
 |-- CD_OPERADORA: integer (nullable = true)
 |-- NM_OPERADORA: string (nullable = true)
 |-- GR_MODALIDADE: string (nullable = true)
 |-- ID_ESTABELECIMENTO_SAUDE: integer (nullable = true)
 |-- CD_CNPJ_ESTB_SAUDE: long (nullable = true)
 |-- CD_CNES: integer (nullable = true)
 |-- NM_ESTABELECIMENTO_SAUDE: string (nullable = true)
 |-- DE_CLAS_ESTB_SAUDE: string (nullable = true)
 |-- DE_TIPO_PRESTADOR: string (nullable = true)
 |-- LG_URGENCIA_EMERGENCIA: integer (nullable = true)
 |-- DE_TIPO_CONTRATO: string (nullable = true)
 |-- DE_DISPONIBILIDADE: string (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- NM_MUNICIPIO: string (nullable = true)
 |-- SG_UF: string (nullable = true)
 |-- NM_REGIAO: string (nullable = true)
 |-- DT_VINCULO_OPERADORA_INICIO: date (nullable = true)
 |-- DT_VINCULO_OPERADORA_FIM: date (nullable = true)
 |-- COMPETENCIA: integer (nullable = true)
 |-- DT_ATUALIZACAO: date (nullable = true)



# Modelagem de indicadores

## Tabelas Auxiliares

In [10]:
df_regiao=df_prestador_amb.select("CD_MUNICIPIO","NM_MUNICIPIO","SG_UF").distinct()

df_cred=df_prestador_amb.select("CD_OPERADORA","NM_OPERADORA","CD_MUNICIPIO",
                                "GR_MODALIDADE","DE_CLAS_ESTB_SAUDE","NM_ESTABELECIMENTO_SAUDE").distinct()
df_regiao.printSchema()
df_cred.printSchema()

root
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- NM_MUNICIPIO: string (nullable = true)
 |-- SG_UF: string (nullable = true)

root
 |-- CD_OPERADORA: integer (nullable = true)
 |-- NM_OPERADORA: string (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- GR_MODALIDADE: string (nullable = true)
 |-- DE_CLAS_ESTB_SAUDE: string (nullable = true)
 |-- NM_ESTABELECIMENTO_SAUDE: string (nullable = true)



In [11]:
df_BEN.printSchema()

root
 |-- #ID_CMPT_MOVEL: integer (nullable = true)
 |-- CD_OPERADORA: integer (nullable = true)
 |-- NM_RAZAO_SOCIAL: string (nullable = true)
 |-- NR_CNPJ: long (nullable = true)
 |-- MODALIDADE_OPERADORA: string (nullable = true)
 |-- SG_UF: string (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- NM_MUNICIPIO: string (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- DE_FAIXA_ETARIA: string (nullable = true)
 |-- DE_FAIXA_ETARIA_REAJ: string (nullable = true)
 |-- CD_PLANO: string (nullable = true)
 |-- TP_VIGENCIA_PLANO: string (nullable = true)
 |-- DE_CONTRATACAO_PLANO: string (nullable = true)
 |-- DE_SEGMENTACAO_PLANO: string (nullable = true)
 |-- DE_ABRG_GEOGRAFICA_PLANO: string (nullable = true)
 |-- COBERTURA_ASSIST_PLAN: string (nullable = true)
 |-- TIPO_VINCULO: string (nullable = true)
 |-- QT_BENEFICIARIO_ATIVO: integer (nullable = true)
 |-- QT_BENEFICIARIO_ADERIDO: integer (nullable = true)
 |-- QT_BENEFICIARIO_CANCELADO: integer (nullab

In [12]:
df_benef=df_BEN.select("CD_OPERADORA","CD_MUNICIPIO","TP_SEXO","DE_FAIXA_ETARIA_REAJ",
                       "TIPO_VINCULO","QT_BENEFICIARIO_ATIVO") \
    .filter(df_BEN.COBERTURA_ASSIST_PLAN == "M�dico-hospitalar")

df_benef.printSchema()

root
 |-- CD_OPERADORA: integer (nullable = true)
 |-- CD_MUNICIPIO: integer (nullable = true)
 |-- TP_SEXO: string (nullable = true)
 |-- DE_FAIXA_ETARIA_REAJ: string (nullable = true)
 |-- TIPO_VINCULO: string (nullable = true)
 |-- QT_BENEFICIARIO_ATIVO: integer (nullable = true)



### Fechando a análise para eventos exclusivamente oncoclógicos

In [13]:
df_DET=df_DET.withColumnRenamed("ID_EVENTO_ATENCAO_SAUDE","ID_EVENTO_ATENCAO_SAUDE_DET")


ref=["Quimioterapia","Radioterapia"]
filtro_atend=df_tipo_atendimento.filter(df_tipo_atendimento.Termo.isin(ref))

df_eventos = df_DET.join(df_CONS,df_DET.ID_EVENTO_ATENCAO_SAUDE_DET ==  df_CONS.ID_EVENTO_ATENCAO_SAUDE,"left")

df_eventos.printSchema()

#print((df_eventos.count(), len(df_eventos.columns)))

df_eventos_onco = df_eventos.join(filtro_atend,df_eventos.TIPO_ATENDIMENTO ==  filtro_atend.codTUSS,"inner")

df_eventos_onco.printSchema()
#print((df_eventos_onco.count(), len(df_eventos_onco.columns)))

root
 |-- ID_EVENTO_ATENCAO_SAUDE_DET: long (nullable = true)
 |-- UF_PRESTADOR: string (nullable = true)
 |-- DT_REALIZACAO: timestamp (nullable = true)
 |-- CD_PROCEDIMENTO: string (nullable = true)
 |-- CD_TABELA_REFERENCIA: integer (nullable = true)
 |-- QT_ITEM_EVENTO_INFORMADO: string (nullable = true)
 |-- VL_ITEM_EVENTO_INFORMADO: string (nullable = true)
 |-- VL_ITEM_PAGO_FORNECEDOR: string (nullable = true)
 |-- IND_PACOTE: integer (nullable = true)
 |-- IND_TABELA_PROPRIA: integer (nullable = true)
 |-- UF_Det: string (nullable = false)
 |-- ANO: string (nullable = false)
 |-- ID_EVENTO_ATENCAO_SAUDE: long (nullable = true)
 |-- ID_PLANO: integer (nullable = true)
 |-- FAIXA_ETARIA: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- CD_MUNICIPIO_BENEFICIARIO: integer (nullable = true)
 |-- PORTE: string (nullable = true)
 |-- CD_MODALIDADE: integer (nullable = true)
 |-- NM_MODALIDADE: string (nullable = true)
 |-- CD_MUNICIPIO_PRESTADOR: integer (nullable = t

In [14]:
df_eventos_onco.printSchema()

root
 |-- ID_EVENTO_ATENCAO_SAUDE_DET: long (nullable = true)
 |-- UF_PRESTADOR: string (nullable = true)
 |-- DT_REALIZACAO: timestamp (nullable = true)
 |-- CD_PROCEDIMENTO: string (nullable = true)
 |-- CD_TABELA_REFERENCIA: integer (nullable = true)
 |-- QT_ITEM_EVENTO_INFORMADO: string (nullable = true)
 |-- VL_ITEM_EVENTO_INFORMADO: string (nullable = true)
 |-- VL_ITEM_PAGO_FORNECEDOR: string (nullable = true)
 |-- IND_PACOTE: integer (nullable = true)
 |-- IND_TABELA_PROPRIA: integer (nullable = true)
 |-- UF_Det: string (nullable = false)
 |-- ANO: string (nullable = false)
 |-- ID_EVENTO_ATENCAO_SAUDE: long (nullable = true)
 |-- ID_PLANO: integer (nullable = true)
 |-- FAIXA_ETARIA: string (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- CD_MUNICIPIO_BENEFICIARIO: integer (nullable = true)
 |-- PORTE: string (nullable = true)
 |-- CD_MODALIDADE: integer (nullable = true)
 |-- NM_MODALIDADE: string (nullable = true)
 |-- CD_MUNICIPIO_PRESTADOR: integer (nullable = t

### Criação de indicadores e armazenamento em diretório

In [15]:
indicador_demo=df_eventos_onco.groupBy("PORTE","SEXO","FAIXA_ETARIA","UF_Cons") \
    .agg(sum("QT_ITEM_EVENTO_INFORMADO").alias("soma_item"), \
         sum("VL_ITEM_EVENTO_INFORMADO").alias("soma_valor"), \
         func.count(func.lit(1)).alias("contagem_evento"))

indicador_demo.write.format("csv").option("header",True)\
    .mode('overwrite').save("/Users/matheuscosta/Untitled Folder/indicadores/indicador_demo")

23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:10 WARN RowBasedKeyValueBatch: Calling spill() on

In [16]:
indicador_proc=df_eventos_onco.groupBy("CD_PROCEDIMENTO","SEXO","FAIXA_ETARIA","UF_Cons") \
    .agg(sum("QT_ITEM_EVENTO_INFORMADO").alias("soma_item"), \
         sum("VL_ITEM_EVENTO_INFORMADO").alias("soma_valor"), \
         func.count(func.lit(1)).alias("contagem_evento"))


windowSpec  = Window.partitionBy("UF_Cons").orderBy(col("soma_valor").desc())


indicador_proc = indicador_proc.select('*', rank().over(windowSpec).alias('rank')) \
    .filter(col('rank') <= 10)


indicador_proc.write.format("csv").option("header",True)\
    .mode('overwrite').save("/Users/matheuscosta/Untitled Folder/indicadores/indicador_proc")

23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:50 WARN RowBasedKeyValueBatch: Calling spill() on



23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:52:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

In [17]:
indicador_geo=df_eventos_onco.groupBy("CD_MUNICIPIO_BENEFICIARIO","UF_Cons") \
    .agg(sum("QT_ITEM_EVENTO_INFORMADO").alias("soma_item"), \
         sum("VL_ITEM_EVENTO_INFORMADO").alias("soma_valor"), \
         func.count(func.lit(1)).alias("contagem_evento"))\
    .join(df_regiao,
          df_eventos_onco.CD_MUNICIPIO_BENEFICIARIO ==  df_regiao.CD_MUNICIPIO,"right")

indicador_geo.write.format("csv").option("header",True)\
    .mode('overwrite').save("/Users/matheuscosta/Untitled Folder/indicadores/indicador_geo")

23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:36 WARN RowBasedKeyValueBatch: Calling spill() on

In [18]:
df_cred1 = df_cred.withColumnRenamed("CD_MUNICIPIO","CD_MUNICIPIO1")
indicador_cred=df_cred1.groupBy("CD_MUNICIPIO1","DE_CLAS_ESTB_SAUDE") \
    .agg(func.count(func.lit(1)).alias("contagem_classe"))\
    .join(df_regiao,
          df_cred1.CD_MUNICIPIO1 ==  df_regiao.CD_MUNICIPIO,"right")

indicador_cred.write.format("csv").option("header",True)\
    .mode('overwrite').save("/Users/matheuscosta/Untitled Folder/indicadores/indicador_cred")

23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/24 23:53:51 WARN RowBasedKeyValueBatch: Calling spill() on

In [19]:
df_cred1 = df_cred.withColumnRenamed("CD_MUNICIPIO","CD_MUNICIPIO1")
operadora=df_cred1.select("NM_OPERADORA","CD_MUNICIPIO1").distinct()
operadora_uf = operadora.join(df_regiao, operadora.CD_MUNICIPIO1 == df_regiao.CD_MUNICIPIO,"right")


operadora_uf.write.format("csv").option("header",True)\
    .mode('overwrite').save("/Users/matheuscosta/Untitled Folder/indicadores/operadora_uf")


                                                                                

In [22]:
df_benef.head()

Row(CD_OPERADORA=339679, CD_MUNICIPIO=521880, TP_SEXO='F', DE_FAIXA_ETARIA_REAJ='29 a 33 anos', TIPO_VINCULO='TITULAR', QT_BENEFICIARIO_ATIVO=3)

In [26]:
df_benef1 = df_benef.withColumnRenamed("CD_MUNICIPIO","CD_MUNICIPIO1")
indicador_benef=df_benef1.groupBy("TP_SEXO","CD_MUNICIPIO1","DE_FAIXA_ETARIA_REAJ") \
    .agg(sum("QT_BENEFICIARIO_ATIVO").alias("soma_benef"))\
    .join(df_regiao,
          df_benef1.CD_MUNICIPIO1 ==  df_regiao.CD_MUNICIPIO,"right")


indicador_benef.write.format("csv").option("header",True)\
    .mode('overwrite').save("/Users/matheuscosta/Untitled Folder/indicadores/indicador_benef")


23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/25 02:05:18 WARN RowBasedKeyValueBatch: Calling spill() on

In [30]:
df_tuss_proc.toPandas().to_csv("/Users/matheuscosta/Untitled Folder/indicadores/tuss.csv", sep=',')