<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


# Projeto Final: Análise de Dados usando Spark

O presente projeto implica na criação de um DataFrame mediante o carregamento de dados provenientes de um arquivo CSV, seguido pela aplicação de transformações e ações por meio do Spark SQL. Para atingir tal objetivo, é necessário realizar as seguintes tarefas:

- Tarefa 1: Gerar DataFrame a partir dos dados CSV.
- Tarefa 2: Definir um esquema para os dados.
- Tarefa 3: Exibir o esquema do DataFrame.
- Tarefa 4: Criar uma visão temporária.
- Tarefa 5: Executar uma consulta SQL.
- Tarefa 6: Calcular o Salário Médio por Departamento.
- Tarefa 7: Filtrar e Exibir Funcionários do Departamento de TI.
- Tarefa 8: Adicionar Bônus de 10% aos Salários.
- Tarefa 9: Encontrar o Salário Máximo por Idade.
- Tarefa 10: Auto-Junção nos Dados dos Funcionários.
- Tarefa 11: Calcular a Idade Média do Funcionário.
- Tarefa 12: Calcular o Salário Total por Departamento.
- Tarefa 13: Ordenar Dados por Idade e Salário.
- Tarefa 14: Contar Funcionários em Cada Departamento.
- Tarefa 15: Filtrar Funcionários com a letra "o" no Nome.

### Pré-requisitos

1. Para esta tarefa de laboratório, você estará usando Python e Spark (PySpark). Portanto, é essencial garantir que as seguintes bibliotecas estejam instaladas em seu ambiente de laboratório

In [1]:
# Instalando pacotes necessários
!pip install pyspark  findspark wget



In [2]:
import findspark
findspark.init()

In [3]:
# PySpark é a API Spark para Python. Neste laboratório, utilizamos o PySpark para inicializar o SparkContext.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [4]:
# Criando um objeto SparkContext

sc = SparkContext.getOrCreate()

# Criando uma SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/01/15 15:50:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Download dos dados CSV


In [5]:
# Faça o download dos dados CSV primeiro para um arquivo local chamado `employees.csv`
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

'employees (2).csv'

### Tarefas

#### Tarefa 1: Gerar um DataFrame Spark a partir dos dados CSV

Leia os dados do arquivo CSV fornecido, `employees.csv`, e importe-os para uma variável DataFrame Spark chamada `employees_df`.

In [5]:
# Leia os dados do arquivo CSV "emp" e importe-os para uma variável DataFrame chamada "employees_df"
employees_df = spark.read.csv('employees.csv')
employees_df.show()

                                                                                

+------+---------+------+---+----------+
|   _c0|      _c1|   _c2|_c3|       _c4|
+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
+------+---------+------+---+----------+
only showing top

#### Tarefa 2: Definir um esquema para os dados

Construa um esquema para os dados de entrada e, em seguida, utilize o esquema definido para ler o arquivo CSV e criar um DataFrame chamado `employees_df`.

In [6]:
# Defina um esquema para os dados de entrada e leia o arquivo usando o Esquema definido pelo usuário
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("Department", StringType(), True),
])

# Carrega o arquivo CSV, aplica o esquema
employees_df = spark.read.csv('employees.csv', schema=schema)

# Elimina linhas nulas
employees_df = employees_df.na.drop()

# Exibe o DataFrame resultante
employees_df.show()

+---+---------+------+---+----------+
| id|     name|salary|age|Department|
+---+---------+------+---+----------+
|198|   Donald|  2600| 29|        IT|
|199|  Douglas|  2600| 34|     Sales|
|200| Jennifer|  4400| 36| Marketing|
|201|  Michael| 13000| 32|        IT|
|202|      Pat|  6000| 39|        HR|
|203|    Susan|  6500| 36| Marketing|
|204|  Hermann| 10000| 29|   Finance|
|205|  Shelley| 12008| 33|   Finance|
|206|  William|  8300| 37|        IT|
|100|   Steven| 24000| 39|        IT|
|101|    Neena| 17000| 27|     Sales|
|102|      Lex| 17000| 37| Marketing|
|103|Alexander|  9000| 39| Marketing|
|104|    Bruce|  6000| 38|        IT|
|105|    David|  4800| 39|        IT|
|106|    Valli|  4800| 38|     Sales|
|107|    Diana|  4200| 35|     Sales|
|108|    Nancy| 12008| 28|     Sales|
|109|   Daniel|  9000| 35|        HR|
|110|     John|  8200| 31| Marketing|
+---+---------+------+---+----------+
only showing top 20 rows



#### Tarefa 3: Exibir o esquema do DataFrame

Exiba o esquema do DataFrame `employees_df`, mostrando todas as colunas e seus respectivos tipos de dados.

In [7]:
# Exiba todas as colunas do DataFrame, juntamente com seus respectivos tipos de dados
employees_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- Department: string (nullable = true)



#### Tarefa 4: Criar uma visualização temporária

Crie uma visualização temporária chamada `employees` para o DataFrame `employees_df`, permitindo consultas Spark SQL nos dados.

In [8]:
# Crie uma visualização temporária chamada "employees" para o DataFrame
employees_df.createTempView('employees')

#### Tarefa 5: Executar uma consulta SQL

Elabore e execute uma consulta SQL para recuperar os registros da visualização `employees` onde a idade dos funcionários ultrapassa 30 anos. Em seguida, exiba o resultado da consulta SQL, mostrando os registros filtrados.

In [9]:
# Consulta SQL para recuperar apenas os registros da Visualização onde a idade ultrapassa 30
spark.sql("SELECT * FROM employees WHERE age > 30").show()

+---+-----------+------+---+----------+
| id|       name|salary|age|Department|
+---+-----------+------+---+----------+
|199|    Douglas|  2600| 34|     Sales|
|200|   Jennifer|  4400| 36| Marketing|
|201|    Michael| 13000| 32|        IT|
|202|        Pat|  6000| 39|        HR|
|203|      Susan|  6500| 36| Marketing|
|205|    Shelley| 12008| 33|   Finance|
|206|    William|  8300| 37|        IT|
|100|     Steven| 24000| 39|        IT|
|102|        Lex| 17000| 37| Marketing|
|103|  Alexander|  9000| 39| Marketing|
|104|      Bruce|  6000| 38|        IT|
|105|      David|  4800| 39|        IT|
|106|      Valli|  4800| 38|     Sales|
|107|      Diana|  4200| 35|     Sales|
|109|     Daniel|  9000| 35|        HR|
|110|       John|  8200| 31| Marketing|
|111|     Ismael|  7700| 32|        IT|
|112|Jose Manuel|  7800| 34|        HR|
|113|       Luis|  6900| 34|     Sales|
|116|     Shelli|  2900| 37|   Finance|
+---+-----------+------+---+----------+
only showing top 20 rows



#### Tarefa 6: Calcular Salário Médio por Departamento

Elabore uma consulta SQL para recuperar o salário médio dos funcionários agrupados por departamento. Exiba o resultado.

In [10]:
# Consulta SQL para calcular o salário médio dos funcionários agrupados por departamento
spark.sql("SELECT Department, ROUND(AVG(Salary), 2) AS AvgSalary FROM employees GROUP BY Department").show()



+----------+---------+
|Department|AvgSalary|
+----------+---------+
|     Sales|  5492.92|
|        HR|   5837.5|
|   Finance|   5730.8|
| Marketing|  6633.33|
|        IT|   7400.0|
+----------+---------+



                                                                                

#### Tarefa 7: Filtrar e Exibir Funcionários do Departamento de TI

Aplique um filtro no DataFrame `employees_df` para selecionar registros onde o departamento é 'TI'. Exiba o DataFrame filtrado.

In [11]:
# Aplique um filtro para selecionar registros onde o departamento é 'TI'
employees_df.filter(employees_df.Department == 'IT').show()

+---+-------+------+---+----------+
| id|   name|salary|age|Department|
+---+-------+------+---+----------+
|198| Donald|  2600| 29|        IT|
|201|Michael| 13000| 32|        IT|
|206|William|  8300| 37|        IT|
|100| Steven| 24000| 39|        IT|
|104|  Bruce|  6000| 38|        IT|
|105|  David|  4800| 39|        IT|
|111| Ismael|  7700| 32|        IT|
|129|  Laura|  3300| 38|        IT|
|132|     TJ|  2100| 34|        IT|
|136|  Hazel|  2200| 29|        IT|
+---+-------+------+---+----------+



#### Tarefa 8: Adicionar Bônus de 10% aos Salários

Realize uma transformação para adicionar uma nova coluna chamada "SalaryAfterBonus" ao DataFrame. Calcule o novo salário adicionando um bônus de 10% ao salário de cada funcionário.

In [12]:
from pyspark.sql.functions import col, round

#Adicione uma nova coluna "SalaryAfterBonus" com um bônus de 10% adicionado ao salário original, arredondando
employees_df.withColumn("SalaryAfterBonus", round(col("Salary") * 1.1, 2)).show()

+---+---------+------+---+----------+----------------+
| id|     name|salary|age|Department|SalaryAfterBonus|
+---+---------+------+---+----------+----------------+
|198|   Donald|  2600| 29|        IT|          2860.0|
|199|  Douglas|  2600| 34|     Sales|          2860.0|
|200| Jennifer|  4400| 36| Marketing|          4840.0|
|201|  Michael| 13000| 32|        IT|         14300.0|
|202|      Pat|  6000| 39|        HR|          6600.0|
|203|    Susan|  6500| 36| Marketing|          7150.0|
|204|  Hermann| 10000| 29|   Finance|         11000.0|
|205|  Shelley| 12008| 33|   Finance|         13208.8|
|206|  William|  8300| 37|        IT|          9130.0|
|100|   Steven| 24000| 39|        IT|         26400.0|
|101|    Neena| 17000| 27|     Sales|         18700.0|
|102|      Lex| 17000| 37| Marketing|         18700.0|
|103|Alexander|  9000| 39| Marketing|          9900.0|
|104|    Bruce|  6000| 38|        IT|          6600.0|
|105|    David|  4800| 39|        IT|          5280.0|
|106|    V

#### Tarefa 9: Encontrar Salário Máximo por Idade

Agrupe os dados por idade e calcule o salário máximo para cada grupo de idade. Exiba o resultado.

In [13]:
from pyspark.sql.functions import max

# Agrupe os dados por idade e calcule o salário máximo para cada grupo de idade
spark.sql("SELECT age, max(Salary) FROM employees GROUP BY age").show()



+---+-----------+
|age|max(Salary)|
+---+-----------+
| 31|       8200|
| 34|       7800|
| 28|      12008|
| 27|      17000|
| 26|       3600|
| 37|      17000|
| 35|       9000|
| 39|      24000|
| 38|       6000|
| 29|      10000|
| 32|      13000|
| 33|      12008|
| 30|       8000|
| 36|       7900|
+---+-----------+



                                                                                

#### Tarefa 10: Auto-Junção nos Dados dos Funcionários

Faça uma junção do DataFrame "employees_df" consigo mesmo com base na coluna "id". Exiba o resultado.

In [14]:
# Faça uma junção do DataFrame consigo mesmo com base na coluna "id"
employees_df.join(employees_df, on="id").show()

+---+---------+------+---+----------+---------+------+---+----------+
| id|     name|salary|age|Department|     name|salary|age|Department|
+---+---------+------+---+----------+---------+------+---+----------+
|198|   Donald|  2600| 29|        IT|   Donald|  2600| 29|        IT|
|199|  Douglas|  2600| 34|     Sales|  Douglas|  2600| 34|     Sales|
|200| Jennifer|  4400| 36| Marketing| Jennifer|  4400| 36| Marketing|
|201|  Michael| 13000| 32|        IT|  Michael| 13000| 32|        IT|
|202|      Pat|  6000| 39|        HR|      Pat|  6000| 39|        HR|
|203|    Susan|  6500| 36| Marketing|    Susan|  6500| 36| Marketing|
|204|  Hermann| 10000| 29|   Finance|  Hermann| 10000| 29|   Finance|
|205|  Shelley| 12008| 33|   Finance|  Shelley| 12008| 33|   Finance|
|206|  William|  8300| 37|        IT|  William|  8300| 37|        IT|
|100|   Steven| 24000| 39|        IT|   Steven| 24000| 39|        IT|
|101|    Neena| 17000| 27|     Sales|    Neena| 17000| 27|     Sales|
|102|      Lex| 1700

#### Tarefa 11: Calcular a Idade Média dos Funcionários

Calcule a idade média dos funcionários usando a função de agregação incorporada. Exiba o resultado.

In [15]:
# Calcular a idade média dos funcionários
from pyspark.sql.functions import avg 
employees_df.agg(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   33.56|
+--------+



#### Tarefa 12: Calcular Salário Total por Departamento

Calcule o salário total para cada departamento utilizando a função de agregação incorporada. Exiba o resultado.

In [16]:
# Calcule o salário total para cada departamento. Dica - Use as funções GroupBy e Aggregate
from pyspark.sql.functions import sum 

# Calcular o salário total por departamento
employees_df.groupBy("Department").agg(sum("Salary")).show()



+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|      71408|
|        HR|      46700|
|   Finance|      57308|
| Marketing|      59700|
|        IT|      74000|
+----------+-----------+



                                                                                

#### Tarefa 13: Ordenar Dados por Idade e Salário

Aplique uma transformação para ordenar o DataFrame por idade em ordem ascendente e, em seguida, por salário em ordem descendente. Exiba o DataFrame ordenado.

In [26]:
# Ordene o DataFrame por idade em ordem ascendente e, em seguida, por salário em ordem descendente
employees_df.orderBy(col("age").asc()).show()
employees_df.orderBy(col("salary").desc()).show()

+---+---------+------+---+----------+
| id|     name|salary|age|Department|
+---+---------+------+---+----------+
|137|   Renske|  3600| 26| Marketing|
|114|      Den| 11000| 27|   Finance|
|101|    Neena| 17000| 27|     Sales|
|108|    Nancy| 12008| 28|     Sales|
|130|    Mozhe|  2800| 28| Marketing|
|126|    Irene|  2700| 28|        HR|
|140|   Joshua|  2500| 29|   Finance|
|204|  Hermann| 10000| 29|   Finance|
|136|    Hazel|  2200| 29|        IT|
|198|   Donald|  2600| 29|        IT|
|115|Alexander|  3100| 29|   Finance|
|134|  Michael|  2900| 29|     Sales|
|120|  Matthew|  8000| 30|        HR|
|127|    James|  2400| 31|        HR|
|110|     John|  8200| 31| Marketing|
|111|   Ismael|  7700| 32|        IT|
|119|    Karen|  2500| 32|   Finance|
|201|  Michael| 13000| 32|        IT|
|205|  Shelley| 12008| 33|   Finance|
|117|    Sigal|  2800| 33|     Sales|
+---+---------+------+---+----------+
only showing top 20 rows

+---+-----------+------+---+----------+
| id|       name|salar

#### Tarefa 14: Contar Funcionários em Cada Departamento

Calcule o número de funcionários em cada departamento. Exiba o resultado.

In [37]:
from pyspark.sql.functions import count

# Calcule o número de funcionários em cada departamento
employees_df.groupBy("Department").agg(count("*").alias("EmployeeCount")).show()

+----------+-------------+
|Department|EmployeeCount|
+----------+-------------+
|     Sales|           13|
|        HR|            8|
|   Finance|           10|
| Marketing|            9|
|        IT|           10|
+----------+-------------+



#### Tarefa 15: Filtrar Funcionários com a Letra 'o' no Nome

Aplique um filtro para selecionar registros onde o nome do funcionário contém a letra 'o'. Exiba o DataFrame filtrado.

In [31]:
# Aplique um filtro para selecionar registros onde o nome do funcionário contém a letra 'o'
employees_df.filter(col("name").like("%o%")).show()

+---+-----------+------+---+----------+
| id|       name|salary|age|Department|
+---+-----------+------+---+----------+
|198|     Donald|  2600| 29|        IT|
|199|    Douglas|  2600| 34|     Sales|
|110|       John|  8200| 31| Marketing|
|112|Jose Manuel|  7800| 34|        HR|
|130|      Mozhe|  2800| 28| Marketing|
|133|      Jason|  3300| 38|     Sales|
|139|       John|  2700| 36|     Sales|
|140|     Joshua|  2500| 29|   Finance|
+---+-----------+------+---+----------+



# Parabéns! Você concluiu o projeto.

Agora você sabe como criar um DataFrame a partir de um arquivo de dados CSV e realizar uma variedade de transformações e ações de DataFrame usando o Spark SQL.

## Authors


Raghul Ramesh


Lavanya T S


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-09-01|0.1|Lavanya T S|Initial version|
|2023-09-11|0.2|Pornima More|QA pass with edits|


Copyright © 2023 IBM Corporation. All rights reserved.
