# Case Nestlé - Data Engineer

In [None]:
# Caso não tenha o  pyspark instalado executar o seguinte comando:
# pip install pyspark

In [92]:
# Bibliotecas utilizadas
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, col, sha2, udf, StringType

In [8]:
# Criando uma nova sessão Spark
spark = SparkSession.builder.appName("DE").getOrCreate()

## Análise exploratória das bases

In [71]:
# Input DataFrames
cargos_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("./Base_case/BaseCargos.csv")
cep_df = spark.read.format("csv").option("header", "true").option("delimiter", "|").load("./Base_case/BaseCEP.csv")
clientes_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("./Base_case/BaseClientes.csv")
funcionarios_df = spark.read.format("csv").option("header", "true").option("delimiter", "||").load("./Base_case/BaseFuncionarios.csv")
nivel_df = spark.read.format("csv").option("header", "true").option("delimiter", "%").load("./Base_case/BaseNivel.csv")
pq_df = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("./Base_case/BasePQ.csv")

In [73]:
#Função para retornar os 5 primeiros valores de cada df
def show_df(df):
    return df.show(5)

In [74]:
show_df(cargos_df)

+-----+----------+--------------+--------+---------+--------+-----+-----------+
|Cargo|     Nível|          Área|COD Área|COD Nível|  Quadro|Bonus|Contratacao|
+-----+----------+--------------+--------+---------+--------+-----+-----------+
|  OPV|   Diretor|     Operações|     JAJ|       JE| Efetivo|    S|  Diretoria|
|  LOI|Estagiário|     Logísitca|     EDE|       JA| Efetivo|    N|    Gerente|
|  ADI|Estagiário|Administrativo|     BAC|       JA| Efetivo|    N|    Gerente|
| ADII|  Analista|Administrativo|     BAC|       DB|Terceiro|    N|         RH|
| OPII|  Analista|     Operações|     JAJ|       DB|Terceiro|    N|         RH|
+-----+----------+--------------+--------+---------+--------+-----+-----------+
only showing top 5 rows



In [75]:
show_df(cep_df)

+--------+-------------------+--------------+
|     CEP|             Estado|        Região|
+--------+-------------------+--------------+
|20125535|        Mato Grosso|Centro - Oeste|
|25995770|Rio Grande do Norte|      Nordeste|
|37278465|            Sergipe|      Nordeste|
|49897703|        Mato Grosso|Centro - Oeste|
|30149335|        Mato Grosso|Centro - Oeste|
+--------+-------------------+--------------+
only showing top 5 rows



In [76]:
show_df(clientes_df)

23/03/20 11:00:15 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Cliente, Valor Contrato Anual, Quantidade de Serviços, Cargo Responsável, CEP, Data Início Contrato, Nivel de Importancia, , , , , 
 Schema: Cliente, Valor Contrato Anual, Quantidade de Serviços, Cargo Responsável, CEP, Data Início Contrato, Nivel de Importancia, _c7, _c8, _c9, _c10, _c11
Expected: _c7 but found: 
CSV file: file:///Users/raquel.bustamante/Documents/Nestlé/Base_case/BaseClientes.csv
+------------------+--------------------+----------------------+-----------------+---------+--------------------+--------------------+----+----+----+----+----+
|           Cliente|Valor Contrato Anual|Quantidade de Serviços|Cargo Responsável|      CEP|Data Início Contrato|Nivel de Importancia| _c7| _c8| _c9|_c10|_c11|
+------------------+--------------------+----------------------+-----------------+---------+--------------------+--------------------+----+----+----+----+----+
|Teixeira Gonçalves|    

In [80]:
# show_df(funcionarios_df)

In [81]:
# show_df(nivel_df)

In [82]:
# show_df(pq_df)

In [86]:
# Função para análise de quantidade de registros e colunas em cada base
dfs = [
    ("Cargos", cargos_df), 
    ("Base CEP", cep_df), 
    ("Base Clientes", clientes_df,),
    ("Base Funcionarios", funcionarios_df), 
    ("Base Nivel", nivel_df), 
    ("Base PQ", pq_df)
]

for base, df in dfs:
    print(f'{base} tem {df.count()} registros e {len(df.columns)} colunas')

    print(f"Valores nulos em {base}:")
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

Cargos tem 25 registros e 8 colunas
Valores nulos em Cargos:
+-----+-----+----+--------+---------+------+-----+-----------+
|Cargo|Nível|Área|COD Área|COD Nível|Quadro|Bonus|Contratacao|
+-----+-----+----+--------+---------+------+-----+-----------+
|    0|    5|   0|       0|        0|     0|    0|          0|
+-----+-----+----+--------+---------+------+-----+-----------+

Base CEP tem 860 registros e 3 colunas
Valores nulos em Base CEP:
+---+------+------+
|CEP|Estado|Região|
+---+------+------+
|  0|    25|     0|
+---+------+------+

Base Clientes tem 325 registros e 12 colunas
Valores nulos em Base Clientes:
23/03/20 11:14:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Cliente, Valor Contrato Anual, Quantidade de Serviços, Cargo Responsável, CEP, Data Início Contrato, Nivel de Importancia, , , , , 
 Schema: Cliente, Valor Contrato Anual, Quantidade de Serviços, Cargo Responsável, CEP, Data Início Contrato, Nivel de Importancia, _c7, _c8, _c9, _c10, _

## Tratativa dados sensíveis

Foi escolhida a coluna CPF das bases funcionários e PQ para serem tratadas como dados sensíveis. Para isso, é possível utilizar a função sha2 para substituir os valores por hash.

In [93]:
def hash_cpf(cpf):
    return sha2(cpf, 256)

hash_cpf_udf = udf(hash_cpf, StringType())

In [97]:
funcionarios_df = funcionarios_df.withColumn("CPF", hash_cpf_udf("CPF"))
pq_df = pq_df.withColumn("CPF", hash_cpf_udf("CPF"))

## Insights

Baseado na análise exploratória feita sobre as tabelas quanto ao seus devidos conteúdos, foi escolhido gerar 5 insights sobre os dados
1. Quantidade de clientes e funcionários por estado
2. Distribuição de cargos por nível
3. 

In [98]:
# Quantidade de clientes e funcionários por estado

clientes_estado_df = clientes_df.join(cep_df, "CEP").groupBy("Estado").count().orderBy("count",ascending=False)
funcionarios_estado_df = funcionarios_df.join(cep_df, "CEP").groupBy("Estado").count().orderBy("count", ascending=False)


In [100]:
clientes_funcionarios_estado = clientes_estado_df.join(funcionarios_estado_df, "Estado", "outer").fillna[0]

TypeError: 'method' object is not subscriptable

In [None]:

# Une as duas tabelas em uma só
clientes_e_funcionarios_por_estado_df = clientes_por_estado_df.join(funcionarios_por_estado_df, "Estado", "outer").fillna(0)

# Renomeia as colunas
clientes_e_funcionarios_por_estado_df = clientes_e_funcionarios_por_estado_df.withColumnRenamed("count", "Num. Clientes").withColumnRenamed("count", "Num. Funcionários")

clientes_e_funcionarios_por_estado_df.show()
