#Desafio técnico MINEHR

### Por: Katia Cardoso

## Importação de diferentes bases

In [0]:
import pandas as pd
import numpy as np
import os

# URL do arquivo CSV
url_csv = 'https://storage.googleapis.com/desafio-ed/ingestion/base_colaboradores_2022.csv'
# URL do arquivo Excel (.xlsx)
url_excel = 'https://storage.googleapis.com/desafio-ed/ingestion/base_colaboradores_2023.xlsx'

# Importando dados do Excel Online
df_xlsx = pd.read_excel(url_excel)

# Importando dados do CSV com codificação "latin-1" e separador ";"
df_csv = pd.read_csv(url_csv, encoding='latin-1', sep=';')

## Padronização dos nomes das colunas


Visualização do nome das colunas de cada DataFrame

In [0]:
# Imprimir os nomes das colunas dos DataFrames
print("Nomes das colunas de df_csv antes dos ajustes:")
print(df_csv.columns)

print("\nNomes das colunas de df_xlsx antes dos ajustes:")
print(df_xlsx.columns)


Nomes das colunas de df_csv antes dos ajustes:
Index(['Matrícula    ', 'MÊS REFERENCIA', 'FUNCIONARIO', 'DATA ADMISSAO',
       'NACIONALIDADE', 'DATA DEMISSAO', 'ESTADO CIVIL', 'SEXO', 'CARGO',
       'EMPRESA', 'DATA NASCIMENTO'],
      dtype='object')

Nomes das colunas de df_xlsx antes dos ajustes:
Index(['mês referência', 'matricula', 'data admissao', 'funcionario',
       'nacionalidade', 'estado civil', 'sexo', 'cargo', 'empresa',
       'data de nascimento', 'data demissao'],
      dtype='object')



Realização da padronização dos nomes das colunas

In [0]:
from unidecode import unidecode

# Função para padronizar nomes de colunas
def normalize_column_names(df):
    # Padronizar nomes das colunas (acentos, minúsculas e espaços)
    normalized_columns = [unidecode(col).lower().strip().replace(' ', '_') for col in df.columns]
    df.columns = normalized_columns
    return df

# Padronizar nomes das colunas para ambos os DataFrames
df_csv = normalize_column_names(df_csv)
df_xlsx = normalize_column_names(df_xlsx)

# Modificar o nome da coluna "data_de_nascimento" no DataFrame do Excel
df_xlsx.rename(columns={'data_de_nascimento': 'data_nascimento'}, inplace=True)

# Verificar se os nomes das colunas são idênticos
if df_csv.columns.equals(df_xlsx.columns):
    # Se os nomes das colunas são idênticos, reorganize as colunas
    df_csv = df_csv[df_xlsx.columns]
else:
    # Imprimir as colunas que são diferentes
    different_columns = [col for col in df_csv.columns if col not in df_xlsx.columns]
    print(f"Colunas diferentes nos DataFrames: {different_columns}")

# se o resultado for [], isto quer dizer que nao tem nenhuma coluna com nome diferente. Caso contrário, irá aparecer o nome da coluna que ainda necessite de algum processo de padronização 


Colunas diferentes nos DataFrames: []



Visualização dos nomes das colunas após a padronização

In [0]:
# Imprimir os nomes das colunas dos DataFrames
print("Nomes das colunas de df_csv apos ajustes:")
print(df_csv.columns)

print("\nNomes das colunas de df_xlsx apos ajustes:")
print(df_xlsx.columns)



Nomes das colunas de df_csv apos ajustes:
Index(['matricula', 'mes_referencia', 'funcionario', 'data_admissao',
       'nacionalidade', 'data_demissao', 'estado_civil', 'sexo', 'cargo',
       'empresa', 'data_nascimento'],
      dtype='object')

Nomes das colunas de df_xlsx apos ajustes:
Index(['mes_referencia', 'matricula', 'data_admissao', 'funcionario',
       'nacionalidade', 'estado_civil', 'sexo', 'cargo', 'empresa',
       'data_nascimento', 'data_demissao'],
      dtype='object')



É importante checar se todas as colunas estão com nomes idênticos, pois caso contrário, no momento da união, serão geradas colunas a mais 

## União das bases

In [0]:
# Concatenar os DataFrames por linhas (um abaixo do outro)
df_final = pd.concat([df_csv, df_xlsx], ignore_index=True)

# Salvando o DataFrame como um arquivo CSV
df_final.to_csv('df_final.csv', index=False, encoding='utf-8')

# Visualizando as primeiras 5 linhas do DataFrame
df_final.head()

Unnamed: 0,matricula,mes_referencia,funcionario,data_admissao,nacionalidade,data_demissao,estado_civil,sexo,cargo,empresa,data_nascimento
0,27,28/02/2022,Colaborador 27,05/12/2005,brasil,,Casado,Masculino,Eletr Lv Transmissao Iii,Empresa 6,01/01/1977
1,28,31/01/2022,Colaborador 28,14/04/2003,brasil,,Solteiro,Feminino,Coord Tesouraria,Empresa 5,01/04/1979
2,28,28/02/2022,Colaborador 28,14/04/2003,brasil,,Casado,Feminino,Coord Tesouraria,Empresa 5,01/04/1979
3,28,31/03/2022,Colaborador 28,14/04/2003,brasil,,Solteiro,Feminino,Coord Tesouraria,Empresa 2,01/04/1979
4,28,30/04/2022,Colaborador 28,14/04/2003,brasil,,Solteiro,Feminino,Coord Tesouraria,Empresa 2,01/04/1979


In [0]:
#Verificação do numero de linhas e colunas arquivo .csv
num_linhas_csv, num_colunas_csv = df_csv.shape
print(f'Número de linhas csv : {num_linhas_csv}')
print(f'Número de colunas csv: {num_colunas_csv} \n')


#Verificação do numero de linhas e colunas arquivo .xlsx
num_linhas_xlsx, num_colunas_xlsx = df_xlsx.shape
print(f'Número de linhas xlsx: {num_linhas_xlsx}')
print(f'Número de colunas xlsx : {num_colunas_xlsx} \n ')

#Verificação do numero de linhas e colunas arquivo final
num_linhas, num_colunas = df_final.shape
print(f'Número de linhas final: {num_linhas}')
print(f'Número de colunas final: {num_colunas}')


Número de linhas csv : 20054
Número de colunas csv: 11 

Número de linhas xlsx: 27297
Número de colunas xlsx : 11 
 
Número de linhas final: 47351
Número de colunas final: 11



Com esta última célula de verificação de contagem de linhas e colunas até o momento é possível notar que neste caso a união teve êxito, pois a soma das linhas de ambos os DataFrames separados é compativel com a quantidade do DataFrame resultante da junção dos dois. Bem como, é possível notar que a quantidade de colunas foi mantida, o que indica que não houve percas nem acréscimo de colunas no decorrer do processo


## Conversão para um DataFrame Spark

In [0]:
# Verificação dos tipos de dados das colunas
print(df_final.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 47351 entries, 0 to 47350
Data columns (total 11 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   matricula        47351 non-null  int64 
 1   mes_referencia   47351 non-null  object
 2   funcionario      47351 non-null  object
 3   data_admissao    47351 non-null  object
 4   nacionalidade    47346 non-null  object
 5   data_demissao    436 non-null    object
 6   estado_civil     47116 non-null  object
 7   sexo             47351 non-null  object
 8   cargo            47351 non-null  object
 9   empresa          47351 non-null  object
 10  data_nascimento  47351 non-null  object
dtypes: int64(1), object(10)
memory usage: 4.0+ MB
None


Esta conversão abaixo das colunas com o tipo 'object' para 'str' foi necessária para garantir que todas as entradas sejam tratadas como strings. Pois o DataFrame Spark, tem um sistema de tipos mais estruturado e requer tipos de dados consistentes em cada coluna

In [0]:
import pyspark.sql.types as t

# Conversão do tipo de dados das colunas com o tipo 'object' para 'str'
for col in df_final.columns:
    if df_final[col].dtype == 'object':
        df_final[col] = df_final[col].astype(str)

In [0]:
from pyspark.sql import SparkSession

# Conversão do DataFrame do Pandas para um DataFrame do Spark
spark_df = spark.createDataFrame(df_final)

# Exiba o esquema do DataFrame Spark
spark_df.printSchema()

root
 |-- matricula: long (nullable = true)
 |-- mes_referencia: string (nullable = true)
 |-- funcionario: string (nullable = true)
 |-- data_admissao: string (nullable = true)
 |-- nacionalidade: string (nullable = true)
 |-- data_demissao: string (nullable = true)
 |-- estado_civil: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cargo: string (nullable = true)
 |-- empresa: string (nullable = true)
 |-- data_nascimento: string (nullable = true)



In [0]:
# Exiba o DataFrame Spark
spark_df.show()

+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+--------------------+---------+---------------+
|matricula|mes_referencia|   funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|     sexo|               cargo|  empresa|data_nascimento|
+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+--------------------+---------+---------------+
|       27|    28/02/2022|Colaborador 27|   05/12/2005|     brasil  |          nan|      Casado|Masculino|Eletr Lv Transmis...|Empresa 6|     01/01/1977|
|       28|    31/01/2022|Colaborador 28|   14/04/2003|     brasil  |          nan|    Solteiro| Feminino|    Coord Tesouraria|Empresa 5|     01/04/1979|
|       28|    28/02/2022|Colaborador 28|   14/04/2003|     brasil  |          nan|      Casado| Feminino|    Coord Tesouraria|Empresa 5|     01/04/1979|
|       28|    31/03/2022|Colaborador 28|   14/04/2003|     brasil  |       

In [0]:
# Verificação do tipo de dados do DataFrame do Spark
print(type(spark_df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
# Imprime o esquema do DataFrame do Spark
print(spark_df.dtypes)

[('matricula', 'bigint'), ('mes_referencia', 'string'), ('funcionario', 'string'), ('data_admissao', 'string'), ('nacionalidade', 'string'), ('data_demissao', 'string'), ('estado_civil', 'string'), ('sexo', 'string'), ('cargo', 'string'), ('empresa', 'string'), ('data_nascimento', 'string')]


verificação do número de linhas e colunas após a conversão para Dataframe do Spark

In [0]:
# Número de linhas
num_linhas = spark_df.count()

# Número de colunas
num_colunas = len(spark_df.columns)

# Exibir o número de linhas e colunas
print(f"Número de linhas: {num_linhas}")
print(f"Número de colunas: {num_colunas}")

Número de linhas: 47351
Número de colunas: 11


## Padronização dos dados


 Para as colunas textuais, retire os leading and trailing whitespace e coloque as iniciais maiúsculas.
 
 Resumo de algumas funções importadas:
 - trim(col(coluna)) remove os espaços em branco no início e no final de cada string na coluna
 - initcap() coloca as iniciais das palavras em maiúsculas. 

Como adicional, utilizei a função translate para retirada de acentuação também. Mais para frente para contagem da coluna "estado_civil" estava ocorrendo duplicidade dos dados 

In [0]:
from pyspark.sql.functions import trim, initcap, col, translate

# Lista de todas as colunas do DataFrame
columns = spark_df.columns

# Lista de colunas textuais 
textual_columns = ['mes_referencia', 'funcionario', 'data_admissao', 'nacionalidade', 'data_demissao', 'estado_civil', 'sexo', 'cargo', 'empresa', 'data_nascimento']  

# Aplicação das transformações nas colunas textuais
for column in textual_columns:
    spark_df = spark_df.withColumn(column, initcap(trim(translate(col(column), 'áéíóúãõâêîôûàèìòùäëïöü', 'aeiouaoaeiouaeiou'))))

# Mostra o DataFrame após as transformações
spark_df.show()

+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+--------------------+---------+---------------+
|matricula|mes_referencia|   funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|     sexo|               cargo|  empresa|data_nascimento|
+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+--------------------+---------+---------------+
|       27|    28/02/2022|Colaborador 27|   05/12/2005|       Brasil|          Nan|      Casado|Masculino|Eletr Lv Transmis...|Empresa 6|     01/01/1977|
|       28|    31/01/2022|Colaborador 28|   14/04/2003|       Brasil|          Nan|    Solteiro| Feminino|    Coord Tesouraria|Empresa 5|     01/04/1979|
|       28|    28/02/2022|Colaborador 28|   14/04/2003|       Brasil|          Nan|      Casado| Feminino|    Coord Tesouraria|Empresa 5|     01/04/1979|
|       28|    31/03/2022|Colaborador 28|   14/04/2003|       Brasil|       


Para as colunas numéricas, certifique-se que tenha no máximo 4 casa decimais. 

Neste código abaixo, é uma verificação de comprimento da coluna "matricula" única coluna numérica até o momento e aqui foi levado em consideração 4 "digitos" e não "casas decimais". Contudo, adiante terão algumas outras colunas numéricas com virgulas (media_idade) e será realizado um round (arrendondamento) para 4 cadas decimais

In [0]:
from pyspark.sql.functions import col, length

# Nome da coluna numérica 
numeric_columns = 'matricula'

# Verificação do comprimento dos valores na coluna numérica
spark_df.withColumn('comprimento', length(col(numeric_columns))).filter(col('comprimento') > 4).show()

+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+-----------+
|matricula|mes_referencia|funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|sexo|cargo|empresa|data_nascimento|comprimento|
+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+-----------+
+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+-----------+




Abordagem de valores nulos escolhida

Para dados textuais: Substituição dos valores nulos e NaN por "Nao preenchido" nas colunas estado_civil e nacionalidade

Para dados numéricos: Não mexer com os nulos e deixar como estão, pois se estes dados forem excluidos agora podem tendenciar analises futuras e caso eles não sejam importantes lá, é recomendado a realização de uma filtragem por dados não nulos



- Contagem de valores nulos

In [0]:
from pyspark.sql.functions import isnan, when, count

# Verificação de valores nulos em todas as colunas
spark_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+
|matricula|mes_referencia|funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|sexo|cargo|empresa|data_nascimento|
+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+
|        0|             0|          0|            0|            5|        19886|         235|   0|    0|      0|              0|
+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+



- Substituição dos valores textuais

In [0]:
# Substituição dos valores nulos e NaN por "Nao preenchido" nas colunas estado_civil e nacionalidade
spark_df = spark_df.withColumn("estado_civil", when(col("estado_civil").isNull() | isnan(col("estado_civil")), "Nao preenchido").otherwise(col("estado_civil")))
spark_df = spark_df.withColumn("nacionalidade", when(col("nacionalidade").isNull() | isnan(col("nacionalidade")), "Nao preenchido").otherwise(col("nacionalidade")))

# Verificação de valores nulos em todas as colunas
spark_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+
|matricula|mes_referencia|funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|sexo|cargo|empresa|data_nascimento|
+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+
|        0|             0|          0|            0|            0|        19886|           0|   0|    0|      0|              0|
+---------+--------------+-----------+-------------+-------------+-------------+------------+----+-----+-------+---------------+



- verificação da porcentagem de nulos no DataFrame

In [0]:
from pyspark.sql.functions import col, isnan, when

# Calcula o número total de linhas no DataFrame
total_linhas = spark_df.count()

# Calculo da porcentagem de dados nulos ou NaN para cada coluna
porcentagens_nulos_nan = []

for coluna in spark_df.columns:
    # Calculo do número de linhas com valores nulos ou NaN na coluna atual
    nulos_ou_nan_na_coluna = spark_df.where((col(coluna).isNull()) | (isnan(col(coluna)))).count()
    
    # Calculo da porcentagem de dados nulos ou NaN na coluna atual
    porcentagem_nulos_nan = (nulos_ou_nan_na_coluna / total_linhas) * 100
    
    # Adiciona o nome da coluna e a porcentagem de nulos ou NaN à lista
    porcentagens_nulos_nan.append((coluna, porcentagem_nulos_nan))

# Mostra as porcentagens de dados nulos ou NaN para todas as colunas
for coluna, porcentagem in porcentagens_nulos_nan:
    print(f"A coluna '{coluna}' tem {porcentagem:.2f}% de dados nulos ou NaN.")


A coluna 'matricula' tem 0.00% de dados nulos ou NaN.
A coluna 'mes_referencia' tem 0.00% de dados nulos ou NaN.
A coluna 'funcionario' tem 0.00% de dados nulos ou NaN.
A coluna 'data_admissao' tem 0.00% de dados nulos ou NaN.
A coluna 'nacionalidade' tem 0.00% de dados nulos ou NaN.
A coluna 'data_demissao' tem 42.00% de dados nulos ou NaN.
A coluna 'estado_civil' tem 0.00% de dados nulos ou NaN.
A coluna 'sexo' tem 0.00% de dados nulos ou NaN.
A coluna 'cargo' tem 0.00% de dados nulos ou NaN.
A coluna 'empresa' tem 0.00% de dados nulos ou NaN.
A coluna 'data_nascimento' tem 0.00% de dados nulos ou NaN.



Após a visualização das porcentagens de números nulos de cada coluna é possível observar que a coluna "data_demissão" possui uma quantidade bastante expresiva de dados nulos, o que indica apenas por esta analise que um pouco mais da metade dos funcionarios presentes nesta base estão empregados na empresa e apartir dai, abrir margens para  diversas técnicas e abordagens para análise desses dados



## Criação de coluna via de-para (grupo cargo)



Importação da base via URL

In [0]:
# URL do arquivo Excel (.xlsx)
url= 'https://storage.googleapis.com/desafio-ed/ingestion/de-para.xlsx'

# Importação dos dados do Excel Online
df_de_para = pd.read_excel(url)

# Visualzação do DataFrame
df_de_para.head()


Unnamed: 0,grupo cargo,cargo
0,Analista,Adm Contratos
1,Analista,Administrador
2,Analista,Administrador Tecnico
3,Analista,Analista Administ Jr
4,Analista,Anl Administrativo I


Padronização dos nomes das colunas

In [0]:
#Padronização do nome das colunas
df_de_para = normalize_column_names(df_de_para)

# Converção do DataFrame Pandas para um DataFrame PySpark
spark_de_para = spark.createDataFrame(df_de_para)

#Visualização do Dataframe Spark 
spark_de_para.printSchema()

root
 |-- grupo_cargo: string (nullable = true)
 |-- cargo: string (nullable = true)



União da base de_para com o DataFrame original, utilizando a função de broadcast (envia uma cópia local dessa base de_para para os clusters para quando for utilizar no join ficar mais perto do que o contato com um servidor central)



In [0]:
from pyspark.sql.functions import broadcast

# Realização do broadcast do DataFrame de_para
broadcast = broadcast(spark_de_para)

# Realização do join entre o DataFrame original e o DataFrame de_para utilizando a coluna "cargo" como chave 
spark_df = spark_df.join(broadcast, "cargo", "left_outer")

# Reorganização das colunas para ter "grupo_cargo" à esquerda de "Cargo"
spark_df = spark_df.select(*[coluna for coluna in spark_df.columns if coluna not in ["cargo", "grupo_cargo"]], "grupo_cargo", "cargo")

# Mostra o DataFrame resultante
spark_df.show()

+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+
|matricula|mes_referencia|   funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|     sexo|  empresa|data_nascimento|     grupo_cargo|               cargo|
+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+
|       27|    28/02/2022|Colaborador 27|   05/12/2005|       Brasil|          Nan|      Casado|Masculino|Empresa 6|     01/01/1977|         Tecnico|Eletr Lv Transmis...|
|       28|    31/01/2022|Colaborador 28|   14/04/2003|       Brasil|          Nan|    Solteiro| Feminino|Empresa 5|     01/04/1979|     Coordenador|    Coord Tesouraria|
|       28|    28/02/2022|Colaborador 28|   14/04/2003|       Brasil|          Nan|      Casado| Feminino|Empresa 5|     01/04/1979|     Coordena


## Criação das colunas inteiras: idade e tempo_empresa


- idade: idade da pessoa naquele mês referência;
- tempo_empresa: o tempo, em meses, em que a pessoa pessoa está ativa na empresa até aquele mês;


In [0]:
from pyspark.sql.functions import year, months_between, when, current_date
import pyspark.sql.functions as f
from pyspark.sql.functions import datediff, to_date, lit, floor

# Conversão das strings de data para o formato de data internacional
spark_df = spark_df.withColumn("data_nascimento", to_date(col("data_nascimento"), "dd/MM/yyyy"))
spark_df = spark_df.withColumn("mes_referencia", to_date(col("mes_referencia"), "dd/MM/yyyy"))

# Calculo da diferença em dias entre a data de referência e a data de nascimento
diferenca_dias = datediff(col("mes_referencia"), col("data_nascimento"))

# Calculo da idade em anos, com a divisão por 365 e arredondamento para baixo
idade = floor(diferenca_dias / lit(365))

# Adição da coluna de idade ao DataFrame original
spark_df = spark_df.withColumn("idade", idade)

# Mostrar o DataFrame resultante
#spark_df.show()


# Filtro das linhas onde data_demissao é nula (ainda não foram demitidos)
funcionarios_ativos = spark_df.filter(col("data_demissao").isNull())

# Conversão das strings de data para o formato de data internacional
spark_df = spark_df.withColumn("data_admissao", to_date(col("data_admissao"), "dd/MM/yyyy"))
spark_df = spark_df.withColumn("mes_referencia", to_date(col("mes_referencia"), "dd/MM/yyyy"))

# Calculo da diferença em meses entre a data de demissão e a data de admissão
diferenca_meses = months_between(col("mes_referencia"), col("data_admissao"))

# Arredondamento para baixo do resultado da diferença em meses
tempo_empresa_meses = floor(diferenca_meses)

# Adição da coluna de tempo na empresa ao DataFrame original
spark_df = spark_df.withColumn("tempo_empresa", tempo_empresa_meses)

# Mostra o DataFrame resultante
spark_df.show()


+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+-----+-------------+
|matricula|mes_referencia|   funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|     sexo|  empresa|data_nascimento|     grupo_cargo|               cargo|idade|tempo_empresa|
+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+-----+-------------+
|       27|    2022-02-28|Colaborador 27|   2005-12-05|       Brasil|          Nan|      Casado|Masculino|Empresa 6|     1977-01-01|         Tecnico|Eletr Lv Transmis...|   45|          194|
|       28|    2022-01-31|Colaborador 28|   2003-04-14|       Brasil|          Nan|    Solteiro| Feminino|Empresa 5|     1979-04-01|     Coordenador|    Coord Tesouraria|   42|          225|
|       28|    2022-02-28|Colaborador 28|   2


## Criação de duas colunas categóricas: ds_idade_cat e ds_tempo_empresa_cat




In [0]:
from pyspark.sql.functions import when

# Definição das condições para categorias de idade
condicoes_idade = [
    (col("idade") < 21, "Menos de 21 Anos"),
    (col("idade").between(21, 25), "Entre 21 E 25 Anos"),
    (col("idade").between(26, 30), "Entre 26 E 30 Anos"),
    (col("idade").between(31, 40), "Entre 31 E 40 Anos"),
    (col("idade").between(41, 50), "Entre 41 E 50 Anos"),
    (col("idade") > 50, "Acima De 50 Anos")
]

# Definição das condições para categorias de tempo de empresa (em meses)
condicoes_tempo_empresa = [
    (col("tempo_empresa") < 3, "Até 3 meses"),
    (col("tempo_empresa").between(3, 6), "Entre 3 E 6 Meses"),
    (col("tempo_empresa").between(2, 12), "Entre 6 Meses E 1 Ano"),
    (col("tempo_empresa").between(12, 24), "Entre 1 E 2 Anos"),
    (col("tempo_empresa").between(24, 60), "Entre 2 E 5 Anos"),
    (col("tempo_empresa").between(60, 120), "Entre 5 E 10 Anos"),
    (col("tempo_empresa") > 120, "Acima De 10 Anos")
]

# Aplicação das condições e criação das colunas categóricas
spark_df = spark_df.withColumn("ds_idade_cat", 
                                                   when(condicoes_idade[0][0], condicoes_idade[0][1])
                                                    .when(condicoes_idade[1][0], condicoes_idade[1][1])
                                                    .when(condicoes_idade[2][0], condicoes_idade[2][1])
                                                    .when(condicoes_idade[3][0], condicoes_idade[3][1])
                                                    .when(condicoes_idade[4][0], condicoes_idade[4][1])
                                                    .when(condicoes_idade[5][0], condicoes_idade[5][1])
                                                    .otherwise("Desconhecido"))
                                            
                                            
                                            
spark_df = spark_df.withColumn("ds_tempo_empresa_cat", 
                                                 when(condicoes_tempo_empresa[0][0], condicoes_tempo_empresa[0][1])
                                                    .when(condicoes_tempo_empresa[1][0], condicoes_tempo_empresa[1][1])
                                                    .when(condicoes_tempo_empresa[2][0], condicoes_tempo_empresa[2][1])
                                                    .when(condicoes_tempo_empresa[3][0], condicoes_tempo_empresa[3][1])
                                                    .when(condicoes_tempo_empresa[4][0], condicoes_tempo_empresa[4][1])
                                                    .when(condicoes_tempo_empresa[5][0], condicoes_tempo_empresa[5][1])
                                                    .when(condicoes_tempo_empresa[6][0], condicoes_tempo_empresa[6][1])
                                                    .otherwise("Desconhecido"))

# Visualizar o DataFrame resultante com as novas colunas categóricas
spark_df.select("ds_idade_cat", "ds_tempo_empresa_cat").show()

spark_df.show()


+------------------+--------------------+
|      ds_idade_cat|ds_tempo_empresa_cat|
+------------------+--------------------+
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 41 E 50 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
|Entre 31 E 40 Anos|    Acima De 10 Anos|
+------------------+--------------


### Salvar o dataFrame neste ponto para o arquivo "mensalizada.parquet"


Apesar de muito esforço, encontrei algumas dificuldades técnicas que me impediu de entregar o arquivo no formato parquet solicitado. As dificuldades encontradas envolveram principalmente a forma como esta plataforma trata seus arquivos e como retirar eles dela :) . Apesar desse impedimento, continuei a exploração e transformação dos dados conforme as especificações do case.

Abaixo, até a proxima seção de agrupamento dos dados, seguem as minhas tentativas:


In [0]:
#from pyspark_sql_utils import to_pandas

# Salva o dataframe em formato Parquet
#spark_df.write.parquet("mensalizada.parquet")

# Baixa o arquivo Parquet
#spark_df.download("/tmp/mensalizada.parquet")
#download(spark_df, "/tmp/mensalizada.parquet", format="parquet")


# Converte o dataframe Spark para um DataFrame Pandas
#df_pandas = to_pandas(spark_df)
# Salva o DataFrame Pandas em um arquivo Parquet
#df_pandas.to_parquet("mensalizada.parquet")

# Salva o dataframe Spark em um arquivo Parquet
spark_df.write.mode("append").parquet("/tmp/mensalizada.parquet")     

#ultima tentativa que deu +-, porém o arquivo fica com tamanho zerado


In [0]:
%fs ls /tmp/

path,name,size,modificationTime
dbfs:/tmp/arquivo.parquet/,arquivo.parquet/,0,1698860782664
dbfs:/tmp/mensalizada.parquet/,mensalizada.parquet/,0,1698860782664


In [0]:
#display(dbutils.fs.ls("dbfs:/tmp/"))
# Lê o dataframe Spark de um arquivo Parquet
#df = spark_df.read.parquet("/tmp/arquivo.parquet")

# Mostra o dataframe Spark
#df.show()

In [0]:
#display(dbutils.fs.ls("dbfs:/tmp/"))

# Especifica o caminho onde deseja salvar o arquivo Parquet
#caminho_parquet = "/dbfs/FileStore"

# Salva o DataFrame como um arquivo Parquet
#spark_df.write.mode("overwrite").parquet(caminho_parquet)

# Verifica se o arquivo existe
#if dbutils.fs.rm(caminho_parquet, True):
#    print(f"Arquivo {caminho_parquet} foi excluído com sucesso!")
#else:
#    print(f"O arquivo {caminho_parquet} não existe ou não pode ser excluído.")

# Escreve o novo arquivo Parquet
#spark_df.write.parquet(caminho_parquet)

In [0]:

# Especifica o caminho onde  deseja salvar o arquivo Parquet
caminho_parquet = "/dbfs/FileStore/mensalizada.parquet"

# Salva o DataFrame como um arquivo Parquet
#spark_df.write.mode("overwrite").parquet(caminho_parquet)

# Verifica se o arquivo existe
#if dbutils.fs.rm(caminho_parquet, True):
#    print(f"Arquivo {caminho_parquet} foi excluído com sucesso!")
#else:
#    print(f"O arquivo {caminho_parquet} não existe ou não pode ser excluído.")

# Escreve o novo arquivo Parquet
#spark_df.write.parquet(caminho_parquet)



In [0]:
# Registra o DataFrame como uma tabela temporária
spark_df.createOrReplaceTempView("mensalizada")

# Executar consultas SQL na tabela temporária
mensalizada = spark.sql("SELECT * FROM mensalizada")

# Mostra o resultado da consulta
mensalizada.show()


+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+-----+-------------+------------------+--------------------+
|matricula|mes_referencia|   funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|     sexo|  empresa|data_nascimento|     grupo_cargo|               cargo|idade|tempo_empresa|      ds_idade_cat|ds_tempo_empresa_cat|
+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+-----+-------------+------------------+--------------------+
|       27|    2022-02-28|Colaborador 27|   2005-12-05|       Brasil|          Nan|      Casado|Masculino|Empresa 6|     1977-01-01|         Tecnico|Eletr Lv Transmis...|   45|          194|Entre 41 E 50 Anos|    Acima De 10 Anos|
|       28|    2022-01-31|Colaborador 28|   2003-04-14|       Brasil|       


## Agrupamento de dados por mês referência e cargo e contagens específicas


Agrupamento de dados por mês referência

In [0]:

# Agrupamento dos dados por mes_referencia e cargo e contagem de registros em cada grupo
spark_df_group = spark_df.groupBy("mes_referencia", "cargo").agg(count("*").alias("total_registros"))


# Visualização do DataFrame resultante
spark_df_group.show()


+--------------+--------------------+---------------+
|mes_referencia|               cargo|total_registros|
+--------------+--------------------+---------------+
|    2022-08-31|Anl Serv Suprimen...|             13|
|    2022-08-31|Anl Desenvolv Sis...|             33|
|    2022-08-31|Anl Serv Contabil Ii|              9|
|    2022-04-30|    Instrutor Tec Ii|              2|
|    2022-01-31| Sup Distribuicao Ii|             13|
|    2022-10-31|Eletr Distribuica...|             62|
|    2022-11-30|   Eletr Inspecao Ii|              6|
|    2022-07-31|Anl Telecomunicac...|              1|
|    2022-11-30|         Anl Teste I|              4|
|    2022-04-30|    Anl Comercial Ii|              2|
|    2022-09-30|            Anl Rh I|              2|
|    2022-03-31|Ger Compras Servicos|              1|
|    2022-07-31|   Anl Financeiro Ii|              2|
|    2022-04-30|      Anl Financas I|              2|
|    2022-06-30|Anl Automacao Tel...|              1|
|    2022-11-30|       Aux C


Contagem do número de mulheres e homens

  - Verificação de valores da coluna "sexo"

In [0]:
# Verificação dos valores únicos na coluna 'sexo'
valores_sexos = spark_df.select('sexo').distinct()

# Visualização do resultado
valores_sexos.show()

+---------+
|     sexo|
+---------+
|      Fem|
|     Masc|
| Feminino|
|Masculino|
+---------+



- Ajuste de dados da coluna: Masc -> Masculino e Fem -> Feminino

In [0]:
from pyspark.sql.functions import when

# Padronização dos valores na coluna 'sexo'
spark_df = spark_df.withColumn('sexo', 
                               when((spark_df.sexo == 'Fem') | (spark_df.sexo == 'Feminino'), 'Feminino')
                               .when((spark_df.sexo == 'Masc') | (spark_df.sexo == 'Masculino'), 'Masculino')
                               .otherwise('Outros'))


spark_df.show()
spark_df_group.show()

+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+-----+-------------+------------------+--------------------+
|matricula|mes_referencia|   funcionario|data_admissao|nacionalidade|data_demissao|estado_civil|     sexo|  empresa|data_nascimento|     grupo_cargo|               cargo|idade|tempo_empresa|      ds_idade_cat|ds_tempo_empresa_cat|
+---------+--------------+--------------+-------------+-------------+-------------+------------+---------+---------+---------------+----------------+--------------------+-----+-------------+------------------+--------------------+
|       27|    2022-02-28|Colaborador 27|   2005-12-05|       Brasil|          Nan|      Casado|Masculino|Empresa 6|     1977-01-01|         Tecnico|Eletr Lv Transmis...|   45|          194|Entre 41 E 50 Anos|    Acima De 10 Anos|
|       28|    2022-01-31|Colaborador 28|   2003-04-14|       Brasil|       

- Contagem das quantidades e visualização

In [0]:

# Agrupamento dos dados por 'mes_referencia', 'cargo' e 'sexo' e contagem da quantidade de ocorrências
spark_df_group= spark_df.groupBy('mes_referencia', 'cargo', 'sexo').count()

# Visualização do resultado da contagem
spark_df_group.show()


+--------------+--------------------+---------+-----+
|mes_referencia|               cargo|     sexo|count|
+--------------+--------------------+---------+-----+
|    2022-09-30|   Coord Servicos Rh|Masculino|    1|
|    2022-09-30|       Anl Compras I|Masculino|    1|
|    2022-12-31|Tec Monitor Operacao|Masculino|    4|
|    2022-07-31|   Eletr Inspecao Ii|Masculino|    6|
|    2022-03-31|Eletr Distribuica...|Masculino|   54|
|    2022-07-31|    Anl Projetos Iii| Feminino|    1|
|    2022-02-28|Mecanico Manutenc...|Masculino|    2|
|    2022-06-30| Anl Banco Dados Iii|Masculino|    6|
|    2022-08-31|Sup Distribuicao Iii|Masculino|    4|
|    2022-09-30|          Secretaria| Feminino|    4|
|    2022-12-31|     Ger Suprimentos|Masculino|    1|
|    2022-06-30| Tec Transmissao Iii|Masculino|    5|
|    2022-05-31| Anl Faturamento Iii| Feminino|    2|
|    2022-06-30|Esp Proc Anl De N...|Masculino|    9|
|    2022-12-31|      Ger Patrimonio|Masculino|    1|
|    2022-01-31|        Advo


Contagem do número de estado civil

- Verificação de valores da coluna "estado_civil"

In [0]:
# Verificação dos valores únicos na coluna 'estado_civil'
valores_estado_civil = spark_df.select('estado_civil').distinct()

# Visualização do resultado
valores_estado_civil.show()

+--------------+
|  estado_civil|
+--------------+
|    Divorciado|
|      Solteiro|
| Uniao Estavel|
|        Casado|
|Nao preenchido|
|         Viuvo|
|      Separado|
+--------------+



- Contagem das quantidades e visualização

In [0]:
# Agrupamento dos dados por 'mes_referencia', 'cargo' e 'estado_civil' e contagem da quantidade de ocorrências
spark_df_group = spark_df.groupBy('mes_referencia', 'cargo', 'estado_civil').count()

# Visualizaçao do resultado da contagem
spark_df_group.show()


+--------------+--------------------+--------------+-----+
|mes_referencia|               cargo|  estado_civil|count|
+--------------+--------------------+--------------+-----+
|    2022-12-31| Sup Distribuicao Ii|        Casado|    5|
|    2022-07-31|Coord Processos A...|        Casado|    2|
|    2022-02-28|        Ger Juridico|        Casado|    1|
|    2022-11-30|Assist Administra...|      Solteiro|   23|
|    2022-06-30|Assist Administra...|      Solteiro|   40|
|    2022-07-31|Anl Infraestrutur...|      Solteiro|    3|
|    2022-01-31|Anl Seg Informaca...|    Divorciado|    1|
|    2022-01-31|        Assist Ti Ii|      Solteiro|   32|
|    2022-10-31|        Esp Contabil|      Solteiro|    2|
|    2022-10-31|          Anl Rh  Ii|Nao preenchido|    1|
|    2022-12-31|   Esp Serv Contabil| Uniao Estavel|    1|
|    2022-02-28|    Assist Tecnico I|      Solteiro|    1|
|    2022-06-30|Ger Corp Rec Fina...|      Solteiro|    1|
|    2022-08-31|Ger Compras Mater...|        Casado|    


Visualização da Média das idades

In [0]:
from pyspark.sql.functions import avg, round

# Agrupamento pelos campos mes_referencia e cargo e calculo da média das idades
spark_df_group= spark_df.groupBy("mes_referencia", "cargo").agg(round(avg("idade"), 4).alias("media_idade"))

# Visualização do resultado
spark_df_group.show()

+--------------+--------------------+-----------+
|mes_referencia|               cargo|media_idade|
+--------------+--------------------+-----------+
|    2022-08-31|Anl Serv Suprimen...|    36.3077|
|    2022-08-31|Anl Desenvolv Sis...|    38.5455|
|    2022-08-31|Anl Serv Contabil Ii|    38.5556|
|    2022-04-30|    Instrutor Tec Ii|       44.0|
|    2022-01-31| Sup Distribuicao Ii|    43.2308|
|    2022-10-31|Eletr Distribuica...|    38.9194|
|    2022-11-30|   Eletr Inspecao Ii|    39.3333|
|    2022-07-31|Anl Telecomunicac...|       59.0|
|    2022-11-30|         Anl Teste I|      27.25|
|    2022-04-30|    Anl Comercial Ii|       34.0|
|    2022-09-30|            Anl Rh I|       30.0|
|    2022-03-31|Ger Compras Servicos|       47.0|
|    2022-07-31|   Anl Financeiro Ii|       37.5|
|    2022-04-30|      Anl Financas I|       26.5|
|    2022-06-30|Anl Automacao Tel...|       33.0|
|    2022-11-30|       Aux Comercial|     31.527|
|    2022-12-31|       Aux Comercial|    31.5753|


Visualização da Idade máxima e mínima;

In [0]:
from pyspark.sql.functions import max, min

# Agrupamento pelos campos mes_referencia e cargo, calculo da idade máxima e mínima
spark_df_group = spark_df.groupBy("mes_referencia", "cargo").agg(
    max("idade").alias("idade_maxima"), 
    min("idade").alias("idade_minima"))

# Visualização do resultado
spark_df_group.show()

+--------------+--------------------+------------+------------+
|mes_referencia|               cargo|idade_maxima|idade_minima|
+--------------+--------------------+------------+------------+
|    2022-08-31|Ger Compras Mater...|          40|          40|
|    2022-08-31|Anl Desenvolv Sis...|          56|          28|
|    2022-08-31|Anl Serv Suprimen...|          42|          28|
|    2022-01-31| Sup Distribuicao Ii|          56|          29|
|    2022-10-31|Eletr Distribuica...|          48|          31|
|    2022-08-31|Anl Serv Contabil Ii|          53|          28|
|    2022-07-31|   Anl Financeiro Ii|          39|          36|
|    2022-12-31|      Anl Serv Rh Ii|          36|          27|
|    2022-12-31|Ger Plan Suprimentos|          40|          40|
|    2022-11-30|         Anl Teste I|          33|          23|
|    2022-10-31|       Anl Teste Iii|          44|          39|
|    2022-04-30|Anl Gestao Desemp...|          35|          31|
|    2022-11-30|       Aux Comercial|   

Visualização da admissão mais antiga por cargo

In [0]:
# Agrupamento pelos campos mes_referencia e cargo, calculo da data de admissão mais antiga
spark_df_group = spark_df.groupBy("mes_referencia", "cargo").agg(
    min("data_admissao").alias("data_admissao_mais_antiga"))

# Visualização do resultado
spark_df_group.show()

+--------------+--------------------+-------------------------+
|mes_referencia|               cargo|data_admissao_mais_antiga|
+--------------+--------------------+-------------------------+
|    2022-08-31|Anl Serv Suprimen...|               2011-09-05|
|    2022-08-31|Anl Desenvolv Sis...|               2006-08-21|
|    2022-08-31|Anl Serv Contabil Ii|               2011-02-01|
|    2022-04-30|    Instrutor Tec Ii|               1997-08-06|
|    2022-01-31| Sup Distribuicao Ii|               1989-11-20|
|    2022-10-31|Eletr Distribuica...|               2005-01-03|
|    2022-11-30|   Eletr Inspecao Ii|               2008-11-03|
|    2022-07-31|Anl Telecomunicac...|               1984-01-02|
|    2022-11-30|         Anl Teste I|               2013-10-10|
|    2022-04-30|    Anl Comercial Ii|               2010-04-15|
|    2022-09-30|            Anl Rh I|               2013-12-11|
|    2022-03-31|Ger Compras Servicos|               2019-04-09|
|    2022-07-31|   Anl Financeiro Ii|   

Após os ajustes e análises, segue uma panorama do estado do DataFrame original

In [0]:
# Visualização do esquema do DataFrame
print(spark_df.dtypes)

# Número de linhas
num_linhas = spark_df.count()

# Número de colunas
num_colunas = len(spark_df.columns)

# Exibição do número de linhas e colunas
print(f"Número de linhas: {num_linhas}")
print(f"Número de colunas: {num_colunas}")

[('matricula', 'bigint'), ('mes_referencia', 'date'), ('funcionario', 'string'), ('data_admissao', 'date'), ('nacionalidade', 'string'), ('data_demissao', 'string'), ('estado_civil', 'string'), ('sexo', 'string'), ('empresa', 'string'), ('data_nascimento', 'date'), ('grupo_cargo', 'string'), ('cargo', 'string'), ('idade', 'bigint'), ('tempo_empresa', 'bigint'), ('ds_idade_cat', 'string'), ('ds_tempo_empresa_cat', 'string')]
Número de linhas: 47351
Número de colunas: 16



É possível notar a adição de mais 5 colunas e manutenção da quantidade de linhas
