Imports

In [3]:
import os
from pyspark.sql import DataFrame, SparkSession
from abc import ABC, abstractmethod
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
import glob
import logging
import boto3
from dotenv import load_dotenv

Rename Columns

In [8]:
columns_rename = {
    "BanksTransformation": {
        "Segmento" : "segmento",
        "CNPJ": "cnpj",
        "Nome": "nome"
    },
    "EmployeesTransformation": {
        "employer-website" : "employer_website",
        "employer-headquarters": "employer_headquarters",
        "employer-founded": "employer_founded",
        "employer-industry": "employer_industry",
        "employer-revenue": "employer_revenue",
        "Geral": "geral",
        "Cultura e valores": "cultura_valores",
        "Diversidade e inclus�o": "diversidade_inclusao",
        "Qualidade de vida": "qualidade_vida",
        "Alta lideran�a": "alta_lideranca",
        "Remunera��o e benef�cios": "remuneracao_beneficios",
        "Oportunidades de carreira": "oportunidades_carreira",
        "Recomendam para outras pessoas(%)": "percentual_recomendam_para_outras_pessoas",
        "Perspectiva positiva da empresa(%)": "percentual_perspectiva_positiva_empresa",
#         "CNPJ": "cnpj",
        "Nome": "nome",
        "Segmento": "segmento"
    },
    "ComplaintsTransformation": {
        "Ano" : "ano",
        "Trimestre": "trimestre",
        "Categoria": "categoria",
        "Tipo": "tipo",
        "CNPJ IF": "cnpj_if",
        "Institui��o financeira": "instituicao_financeira",
        "�ndice": "indice",
        "Quantidade de reclama��es reguladas procedentes": "qtd_reclamacoes_reguladas_procedentes",
        "Quantidade de reclama��es reguladas - outras": "qtd_reclamacoes_reguladas_outras",
        "Quantidade de reclama��es n�o reguladas": "qtd_reclamacoes_nao_reguladas",
        "Quantidade total de reclama��es": "qtd_total_reclamacoes",
        "Quantidade total de clientes - CCS e SCR": "qtd_total_clientes_ccs_scr",
        "Quantidade de clientes - CCS": "qtd_clientes_ccs",
        "Quantidade de clientes - SCR": "qtd_clientes_scr"
    }
}

Load Data

In [9]:
class UnsupportedFileType(Exception):
    def __init__(self, file_type):
        self.file_type = file_type
        self.message = f"File(s) of type {file_type} not supported"
        super().__init__(self.message)

class LoadData:
    def __init__(self, spark, file_directory: list, file_type: str, separator: str = ';', header: bool = True, encoding='utf-8'):
        self.file_directory = file_directory
        self.file_type = file_type
        self.separator = separator
        self.spark = spark
        self.header = header
        self.encoding = encoding

    def load(self) -> DataFrame:
        if self.file_type in ('csv', 'tsv'):
            return \
                self.spark.read.options(
                    delimiter=self.separator,
                    header=self.header,
                    encoding=self.encoding
                ).csv(self.file_directory)
        else:
            raise UnsupportedFileType(self.file_type)

Fixing Name and DataType

In [10]:
class TransformData(ABC):
    """
    Gathers general functions for all transformations.
    """
    def format_cnpj(self, value):
        return F.when((value == ' ') | (value == ''), value).otherwise(F.lpad(value, 8, '0'))


    def load_column_rename_mappings(self, transformation_name):
#         json_file_path = os.path.join(os.path.dirname(__file__), 'column_rename.json')
        column_rename_mappings = column_rename
        return column_rename_mappings.get(transformation_name, {})


    def rename_columns(self, df: DataFrame, column_rename) -> DataFrame:
        for old_name, new_name in column_rename.items():
            df = df.withColumnRenamed(old_name, new_name)
        return df


    @abstractmethod
    def transform(self) -> DataFrame:
        pass


class BanksTransformation(TransformData):
    """
    Functions for transforming the pandas dataframe for banks.
    """
    def __init__(self, df: DataFrame):
        """
        Receives the dataframe.
        """
        self.df = df
        self.column_rename = self.load_column_rename_mappings('BanksTransformation')



    def transform(self) -> DataFrame:
        """
        Function to rename (to snake_case), format, and adjust data in the 'Segment', 'CNPJ', and 'Name' columns.
        Returns a dataframe.
        """
        transformed_df = self.rename_columns(self.df, self.column_rename)
        transformed_df = transformed_df.withColumn("cnpj", self.format_cnpj(F.col("cnpj")))
        transformed_df = transformed_df.withColumn("nome", F.regexp_replace(F.col("nome").cast(StringType()), ' - PRUDENCIAL', ''))
        return transformed_df


class EmployeesTransformation(TransformData):
    """
    Functions for transforming the pandas dataframe for employees.
    """
    def __init__(self, df: DataFrame):
        """
        Receives the dataframe.
        """
        self.df = df
        self.column_rename = self.load_column_rename_mappings('EmployeesTransformation')


    def transform(self) -> DataFrame:
        """
        Function to rename (to snake_case), format, and change data types.
        Returns a dataframe.
        """
        transformed_df = self.rename_columns(self.df, self.column_rename)

        transformed_df = transformed_df\
            .withColumn("employer_name", F.col('employer_name').cast(StringType()))\
            .withColumn("reviews_count", F.col('reviews_count').cast(IntegerType()))\
            .withColumn("culture_count", F.col('culture_count').cast(IntegerType()))\
            .withColumn("salaries_count", F.col('salaries_count').cast(IntegerType()))\
            .withColumn("benefits_count", F.col('benefits_count').cast(IntegerType()))\
            .withColumn("employer_website", F.col('employer_website').cast(StringType()))\
            .withColumn("employer_headquarters", F.col('employer_headquarters').cast(StringType()))\
            .withColumn("employer_founded", F.col('employer_founded').cast(IntegerType()))\
            .withColumn("employer_industry", F.col('employer_industry').cast(StringType()))\
            .withColumn("employer_revenue", F.col('employer_revenue').cast(StringType()))\
            .withColumn("url", F.col('url').cast(StringType()))\
            .withColumn("geral", F.col('geral').cast(DecimalType(20,2)))\
            .withColumn("cultura_valores", F.col('cultura_valores').cast(DecimalType(20,2)))\
            .withColumn("diversidade_inclusao", F.col('diversidade_inclusao').cast(DecimalType(20,2)))\
            .withColumn("qualidade_vida", F.col('qualidade_vida').cast(DecimalType(20,2)))\
            .withColumn("alta_lideranca", F.col('alta_lideranca').cast(DecimalType(20,2)))\
            .withColumn("remuneracao_beneficios", F.col('remuneracao_beneficios').cast(DecimalType(20,2)))\
            .withColumn("oportunidades_carreira", F.col('oportunidades_carreira').cast(DecimalType(20,2)))\
            .withColumn("percentual_recomendam_para_outras_pessoas", F.col('percentual_recomendam_para_outras_pessoas').cast(DecimalType(20,2)))\
            .withColumn("percentual_perspectiva_positiva_empresa", F.col('percentual_perspectiva_positiva_empresa').cast(DecimalType(20,2)))\
            .withColumn("nome", F.col('nome').cast(StringType()))\
            .withColumn("segmento", F.col('segmento').cast(StringType()))\
            .withColumn("match_percent", F.col('match_percent').cast(FloatType()))
        #             .withColumn('cnpj', self.format_cnpj(F.col("cnpj")))\


        return transformed_df


    def calculate_aggregates(self, df) -> DataFrame:
        """
        Function to return a pandas dataframe of a pivot table grouped by the 'name' column,
        aggregating the 'geral' and 'remuneracao_beneficios' columns by mean.
        """
        aggregated_df  = df.groupby('nome').agg(
            F.round(F.mean('geral'), 2).alias('geral'),
            F.round(F.mean('remuneracao_beneficios'), 2).alias('remuneracao_beneficios')
        )
        return aggregated_df 


class ComplaintsTransformation(TransformData):
    """
    Functions for transforming the pandas dataframe for complaints.
    """
    def __init__(self, df: DataFrame):
        """
        Receives the dataframe.
        """
        self.df = df
        self.column_rename = self.load_column_rename_mappings('ComplaintsTransformation')


    def transform(self) -> DataFrame:
        """
        Function to rename (to snake_case), format, and change data types.
        Returns a dataframe.
        """
        transformed_df = self.rename_columns(self.df, self.column_rename)

        transformed_df = transformed_df\
            .withColumn("ano", F.col('ano').cast(IntegerType()))\
            .withColumn('trimestre', F.col("trimestre").cast(StringType()))\
            .withColumn('categoria', F.col("categoria").cast(StringType()))\
            .withColumn('tipo', F.col("tipo").cast(StringType()))\
            .withColumn("cnpj", self.format_cnpj(F.col("cnpj_if")))\
            .withColumn("nome", F.regexp_replace(F.col("instituicao_financeira").cast(StringType()), ' \(conglomerado\)', ''))\
            .withColumn("indice", F.regexp_replace(F.col('indice').cast(StringType()), ',', '.').cast(DecimalType(20,2)))\
            .withColumn("qtd_reclamacoes_reguladas_procedentes", F.col('qtd_reclamacoes_reguladas_procedentes').cast(DecimalType(20,2)))\
            .withColumn("qtd_reclamacoes_reguladas_outras", F.col('qtd_reclamacoes_reguladas_outras').cast(DecimalType(20,2)))\
            .withColumn("qtd_reclamacoes_nao_reguladas", F.col('qtd_reclamacoes_nao_reguladas').cast(DecimalType(20,2)))\
            .withColumn("qtd_total_reclamacoes", F.col('qtd_total_reclamacoes').cast(DecimalType(20,2)))\
            .withColumn("qtd_total_clientes_ccs_scr", F.col('qtd_total_clientes_ccs_scr').cast(DecimalType(20,2)))\
            .withColumn("qtd_clientes_ccs", F.col('qtd_clientes_ccs').cast(DecimalType(20,2)))\
            .withColumn("qtd_clientes_scr", F.col('qtd_clientes_scr').cast(DecimalType(20,2)))

        return transformed_df


    def calculate_aggregates(self, df) -> DataFrame:
        """
        Function to return a pandas dataframe of a pivot table grouped by the 'name' column,
        aggregating columns like 'indice', 'qtd_total_reclamacoes', and 'qtd_total_clientes_ccs_scr' by mean.
        """
        aggregated_df  = df.groupby('nome').agg(
            F.round(F.mean('indice'), 2).alias("indice"),
            F.round(F.mean('qtd_total_reclamacoes'), 2).alias("qtd_total_reclamacoes"),
            F.max('qtd_total_clientes_ccs_scr').alias("qtd_total_clientes_ccs_scr")
        )
        return aggregated_df

Starting spark

In [8]:
load_dotenv(r'credencials\.env')

False

In [9]:
A = boto3.client(
    's3',
    aws_access_key_id = os.getenv('aws_access_key_id'),
    aws_secret_access_key = os.getenv('aws_secret_access_key'),
    aws_session_token = os.getenv('aws_session_token')
)

In [None]:
spark

In [12]:
pip3 install --upgrade jupyter boto3 aws-glue-sessions   

Note: you may need to restart the kernel to use updated packages.


ERROR: Ignored the following versions that require a different python version: 0.9.1 Requires-Python >=2.7,<3.0; 1.0.2 Requires-Python >=3,<3.8
ERROR: Could not find a version that satisfies the requirement awsglue-local (from versions: none)
ERROR: No matching distribution found for awsglue-local


In [4]:
pip install awsglue-local

Note: you may need to restart the kernel to use updated packages.


ERROR: Ignored the following versions that require a different python version: 0.9.1 Requires-Python >=2.7,<3.0; 1.0.2 Requires-Python >=3,<3.8
ERROR: Could not find a version that satisfies the requirement awsglue-local (from versions: none)
ERROR: No matching distribution found for awsglue-local


In [14]:
!jupyter notebook

^C


In [5]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext

ModuleNotFoundError: No module named 'awsglue'

In [7]:
import boto3
s3 = boto3.resource('s3')

In [9]:

%glue_version 3.0

UsageError: Line magic function `%glue_version` not found.


In [4]:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

RuntimeError: Java gateway process exited before sending its port number

Running fixes

In [19]:
def read_and_set_datatype() -> tuple:
    """
    Initiates the main execution where a logger is initialized, the current directory is captured, and various tables are loaded,
    transformed, concatenated into a single dataframe via join, and finally saved in a directory in the 'parquet' fixed format.
    """
    logger = logging.getLogger(__name__)
    logging.basicConfig(level=logging.INFO)

    load_banks = LoadData(
        spark=spark,
        file_directory='s3://bucket-boto-gabrafur-usp/raw_files/banco/', 
        file_type='tsv',
        separator='\t'
    )
    df_raw_banks = load_banks.load()

    load_employees = LoadData(
        spark=spark,
        file_directory='s3://bucket-boto-gabrafur-usp/raw_files/empregados/', 
        file_type='csv',
        separator='|'
    )
    df_raw_employees = load_employees.load()

    load_complaints = LoadData(
        spark=spark,
        file_directory='s3://bucket-boto-gabrafur-usp/raw_files/reclamacoes/', 
        file_type='csv',
        separator=';'
    )
    df_raw_complaints = load_complaints.load()

    transform_banks = BanksTransformation(df_raw_banks)
    df_banks = transform_banks.transform()

    transform_employees = EmployeesTransformation(df_raw_employees)
    df_employees = transform_employees.transform()

    transform_complaints = ComplaintsTransformation(df_raw_complaints)
    df_complaints = transform_complaints.transform()

    return df_banks, df_employees, df_complaints

df_banks, df_employees, df_complaints = read_and_set_datatype()

NameError: name 'spark' is not defined

In [None]:
   
    df_grouped_employees = transform_employees.calculate_aggregates(df_employees)
    
    df_grouped_complaints = transform_complaints.calculate_aggregates(df_complaints)
   
    df_complaints_banks = df_banks.join(df_grouped_complaints, on=['nome'], how='inner')
    logging.info(f'Count Reclama��es x Bancos: \n{df_complaints_banks.count()}')

    df_complaints_banks_employees = df_complaints_banks.join(df_grouped_employees, on=['nome'], how='inner')
    logging.info(f'Count Reclama��es x Bancos x Empregados: \n{df_complaints_banks_employees.count()}')

    output_directory = 's3://805766217211-transformed/atividade03'
    write_data = DataWriter()
    write_data.write_parquet(df_complaints_banks_employees, output_directory)

In [None]:
class DataWriter:
    def write_parquet(self, df, output_directory, mode="overwrite", repartition=True):
        if repartition:
            df.repartition(1).write.mode(mode).parquet(output_directory)
        else:
            df.write.mode(mode).parquet(output_directory)
            
