# MVP - Engenharia de Dados
## Análise de índices de mortalidade no Brasil

# 1️⃣ Objetivo
_____

Este MVP tem como objetivo construir um pipeline de dados na nuvem para analisar dados de mortalidade no Brasil, utilizando tecnologias em nuvem, como Databricks e seu Delta Lake. O pipeline envolverá as etapas de busca, coleta, modelagem, carga e análise dos dados, com o propósito de fornecer insights sobre as causas e padrões de mortalidade no país.

O problema central que este MVP busca resolver é a falta de uma visão consolidada e acessível dos dados de mortalidade, que permita identificar tendências, anomalias e fatores relevantes para a saúde pública. Para isso, serão respondidas as seguintes perguntas:

- Qual o número total de óbitos registrados no Brasil ao longo dos anos?
 
- Quais são as principais causas de morte no Brasil? 
 
- Qual a distribuição da mortalidade por faixa etária? 
 
- Quais estados e municípios apresentam as maiores taxas de mortalidade? 
 
- Qual a proporção de mortes violentas em relação ao total de óbitos? 
 
- Houve mudanças nas principais causas de morte ao longo das décadas? 
 
- A escolaridade influencia as causas de morte? 
 
- Qual é a proporção de óbitos que ocorreram em hospitais versus outros locais (domicílio, via pública, etc.)? 
 
- Qual é a distribuição de óbitos por raça/cor? 
 
- Quantos óbitos foram reclassificados após investigação? 
 
- O estado civil tem relação com a mortalidade? 
 
- Existe correlação entre densidade populacional e taxa de mortalidade? 
 
- Qual a média de idade das pessoas no momento da morte? 
 
- O sexo das vítimas influencia as causas da morte? 
 
- Quais são as causas de morte mais comuns entre crianças, adultos e idosos? 
 
- Qual a relação entre ocupação profissional e causas de morte? 
 
- Qual a relação entre idade da mãe e mortalidade infantil? 
 
- A disponibilidade de hospitais impacta as taxas de mortalidade? 
 
- Existe uma diferença significativa na mortalidade entre capitais e cidades do interior? 
 
- Em quais horários ocorrem mais mortes violentas? 
 
- A taxa de homicídios e suicídios aumentou ou diminuiu ao longo dos anos? 
 
- Quais regiões apresentam maior incidência de homicídios e suicídios? 
 
- Qual é a relação entre óbitos e condições socioeconômicas (e.g., escolaridade, ocupação, região)? 
 
- Qual é a distribuição de óbitos por causas relacionadas ao trabalho? 

Ao final do projeto, espera-se entregar uma base de dados confiável com análises que contribuam para a compreensão dos fatores que impactam a mortalidade no Brasil.

# 2️⃣ Fonte dos dados e coleta
___

Descrever como e onde os dados foram coletados.

# 3️⃣ Modelagem
___

- Explicar o modelo flat e como se chegou nele.
- Fazer o Catálogo dos dados contendo minimamente uma descrição detalhada dos dados e seus domínios, contendo valores mínimos e máximos esperados para dados numéricos, e possíveis categorias para dados categóricos.
- Explicar qual técnica foi utilizada para compor o conjunto de dados.

# 4️⃣ Carga
___

- Utilizar pipelines de ETL (Extração, Transformação e Carga).
- Deve-se documentar os processos de transformação e carga, principalmente os de transformação, e.g. a junção e conciliação de dois conjuntos de dados diferentes.


# 🅰️ Importar bibliotecas

In [0]:
from pyspark.sql.functions import lit  # Para adicionar colunas ausentes preenchidas com null
from collections import defaultdict  # Para verificar os tipos de colunas existentes no dataframe

# 🅱️ Tratamento dos dados

In [0]:
# Caminho da pasta onde os arquivos CSV estão armazenados
caminho_pasta = "dbfs:/FileStore/MortalidadeGeral"

# Lista todos os arquivos CSV na pasta
arquivos_csv = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).listStatus(spark._jvm.org.apache.hadoop.fs.Path(caminho_pasta))
arquivos_csv = [arquivo.getPath().getName() for arquivo in arquivos_csv if arquivo.getPath().getName().endswith('.csv')]

# Itera sobre cada arquivo CSV
for arquivo_csv in arquivos_csv:
    caminho_completo = f"{caminho_pasta}/{arquivo_csv}"
    
    # Lê o arquivo CSV usando Spark
    df = spark.read.csv(caminho_completo, sep =';', header=True, inferSchema=True)

    # Obtém o número de linhas
    num_linhas = df.count()
    num_linhas = "{:,.0f}".format(num_linhas).replace(",", ".")
    
    # Obtém o número de colunas
    num_colunas = len(df.columns)
    
    # Obtém o nome das colunas
    nomes_colunas = df.columns
    
    # Exibe as informações
    print(f"Arquivo: {arquivo_csv}")
    print(f"Número de linhas: {num_linhas}")
    print(f"Número de colunas: {num_colunas}")
    print(f"Nomes das colunas: {nomes_colunas}")
    print("-" * 40)


Arquivo: Mortalidade_Geral_1979.csv
Número de linhas: 711.742
Número de colunas: 46
Nomes das colunas: ['CONTADOR', 'REGISTRO', 'DATAREG', 'TIPOBITO', 'DATAOBITO', 'ESTCIVIL', 'SEXO', 'DATANASC', 'IDADE', 'LOCOCOR', 'CODIGO', 'MUNIOCOR', 'MUNIRES', 'AREARES', 'OCUPACAO', 'NATURAL', 'INSTRUCAO', 'OCUPPAI', 'INSTRPAI', 'OCUPMAE', 'IDADEMAE', 'INSTRMAE', 'FILHVIVOS', 'FILHMORT', 'SEMANGEST', 'TIPOGRAV', 'TIPOPARTO', 'PESONASC', 'ASSISTMED', 'ATESTANTE', 'EXAME', 'CIRURGIA', 'NECROPSIA', 'OBITOFE1', 'OBITOFE2', 'CAUSABAS', 'TIPOVIOL', 'TIPOACID', 'FONTINFO', 'ACIDTRAB', 'LOCACID', 'CRITICA', 'NUMEXPORT', 'CRSOCOR', 'CRSRES', 'UFINFORM']
----------------------------------------
Arquivo: Mortalidade_Geral_1980.csv
Número de linhas: 750.727
Número de colunas: 46
Nomes das colunas: ['CONTADOR', 'REGISTRO', 'DATAREG', 'TIPOBITO', 'DATAOBITO', 'ESTCIVIL', 'SEXO', 'DATANASC', 'IDADE', 'LOCOCOR', 'CODIGO', 'MUNIOCOR', 'MUNIRES', 'AREARES', 'OCUPACAO', 'NATURAL', 'INSTRUCAO', 'OCUPPAI', 'INSTRPAI',

In [0]:
# Defina a ordem desejada das colunas
ordem_colunas = [
   'CONTADOR', 'CARTORIO', 'REGISTRO', 'DATAREG', 'TIPOBITO', 'DATAOBITO', 'ESTCIVIL', 'SEXO', 'DATANASC', 'IDADE', 'LOCOCOR', 'CODIGO', 'MUNIOCOR', 'MUNIRES', 'BAIRES', 'AREARES', 'OCUPACAO', 'NATURAL', 'INSTRUCAO', 'OCUPPAI', 'INSTRPAI', 'OCUPMAE', 'IDADEMAE', 'INSTRMAE', 'FILHVIVOS', 'FILHMORT', 'SEMANGEST', 'TIPOGRAV', 'TIPOPARTO', 'PESONASC', 'ASSISTMED', 'ATESTANTE', 'EXAME', 'CIRURGIA', 'NECROPSIA', 'OBITOFE1', 'OBITOFE2', 'CAUSABAS', 'TIPOVIOL', 'TIPOACID', 'FONTINFO', 'ACIDTRAB', 'LOCACID', 'CRITICA', 'NUMEXPORT', 'CRSOCOR', 'CRSRES', 'UFINFORM', 'RACACOR', 'ETNIA', 'DTOBITO', 'DTNASC', 'ESTCIV', 'ESC', 'OCUP', 'CODBAIRES', 'CODMUNRES', 'CODMUNOCOR', 'ESCMAE', 'QTDFILVIVO', 'QTDFILMORT', 'GRAVIDEZ', 'GESTACAO', 'PARTO', 'OBITOPARTO', 'PESO', 'OBITOGRAV', 'OBITOPUERP', 'LINHAA', 'LINHAB', 'LINHAC', 'LINHAD', 'LINHAII', 'CIRCOBITO', 'FONTE', 'CODESTAB', 'HORAOBITO', 'CODBAIOCOR', 'TPASSINA', 'DTATESTADO', 'TPPOS', 'DTINVESTIG', 'CAUSABAS_O', 'DTCADASTRO', 'FONTEINV', 'DTRECEBIM', 'CB_PRE', 'NUMERODN', 'MORTEPARTO', 'TPOBITOCOR', 'ORIGEM', 'DTCADINF', 'DTCADINV', 'COMUNSVOIM', 'DTRECORIG', 'DTRECORIGA', 'CAUSAMAT', 'ESC2010', 'ESCMAE2010', 'STDOEPIDEM', 'STDONOVA', 'CODMUNCART', 'CODCART', 'NUMREGCART', 'DTREGCART', 'SERIESCFAL', 'ESCMAEAGR1', 'ESCFALAGR1', 'SERIESCMAE', 'SEMAGESTAC', 'TPMORTEOCO', 'EXPDIFDATA', 'DIFDATA', 'DTCONINV', 'DTCONCASO', 'NUDIASOBIN', 'CODMUNNATU', 'ESTABDESCR', 'CRM', 'NUMEROLOTE', 'STCODIFICA', 'CODIFICADO', 'VERSAOSIST', 'VERSAOSCB', 'ATESTADO', 'NUDIASOBCO', 'FONTES', 'TPRESGINFO', 'TPNIVELINV', 'NUDIASINF', 'FONTESINF', 'ALTCAUSA'
]

# Caminho da pasta com os arquivos CSV
caminho_pasta = 'dbfs:/FileStore/MortalidadeGeral/'

# Listar todos os arquivos na pasta
arquivos = [file.path for file in dbutils.fs.ls(caminho_pasta) if file.path.endswith(".csv")]

# Dicionário para armazenar os DataFrames
dfs = {}

# Processar cada arquivo
for caminho_csv in arquivos:
    ano = caminho_csv.split('_')[-1].split('.')[0]  # Extrair o ano do nome do arquivo
    nome_variavel = f"df{ano}"
    print(f"Processando arquivo: {caminho_csv}")

    # Ler arquivo CSV
    df = spark.read.csv(caminho_csv, sep =';', header = True)

    # Verificar as colunas existentes no DataFrame
    colunas_existentes = df.columns

    # Criar um novo DataFrame com a ordem desejada
    df_reordenado = df

    # Adicionar colunas ausentes preenchidas com null
    for coluna in ordem_colunas:
        if coluna not in colunas_existentes:
            df_reordenado = df_reordenado.withColumn(coluna, lit(None))

    # Reordenar as colunas
    df_reordenado = df_reordenado.select(ordem_colunas)

        # Armazenar o DataFrame processado na variável correspondente
    dfs[nome_variavel] = df_reordenado
    print(f"DataFrame armazenado: {nome_variavel}")

print("Processamento concluído!")


Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1979.csv
DataFrame armazenado: df1979
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1980.csv
DataFrame armazenado: df1980
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1981.csv
DataFrame armazenado: df1981
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1982.csv
DataFrame armazenado: df1982
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1983.csv
DataFrame armazenado: df1983
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1984.csv
DataFrame armazenado: df1984
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1985.csv
DataFrame armazenado: df1985
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1986.csv
DataFrame armazenado: df1986
Processando arquivo: dbfs:/FileStore/MortalidadeGeral/Mortalidade_Geral_1987.csv
DataFrame armazenado: df1987
Processand

In [0]:
# Cria o DataFrame final
df_final = None

for nome_variavel, df in dfs.items():
    if df_final is None:
        df_final = df  # Inicializa o DataFrame final com o primeiro DataFrame
    else:
        df_final = df_final.union(df)  # Adiciona os demais DataFrames ao DataFrame final

# Exibir o DataFrame final
display(df_final.limit(5))

CONTADOR,CARTORIO,REGISTRO,DATAREG,TIPOBITO,DATAOBITO,ESTCIVIL,SEXO,DATANASC,IDADE,LOCOCOR,CODIGO,MUNIOCOR,MUNIRES,BAIRES,AREARES,OCUPACAO,NATURAL,INSTRUCAO,OCUPPAI,INSTRPAI,OCUPMAE,IDADEMAE,INSTRMAE,FILHVIVOS,FILHMORT,SEMANGEST,TIPOGRAV,TIPOPARTO,PESONASC,ASSISTMED,ATESTANTE,EXAME,CIRURGIA,NECROPSIA,OBITOFE1,OBITOFE2,CAUSABAS,TIPOVIOL,TIPOACID,FONTINFO,ACIDTRAB,LOCACID,CRITICA,NUMEXPORT,CRSOCOR,CRSRES,UFINFORM,RACACOR,ETNIA,DTOBITO,DTNASC,ESTCIV,ESC,OCUP,CODBAIRES,CODMUNRES,CODMUNOCOR,ESCMAE,QTDFILVIVO,QTDFILMORT,GRAVIDEZ,GESTACAO,PARTO,OBITOPARTO,PESO,OBITOGRAV,OBITOPUERP,LINHAA,LINHAB,LINHAC,LINHAD,LINHAII,CIRCOBITO,FONTE,CODESTAB,HORAOBITO,CODBAIOCOR,TPASSINA,DTATESTADO,TPPOS,DTINVESTIG,CAUSABAS_O,DTCADASTRO,FONTEINV,DTRECEBIM,CB_PRE,NUMERODN,MORTEPARTO,TPOBITOCOR,ORIGEM,DTCADINF,DTCADINV,COMUNSVOIM,DTRECORIG,DTRECORIGA,CAUSAMAT,ESC2010,ESCMAE2010,STDOEPIDEM,STDONOVA,CODMUNCART,CODCART,NUMREGCART,DTREGCART,SERIESCFAL,ESCMAEAGR1,ESCFALAGR1,SERIESCMAE,SEMAGESTAC,TPMORTEOCO,EXPDIFDATA,DIFDATA,DTCONINV,DTCONCASO,NUDIASOBIN,CODMUNNATU,ESTABDESCR,CRM,NUMEROLOTE,STCODIFICA,CODIFICADO,VERSAOSIST,VERSAOSCB,ATESTADO,NUDIASOBCO,FONTES,TPRESGINFO,TPNIVELINV,NUDIASINF,FONTESINF,ALTCAUSA
1,,,,2,790600,2,1,,463,1,,1100304,1200000,,,612,832,2,,,,,,,,,,,0,0,1,0,0,0,,,8199,3,,,0,1,,,,,11,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,,,,2,790100,1,2,,467,3,,1200104,1200104,,,7,812,1,,,,,,,,,,,0,2,0,0,0,0,,,7989,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,,,,2,790100,1,2,,401,1,,1200104,1200104,,,0,812,1,,,,,,,,,,,0,1,1,2,2,2,,,90,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,,,,2,790100,1,1,,404,1,,1200104,1200104,,,0,812,1,,,,,,,,,,,0,1,1,2,2,2,,,90,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,,,,2,790100,3,1,,477,3,,1200104,1200104,,,7,813,1,,,,,,,,,,,0,2,0,0,0,0,,,7989,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
df_final.coalesce(1).write.format("csv") \
                   .mode("overwrite") \
                   .option("header", "true") \
                   .save("dbfs:/FileStore/tables")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2065836568278290>:1[0m
[0;32m----> 1[0m [43mdf_final[49m[38;5;241;43m.[39;49m[43mcoalesce[49m[43m([49m[38;5;241;43m1[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcsv[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      2[0m [43m                   [49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m [49m[43m\[49m
[1;32m      3[0m [43m                   [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"

Tentamos salvar o dataframe df_final em CSV no DBFS, porém obtivemos o seguinte erro: 
AnalysisException: Column `CARTORIO` has a data type of void, which is not supported by CSV.
Portanto, vamos verificar os tipos de cada coluna para deletar os de tipo void.

In [0]:
# Criar um dicionário para agrupar colunas por tipo
colunas_por_tipo = defaultdict(list)

# Preencher o dicionário
for col, dtype in df_final.dtypes:
    colunas_por_tipo[dtype].append(col)

# Exibir o resultado
for tipo, colunas in colunas_por_tipo.items():
    print(f"\n🔹 Tipo: {tipo}")
    print(colunas)


🔹 Tipo: string
['CONTADOR', 'REGISTRO', 'DATAREG', 'TIPOBITO', 'DATAOBITO', 'ESTCIVIL', 'SEXO', 'DATANASC', 'IDADE', 'LOCOCOR', 'CODIGO', 'MUNIOCOR', 'MUNIRES', 'AREARES', 'OCUPACAO', 'NATURAL', 'INSTRUCAO', 'OCUPPAI', 'INSTRPAI', 'OCUPMAE', 'IDADEMAE', 'INSTRMAE', 'FILHVIVOS', 'FILHMORT', 'SEMANGEST', 'TIPOGRAV', 'TIPOPARTO', 'PESONASC', 'ASSISTMED', 'ATESTANTE', 'EXAME', 'CIRURGIA', 'NECROPSIA', 'OBITOFE1', 'OBITOFE2', 'CAUSABAS', 'TIPOVIOL', 'TIPOACID', 'FONTINFO', 'ACIDTRAB', 'LOCACID', 'CRITICA', 'NUMEXPORT', 'CRSOCOR', 'CRSRES', 'UFINFORM', 'RACACOR', 'ETNIA', 'DTOBITO', 'DTNASC', 'ESTCIV', 'ESC', 'OCUP', 'CODMUNRES', 'CODMUNOCOR', 'ESCMAE', 'QTDFILVIVO', 'QTDFILMORT', 'GRAVIDEZ', 'GESTACAO', 'PARTO', 'OBITOPARTO', 'PESO', 'OBITOGRAV', 'OBITOPUERP', 'LINHAA', 'LINHAB', 'LINHAC', 'LINHAD', 'LINHAII', 'CIRCOBITO', 'FONTE', 'CODESTAB', 'HORAOBITO', 'TPASSINA', 'DTATESTADO', 'TPPOS', 'DTINVESTIG', 'CAUSABAS_O', 'DTCADASTRO', 'FONTEINV', 'DTRECEBIM', 'CB_PRE', 'MORTEPARTO', 'TPOBITOC

In [0]:
# Lista de colunas que para remover
colunas_para_remover = ['CARTORIO', 'BAIRES', 'CODBAIRES', 'CODBAIOCOR', 'NUMERODN', 'CODCART', 'CRM']

# Remover as colunas desnecessárias
df_final = df_final.drop(*colunas_para_remover)

# Exibir o DataFrame resultante
display(df_final.limit(5))


CONTADOR,REGISTRO,DATAREG,TIPOBITO,DATAOBITO,ESTCIVIL,SEXO,DATANASC,IDADE,LOCOCOR,CODIGO,MUNIOCOR,MUNIRES,AREARES,OCUPACAO,NATURAL,INSTRUCAO,OCUPPAI,INSTRPAI,OCUPMAE,IDADEMAE,INSTRMAE,FILHVIVOS,FILHMORT,SEMANGEST,TIPOGRAV,TIPOPARTO,PESONASC,ASSISTMED,ATESTANTE,EXAME,CIRURGIA,NECROPSIA,OBITOFE1,OBITOFE2,CAUSABAS,TIPOVIOL,TIPOACID,FONTINFO,ACIDTRAB,LOCACID,CRITICA,NUMEXPORT,CRSOCOR,CRSRES,UFINFORM,RACACOR,ETNIA,DTOBITO,DTNASC,ESTCIV,ESC,OCUP,CODMUNRES,CODMUNOCOR,ESCMAE,QTDFILVIVO,QTDFILMORT,GRAVIDEZ,GESTACAO,PARTO,OBITOPARTO,PESO,OBITOGRAV,OBITOPUERP,LINHAA,LINHAB,LINHAC,LINHAD,LINHAII,CIRCOBITO,FONTE,CODESTAB,HORAOBITO,TPASSINA,DTATESTADO,TPPOS,DTINVESTIG,CAUSABAS_O,DTCADASTRO,FONTEINV,DTRECEBIM,CB_PRE,MORTEPARTO,TPOBITOCOR,ORIGEM,DTCADINF,DTCADINV,COMUNSVOIM,DTRECORIG,DTRECORIGA,CAUSAMAT,ESC2010,ESCMAE2010,STDOEPIDEM,STDONOVA,CODMUNCART,NUMREGCART,DTREGCART,SERIESCFAL,ESCMAEAGR1,ESCFALAGR1,SERIESCMAE,SEMAGESTAC,TPMORTEOCO,EXPDIFDATA,DIFDATA,DTCONINV,DTCONCASO,NUDIASOBIN,CODMUNNATU,ESTABDESCR,NUMEROLOTE,STCODIFICA,CODIFICADO,VERSAOSIST,VERSAOSCB,ATESTADO,NUDIASOBCO,FONTES,TPRESGINFO,TPNIVELINV,NUDIASINF,FONTESINF,ALTCAUSA
1,,,2,790600,2,1,,463,1,,1100304,1200000,,612,832,2,,,,,,,,,,,0,0,1,0,0,0,,,8199,3,,,0,1,,,,,11,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,,,2,790100,1,2,,467,3,,1200104,1200104,,7,812,1,,,,,,,,,,,0,2,0,0,0,0,,,7989,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,,,2,790100,1,2,,401,1,,1200104,1200104,,0,812,1,,,,,,,,,,,0,1,1,2,2,2,,,90,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,,,2,790100,1,1,,404,1,,1200104,1200104,,0,812,1,,,,,,,,,,,0,1,1,2,2,2,,,90,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,,,2,790100,3,1,,477,3,,1200104,1200104,,7,813,1,,,,,,,,,,,0,2,0,0,0,0,,,7989,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
# Salvar a tabela Delta em um local persistente
df_final.write.format("delta") \
              .mode("overwrite") \
              .option("path", "dbfs:/mnt/delta/mortalidade_geral") \
              .saveAsTable("mortalidade_geral")

In [0]:
# Carregar tabela delta salva no DBFS
df_final = spark.read.format("delta").load("dbfs:/mnt/delta/mortalidade_geral")
display(df_final.limit(5))


CONTADOR,REGISTRO,DATAREG,TIPOBITO,DATAOBITO,ESTCIVIL,SEXO,DATANASC,IDADE,LOCOCOR,CODIGO,MUNIOCOR,MUNIRES,AREARES,OCUPACAO,NATURAL,INSTRUCAO,OCUPPAI,INSTRPAI,OCUPMAE,IDADEMAE,INSTRMAE,FILHVIVOS,FILHMORT,SEMANGEST,TIPOGRAV,TIPOPARTO,PESONASC,ASSISTMED,ATESTANTE,EXAME,CIRURGIA,NECROPSIA,OBITOFE1,OBITOFE2,CAUSABAS,TIPOVIOL,TIPOACID,FONTINFO,ACIDTRAB,LOCACID,CRITICA,NUMEXPORT,CRSOCOR,CRSRES,UFINFORM,RACACOR,ETNIA,DTOBITO,DTNASC,ESTCIV,ESC,OCUP,CODMUNRES,CODMUNOCOR,ESCMAE,QTDFILVIVO,QTDFILMORT,GRAVIDEZ,GESTACAO,PARTO,OBITOPARTO,PESO,OBITOGRAV,OBITOPUERP,LINHAA,LINHAB,LINHAC,LINHAD,LINHAII,CIRCOBITO,FONTE,CODESTAB,HORAOBITO,TPASSINA,DTATESTADO,TPPOS,DTINVESTIG,CAUSABAS_O,DTCADASTRO,FONTEINV,DTRECEBIM,CB_PRE,MORTEPARTO,TPOBITOCOR,ORIGEM,DTCADINF,DTCADINV,COMUNSVOIM,DTRECORIG,DTRECORIGA,CAUSAMAT,ESC2010,ESCMAE2010,STDOEPIDEM,STDONOVA,CODMUNCART,NUMREGCART,DTREGCART,SERIESCFAL,ESCMAEAGR1,ESCFALAGR1,SERIESCMAE,SEMAGESTAC,TPMORTEOCO,EXPDIFDATA,DIFDATA,DTCONINV,DTCONCASO,NUDIASOBIN,CODMUNNATU,ESTABDESCR,NUMEROLOTE,STCODIFICA,CODIFICADO,VERSAOSIST,VERSAOSCB,ATESTADO,NUDIASOBCO,FONTES,TPRESGINFO,TPNIVELINV,NUDIASINF,FONTESINF,ALTCAUSA
1,,,2,790600,2,1,,463,1,,1100304,1200000,,612,832,2,,,,,,,,,,,0,0,1,0,0,0,,,8199,3,,,0,1,,,,,11,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,,,2,790100,1,2,,467,3,,1200104,1200104,,7,812,1,,,,,,,,,,,0,2,0,0,0,0,,,7989,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,,,2,790100,1,2,,401,1,,1200104,1200104,,0,812,1,,,,,,,,,,,0,1,1,2,2,2,,,90,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,,,2,790100,1,1,,404,1,,1200104,1200104,,0,812,1,,,,,,,,,,,0,1,1,2,2,2,,,90,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,,,2,790100,3,1,,477,3,,1200104,1200104,,7,813,1,,,,,,,,,,,0,2,0,0,0,0,,,7989,0,,,0,0,,,,,12,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
%sql
CREATE TABLE mortalidade_geral
USING delta
LOCATION 'dbfs:/mnt/delta/mortalidade_geral';

In [0]:
# Consulta SQL para contar DATAOBITO com menos de 8 caracteres
query = """
    SELECT *
    FROM mortalidade_geral
    WHERE LENGTH(DATAOBITO) < 4
"""

# Executar a consulta
resultado = spark.sql(query)

# Exibir o resultado
resultado.display()

CONTADOR,REGISTRO,DATAREG,TIPOBITO,DATAOBITO,ESTCIVIL,SEXO,DATANASC,IDADE,LOCOCOR,CODIGO,MUNIOCOR,MUNIRES,AREARES,OCUPACAO,NATURAL,INSTRUCAO,OCUPPAI,INSTRPAI,OCUPMAE,IDADEMAE,INSTRMAE,FILHVIVOS,FILHMORT,SEMANGEST,TIPOGRAV,TIPOPARTO,PESONASC,ASSISTMED,ATESTANTE,EXAME,CIRURGIA,NECROPSIA,OBITOFE1,OBITOFE2,CAUSABAS,TIPOVIOL,TIPOACID,FONTINFO,ACIDTRAB,LOCACID,CRITICA,NUMEXPORT,CRSOCOR,CRSRES,UFINFORM,RACACOR,ETNIA,DTOBITO,DTNASC,ESTCIV,ESC,OCUP,CODMUNRES,CODMUNOCOR,ESCMAE,QTDFILVIVO,QTDFILMORT,GRAVIDEZ,GESTACAO,PARTO,OBITOPARTO,PESO,OBITOGRAV,OBITOPUERP,LINHAA,LINHAB,LINHAC,LINHAD,LINHAII,CIRCOBITO,FONTE,CODESTAB,HORAOBITO,TPASSINA,DTATESTADO,TPPOS,DTINVESTIG,CAUSABAS_O,DTCADASTRO,FONTEINV,DTRECEBIM,CB_PRE,MORTEPARTO,TPOBITOCOR,ORIGEM,DTCADINF,DTCADINV,COMUNSVOIM,DTRECORIG,DTRECORIGA,CAUSAMAT,ESC2010,ESCMAE2010,STDOEPIDEM,STDONOVA,CODMUNCART,NUMREGCART,DTREGCART,SERIESCFAL,ESCMAEAGR1,ESCFALAGR1,SERIESCMAE,SEMAGESTAC,TPMORTEOCO,EXPDIFDATA,DIFDATA,DTCONINV,DTCONCASO,NUDIASOBIN,CODMUNNATU,ESTABDESCR,NUMEROLOTE,STCODIFICA,CODIFICADO,VERSAOSIST,VERSAOSCB,ATESTADO,NUDIASOBCO,FONTES,TPRESGINFO,TPNIVELINV,NUDIASINF,FONTESINF,ALTCAUSA
1576,,,2,95,,9.0,,305,,,2400000,2400000,,,,,,,,,,,,,,,,9.0,,,,,,,261X,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1600,,,2,95,1.0,9.0,19950320.0,302,1.0,,2412203,2412203,,,,1.0,,,,,,,,,,,,1.0,5.0,1.0,,,,,0093,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1828,,,2,95,0.0,1.0,19500223.0,444,1.0,,2408102,2408102,2.0,,,0.0,,,,,,,,,,,,9.0,,,,,,,2791,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1830,,,2,95,0.0,1.0,19410601.0,454,1.0,,2408102,2408102,2.0,,,0.0,,,,,,,,,,,,9.0,,,,,,,2791,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1881,1063.0,950310.0,2,95,4.0,2.0,,493,3.0,,2407401,2407401,,7.0,824.0,0.0,,,,,,,,,,,,9.0,,,,,,,797X,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2329,698.0,950325.0,2,95,2.0,2.0,19050406.0,490,3.0,,2410207,2410207,,7.0,824.0,0.0,,,,,,,,,,,,9.0,,,,,,,4299,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3245,,,2,95,2.0,2.0,19160505.0,479,1.0,,2408102,2412005,,8.0,,0.0,,,,,,,,,,,,9.0,,,,,,,438X,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3672,,,2,95,,9.0,1995.0,104,1.0,,2408102,2407500,,,824.0,,612.0,1.0,8.0,23.0,1.0,,,7.0,1.0,0.0,,1.0,4.0,0.0,0.0,1.0,,,7700,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4105,,,2,95,0.0,2.0,,0,1.0,,2408102,2408102,,,,0.0,,0.0,,,0.0,,,0.0,0.0,0.0,,1.0,1.0,0.0,0.0,0.0,,,2102,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4839,6055.0,951003.0,2,95,2.0,2.0,,0,0.0,,2400000,2400000,,0.0,0.0,0.0,,,,,,,,,,,,9.0,0.0,0.0,0.0,0.0,,,7999,,,,,,,,,,24,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
%sql
DESCRIBE FORMATTED mortalidade_geral;

col_name,data_type,comment
CONTADOR,string,
REGISTRO,string,
DATAREG,string,
TIPOBITO,string,
DATAOBITO,string,
ESTCIVIL,string,
SEXO,string,
DATANASC,string,
IDADE,string,
LOCOCOR,string,


In [0]:
# Caminho da tabela Delta no DBFS
#caminho_tabela = "dbfs:/mnt/delta/mortalidade_geral"

# Deletar os arquivos da tabela Delta
#dbutils.fs.rm(caminho_tabela, recurse=True)

Out[24]: True

In [0]:
# Ler a tabela Delta do metastore
df = spark.table("mortalidade_geral")

# Verificar o número de linhas
num_linhas = df.count()

# Verificar o número de colunas
num_colunas = len(df.columns)

# Exibir os resultados
print(f"Número de linhas: {format(num_linhas, '_.0f').replace('_', '.')}")
print(f"Número de colunas: {num_colunas}")

Número de linhas: 47.371.252
Número de colunas: 125


In [0]:
# Lista de colunas que para remover
colunas_para_remover = ['DATAREG', 'TIPOBITO']

# Remover as colunas desnecessárias
mortalidade_geral = mortalidade_geral.drop(*colunas_para_remover)

# Exibir o DataFrame resultante
display(mortalidade_geral.limit(5))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1093050904738668>:5[0m
[1;32m      2[0m colunas_para_remover [38;5;241m=[39m [[38;5;124m'[39m[38;5;124mDATAREG[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mTIPOBITO[39m[38;5;124m'[39m]
[1;32m      4[0m [38;5;66;03m# Remover as colunas desnecessárias[39;00m
[0;32m----> 5[0m mortalidade_geral [38;5;241m=[39m mortalidade_geral[38;5;241m.[39mdrop([38;5;241m*[39mcolunas_para_remover)
[1;32m      7[0m [38;5;66;03m# Exibir o DataFrame resultante[39;00m
[1;32m      8[0m display(mortalidade_geral[38;5;241m.[39mlimit([38;5;241m5[39m))

[0;31mNameError[0m: name 'mortalidade_geral' is not defined

In [0]:
Gravar cada CSV em um DataFrame
Preencher dados faltantes com NULL
Organizar as colunas de cada Dataframe na ordem desejada
Juntar todos os Dataframes em um único Dataframe
Salvar o Dataframe como uma tabela Delta 

Deletar colunas desnecessárias
Remover duplicatas
Formatar datas corretamente
Padronizar variáveis categóricas (ex.: SEXO, MUNIRES)
Criar novas colunas úteis (ex.: FAIXA_ETARIA)
Remover registros incompletos
Corrigir valores inconsistentes
Converter datas e numéricos
Identificar e remover ou ajustar Outliers
