In [230]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('ETL_LAB03')
    .getOrCreate()
)

In [231]:
bank_df = spark.read.csv('../data/bronze/banks/EnquadramentoInicia_v2.tsv', header=True, sep='\t')

# Data Transformation for bank dataset
for column in bank_df.columns:
    bank_df = bank_df.withColumnRenamed(
        existing=column, 
        new=column.lower()
    )

for replacement_action in [
    ("nome", "- PRUDENCIAL", ""),
    ("nome","(\.+|\/+|\-+)", ""),
    ("nome"," INSTITUIÇÃO DE PAGAMENTO", ""),
    ("nome","SOCIEDADE DE CRÉDITO, FINANCIAMENTO E INVESTIMENTO", "SCFI"),
    ("nome"," SA", ""),
]:
    bank_df = bank_df.withColumn(
        "nome", regexp_replace(
            replacement_action[0],
            replacement_action[1],
            replacement_action[2]
        )
    )
bank_df = bank_df.withColumn('nome_fantasia', split(col('nome'),'  ').getItem(1))
# print(bank_df.count())
# bank_df = bank_df.na.drop()
# bank_df = bank_df.dropDuplicates()
# bank_df.show(5, truncate=False)
# print(f"Number of rows x columns - Bank Data: {bank_df.count()} x {len(bank_df.columns)}")
bank_df.write.mode("overwrite").csv("../data/silver/banks", header=True, sep=";")
#bank_df.select('nome', 'nome_fantasia').distinct().orderBy('nome').show(100, truncate=False)

In [232]:
# Lendo separado devido a estrutura diferente
employee_df_1 = spark.read.format("csv").option("header", "true").option('delimiter','|').load("../data/bronze/employees/glassdoor_consolidado_join_match_less_v2.csv")
employee_df_2 = spark.read.format("csv").option("header", "true").option('delimiter','|').load("../data/bronze/employees/glassdoor_consolidado_join_match_v2.csv")

# Criando colunas
employee_df_1 = employee_df_1.withColumn('Segmento', lit(''))
employee_df_2 = employee_df_2.withColumn('CNPJ', lit(''))

# Ordenando as colunas
columns = ["employer_name", "reviews_count", "culture_count", "salaries_count", "benefits_count", "employer-website", "employer-headquarters", "employer-founded", "employer-industry", "employer-revenue", "url", "Geral", "Cultura e valores", "Diversidade e inclusão", "Qualidade de vida", "Alta liderança", "Remuneração e benefícios", "Oportunidades de carreira", "Recomendam para outras pessoas(%)", "Perspectiva positiva da empresa(%)", "CNPJ", "Segmento", "Nome", "match_percent"]

employee_df_1 = employee_df_1.select(columns)
employee_df_2 = employee_df_2.select(columns)

# unindo os dados
employee_df = employee_df_1.union(employee_df_2)

# Data Transformation for employee dataset
for column in employee_df.columns:
    employee_df = employee_df.withColumnRenamed(
        column, 
        column.replace("-","_").replace(" ","_").lower()
    )

employee_df.cache()

for replacement_action in [
    ("nome", "- PRUDENCIAL", ""),
    ("nome","(\.+|\/+|\-+)", ""),
    ("nome"," INSTITUIÇÃO DE PAGAMENTO", ""),
    ("nome","SOCIEDADE DE CRÉDITO, FINANCIAMENTO E INVESTIMENTO", "SCFI"),
    ("nome"," SA", ""),
]:
    employee_df = employee_df.withColumn(
        "nome", regexp_replace(
            replacement_action[0],
            replacement_action[1],
            replacement_action[2]
        )
    )
employee_df = employee_df.withColumn('employer_name', upper(col('employer_name')))

# employee_df.select('nome','employer_name').distinct().orderBy('nome').show(100, truncate=False)
#print(f"Number of rows x columns - Employee Data: {employee_df.count()} x {len(employee_df.columns)}")
employee_df.write.mode("overwrite").csv("../data/silver/employee", header=True, sep=";")

23/08/29 21:25:13 WARN CacheManager: Asked to cache already cached data.


In [241]:
#Claims
claims_df = spark.read.format("csv").option("header", "true").option('delimiter',',').load("../data/bronze/claims")

# Data Transformation for employee dataset
for column in claims_df.columns:
    claims_df = claims_df.withColumnRenamed(
        column, 
        column.replace("-","_").replace(" ","_").lower()
    )

claims_df = claims_df.withColumnRenamed('cnpj_if', 'cnpj')\
    .withColumnRenamed('instituição_financeira', 'nome')

for replacement_action in [
    ("nome", "- PRUDENCIAL", ""),
    ("nome","(\.+|\/+|\-+)", ""),
    ("nome"," \(conglomerado\)", ""),
    ("nome"," INSTITUIÇÃO DE PAGAMENTO", ""),
    ("nome","SOCIEDADE DE CRÉDITO, FINANCIAMENTO E INVESTIMENTO", "SCFI"),
    ("nome"," SA", ""),
]:
    claims_df = claims_df.withColumn(
        "nome", regexp_replace(
            replacement_action[0],
            replacement_action[1],
            replacement_action[2]
        )
    )

# claims_df = claims_df.na.drop()
# claims_df = claims_df.dropDuplicates()
# print(f"Number of rows x columns - Employee Data: {employee_df.count()} x {len(employee_df.columns)}")
claims_df.write.mode("overwrite").csv("../data/silver/claims", header=True, sep=";")

In [242]:
# reading data for joining
claims_df = spark.read.format("csv").option("header", "true").option('delimiter',';').load("../data/silver/claims")
employee_df = spark.read.format("csv").option("header", "true").option('delimiter',';').load("../data/silver/employee")
#employee_df = employee_df.drop('cnpj','segmento')
banks_df = spark.read.format("csv").option("header", "true").option('delimiter',';').load("../data/silver/banks")

In [243]:
banks_df = banks_df.alias('banks_df')
claims_df = claims_df.alias('claims_df')
join_df = claims_df.join(banks_df, 'nome', 'inner').select(col('banks_df.cnpj').alias('cnpj_banks'),'claims_df.*').drop('cnpj').withColumnRenamed('cnpj_banks','cnpj')

In [244]:
employee_df = employee_df.alias('employee_df')
join_df = join_df.alias('join_df')
# tratando a coluna indice
join_df.withColumn('índice', regexp_replace('índice', ',', '.'))

# Unindo os dados
join_df = join_df.join( employee_df, 'nome' , 'inner').select('join_df.nome',
                                                              'join_df.cnpj',
                                                              'join_df.categoria',
                                                              'join_df.quantidade_total_de_clientes_–_ccs_e_scr',
                                                              regexp_replace('join_df.índice', ',', '.').alias('índice'),
                                                              'join_df.quantidade_total_de_reclamações',
                                                              'employee_df.geral',
                                                              'employee_df.remuneração_e_benefícios')

In [245]:
# Estrutura final esperada:
# Nome do Banco
# CNPJ
# Classificação do Banco
# Quantidade de Clientes do Bancos
# Índice de reclamações
# Quantidade de reclamações
# Índice de satisfação dos funcionários dos bancos
# Índice de satisfação com salários dos funcionários dos bancos.

final_df = join_df.select(col('nome').alias('Nome do Banco'), 
                           col('cnpj').alias('CNPJ'),
                           col('categoria').alias('Classificação'),
                           col('quantidade_total_de_clientes_–_ccs_e_scr').alias('Quantidade de Clientes do Bancos'),
                           col('índice').cast('integer').alias('Índice de reclamações'),
                           col('quantidade_total_de_reclamações').alias('Quantidade de reclamações'),
                           col('geral').alias('Índice de satisfação dos funcionários dos bancos'),
                           col('remuneração_e_benefícios').alias('Índice de satisfação com salários dos funcionários dos bancos'))

final_df = final_df.groupBy('Nome do Banco',
                            'CNPJ',
                            'Classificação')\
                   .agg(round(avg('Quantidade de Clientes do Bancos')).alias('Quantidade de Clientes do Bancos'),
                              avg('Índice de reclamações').alias('Índice de reclamações'),
                              avg('Quantidade de reclamações').alias('Quantidade de reclamações'),
                              avg('Índice de satisfação dos funcionários dos bancos').alias('Índice de satisfação dos funcionários dos bancos'),
                              avg('Índice de satisfação com salários dos funcionários dos bancos').alias('Índice de satisfação com salários dos funcionários dos bancos'))

In [248]:
final_df.write.mode("overwrite").csv("../data/gold/final_table", header=True, sep=";")