# Criação de Views Temporárias a partir de Arquivos CSV por Ano e Semestre

In [0]:
# Importar bibliotecas necessárias
from pyspark.sql import SparkSession

# Função simples para carregar um arquivo CSV e criar uma view temporária
def carregar_csv(nome_tabela, caminho_csv):
    # Ler o arquivo CSV
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("sep", ';') \
        .load(caminho_csv)
    
    # Criar uma view temporária
    df.createOrReplaceTempView(nome_tabela)
    
    # Exibir as primeiras 10 linhas da tabela temporária para conferir
    print(f"\nPrimeiras 10 linhas da tabela {nome_tabela}:")
    spark.sql(f"SELECT * FROM {nome_tabela} LIMIT 10").show()

# Lista de anos e semestres - Arquivos são nomeados como: auto_cau_5A
anos_semestres = [
    ("2015", "A"), ("2015", "B"),
    ("2016", "A"), ("2016", "B"),
    ("2017", "A"), ("2017", "B"),
    ("2018", "A"), ("2018", "B"),
    ("2019", "A"), ("2019", "B")
]

# Lista dos arquivos que preciso carregar
nomes_base = [
    "auto_cau", "auto_cat", "auto_cep", "auto_cidade", "auto_cob",
    "auto_idade", "auto_reg", "auto_sexo", "auto2_grupo", "auto2_vei",
    "PremReg", "arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp", "SinReg"
]

# Loop simples para carregar os arquivos e criar views temporárias
for ano, semestre in anos_semestres:
    for nome_base in nomes_base:
        # Criar o nome do arquivo no formato auto_cau_5A, auto_cat_5A, etc.
        nome_arquivo = f"{nome_base}_{ano[-1]}{semestre}"
        caminho_csv = f"dbfs:/FileStore/tables/{nome_arquivo}.csv"
        
        # Chamar a função para carregar o CSV e criar a view
        carregar_csv(nome_arquivo, caminho_csv)


# Concatenar os semestres por ano

In [0]:
# Importar função necessária para adicionar colunas
from pyspark.sql.functions import lit

# Função para concatenar arquivos de um tipo específico por ano e semestre
def concatenar_arquivos_por_tipo(nome_base):
    """
    Concatena arquivos CSV de vários anos e semestres em uma única tabela temporária.
    
    :param nome_base: O nome base dos arquivos CSV a serem lidos.
    """
    # Lista para armazenar DataFrames dos arquivos
    dfs = []
    
    # Iterar sobre os anos e semestres
    for ano in ["2015", "2016", "2017", "2018", "2019"]:
        for semestre in ["A", "B"]:
            # Gerar o nome do arquivo com base no ano e semestre
            nome_arquivo = f"{nome_base}_{ano[-1]}{semestre}"
            caminho_csv = f"dbfs:/FileStore/tables/{nome_arquivo}.csv"
            
            # Carregar o arquivo CSV no Spark DataFrame
            df = spark.read.format("csv") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .option("sep", ';') \
                .load(caminho_csv)
            
            # Adicionar uma coluna para o ano que o dado foi registrado
            df = df.withColumn("ano_registro", lit(ano))
            
            # Adicionar o DataFrame à lista
            dfs.append(df)
    
    # Verificar se os esquemas dos DataFrames são compatíveis
    all_columns = set(df.columns for df in dfs)
    
    # Ajustar os DataFrames para garantir o mesmo esquema
    for i in range(len(dfs)):
        df = dfs[i]
        missing_columns = all_columns - set(df.columns)
        for column in missing_columns:
            df = df.withColumn(column, lit(None))
        dfs[i] = df.select(sorted(all_columns))  # Ordenar colunas para padronizar
    
    # Concatenar todos os DataFrames em um único
    df_concatenado = dfs[0]
    for df in dfs[1:]:
        df_concatenado = df_concatenado.union(df)
    
    # Criar uma view temporária com o DataFrame concatenado
    nome_tabela_final = f"dados_{nome_base}"
    df_concatenado.createOrReplaceTempView(nome_tabela_final)
    
    # Mostrar as primeiras 10 linhas da tabela final
    print(f"\nTabela: {nome_tabela_final}")
    spark.sql(f"SELECT * FROM {nome_tabela_final} LIMIT 10").display()

# Lista de tipos de arquivos para processamento
tipos_arquivos = [
    "SinReg", "auto_cau", "auto_cat", "auto_cep", "auto_cidade", "auto_cob",
    "auto_idade", "auto_reg", "auto_sexo", "auto2_grupo", "auto2_vei",
    "PremReg", "arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"
]

# Processar e concatenar todos os arquivos por tipo
for tipo in tipos_arquivos:
    concatenar_arquivos_por_tipo(tipo)


In [0]:
# Lista de tipos de arquivos para visualizar
tipos_arquivos = [
    "SinReg", "auto_cau", "auto_cat", "auto_cep", "auto_cidade", "auto_cob",
    "auto_idade", "auto_reg", "auto_sexo", "auto2_grupo", "auto2_vei",
    "PremReg", "arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"
]

# Exibe as primeiras 50 linhas de cada tabela temporária criada
for tipo in tipos_arquivos:
    nome_tabela_final = f"dados_{tipo}"  # Nome da tabela temporária
    print(f"\nExibindo a tabela: {nome_tabela_final}")  # Mostra qual tabela está sendo exibida
    consulta = f"SELECT * FROM {nome_tabela_final} LIMIT 50"  
    resultado = spark.sql(consulta)  
    resultado.show()  


# Concatenar e Salvar os DataFrames
- Salvar os DataFrames me ajuda a economizar tempo na execução do código, especialmente quando o cluster expira.

In [0]:
# Importando a função necessária
from pyspark.sql.functions import lit

# Função para juntar e salvar arquivos no formato Parquet
def concatenar_e_salvar_arquivos_por_tipo(nome_base):
    # Lista para guardar os DataFrames
    dfs = []
    
    # Fazer a leitura dos arquivos de cada ano e semestre
    for ano in ["2015", "2016", "2017", "2018", "2019"]:
        for semestre in ["A", "B"]:
            # Cria o nome do arquivo com base no ano e semestre
            nome_arquivo = f"{nome_base}_{ano[-1]}{semestre}"
            
            # Lê o arquivo CSV e cria um DataFrame
            caminho_csv = f"dbfs:/FileStore/tables/{nome_arquivo}.csv"
            df = spark.read.format("csv") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .option("sep", ';') \
                .load(caminho_csv)

            # Adicionar uma coluna para o ano que o dado foi registrado
            df = df.withColumn("ano_registro", lit(ano))

            # Adiciona o DataFrame à lista
            dfs.append(df)
    
    # Junta todos os DataFrames em um só
    df_concatenado = dfs[0]
    for df in dfs[1:]:
        df_concatenado = df_concatenado.union(df)
    
    # Salva o DataFrame concatenado no formato Parquet
    caminho_dbfs = f"dbfs:/FileStore/tables/{nome_base}_concatenado.parquet"
    df_concatenado.write.mode("overwrite").parquet(caminho_dbfs)
    print(f"DataFrame {nome_base} salvo com sucesso no caminho {caminho_dbfs}")

# Lista dos tipos de arquivos que serão processados e salvos
tipos_arquivos = [
    "SinReg", "auto_cau", "auto_cat", "auto_cep", "auto_cidade", "auto_cob",
    "auto_idade", "auto_reg", "auto_sexo", "auto2_grupo", "auto2_vei",
    "PremReg", "arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"
]

# Executar a função para cada tipo de arquivo
for tipo in tipos_arquivos:
    concatenar_e_salvar_arquivos_por_tipo(tipo)

# Carregar os DataFrames salvos

In [0]:
# Função para carregar um DataFrame de um arquivo Parquet e criar uma view temporária
def carregar_view(nome_base):
    # Definir o caminho do arquivo Parquet
    caminho_parquet = f"dbfs:/FileStore/tables/{nome_base}_concatenado.parquet"
    
    # Carregar o DataFrame do arquivo
    df = spark.read.format("parquet").load(caminho_parquet)
    
    # Nome da view temporária
    nome_view = f"dados_{nome_base}"
    df.createOrReplaceTempView(nome_view)
    
    # Mostrar as primeiras 10 linhas da view
    print(f"\nTabela: {nome_view}")
    spark.sql(f"SELECT * FROM {nome_view} LIMIT 10").display()

# Lista de tipos de arquivos para carregar
tipos_de_arquivos = [
    "SinReg", "auto_cau", "auto_cat", "auto_cep", "auto_cidade", "auto_cob",
    "auto_idade", "auto_reg", "auto_sexo", "auto2_grupo", "auto2_vei",
    "PremReg", "arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"
]

# Carregar a view para cada tipo de arquivo
for tipo in tipos_de_arquivos:
    carregar_view(tipo)

# Preparação e Tratamento dos Dados

In [0]:
from pyspark.sql.functions import col

# Função para limpar os dados do DataFrame
def limpar_dados(df):
    """
    Essa função vai ajudar a limpar o DataFrame, removendo duplicatas
    e registros que estão incompletos.
    """
    # Remover duplicatas
    df = df.dropDuplicates()

    # Tirar os registros que têm todos os valores nulos
    df = df.dropna(how='all')

    # Remover registros que tenham algum valor nulo
    df = df.dropna()

    return df

# Limpar cada DataFrame da lista
for tipo in tipos_arquivos:
    # Definindo o nome da tabela a partir do tipo
    nome_tabela_final = f"dados_{tipo}"
    
    # Carregar o DataFrame a partir da view temporária
    df = spark.sql(f"SELECT * FROM {nome_tabela_final}")
    
    # Aplicar a limpeza nos dados
    df_limpo = limpar_dados(df)
    
    # Criar uma nova view temporária para os dados limpos
    df_limpo.createOrReplaceTempView(nome_tabela_final + "_limpo")

# Mostrar as primeiras 50 linhas de cada DataFrame limpo
for tipo in tipos_arquivos:
    nome_tabela_final = f"dados_{tipo}_limpo"
    print(f"\nTabela: {nome_tabela_final}")
    
    # Realizando a consulta para mostrar os dados
    consulta = f"SELECT * FROM {nome_tabela_final} LIMIT 50"
    resultado = spark.sql(consulta)
    
    # Exibindo os resultados
    resultado.display()

# Tratamento de Valores Ausentes

In [0]:
from pyspark.sql.functions import mean

# Dividindo os tipos de arquivos em partes menores para facilitar o processamento
tipos_arquivos_parte1 = ["SinReg", "auto_cau", "auto_cat"]
tipos_arquivos_parte2 = ["auto_cep", "auto_cidade", "auto_cob"]
tipos_arquivos_parte3 = ["auto_idade", "auto_reg", "auto_sexo"]
tipos_arquivos_parte4 = ["auto2_grupo", "auto2_vei", "PremReg"]
tipos_arquivos_parte5 = ["arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"]

# Função para tratar valores ausentes nos DataFrames
def tratar_valores_ausentes(df):
    # Para colunas numéricas, preencher com a média
    colunas_numericas = [c for c, t in df.dtypes if t in ('int', 'double')]
    for coluna in colunas_numericas:
        media = df.agg(mean(coluna)).collect()[0][0]
        df = df.fillna({coluna: media})
    
    # Para colunas categóricas, usar o valor mais frequente
    colunas_categoricas = [c for c, t in df.dtypes if t == 'string']
    for coluna in colunas_categoricas:
        valor_frequente = df.groupBy(coluna).count().orderBy('count', ascending=False).first()[0]
        df = df.fillna({coluna: valor_frequente})
    
    return df

# Função para processar e salvar DataFrames de uma parte
def processar_e_salvar(tipos_arquivos):
    for tipo in tipos_arquivos:
        nome_tabela_final = f"dados_{tipo}_limpo"
        df = spark.sql(f"SELECT * FROM {nome_tabela_final}")
        
        # Tratando valores ausentes
        df_tratado = tratar_valores_ausentes(df)
        df_tratado.createOrReplaceTempView(nome_tabela_final + "_tratado")
        
        # Salvando o DataFrame tratado
        caminho_arquivo = f"dbfs:/FileStore/tables/{tipo}_tratado.parquet"
        df_tratado.write.mode("overwrite").parquet(caminho_arquivo)
        print(f"Arquivo {tipo}_tratado.parquet salvo com sucesso.")

# Processar cada parte separadamente
processar_e_salvar(tipos_arquivos_parte1)
processar_e_salvar(tipos_arquivos_parte2)
processar_e_salvar(tipos_arquivos_parte3)
processar_e_salvar(tipos_arquivos_parte4)
processar_e_salvar(tipos_arquivos_parte5)

# Observação sobre arquivos que estão atrasando
print("Os arquivos arq_casco3 e arq_casco4 estão atrasando o processamento, então decidi seguir sem esses DataFrames tratados, excluindo-os das análises.")

# Criar a Função de Normalização de Texto e Aplicar a Função aos DataFrames

In [0]:
from pyspark.sql.functions import lower, trim, col
from pyspark.sql import functions as F

# Lista de tipos de arquivos divididos em partes
tipos_arquivos_parte1 = ["SinReg", "auto_cau", "auto_cat"]
tipos_arquivos_parte2 = ["auto_cep", "auto_cidade", "auto_cob"]
tipos_arquivos_parte3 = ["auto_idade", "auto_reg", "auto_sexo"]
tipos_arquivos_parte4 = ["auto2_grupo", "auto2_vei", "PremReg"]
tipos_arquivos_parte5 = ["arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"]

# Função UDF para normalizar texto
def normalizar_texto(coluna):
    """
    Esta função transforma o texto em minúsculas e remove espaços extras.
    """
    return trim(lower(coluna))

# Função para normalizar dados textuais em um DataFrame
def normalizar_dados_textuais(df):
    """
    Aplica a normalização a todas as colunas do tipo texto no DataFrame.
    """
    colunas_textuais = [c for c, t in df.dtypes if t == 'string']
    for coluna in colunas_textuais:
        df = df.withColumn(coluna, normalizar_texto(col(coluna)))
    return df

# Função para carregar, normalizar e salvar DataFrames de texto
def carregar_normalizar_e_salvar_textuais(tipos_arquivos):
    for tipo in tipos_arquivos:
        caminho_parquet = f"dbfs:/FileStore/tables/{tipo}_concatenado.parquet"
        
        # Carregar o DataFrame do arquivo Parquet
        df_concatenado = spark.read.format("parquet").load(caminho_parquet)
        
        # Normalizar o DataFrame carregado
        df_normalizado = normalizar_dados_textuais(df_concatenado)
        
        # Criar uma view temporária com os dados normalizados
        nome_tabela_final = f"dados_{tipo}_textual_normalizado"
        df_normalizado.createOrReplaceTempView(nome_tabela_final)
        
        # Salvar o DataFrame normalizado em um novo arquivo Parquet
        caminho_arquivo = f"dbfs:/FileStore/tables/{tipo}_textual_normalizado.parquet"
        df_normalizado.write.mode("overwrite").parquet(caminho_arquivo)
        
        print(f"Arquivo {tipo}_textual_normalizado.parquet salvo com sucesso.")

# Processar e salvar cada parte dos DataFrames
carregar_normalizar_e_salvar_textuais(tipos_arquivos_parte1)
carregar_normalizar_e_salvar_textuais(tipos_arquivos_parte2)
carregar_normalizar_e_salvar_textuais(tipos_arquivos_parte3)
carregar_normalizar_e_salvar_textuais(tipos_arquivos_parte4)
carregar_normalizar_e_salvar_textuais(tipos_arquivos_parte5)

# Identificação e Tratamento de Outliers

In [0]:
from pyspark.sql.functions import col

# Função para tratar outliers em DataFrames
def tratar_outliers(df):
    """
    Identifica e trata outliers em variáveis numéricas utilizando o método do intervalo interquartil (IQR).
    """
    # Selecionar colunas numéricas do DataFrame
    colunas_numericas = [c for c, t in df.dtypes if t in ('int', 'double')]
    for coluna in colunas_numericas:
        # Calcular os quantis para determinar o IQR
        quantile1 = df.approxQuantile(coluna, [0.25], 0.05)[0]
        quantile3 = df.approxQuantile(coluna, [0.75], 0.05)[0]
        iqr = quantile3 - quantile1
        limite_inferior = quantile1 - 1.5 * iqr
        limite_superior = quantile3 + 1.5 * iqr

        # Filtrar o DataFrame para remover os outliers
        df = df.filter((col(coluna) >= limite_inferior) & (col(coluna) <= limite_superior))

    return df

# Função para processar e salvar DataFrames sem outliers
def processar_e_salvar_outliers(tipos_arquivos):
    for tipo in tipos_arquivos:
        nome_tabela_tratado = f"dados_{tipo}_tratado"
        
        # Verificar se a tabela tratada já existe
        if nome_tabela_tratado in [t.name for t in spark.catalog.listTables()]:
            df = spark.sql(f"SELECT * FROM {nome_tabela_tratado}")
            df_sem_outliers = tratar_outliers(df)
            df_sem_outliers.createOrReplaceTempView(nome_tabela_tratado + "_sem_outliers")

            # Salvar o DataFrame sem outliers em formato Parquet
            caminho_arquivo = f"dbfs:/FileStore/tables/{tipo}_sem_outliers.parquet"
            df_sem_outliers.write.mode("overwrite").parquet(caminho_arquivo)
            print(f"Arquivo {tipo}_sem_outliers.parquet salvo com sucesso.")
        else:
            print(f"Tabela {nome_tabela_tratado} não encontrada.")

# Listas de tipos de arquivos divididos em partes
tipos_arquivos_parte1 = ["SinReg", "auto_cau", "auto_cat"]
tipos_arquivos_parte2 = ["auto_cep", "auto_cidade", "auto_cob"]
tipos_arquivos_parte3 = ["auto_idade", "auto_reg", "auto_sexo"]
tipos_arquivos_parte4 = ["auto2_grupo", "auto2_vei", "PremReg"]
tipos_arquivos_parte5 = ["arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"]

# Processar e salvar DataFrames sem outliers para cada parte
processar_e_salvar_outliers(tipos_arquivos_parte1)
processar_e_salvar_outliers(tipos_arquivos_parte2)
processar_e_salvar_outliers(tipos_arquivos_parte3)
processar_e_salvar_outliers(tipos_arquivos_parte4)
processar_e_salvar_outliers(tipos_arquivos_parte5)

# Conversão de Variáveis Categóricas em Variáveis Numéricas (One-Hot Encoding)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType

# Função para fazer One-Hot Encoding em colunas categóricas
def one_hot_encoding(df):
    # Identificar colunas categóricas
    colunas_categoricas = [c for c in df.columns if df.schema[c].dataType == StringType()]
    if len(colunas_categoricas) > 0:  # Verifica se há colunas categóricas
        # Criar os indexadores e codificadores
        indexers = [StringIndexer(inputCol=c, outputCol=c + "_index") for c in colunas_categoricas]
        encoders = [OneHotEncoder(inputCols=[c + "_index"], outputCols=[c + "_vec"]) for c in colunas_categoricas]
        
        # Montar o pipeline
        pipeline = Pipeline(stages=indexers + encoders)
        model = pipeline.fit(df)  # Ajustar o modelo ao DataFrame
        df_codificado = model.transform(df)  # Transformar o DataFrame

        # Remover colunas categóricas originais e suas versões indexadas
        colunas_remover = colunas_categoricas + [c + "_index" for c in colunas_categoricas]
        df_codificado = df_codificado.drop(*colunas_remover)

        return df_codificado  # Retornar o DataFrame codificado
    else:
        return df  # Se não houver colunas categóricas, retornar o DataFrame original

# Função para processar e salvar DataFrames com One-Hot Encoding
def processar_e_salvar_codificacao(tipos_arquivos):
    for tipo in tipos_arquivos:
        caminho_parquet = f"dbfs:/FileStore/tables/{tipo}_sem_outliers.parquet"
        
        try:
            df = spark.read.format("parquet").load(caminho_parquet)  # Carregar DataFrame
            print(f"Tabela {tipo}_sem_outliers carregada com sucesso.")
            
            # Processar apenas um subconjunto se a tabela for muito grande, devido ao tempo de duração do cluster
            if tipo == "arq_casco_comp":
                df = df.limit(100000)  # Limitar o número de registros
            
            # Aplicar One-Hot Encoding
            df_codificado = one_hot_encoding(df)
            df_codificado.createOrReplaceTempView(f"dados_{tipo}_sem_outliers_codificado")
            
            # Salvar o DataFrame codificado
            caminho_arquivo_codificado = f"dbfs:/FileStore/tables/{tipo}_codificado.parquet"
            df_codificado.write.mode("overwrite").parquet(caminho_arquivo_codificado)
            print(f"Arquivo {tipo}_codificado.parquet salvo com sucesso.")
        
        except Exception as e:
            print(f"Erro ao carregar ou processar a tabela {tipo}: {e}")

# Processar e salvar DataFrames codificados para cada parte
processar_e_salvar_codificacao(tipos_arquivos_parte1)
processar_e_salvar_codificacao(tipos_arquivos_parte2)
processar_e_salvar_codificacao(tipos_arquivos_parte3)
processar_e_salvar_codificacao(tipos_arquivos_parte4)
processar_e_salvar_codificacao(tipos_arquivos_parte5)

# Normalização de Variáveis Numéricas

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.utils
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

# Função para normalizar variáveis numéricas
def normalizar_variaveis_numericas(df):
    # Selecionar colunas numéricas
    numeric_columns = [field.name for field in df.schema.fields if field.dataType.simpleString() in ['double', 'float', 'int']]
    
    # Normalização usando StandardScaler
    for column in numeric_columns:
        # Criar vetor para a coluna
        assembler = VectorAssembler(inputCols=[column], outputCol=f"{column}_vector")
        scaler = StandardScaler(inputCol=f"{column}_vector", outputCol=f"{column}_scaled", withMean=True, withStd=True)
        pipeline = Pipeline(stages=[assembler, scaler])
        
        # Aplicar o pipeline para normalização
        df = pipeline.fit(df).transform(df).drop(f"{column}_vector")  # Remove a coluna vetor após normalizar
    
    return df

# Função principal para processar e salvar a normalização
def processar_e_salvar_normalizacao(tipos_arquivos):
    caminho_diretorio_normalizados = "dbfs:/FileStore/tables/numerico_normalizados"
    
    # Criar diretório se não existir
    if not dbutils.fs.ls(caminho_diretorio_normalizados):
        dbutils.fs.mkdirs(caminho_diretorio_normalizados)
        print(f"Diretório {caminho_diretorio_normalizados} criado.")
    else:
        print(f"Diretório {caminho_diretorio_normalizados} já existe.")
    
    for tipo in tipos_arquivos:
        caminho_parquet = f"dbfs:/FileStore/tables/{tipo}_sem_outliers.parquet"
        
        try:
            # Carregar o DataFrame
            df = spark.read.format("parquet").load(caminho_parquet)
            print(f"Tabela {tipo}_sem_outliers carregada com sucesso.")
            
            # Normalizar as variáveis numéricas
            df_normalizado = normalizar_variaveis_numericas(df)
            df_normalizado.createOrReplaceTempView(f"dados_{tipo}_sem_outliers_numerico_normalizado")

            # Mostrar esquema e primeiras 5 linhas
            df_normalizado.printSchema()
            df_normalizado.show(5)

            # Salvar DataFrame normalizado
            caminho_arquivo_normalizado = f"{caminho_diretorio_normalizados}/{tipo}_numerico_normalizado.parquet"
            print(f"Tentando salvar {tipo}_numerico_normalizado.parquet...")
            df_normalizado.write.mode("overwrite").parquet(caminho_arquivo_normalizado)
            print(f"Arquivo {tipo}_numerico_normalizado.parquet salvo com sucesso.")
        
        # Verificação de erros
        except FileNotFoundError as e:
            print(f"Erro: Arquivo {caminho_parquet} não encontrado para {tipo}. {e}")
        except pyspark.sql.utils.AnalysisException as e:
            print(f"Erro de análise ao processar a tabela {tipo}: {e}")
        except Exception as e:
            print(f"Erro ao processar a tabela {tipo}: {e}")

# Processar e salvar DataFrames normalizados para cada parte
processar_e_salvar_normalizacao(tipos_arquivos_parte1)
processar_e_salvar_normalizacao(tipos_arquivos_parte2)
processar_e_salvar_normalizacao(tipos_arquivos_parte3)
processar_e_salvar_normalizacao(tipos_arquivos_parte4)
processar_e_salvar_normalizacao(tipos_arquivos_parte5)


# Executar DataFrames tratados

In [0]:
# Embora todos os DataFrames sejam carregados, somente o DataFrame com a versão "_tratado" contém todas as alterações realizadas anteriormente.

# Lista de tipos de arquivos divididos em partes
tipos_arquivos_parte1 = ["SinReg", "auto_cau", "auto_cat"]
tipos_arquivos_parte2 = ["auto_cep", "auto_cidade", "auto_cob"]
tipos_arquivos_parte3 = ["auto_idade", "auto_reg", "auto_sexo"]
tipos_arquivos_parte4 = ["auto2_grupo", "auto2_vei", "PremReg"]
tipos_arquivos_parte5 = ["arq_casco_comp", "arq_casco3_comp", "arq_casco4_comp"]

# Lista das versões dos arquivos a serem carregados
versoes = ["tratado", "codificado", "sem_outliers", "textual_normalizado", "normalizado"]

# Função para carregar e exibir DataFrames
def carregar_e_exibir(tipos_arquivos, versao):
    for tipo in tipos_arquivos:
        # Montar o caminho do arquivo Parquet para a versão específica
        caminho_arquivo = f"dbfs:/FileStore/tables/{tipo}_{versao}.parquet"
        
        # Tentar carregar o DataFrame do arquivo Parquet
        try:
            df = spark.read.parquet(caminho_arquivo)
            nome_tabela_final = f"{tipo}_{versao}"
            df.createOrReplaceTempView(nome_tabela_final)
            
            # Mostrar as primeiras 50 linhas do DataFrame carregado
            print(f"\nTabela: {nome_tabela_final}")
            consulta = f"SELECT * FROM {nome_tabela_final} LIMIT 50"
            resultado = spark.sql(consulta)
            resultado.display()
            
        except Exception as e:
            print(f"Não foi possível carregar {caminho_arquivo}: {e}")

# Função para carregar e exibir todas as versões
def carregar_por_versao(tipos_arquivos):
    for versao in versoes:
        print(f"\nCarregando arquivos da versão: {versao}")
        carregar_e_exibir(tipos_arquivos, versao)

# Carregar e exibir DataFrames para cada parte e versão
carregar_por_versao(tipos_arquivos_parte1)
carregar_por_versao(tipos_arquivos_parte2)
carregar_por_versao(tipos_arquivos_parte3)
carregar_por_versao(tipos_arquivos_parte4)
carregar_por_versao(tipos_arquivos_parte5)


# Feature Engineering: Derivação de Variáveis de Risco

In [0]:
%sql
-- Verificar categorias
SELECT DISTINCT CODIGO, CATEGORIA
FROM auto_cat_tratado;

-- Modelo do Veículo
SELECT *,
       CASE
           WHEN codigo = 1 THEN 'Automóveis Comuns (Passeio Nacional)'
           WHEN codigo = 2 THEN 'Automóveis Importados'
           WHEN codigo = 3 THEN 'Pick-ups'
           WHEN codigo IN (3, 4) THEN 'Veículos de Carga (nacional e importado)'
           WHEN codigo = 5 THEN 'Motocicletas (nacional e importado)'
           WHEN codigo = 6 THEN 'Ônibus (nacional e importado)'
           WHEN codigo = 7 THEN 'Utilitários (nacional e importado)'
           WHEN codigo = 9 THEN 'Outros'
           ELSE 'Categoria Desconhecida'
       END AS grupo_de_risco
FROM auto_cat_tratado;


# Consulta para mostrar as colunas da tabela arq_casco_comp_tratado

In [0]:
%sql
SELECT * FROM arq_casco_comp_tratado LIMIT 10

# Cálculo da Idade do Veículo

In [0]:
%sql
SET ano_atual = 2024;
SELECT *,
       2024 - ANO_MODELO AS Idade_Veiculo
FROM arq_casco_comp_tratado

# Valor Segurado

In [0]:
%sql
SELECT IS_MEDIA, EXPOSICAO1, PREMIO1
FROM arq_casco_comp_tratado
LIMIT 50


# Categorização da Idade do Segurado

In [0]:
%sql
SELECT 
    TRIM(descricao) AS descricao, -- remover espaços em branco do início e do final de uma string
    CASE 
        WHEN TRIM(descricao) = 'Entre 18 e 25 anos' THEN '18-25'
        WHEN TRIM(descricao) = 'Entre 26 e 35 anos' THEN '26-35'
        WHEN TRIM(descricao) = 'Entre 36 e 45 anos' THEN '36-45'
        WHEN TRIM(descricao) = 'Entre 46 e 55 anos' THEN '46-55'
        WHEN TRIM(descricao) = 'Maior que 55 anos' THEN '56 ou +'
        WHEN TRIM(descricao) = 'N�o informada' THEN 'Não informada'
        ELSE 'Desconhecido'
    END AS Faixa_Etaria,
    COUNT(*) AS Total_Segurados
FROM 
    auto_idade_tratado
GROUP BY 
    TRIM(descricao)
LIMIT 50;

# Índice de Risco por Cidade ou Estado
- Roubo e Furto
- Colisão Parcial
- Perda Total
- Incêndio

In [0]:
%sql
SELECT 
    REGIAO,
    COUNT(*) AS FREQ_SIN1, -- Total de sinistros da cobertura roubo/furto
    SUM(INDENIZ1) AS Total_Indenizacoes_roubo_furto -- Total de indenizações de sinistros da cobertura roubo/furto
FROM 
    arq_casco_comp_tratado
WHERE 
    INDENIZ1 > 0 -- Filtrar apenas os sinistros que resultaram em indenizações
GROUP BY 
    REGIAO
ORDER BY 
    Total_Indenizacoes_roubo_furto DESC
LIMIT 50;

In [0]:
%sql
SELECT 
    REGIAO,
    COUNT(*) AS FREQ_SIN2, -- Quantidade de sinistros da cobertura colisão parcial
    SUM(INDENIZ2) AS Total_Indenizacoes_Colisao_Parcial -- Total de indenizações de sinistros da cobertura colisão parcial
FROM 
    arq_casco_comp_tratado
WHERE 
    INDENIZ2 > 0 -- Filtrar apenas os sinistros que resultaram em indenizações
GROUP BY 
    REGIAO
ORDER BY 
    Total_Indenizacoes_Colisao_Parcial DESC
LIMIT 50;

In [0]:
%sql
SELECT 
    REGIAO,
    COUNT(*) AS FREQ_SIN3, -- - Quantidade de sinistros da cobertura colisão perda total
    SUM(INDENIZ3) AS Total_Indenizacoes_perda_total -- Total de indenizações de sinistros da cobertura colisão perda total
FROM 
    arq_casco_comp_tratado
WHERE 
    INDENIZ3 > 0 -- Filtrar apenas os sinistros que resultaram em indenizações
GROUP BY 
    REGIAO
ORDER BY 
    Total_Indenizacoes_perda_total DESC
LIMIT 50;

In [0]:
%sql
SELECT 
    REGIAO,
    COUNT(*) AS FREQ_SIN4, -- Quantidade de sinistros da cobertura incêndio
    SUM(INDENIZ4) AS Total_indenizacoes_incendio -- Total de indenizações de sinistros da cobertura incêndio
FROM 
    arq_casco_comp_tratado
WHERE 
    INDENIZ4 > 0 -- Filtrar apenas os sinistros que resultaram em indenizações
GROUP BY 
    REGIAO
ORDER BY 
    Total_indenizacoes_incendio DESC
LIMIT 50;

# Razão Prêmio/Valor Segurado

In [0]:
%sql
SELECT 
    REGIAO,
    SUM(PREMIO1) AS Total_Premio, -- Total de prêmios
    SUM(IS_MEDIA * EXPOSICAO1) AS Total_Valor_Segurado, -- Total dos valores segurados ponderados
    (SUM(PREMIO1) / NULLIF(SUM(IS_MEDIA * EXPOSICAO1), 0)) AS Premio_Valor_Segurado -- Razão Prêmio/Valor Segurado
FROM 
    arq_casco_comp_tratado
GROUP BY 
    REGIAO
ORDER BY 
    Premio_Valor_Segurado DESC
LIMIT 50;

# Frequência de Sinistros

In [0]:
%sql
SELECT 
    REGIAO,
    SUM(FREQ_SIN1 + FREQ_SIN2 + FREQ_SIN3 + FREQ_SIN4 + FREQ_SIN9) AS Numero_Sinistros,
    SUM(EXPOSICAO1) AS Tempo_Exposicao,
    SUM(FREQ_SIN1 + FREQ_SIN2 + FREQ_SIN3 + FREQ_SIN4 + FREQ_SIN9) / NULLIF(SUM(EXPOSICAO1), 0) AS Frequencia_Sinistros
FROM 
    arq_casco_comp_tratado
GROUP BY 
    REGIAO
ORDER BY 
    Frequencia_Sinistros DESC
LIMIT 50;

# Severidade Média dos Sinistros

In [0]:
%sql
SELECT 
    REGIAO,
    SUM(INDENIZ1 + INDENIZ2 + INDENIZ3 + INDENIZ4 + INDENIZ9) AS Valor_Indenizacao, -- Total das indenizações
    SUM(FREQ_SIN1 + FREQ_SIN2 + FREQ_SIN3 + FREQ_SIN4 + FREQ_SIN9) AS Numero_Sinistros, -- Total de sinistros
    CASE 
        WHEN SUM(FREQ_SIN1 + FREQ_SIN2 + FREQ_SIN3 + FREQ_SIN4 + FREQ_SIN9) = 0 THEN 0
        ELSE SUM(INDENIZ1 + INDENIZ2 + INDENIZ3 + INDENIZ4 + INDENIZ9) / NULLIF(SUM(FREQ_SIN1 + FREQ_SIN2 + FREQ_SIN3 + FREQ_SIN4 + FREQ_SIN9), 0) 
    END AS Severidade_Sinistros -- Severidade média dos sinistros
FROM 
    arq_casco_comp_tratado
GROUP BY 
    REGIAO
ORDER BY 
    Severidade_Sinistros DESC
LIMIT 50;

# Análise Exploratória de Dados (EDA)
- Análise Estatística Descritiva

In [0]:
# Carregar bibliotecas
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregar os dados do Spark em um DataFrame do pandas
arq_casco_comp_tratado = spark.sql("""
SELECT *
FROM arq_casco_comp_tratado
WHERE COD_TARIF = 1  -- Apenas veículos de passeio
  AND ANO_MODELO > 1978 AND ANO_MODELO <= 2024
""").toPandas()

# 1. Verificar as primeiras linhas do DataFrame
print(arq_casco_comp_tratado.head())

# 2. Informações gerais do DataFrame
print(arq_casco_comp_tratado.info())

# 3. Estatísticas descritivas
print(arq_casco_comp_tratado.describe())

# 4. Verificar valores ausentes
print(arq_casco_comp_tratado.isnull().sum())

# Identificação de Fatores de Risco

In [0]:
%sql
-- Valor Total das Indenizações por Categoria de Veículo
CREATE OR REPLACE TEMP VIEW total_indenizacoes_categoria AS
SELECT 
    a.CATEGORIA,  -- Categoria do veículo
    SUM(c.INDENIZ1) AS Total_Indenizacoes_Roubo_Furto, 
    SUM(c.INDENIZ2) AS Total_Indenizacoes_Colisao_Parcial,  
    SUM(c.INDENIZ3) AS Total_Indenizacoes_Colisao_Perdida_Total,  
    SUM(c.INDENIZ4) AS Total_Indenizacoes_Incendio,  
    SUM(c.INDENIZ9) AS Total_Indenizacoes_Outras,
    SUM(c.INDENIZ1 + c.INDENIZ2 + c.INDENIZ3 + c.INDENIZ4 + c.INDENIZ9) AS Total_Indenizacoes_Geral  -- Total geral de indenizações
FROM 
    arq_casco_comp_tratado c  -- Tabela de sinistros
JOIN 
    auto_cat_tratado a ON c.COD_TARIF = a.CODIGO  -- Junta com a tabela de categorias
GROUP BY 
    a.CATEGORIA;  -- Agrupa pelos tipos de veículos

# Análise do Risco de Incêndio em Veículos: Relação entre Veiculo_Idade, Região e Sinistros

In [0]:
%sql
-- Criando uma visualização temporária para analisar o risco de incêndio por idade do veículo e região
CREATE OR REPLACE TEMP VIEW analise_risco_incendio AS
WITH Veiculo_Idade AS (
    SELECT v.*,
           CASE 
               WHEN v.ANO_MODELO = 2024 THEN 0 -- Se o ano do modelo é 2024, idade é 0
               ELSE 2024 - v.ANO_MODELO -- Caso contrário, calcula a idade normalmente
           END AS Idade_Veiculo
    FROM arq_casco_comp_tratado v
    JOIN auto_cat_tratado a ON v.COD_TARIF = a.CODIGO
    WHERE v.ANO_MODELO > 1978 AND v.ANO_MODELO <= 2024 -- Filtrando anos válidos
      AND a.CODIGO = 1  -- Apenas veículos de passeio
)

SELECT 
    vi.Idade_Veiculo,
    si.REGIAO,
    COUNT(*) AS Freq_Sinistros_Incendio,
    SUM(si.INDENIZ4) AS Total_Indenizacoes_Incendio
FROM 
    Veiculo_Idade vi
JOIN 
    arq_casco_comp_tratado si ON vi.REGIAO = si.REGIAO
WHERE 
    si.INDENIZ4 > 0 -- Filtra apenas os sinistros de incêndio
GROUP BY 
    vi.Idade_Veiculo, si.REGIAO
ORDER BY 
    vi.Idade_Veiculo, Total_Indenizacoes_Incendio DESC;

In [0]:
# Gráfico de Sinistros de Incêndio por Região
# Carregar bibliotecas
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregar os resultados da consulta SQL para um DataFrame
result_df_regiao = spark.sql("""
SELECT REGIAO, SUM(INDENIZ4) AS Freq_Sinistros_Incendio
FROM arq_casco_comp_tratado
WHERE COD_TARIF = 1  -- Apenas veículos de passeio
  AND ANO_MODELO > 1978 AND ANO_MODELO <= 2024
GROUP BY REGIAO
HAVING SUM(INDENIZ4) > 0  -- Filtrar apenas regiões com sinistros
""").toPandas()

# Criar o gráfico
plt.figure(figsize=(12, 6))
sns.barplot(data=result_df_regiao, x='REGIAO', y='Freq_Sinistros_Incendio')
plt.title('Frequência de Sinistros por Incêndio por Região')
plt.xlabel('Região')
plt.ylabel('Frequência de Sinistros por Incêndio')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [0]:
# Gráfico de Sinistros de Incêndio por Idade do Veículo
# Carregar resultados para um DataFrame
result_df_idade = spark.sql("""
WITH Veiculo_Idade AS (
    SELECT v.ANO_MODELO,
           (2024 - v.ANO_MODELO) AS Idade_Veiculo,
           SUM(v.INDENIZ4) AS Freq_Sinistros_Incendio
    FROM arq_casco_comp_tratado v
    JOIN auto_cat_tratado a ON v.COD_TARIF = a.CODIGO
    WHERE v.ANO_MODELO > 1978 AND v.ANO_MODELO <= 2024
      AND a.CODIGO = 1  -- Apenas veículos de passeio
    GROUP BY v.ANO_MODELO
)

SELECT Idade_Veiculo, SUM(Freq_Sinistros_Incendio) AS Freq_Sinistros_Incendio
FROM Veiculo_Idade
GROUP BY Idade_Veiculo
""").toPandas()

# Criar o gráfico
plt.figure(figsize=(12, 6))
sns.barplot(data=result_df_idade, x='Idade_Veiculo', y='Freq_Sinistros_Incendio')
plt.title('Frequência de Sinistros por Incêndio em Relação à Idade do Veículo')
plt.xlabel('Idade do Veículo (anos)')
plt.ylabel('Frequência de Sinistros por Incêndio')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


# Análise de Risco por Idade do Veículo

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW analise_risco AS
WITH Veiculo_Idade AS (
    SELECT v.*,
           CASE 
               WHEN v.ANO_MODELO = 2024 THEN 0 -- 2024 - 0 = 2024, THEN TURN TO 0
               ELSE 2024 - v.ANO_MODELO 
           END AS Idade_Veiculo
    FROM arq_casco_comp_tratado v
    JOIN auto_cat_tratado a ON v.COD_TARIF = a.CODIGO
    WHERE v.ANO_MODELO > 1978 AND v.ANO_MODELO <= 2024 -- Filtrar possíveis erros
      AND a.CODIGO = 1  -- Filtra apenas veículos de passeio (Tipo 1)
)

SELECT 
    ANO_MODELO,
    (FLOOR(Idade_Veiculo / 3) * 3) AS Faixa_Idade_Veiculo, -- Definir faixa para melhor visualização gráfica
    COUNT(*) AS Frequencia_Sinistros,
    SUM(INDENIZ1) AS Total_Indenizacoes_Roubo_Furto,
    SUM(INDENIZ2) AS Total_Indenizacoes_Colisao_Parcial,
    SUM(INDENIZ3) AS Total_Indenizacoes_Colisao_Perdida_Total,
    SUM(INDENIZ4) AS Total_Indenizacoes_Incendio,
    SUM(INDENIZ9) AS Total_Indenizacoes_Outras
FROM 
    Veiculo_Idade
WHERE 
    (INDENIZ1 IS NOT NULL OR 
     INDENIZ2 IS NOT NULL OR 
     INDENIZ3 IS NOT NULL OR 
     INDENIZ4 IS NOT NULL OR 
     INDENIZ9 IS NOT NULL)
GROUP BY 
    ANO_MODELO, Faixa_Idade_Veiculo
ORDER BY 
    Faixa_Idade_Veiculo

In [0]:
# Frequência de Sinistros por Faixa de Idade do Veículo
# Carregar bibliotecas
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregar os resultados da consulta SQL para um DataFrame
result_df = spark.sql("SELECT * FROM analise_risco").toPandas()

# Criar o gráfico
plt.figure(figsize=(12, 6))
sns.barplot(data=result_df, x='Faixa_Idade_Veiculo', y='Frequencia_Sinistros')
plt.title('Frequência de Sinistros por Faixa de Idade do Veículo')
plt.xlabel('Faixa de Idade do Veículo (anos)')
plt.ylabel('Frequência de Sinistros')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [0]:
# Indenizações por Faixa de Idade_Veiculo
# Carregar bibliotecas
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregar os resultados da consulta SQL para um DataFrame
result_df = spark.sql("SELECT * FROM analise_risco").toPandas()

# Definir tamanho do gráfico
plt.figure(figsize=(16, 12))

# Gráfico para Roubo/Furto
plt.subplot(3, 2, 1)
sns.barplot(data=result_df, x='Faixa_Idade_Veiculo', y='Total_Indenizacoes_Roubo_Furto', color='blue')
plt.title('Indenizações por Roubo/Furto')
plt.xlabel('Faixa de Idade do Veículo (anos)')
plt.ylabel('Total de Indenizações')

# Gráfico para Colisão Parcial
plt.subplot(3, 2, 2)
sns.barplot(data=result_df, x='Faixa_Idade_Veiculo', y='Total_Indenizacoes_Colisao_Parcial', color='orange')
plt.title('Indenizações por Colisão Parcial')
plt.xlabel('Faixa de Idade do Veículo (anos)')
plt.ylabel('Total de Indenizações')

# Gráfico para Colisão Perdida Total
plt.subplot(3, 2, 3)
sns.barplot(data=result_df, x='Faixa_Idade_Veiculo', y='Total_Indenizacoes_Colisao_Perdida_Total', color='green')
plt.title('Indenizações por Colisão Perdida Total')
plt.xlabel('Faixa de Idade do Veículo (anos)')
plt.ylabel('Total de Indenizações')

# Gráfico para Incêndio
plt.subplot(3, 2, 4)
sns.barplot(data=result_df, x='Faixa_Idade_Veiculo', y='Total_Indenizacoes_Incendio', color='red')
plt.title('Indenizações por Incêndio')
plt.xlabel('Faixa de Idade do Veículo (anos)')
plt.ylabel('Total de Indenizações')

# Gráfico para Outras Indenizações
plt.subplot(3, 2, 5)
sns.barplot(data=result_df, x='Faixa_Idade_Veiculo', y='Total_Indenizacoes_Outras', color='purple')
plt.title('Indenizações por Outras Categorias')
plt.xlabel('Faixa de Idade do Veículo (anos)')
plt.ylabel('Total de Indenizações')

plt.tight_layout()
plt.show()

# Análise de Risco por Região

In [0]:
%sql
select distinct codigo, descricao from auto_reg_tratado
order by codigo asc

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW analise_regiao AS
WITH Risco_Regiao AS (
    SELECT 
        r.codigo AS Codigo_Regiao,
        COUNT(c.INDENIZ1) AS Total_Sinistros_Roubo_Furto,  
        COUNT(c.INDENIZ2) AS Total_Sinistros_Colisao_Parcial,  
        COUNT(c.INDENIZ3) AS Total_Sinistros_Colisao_Perdida_Total,  
        COUNT(c.INDENIZ4) AS Total_Sinistros_Incendio, 
        COUNT(c.INDENIZ9) AS Total_Sinistros_Outras, 
        SUM(c.INDENIZ1) AS Total_Indenizacoes_Roubo_Furto, 
        SUM(c.INDENIZ2) AS Total_Indenizacoes_Colisao_Parcial,  
        SUM(c.INDENIZ3) AS Total_Indenizacoes_Colisao_Perdida_Total,  
        SUM(c.INDENIZ4) AS Total_Indenizacoes_Incendio,  
        SUM(c.INDENIZ9) AS Total_Indenizacoes_Outras  
    FROM
        arq_casco_comp_tratado c
    JOIN 
        auto_reg_tratado r ON c.regiao = r.codigo
    JOIN 
        auto_cat_tratado a ON c.COD_TARIF = a.CODIGO  
    WHERE 
        a.CODIGO = 1  -- Filtra apenas veículos de passeio (Tipo 1)
        AND c.idade <= 10  -- Filtra veículos com idade <= 10 anos
    GROUP BY 
        r.codigo  
)

SELECT * FROM Risco_Regiao;

In [0]:
# Total Geral de Sinistros por Região e Total por Tipo de Sinistros
# Carregar bibliotecas
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregar os resultados da consulta SQL para um DataFrame
result_df = spark.sql("SELECT * FROM analise_regiao").toPandas()

# Calcular o total de sinistros geral por região
result_df['Total_Sinistros'] = (result_df['Total_Sinistros_Roubo_Furto'] + 
                                 result_df['Total_Sinistros_Colisao_Parcial'] + 
                                 result_df['Total_Sinistros_Colisao_Perdida_Total'] + 
                                 result_df['Total_Sinistros_Incendio'] + 
                                 result_df['Total_Sinistros_Outras'])

# Criar gráficos para cada tipo de sinistro
types_of_claims = [
    'Total_Sinistros_Roubo_Furto',
    'Total_Sinistros_Colisao_Parcial',
    'Total_Sinistros_Colisao_Perdida_Total',
    'Total_Sinistros_Incendio',
    'Total_Sinistros_Outras'
]

# Configuração do gráfico
plt.figure(figsize=(18, 12))

# Loop para criar um gráfico para cada tipo de sinistro
for i, claim_type in enumerate(types_of_claims, 1):
    plt.subplot(3, 2, i)
    sns.barplot(data=result_df, x='Codigo_Regiao', y=claim_type, palette='viridis')
    plt.title(f'Total de {claim_type.replace("_", " ").title()} por Região')
    plt.xlabel('Código da Região')
    plt.ylabel(f'Total de {claim_type.replace("_", " ").title()}')
    plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# Criar gráfico total de sinistros
plt.figure(figsize=(12, 6))
sns.barplot(data=result_df, x='Codigo_Regiao', y='Total_Sinistros', palette='viridis')

plt.title('Total Geral de Sinistros por Região')
plt.xlabel('Código da Região')
plt.ylabel('Total Geral de Sinistros')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [0]:
# Top 5 Regiões com Maior Número Total de Sinistros
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregar os resultados da consulta SQL para um DataFrame
result_df = spark.sql("SELECT * FROM analise_regiao").toPandas()

# Configurar o estilo do seaborn
sns.set(style="whitegrid")

# Calcular o total de sinistros para cada região
result_df['Total_Sinistros'] = (
    result_df['Total_Sinistros_Roubo_Furto'] +
    result_df['Total_Sinistros_Colisao_Parcial'] +
    result_df['Total_Sinistros_Colisao_Perdida_Total'] +
    result_df['Total_Sinistros_Incendio'] +
    result_df['Total_Sinistros_Outras']
)

# Filtrar e ordenar os dados para obter as top 5 regiões
top_5_regioes = result_df.nlargest(5, 'Total_Sinistros')

# Criar o gráfico
plt.figure(figsize=(12, 6))
sns.barplot(data=top_5_regioes, x='Codigo_Regiao', y='Total_Sinistros', palette='viridis')

plt.title('Top 5 Regiões com Maior Número Total de Sinistros')
plt.xlabel('Código da Região')
plt.ylabel('Total de Sinistros')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


# Tamanho da amostra

In [0]:
# Contar o número de registros que atendem aos critérios especificados
sample_size_query = """
SELECT COUNT(*) AS Numero_Registros
FROM arq_casco_comp_tratado v
JOIN auto_cat_tratado a ON v.COD_TARIF = a.CODIGO
WHERE a.CODIGO = 1  -- Apenas veículos de passeio
  AND (2024 - v.ANO_MODELO) <= 10  -- Idade do veículo <= 10 anos
  AND v.REGIAO NOT IN (11, 13)  -- Excluir regiões 11 e 13
"""

# Executar a consulta e carregar o resultado em um DataFrame
sample_size_df = spark.sql(sample_size_query).toPandas()

# Mostrar o número de registros
print(f"Número de registros na amostra: {sample_size_df['Numero_Registros'][0]}")