### To Silver: Ler os JSONs salvos (bronze), transformar e salvar como Parquet na camada Silver

In [None]:
%pip install unidecode

Python interpreter will be restarted.
Collecting unidecode
  Downloading Unidecode-1.3.8-py3-none-any.whl (235 kB)
Installing collected packages: unidecode
Successfully installed unidecode-1.3.8
Python interpreter will be restarted.


In [None]:
import logging
import re
from unidecode import unidecode
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, length, lit, regexp_replace, lpad, upper, udf
from pyspark.sql.types import StringType


In [None]:
# Configuração do Logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Criar um logger
logger = logging.getLogger(__name__)

In [None]:
# Criar uma sessão Spark
# Camada Silver: Ler os JSONs salvos transformar e salvar como Parquet
spark = SparkSession.builder.appName("trasnform_glassdoor_raw_data").getOrCreate()
logger.info("Criando sessão Spark....")

try:
    df_bronze_bancos = spark.read.json("/FileStore/glassdoor/bronze/bancos/")
    df_bronze_empregados = spark.read.json("/FileStore/glassdoor/bronze/empregados/*.json")
    df_silver_reclamacoes = spark.read.json("/FileStore/glassdoor/bronze/reclamacoes/*.json")
    logger.info("Iniciando a leitura dos arquivos JSON.")

except Exception as e:
    logger.error(f"Ocorreu um erro ao ler o arquivo JSON: {e}")

2024-08-17 01:40:55,368 - INFO - Iniciando a leitura dos arquivos JSON.


In [None]:
try:
    path_silver_bancos = "/FileStore/glassdoor/silver/bancos"
    path_silver_empregados = "/FileStore/glassdoor/silver/emprepagos"
    path_silver_reclamacoes = "/FileStore/glassdoor/silver/reclamacoes"
    # Salvando o DataFrame df_bronze_bancos no diretório /silver em formato Parquet
    logger.info("Criando paths da camada Silver.")
except Exception as e:
    logger.error("Ocorreu um erro durante a criação dos paths da camada Silver.", exc_info=True)

In [None]:
try:
    # Transformações
    logger.info("Iniciando transformações no DataFrame.")
    # Removendo parte do texto da coluna 'Nome' e ajustando o tipo da coluna 'CNPJ'
    bancos = df_bronze_bancos.withColumn("Nome", regexp_replace("Nome", " - PRUDENCIAL", ""))
    # empregados = bancos.withColumn("CNPJ", col("CNPJ").cast("string"))

    # Filtro usando operações diretas do DataFrame
    bancos = bancos.filter(col("CNPJ").isNotNull())
    logger.info("Filtro aplicado para remover linhas onde CNPJ é nulo.")

    # Exibir as primeiras 10 linhas do DataFrame para verificar os dados
    bancos.show(10)
    logger.info("Exibição das primeiras 10 linhas completada.")

except Exception as e:
    logger.error("Ocorreu um erro durante o processamento dos dados.", exc_info=True)

2024-08-17 01:41:00,242 - INFO - Iniciando transformações no DataFrame.
2024-08-17 01:41:00,336 - INFO - Filtro aplicado para remover linhas onde CNPJ é nulo.
2024-08-17 01:41:01,558 - INFO - Exibição das primeiras 10 linhas completada.


+--------+--------------------+--------+
|    CNPJ|                Nome|Segmento|
+--------+--------------------+--------+
|       0|     BANCO DO BRASIL|      S1|
|60746948|            BRADESCO|      S1|
|30306294|         BTG PACTUAL|      S1|
|  360305|CAIXA ECONOMICA F...|      S1|
|60872504|                ITAU|      S1|
|90400888|           SANTANDER|      S1|
|92702067|            BANRISUL|      S2|
| 7237373|BANCO DO NORDESTE...|      S2|
|33657248|               BNDES|      S2|
|33479023|            CITIBANK|      S2|
+--------+--------------------+--------+
only showing top 10 rows



In [None]:
display(df_bronze_empregados)

Alta liderança,CNPJ,Cultura e valores,Diversidade e inclusão,Geral,Nome,Oportunidades de carreira,Perspectiva positiva da empresa(%),Qualidade de vida,Recomendam para outras pessoas(%),Remuneração e benefícios,Segmento,benefits_count,culture_count,employer_name,match_percent,reviews_count,salaries_count,url
3.4,,3.7,4.0,3.8,BNP PARIBAS,3.5,63.0,3.8,77.0,3.4,S3,3600,4100,BNP Paribas,100,13000,20000,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-BNP-Paribas-EI_IE10342.13,24.htm"
3.7,,3.7,3.5,4.0,BTG PACTUAL,4.2,73.0,2.8,78.0,4.4,S1,635,683,BTG Pactual,100,1600,2800,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-BTG-Pactual-EI_IE411540.13,24.htm"
2.8,,3.2,3.2,3.5,ALFA,2.7,47.0,3.1,68.0,3.8,S3,105,74,Banco Alfa,100,175,271,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Alfa-EI_IE817306.13,23.htm"
3.6,,3.9,4.1,4.1,BMG,3.5,63.0,3.7,79.0,4.2,S3,277,232,Banco BMG,100,445,704,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-BMG-EI_IE2482923.13,22.htm"
3.4,,3.8,3.9,4.2,BRADESCO,3.8,66.0,3.4,81.0,4.3,S1,3200,3300,Banco Bradesco,100,11000,19000,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Bradesco-EI_IE10997.13,27.htm"
3.3,,3.7,3.6,4.0,BANCO DAYCOVAL S.A,3.4,72.0,3.4,78.0,4.1,S3,107,157,Banco Daycoval,100,300,521,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Daycoval-EI_IE1480352.13,27.htm"
3.7,,4.3,4.3,4.3,BANCO FIBRA S.A.,3.6,84.0,3.8,87.0,4.2,S3,37,41,Banco Fibra,100,119,172,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Fibra-EI_IE744986.13,24.htm"
3.1,,3.4,3.5,3.8,ORIGINAL,3.3,53.0,3.4,71.0,4.0,S3,261,286,Banco Original,100,651,1100,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Original-EI_IE1026577.13,27.htm"
3.9,,4.2,4.4,4.4,PAN,4.0,79.0,3.9,91.0,4.6,S3,493,520,Banco PAN,100,1100,1700,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-PAN-EI_IE1184444.13,22.htm"
3.0,,3.5,3.5,3.8,PINE,3.5,50.0,3.4,71.0,3.9,S3,63,45,Banco Pine,100,121,216,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Pine-EI_IE1086238.13,23.htm"


In [None]:
# Lendo todos os arquivos JSON no diretório especificado
empregados_df = spark.read.json(df_bronze_empregados)
# Definindo as estruturas de colunas padrão
colunas_padrao_cnpj = [
    'employer_name', 'reviews_count', 'culture_count', 'salaries_count',
    'benefits_count', 'employer-website', 'employer-headquarters',
    'employer-founded', 'employer-industry', 'employer-revenue', 'url',
    'Geral', 'Cultura e valores', 'Diversidade e inclusão',
    'Qualidade de vida', 'Alta liderança', 'Remuneração e benefícios',
    'Oportunidades de carreira', 'Recomendam para outras pessoas(%)',
    'Perspectiva positiva da empresa(%)', 'CNPJ', 'Nome', 'match_percent'
]

colunas_padrao_seg = [
    'employer_name', 'reviews_count', 'culture_count', 'salaries_count',
    'benefits_count', 'employer-website', 'employer-headquarters',
    'employer-founded', 'employer-industry', 'employer-revenue', 'url',
    'Geral', 'Cultura e valores', 'Diversidade e inclusão',
    'Qualidade de vida', 'Alta liderança', 'Remuneração e benefícios',
    'Oportunidades de carreira', 'Recomendam para outras pessoas(%)',
    'Perspectiva positiva da empresa(%)', 'Segmento', 'Nome', 'match_percent'
]
try:
    # Unificar schemas adicionando colunas faltantes com tipos apropriados
    for coluna in set(colunas_padrao_cnpj + colunas_padrao_seg):
        if coluna not in empregados_df.columns:
            empregados_df = empregados_df.withColumn(coluna, lit(None).cast(StringType()))

    # Ajustar a coluna "Segmento" para conter apenas dois caracteres ou ser null
    empregados_df = empregados_df.withColumn("Segmento", when(length(col("Segmento")) == 2, col("Segmento")).otherwise(lit(None)))
    logger.info("Realizando a unificação das colunas faltantes em um único DataFrame.")

except Exception as e:
    logger.error("Ocorreu um erro durante a unificação das colunas.", exc_info=True)

In [None]:
logger.info(f"Iniciando o tratamento de dados nos DataFrames: {empregados_df}, {bancos}")
try:
    # Converter 'Nome' para maiúsculas em ambos os DataFrames
    empregados_df = empregados_df.withColumn("Nome", upper(col("Nome")))
    bancos_filtrados = bancos.withColumn("Nome", upper(col("Nome")))
    # Converter CNPJ para string em ambos os DataFrames para garantir compatibilidade de formato
    empregados_df = empregados_df.withColumn("CNPJ", col("CNPJ").cast(StringType()))
    bancos_filtrados = bancos.withColumn("CNPJ", col("CNPJ").cast(StringType()))
except Exception as e:
    logger.error(f"Ocorreu um erro durante tratamento: {e}.", exc_info=True)


logger.info(f"Iniciando o Merge dos DataFrames: {empregados_df}, {bancos_filtrados}")
try:
    # Realizar o join com a condição complexa de CNPJ e Nome
    final_df = empregados_df.join(
        bancos_filtrados,
        empregados_df.CNPJ == bancos_filtrados.CNPJ,
        "inner"
    )
    # Alias para os DataFrames
    empregados_df = empregados_df.alias("emp")
    bancos_filtrados = bancos_filtrados.alias("banco")

    # Realizando o join e selecionando apenas as colunas necessárias
    final_df = empregados_df.join(
        bancos_filtrados,
        on=empregados_df.Nome == bancos_filtrados.Nome,
        how="inner"
    ).select(
        "emp.*",  # Seleciona todas as colunas de empregados_df
        col("banco.CNPJ").alias("CNPJ_banco")  # Seleciona a coluna CNPJ de bancos_filtrados temporariamente para atualização
    )

    # Atualizando a coluna CNPJ onde for nula em empregados_df usando o valor de bancos_filtrados
    final_df = final_df.withColumn(
        "CNPJ",
        when(
            col("emp.CNPJ").isNull() & col("CNPJ_banco").isNotNull(),
            col("CNPJ_banco")
        ).otherwise(col("emp.CNPJ"))
    )

    # Removendo a coluna temporária CNPJ_banco após sua utilização
    final_df = final_df.drop("CNPJ_banco")
    final_df = final_df.withColumn("CNPJ", lpad(col("CNPJ"), 8, "0"))
    # Remover duplicatas se necessário
    final_df = final_df.dropDuplicates()
    logger.info(f"Merge realizado com sucesso!: {final_df}")
except Exception as e:
    logger.error(f"Ocorreu um erro durante tratamento: {e}.", exc_info=True)
    
try:
    # Salvando o DataFrame df_bronze_bancos no diretório /silver em formato Parquet
    final_df.write.mode("overwrite").parquet("/FileStore/glassdoor/silver/empregados")
    logger.info("Salvando 'Empregados' na Silver.")
except Exception as e:
    logger.error("Ocorreu um erro durante o Salvando os dados de 'Empregados'.", exc_info=True)

# Exibir o esquema e algumas linhas para verificar as alterações
final_df.printSchema()

2024-08-17 01:41:51,205 - INFO - Salvando 'Empregados' na Silver.


root
 |-- Alta liderança: string (nullable = true)
 |-- CNPJ: string (nullable = true)
 |-- Cultura e valores: string (nullable = true)
 |-- Diversidade e inclusão: string (nullable = true)
 |-- Geral: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Oportunidades de carreira: string (nullable = true)
 |-- Perspectiva positiva da empresa(%): string (nullable = true)
 |-- Qualidade de vida: string (nullable = true)
 |-- Recomendam para outras pessoas(%): string (nullable = true)
 |-- Remuneração e benefícios: string (nullable = true)
 |-- Segmento: string (nullable = true)
 |-- benefits_count: string (nullable = true)
 |-- culture_count: string (nullable = true)
 |-- employer_name: string (nullable = true)
 |-- match_percent: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- salaries_count: string (nullable = true)
 |-- url: string (nullable = true)
 |-- employer-headquarters: string (nullable = true)
 |-- employer-industry: string (nullable = t

Alta liderança,CNPJ,Cultura e valores,Diversidade e inclusão,Geral,Nome,Oportunidades de carreira,Perspectiva positiva da empresa(%),Qualidade de vida,Recomendam para outras pessoas(%),Remuneração e benefícios,Segmento,benefits_count,culture_count,employer_name,match_percent,reviews_count,salaries_count,url,employer-headquarters,employer-industry,employer-revenue,employer-website,employer-founded
3.6,49336860,4.1,4.2,4.1,ING,3.8,73.0,4.1,83.0,3.8,S3,1100,1500,ING,100,4500,7600,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-ING-EI_IE4264.13,16.htm",,,,,
3.5,360305,3.8,4.2,4.4,CAIXA ECONOMICA FEDERAL,3.9,72.0,3.7,86.0,4.3,S1,857,717,Caixa Econômica Federal,98,4000,7300,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Caixa-Econ%C3%B4mica-Federal-EI_IE41924.13,36.htm",,,,,
3.9,59588111,4.3,4.4,4.4,VOTORANTIM,3.9,79.0,4.1,93.0,4.6,S2,574,563,Banco Votorantim,100,1400,2100,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-banco-BV-EI_IE333951.13,21.htm",,,,,
3.9,1181521,4.4,4.2,4.4,SICREDI,4.1,80.0,4.3,89.0,4.1,S3,1100,1300,Sicredi,100,2900,4800,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Sicredi-EI_IE661772.13,20.htm",,,,,
3.5,33479023,3.8,4.1,3.9,CITIBANK,3.9,68.0,3.5,76.0,3.8,S2,9900,8900,Citi,100,31000,52000,https://www.glassdoor.com.br/citi,,,,,
3.3,62232889,3.7,3.6,4.0,BANCO DAYCOVAL S.A,3.4,72.0,3.4,78.0,4.1,S3,107,157,Banco Daycoval,100,300,521,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Banco-Daycoval-EI_IE1480352.13,27.htm",,,,,
3.5,62331228,3.8,4.0,3.9,DEUTSCHE,3.7,70.0,3.8,78.0,3.6,S3,3100,3200,Deutsche Bank,100,12000,21000,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Deutsche-Bank-EI_IE3150.13,26.htm",,,,,
3.8,60814191,4.1,4.0,4.1,MERCEDES-BENZ,3.6,56.0,4.4,86.0,4.0,S3,118,90,Mercedes-Benz Mobility,73,282,513,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Mercedes-Benz-Mobility-EI_IE38563.13,35.htm",,,,,
3.4,90400888,3.7,4.0,3.9,SANTANDER,3.8,65.0,3.4,75.0,4.0,S1,5100,5900,Santander,100,17000,29000,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Santander-EI_IE828048.13,22.htm",,,,,
3.7,59588111,4.2,4.2,4.1,VOTORANTIM,3.8,79.0,3.8,85.0,3.3,S2,200,177,Votorantim,100,608,1000,"https://www.glassdoor.com.br/Vis%C3%A3o-geral/Trabalhar-na-Votorantim-EI_IE42662.13,23.htm",,,,,


In [None]:
try:
    # Salvando o DataFrame df_bronze_bancos no diretório /silver em formato Parquet
    bancos_filtrados.write.mode("overwrite").parquet(path_silver_bancos)
    logger.info("Salvando 'Bancos' na Silver.")
except Exception as e:
    logger.error("Ocorreu um erro durante o Salvando os dados de Bancos.", exc_info=True)

2024-08-17 01:41:15,104 - INFO - Salvando 'Bancos' na Silver.


In [None]:
logger.info("Iniciando a transformação do DataFrame: reclamacoes")
try:
    reclamacoes_df = df_silver_reclamacoes.withColumn("Índice", regexp_replace("Índice", ",", "."))
    reclamacoes_df = reclamacoes_df.withColumn("Instituição financeira", regexp_replace("Instituição financeira", " \(conglomerado\)", ""))

    def normalize_column_names(df):
        for col_name in df.columns:
            new_col_name = unidecode(col_name.replace(" – ", "_").replace(" ", "_").replace("-", "_"))
            df = df.withColumnRenamed(col_name, new_col_name)
        return df

    final_df = normalize_column_names(final_df)
    reclamacoes_df = normalize_column_names(reclamacoes_df)

    # Atualizar os sets de colunas após normalização
    final_df_columns = set(final_df.columns)
    logger.info("Transformação concluida com sucesso!")
# reclamacoes_df_columns = set(reclamacoes_df.columns)display(reclamacoes_df)
except Exception as e:
    logger.error("Ocorreu erro na transformação! Verificar: ", exc_info=True)

2024-08-17 03:45:37,912 - INFO - Tranformando dados...
2024-08-17 03:45:38,156 - ERROR - Ocorreu erro na transformação! Verificar: 
Traceback (most recent call last):
  File "<command-3436080636702238>", line 4, in <module>
    reclamacoes_df = reclamacoes_df.withColumn("Índice", regexp_replace("Índice", ",", "."))
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 4758, in withColumn
    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sparkSession)
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
    raise converted from None
pyspark.errors.exceptions.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Índice` cannot be res

In [None]:
logger.info("Salvando 'reclamacoes' na Silver.")
try:
    # Escrever o DataFrame resultante como Parquet na camada Gold
    reclamacoes_df.write.mode("overwrite").parquet(path_silver_bancos)
    logger.info(f" o DataFrame {path_silver_bancos} salvo com sucesso!.")
except Exception as e:
    logger.error("Ocorreu um erro durante o Salvando os dados de 'reclamacoes'.", exc_info=True)

In [None]:
logger.info("Iniciando a normalizando das colunas")
try:
    def normalize_column_names(df):
        for col_name in df.columns:
            new_col_name = unidecode(col_name.replace(" – ", "_").replace(" ", "_").replace("-", "_"))
            df = df.withColumnRenamed(col_name, new_col_name)
        return df

    final_df = normalize_column_names(final_df)
    reclamacoes_df = normalize_column_names(reclamacoes_df)

    # Atualizar os sets de colunas após normalização
    final_df_columns = set(final_df.columns)
    reclamacoes_df_columns = set(reclamacoes_df.columns)
    logger.info("Normalizando das colunas realizada com sucesso!")
    
except Exception as e:
    logger.error(f"Ocorreu um erro durante a normalização: {e}.", exc_info=True)

logger.info("Iniciando a JOIN e removendo colunas duplicadas")
try: 
    # Realizar o join
    joined_df = final_df.join(reclamacoes_df, final_df["CNPJ"] == reclamacoes_df["CNPJ_IF"], "inner")
    # Colunas únicas após normalização
    unique_final_df_columns = final_df_columns - reclamacoes_df_columns
    unique_reclamacoes_df_columns = reclamacoes_df_columns - final_df_columns
    unique_columns = list(unique_final_df_columns) + list(unique_reclamacoes_df_columns)
    # Selecionar colunas únicas
    joined_df = joined_df.select(*unique_columns)
    logger.info("Operaçao realizada com sucesso!")
except Exception as e:
    logger.error(f"Ocorreu um erro durante o JOIN e remoção de colunas duplicadas: {e}.", exc_info=True)



In [None]:
logger.info("Definando UDF")
try:
    # Definir a UDF
    def remove_accents_and_upper(input_str):
        if input_str is not None:
            return unidecode(input_str.upper())
        return None

    remove_accents_and_upper_udf = udf(remove_accents_and_upper, StringType())

    # Aplicar a UDF
    for column in unique_columns:
        joined_df = joined_df.withColumn(column, remove_accents_and_upper_udf(col(column)))

    logger.info("UDF Definido!")

except Exception as e:
    logger.error(f"Ocorreu um erroa definição de UDF: {e}.", exc_info=True)

In [None]:
logger.info("Salvando DataFrame Processado na GOLD.")
output_joined_df_path = "/FileStore/glassdoor/gold/dadosunificados.parquet"
try:
    # Escrever o DataFrame resultante como Parquet na camada Gold
    joined_df.write.mode("overwrite").parquet(output_joined_df_path)
    logger.info(f"Arquivo Parquet salvo com sucesso em: {output_joined_df_path}")

except Exception as e:
    logger.error("Ocorreu um erro durante o Salvando os dados na camada GOLD'.", exc_info=True)

Arquivo Parquet salvo com sucesso em: /FileStore/glassdoor/gold/dadosunificados.parquet


In [None]:
# Verifique se o DataFrame existe e tem colunas
logger.info(joined_df.columns)
# Contar o número de linhas para garantir que não está vazio
logger.info(joined_df.count())


['employer_name', 'Perspectiva_positiva_da_empresa(%)', 'employer_website', 'Recomendam_para_outras_pessoas(%)', 'Cultura_e_valores', 'Remuneracao_e_beneficios', 'culture_count', 'employer_revenue', 'Geral', 'salaries_count', 'match_percent', 'employer_founded', 'employer_headquarters', 'employer_industry', 'CNPJ', 'url', 'Segmento', 'Oportunidades_de_carreira', 'Diversidade_e_inclusao', 'Qualidade_de_vida', 'reviews_count', 'Alta_lideranca', 'benefits_count', 'Nome', 'CNPJ_IF', 'Quantidade_de_clientes_CCS', 'Instituicao_financeira', 'Quantidade_de_reclamacoes_reguladas_procedentes', 'Ano', 'Quantidade_de_clientes_SCR', 'Indice', 'Quantidade_de_reclamacoes_reguladas___outras', 'Quantidade_total_de_clientes_CCS_e_SCR', 'Quantidade_total_de_reclamacoes', 'Quantidade_de_reclamacoes_nao_reguladas', 'Categoria', 'Trimestre', 'Tipo']
18
