## Instalação de pacotes






In [None]:
!pip install pymongo
!pip install pyspark
!pip install pandera

## Importação de Bibliotecas

In [None]:
import pandas as pd
import numpy as np
import pymongo
import pyspark.sql.functions as f
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandera as pa
from datetime import datetime

## Extração do dataset

In [None]:
df1 = pd.read_csv('https://storage.googleapis.com/projeto-leonardo-vitor/D.SDA.PDA.005.CAT.202201%20-%20D.SDA.PDA.005.CAT.202201(origi).csv',sep=';')

## Configuração do pandas para visualizar todas as linhas e colunas

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

## Conexão com o mongoDB

In [None]:
uri = "mongodb+srv://cluster0.znsuw4p.mongodb.net/?authSource=%24external&authMechanism=MONGODB-X509&retryWrites=true&w=majority"
client = MongoClient(uri,tls=True,tlsCertificateKeyFile='/content/X509-cert-8832947067617257787.pem')

## Configuração do Spark

In [None]:
spark = (
    SparkSession.builder
                .master('local')
                .appName('etl-soulcode')
                .config('spark.ui.port', '4050')
                .config('spark.jars', 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-2.1.1.jar')
                .getOrCreate()
)

## Visualizando a SparkSession

In [None]:
spark

## Criação do banco de dados e das coleções no mongoDB

In [None]:
db = client['db-soulcode']
collection1 = db['dataframe_orig1']


## Enviando os datasets pro mongoDB

In [None]:
# Subindo primeiro dataset pro mongoDB
df1.reset_index(inplace=True)
df1_dict = df1.to_dict("records")
collection1.insert_many(df1_dict)

In [None]:
# Contagem dos documentos
collection1.count_documents({})

304280

In [None]:
# Transformando os documentos em dataframes pandas
cursor1 = collection1.find({})
df1 = pd.DataFrame(list(cursor1))

# Aqui iremos começar alguns tratamentos utilizando apenas o Pandas. Para operações mais especificas, utilizaremos Spark mais abaixo no código.

In [None]:
# Visualização inicial do dataframe
df1.head(5)

In [None]:
# Verificando os tipos de dados das colunas
df1.dtypes

In [None]:
# Dropando a coluna contendo a identificação do mongoDB sobre cada objeto
df1.drop('_id', axis=1, inplace=True)

In [None]:
## Verificando se a dropagem foi executada

df1.head(5)

In [None]:
## Analisando o nome das colunas

df1.columns

In [None]:
# Backup
dfback = df1.copy()

In [None]:
df1 = dfback

In [None]:
# Criando um dicionário para renomear as colunas

dict_newcolumns = {'index':'id', 'Agente  Causador  Acidente': 'causa_acidente', 'Data Acidente':'data_acidente', 'CBO':'oficio', 'CID-10':'cid_10',
                  'CNAE2.0 Empregador':'num_cnae_empregador', 'CNAE2.0 Empregador.1':'desc_cnae_empregador', 'Emitente CAT':'emitente_cat', 
                  'Espécie do benefício':'tipo_beneficio','Filiação Segurado':'filiacao_segurado', 'Indica Óbito Acidente':'obito', 'Munic Empr':'municipio_empresa', 
                  'Natureza da Lesão':'natureza_lesao', 'Origem de Cadastramento CAT':'origem_cad_cat', 'Parte Corpo Atingida':'parte_corpo_atingida', 'Sexo':'sexo', 
                  'Tipo do Acidente':'tipo_do_acidente','UF  Munic.  Acidente':'uf_acidente', 'UF Munic. Empregador':'uf_empregador', 'Data Acidente.1': 'data_acidente_1', 
                  'Data Despacho Benefício':'data_despacho_beneficio', 'Data Acidente.2':'data_acidente_2', 'Data Nascimento':'nascimento', 'Data Emissão CAT':'emissao_cat', 
                  'CNPJ/CEI Empregador,,,,,,,,,,,,':'cnpj_empregador'}

In [None]:
# Renomeando as colunas

df1.rename(columns=dict_newcolumns,inplace=True)

In [None]:
# Verificando as alterações 
df1.head()

In [None]:
# Drop de colunas irrelevantes para analises
df1.drop(columns=['desc_cnae_empregador', 'origem_cad_cat'], axis=1, inplace=True)

In [None]:
# Renomeando valores para melhor compreensão 

df1 = df1.replace('{ñ class} ','Não informado', regex=True)

In [None]:
# Removendo virgulas do dataframe

df1 = df1.replace(',', '', regex=True)

In [None]:
## Verificando as alterações 

df1.head()

In [None]:
# Contagem dos valores únicos da coluna oficio

df1['oficio'].value_counts()

In [None]:
# Remoção de numeros e caracteres específicos da coluna Oficio

df1['oficio'] = df1['oficio'].str.replace(r'\d+','')
df1['oficio'] = df1['oficio'].str.replace('-', '')

In [None]:
# Verificação do conteúdo da coluna CNPJ EMPREGADOR

df1['cnpj_empregador'].value_counts()

In [None]:
# Substituindo CNPJ nulos para NAN

df1['cnpj_empregador'].replace('00.000.000.000.000', np.nan, inplace=True)

In [None]:
# Verificando as alterações

df1['cnpj_empregador'].value_counts()

In [None]:
# Verificação do conteúdo da coluna cid_10

df1['cid_10'].value_counts()

In [None]:
# Vamos separar a coluna cid_10 em duas colunas.
# Para isso, vamos renomear a coluna cid_10 para desc_cid-10 contendo apenas a descrição da doença/problema de saúde.
# Então, vamos criar uma nova coluna chamada cod_cid-10 contendo apenas o código da doença/problema de saúde.

df1.rename({'cid_10':'desc_cid-10'}, axis=1, inplace=True)

In [None]:
# Criação da nova coluna

df1['cod_cid-10'] = df1['desc_cid-10'].str[:5]

In [None]:
## Verificando a criação da nova coluna

df1.head(1)

In [None]:
# Removendo o código da coluna que contém apenas a descrição

df1['desc_cid-10'] = df1['desc_cid-10'].str[5:]

In [None]:
# Verificação do conteúdo da coluna cod_cid-10

df1['cod_cid-10'].value_counts()

In [None]:
# Verificação do conteúdo da coluna desc_cid-10

df1['desc_cid-10'].value_counts()

In [None]:
# Verificação do conteúdo da coluna num_cnae_empregador

df1['num_cnae_empregador'].value_counts()

In [None]:
# Transformando a coluna num_cnae_empregador para tipo String

df1.num_cnae_empregador = df1.num_cnae_empregador.astype(str)

In [None]:
# Verificando as alterações

df1.dtypes

In [None]:
# Verificação do conteúdo da coluna tipo_beneficio

df1['tipo_beneficio'].value_counts()

In [None]:
# Renomeando o valor Pa para Processo Administrativo na coluna tipo_beneficio

df1['tipo_beneficio'] = df1['tipo_beneficio'].str.replace('Pa', 'Processo Administrativo')

In [None]:
# Verificação do conteúdo da coluna filiacao_segurado

df1['filiacao_segurado'].value_counts()

In [None]:
# Verificação do conteúdo da coluna obito

df1['obito'].value_counts()

In [None]:
# Criação de uma nova coluna contendo apenas o código do municipio

df1['cod_municipio'] = df1['municipio_empresa'].str[:6]

In [None]:
# Remoção dos códigos na coluna municipio_empresa

df1['municipio_empresa'] = df1['municipio_empresa'].str[7:]

In [None]:
# Verificação do dataframe

df1.head(2)

In [None]:
# Trocando todos os valores Não informado para valor nulo
df1.replace("Não informado", np.nan, inplace=True)
df1.replace("Não Informado", np.nan, inplace=True)
df1.replace("Não i", np.nan, inplace=True)

In [None]:
# Conversão dos tipos das colunas
df1['data_acidente_1'] = pd.to_datetime(df1['data_acidente_1'])

In [None]:
# Formatando as datas da coluna data_acidente_1 
df1['data_acidente_1'] = df1['data_acidente_1'].dt.strftime('%m/%Y')

In [None]:
# Conversão do tipo da coluna
df1["id"] = df1["id"].apply(np.int64)

In [None]:
# Criação de um novo index para o dataframe
df1.set_index('id',inplace=True)

# Aqui utilizaremos Pandera para realizar a validação dos dados.

In [None]:
# Schema para validação dos dados

schema = pa.DataFrameSchema (
    columns = {
          'id':pa.Column(pa.Int, nullable=True),
          'causa_acidente':pa.Column(pa.String, nullable=True),
          'data_acidente':pa.Column(pa.String, nullable=True),
          'desc_cid-10':pa.Column(pa.String, nullable=True),
          'num_cnae_empregador':pa.Column(pa.String, nullable=True),
          'emitente_cat':pa.Column(pa.String, nullable=True),
          'tipo_beneficio':pa.Column(pa.String, nullable=True),
          'filiacao_segurado':pa.Column(pa.String, nullable=True),
          'obito':pa.Column(pa.String, nullable=True),
          'municipio_da_empresa':pa.Column(pa.String, nullable=True),
          'natureza_lesao':pa.Column(pa.String, nullable=True),
          'parte_corpo_atingida':pa.Column(pa.String, nullable=True),
          'sexo':pa.Column(pa.String, nullable=True),
          'tipo_do_acidente':pa.Column(pa.String, nullable=True),
          'uf_acidente':pa.Column(pa.String, nullable=True),
          'uf_empregador':pa.Column(pa.String, nullable=True),
          'data_acidente_1':pa.Column(pa.String, nullable=True),
          'data_despacho_beneficio':pa.Column(pa.String, nullable=True),
          'data_acidente_2':pa.Column(pa.String, nullable=True),
          'nascimento':pa.Column(pa.String, nullable=True),
          'emissao_cat':pa.Column(pa.String, nullable=True),
          'cnpj_empregador':pa.Column(pa.String, nullable=True),
          'cod_cid-10':pa.Column(pa.String, nullable=True),
          'cod_municipio':pa.Column(pa.String, nullable=True)
    }
)

In [None]:
# Executando a validação 
schema.validate(df1)

# Aqui começaremos a utilizar PySpark. Para isso, primeiro é necessário criar um schema, contendo o nome e o tipo da coluna.

In [None]:
# Criação do Schema para criar uma dataframe com pyspark

mySchema = StructType([StructField("id", IntegerType(), True)\
                       ,StructField("causa_acidente", StringType(), True)\
                       ,StructField("data_acidente", StringType(), True)\
                       ,StructField("oficio", StringType(), True)\
                       ,StructField("desc_cid-10", StringType(), True)\
                       ,StructField("num_cnae_empregador", StringType(), True)\
                       ,StructField("emitent_cat", StringType(), True)\
                       ,StructField("tipo_beneficio", StringType(), True)\
                       ,StructField("filiacao_segurado", StringType(), True)\
                       ,StructField("obito", StringType(), True)\
                       ,StructField("municipio_da_empresa", StringType(), True)\
                       ,StructField("natureza_lesao", StringType(), True)\
                       ,StructField("parte_corpo_atingida", StringType(), True)\
                       ,StructField("sexo", StringType(), True)\
                       ,StructField("tipo_do_acidente", StringType(), True)\
                       ,StructField("uf_acidente", StringType(), True)\
                       ,StructField("uf_empregador", StringType(), True)\
                       ,StructField("data_acidente_1", StringType(), True)\
                       ,StructField("data_despacho_beneficio", StringType(), True)\
                       ,StructField("data_acidente_2", StringType(), True)\
                       ,StructField("nascimento", StringType(), True)\
                       ,StructField("emissao_cat", StringType(), True)\
                       ,StructField("cnpj_empregador", StringType(), True)\
                       ,StructField("cod_cid-10", StringType(), True)\
                       ,StructField("cod_municipio", StringType(), True)])

In [None]:
# Criação do dataframe com PySpark
sparkDF = spark.createDataFrame(df1, schema=mySchema)
sparkDF.printSchema()

In [None]:
# Realizando transformação na coluna num_cnae_empregador

sparkDF = sparkDF.withColumn('num_cnae_empregador', f.regexp_replace('num_cnae_empregador',"^0(0)?$", "Não informado"))

In [None]:
# Visualização da coluna num_cnae_empregador

sparkDF.select('num_cnae_empregador').show(800)

In [None]:
# Visualização dos valores únicos da coluna natureza_lesao
 
sparkDF.select('natureza_lesao').distinct().show(50, truncate=False)

In [None]:
# Visualização dos valores únicos da coluna parte_corpo_atingida

sparkDF.select('parte_corpo_atingida').distinct().show(50, truncate=False)

In [None]:
# Visualização dos valores únicos da coluna sexo 

sparkDF.select('sexo').distinct().show(truncate=False)

In [None]:
# Visualização dos valores únicos da coluna tipo_do_acidente 

sparkDF.select('tipo_do_acidente').distinct().show(truncate=False)

In [None]:
# Tratamento da coluna tipo_do_acidente

sparkDF = sparkDF.withColumn('tipo_do_acidente', f.regexp_replace('tipo_do_acidente', 'Ignorado', 'Não informado'))

In [None]:
# Verificando se a operação foi bem sucedida

sparkDF.select('tipo_do_acidente').distinct().show(truncate=False)

In [None]:
# Visualização dos valores únicos da coluna uf_acidente

sparkDF.select('uf_acidente').distinct().show(50, truncate=False)

In [None]:
# Tratamento da coluna uf_acidente

sparkDF = sparkDF.withColumn('uf_acidente', f.regexp_replace('uf_acidente', 'Zerado', 'Não informado'))

In [None]:
# Visualização dos valores únicos da coluna uf_empregador

sparkDF.select('uf_empregador').distinct().show(20, truncate=False)

In [None]:
# Visualização dos valores únicos da coluna data_acidente_1 

sparkDF.select('data_acidente_1').distinct().show(200, truncate=False)

In [None]:
# Visualização dos valores únicos da coluna data_despacho_beneficio

sparkDF.select('data_despacho_beneficio').distinct().show(100, truncate=False)

In [None]:
# Tratamento da coluna data_despacho_beneficio

sparkDF = sparkDF.withColumn('data_despacho_beneficio', f.regexp_replace('data_despacho_beneficio', '0000/00', 'Não informado'))

In [None]:
# Visualização dos valores únicos da coluna data_acidente_2 

sparkDF.select('data_acidente_2').distinct().show(100, truncate=False)

In [None]:
# Checando o tipo e valores das colunas nascimento e data_nascimento

sparkDF.withColumnRenamed('nascimento', 'data_nascimento').printSchema()

In [None]:
# Visualização dos valores únicos da coluna emissao_cat

sparkDF.select('emissao_cat').distinct().show(100, truncate=False)

In [None]:
# Tratamento da coluna emissao_cat

sparkDF.withColumn('emissao_cat', f.regexp_replace('emissao_cat', '00/00/0000', 'Não informado'))

In [None]:
# Visualizando se o tratamento foi bem sucedido

sparkDF.select('emissao_cat').distinct().show(100, truncate=False)

In [None]:
# Tratamento da coluna municipio_empresa

sparkDF = sparkDF.withColumn('municipio_empresa', f.regexp_replace('municipio_da_empresa', '-.*$', ''))

In [None]:
  # Reorganizando as colunas
first_cols = ["id", "causa_acidente", "data_acidente", "oficio", "cod_cid-10", "desc_cid-10", "cod_municipio", "municipio_da_empresa"]
other_cols = sorted([col for col in sparkDF.columns if col not in first_cols])

In [None]:
# Reorganizando as colunas
sparkDF = sparkDF.select(first_cols + other_cols)

## Dataframe tratado

In [None]:
# Convertendo o dataframe para um arquivo excel

df1.to_excel("df1-tratado.xlsx", encoding='utf-8', index=False)

## Visualização do DataFrame tratado

In [None]:
df1.head()

In [None]:
# Convertendo o dataframes para Pandas novamente

df1 = sparkDF.toPandas()