<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Big Data I - Processamento
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)<br>
Aluno [Rafael Barbosa](https://www.linkedin.com/in/barbosa89/)


---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

In [1]:
###  Download do Spark, versão 3.0.0.

%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

In [2]:
### Download e instalação do Java, versão 8.

%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
### Instala o PySpark na mesmaa versão da aplicação Spark

!pip install -q pyspark==3.0.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [4]:
### Etapa de configuração: preencher as variáveis de ambiente `JAVA_HOME` e `SPARK_HOME` com o seus respectivos caminhos de instalação.

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [5]:
#### Conecta PySpark, Python, Spark, Java.

!pip install -q findspark==1.4.2

In [6]:
### Inicia o ambiente de execução Python

import findspark

findspark.init()

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Download dos dados utilizando a máquina virutal do Google Colab.

In [7]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

Para isso, utilize:

 - Pandas

In [68]:
import pandas as pd

# Carrega o CSV
data_pd = pd.read_csv("uk-macroeconomic-data.csv")

# Seleciona e renomeia as colunas de interesse
data_pd = data_pd[["Description", "Population (GB+NI)", "Unemployment rate"]]
data_pd = data_pd.rename(columns={"Description": "year", "Population (GB+NI)": "population", "Unemployment rate": "unemployment_rate"})

# Filtra a linha a ser descartada
data_pd_description = data_pd[data_pd['year'] == 'Units']

# Exclui a linha do DataFrame original
data_pd = data_pd[~data_pd['year'].isin(data_pd_description['year'])]

# Remove valores nulos e ordena por ano decrescente
data_pd = data_pd.dropna().sort_values(by='year', ascending=False)

data_pd.reset_index(drop=True, inplace=True)
data_pd.head(20)


Unnamed: 0,year,population,unemployment_rate
0,2016,65573,4.9
1,2015,65110,5.38
2,2014,64597,6.18
3,2013,64106,7.61
4,2012,63705,7.97
5,2011,63285,8.11
6,2010,62759,7.87
7,2009,62260,7.61
8,2008,61824,5.69
9,2007,61319,5.33


 - PySpark

In [62]:
### Cria aplicação Spark

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

In [63]:
### Carrega CSV no dataframe Spark

data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

In [64]:
### Seleciona as colunas de interesse e as renomeia

data = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

data = data.\
  withColumnRenamed("Description", 'year').\
  withColumnRenamed("Population (GB+NI)", "population").\
  withColumnRenamed("Unemployment rate", "unemployment_rate")
data.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
+-----+----------+-----------------+
only showing top 10 rows



In [65]:
### Seleciona e carrega linha a ser descartada em um novo dataframe

data_description = data.filter(data['year'] == 'Units')
data_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [66]:
### Utiliza o "join" "left_anti" para excluir do dataframe orignial a linha guardada no dataframe "descripition"
### O `broadcast` "marca" um `DataFrame` como "pequeno" e força o Spark a trafega-lo pela rede

from pyspark.sql.functions import broadcast

data = data.join(other=broadcast(data_description), on=['year'], how='left_anti')
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



In [67]:
### Remove linhas com valor nulo e mostra o resultados final

data = data.dropna()
data.orderBy("year", ascending=False).show()

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|2016|     65573|             4.90|
|2015|     65110|             5.38|
|2014|     64597|             6.18|
|2013|     64106|             7.61|
|2012|     63705|             7.97|
|2011|     63285|             8.11|
|2010|     62759|             7.87|
|2009|     62260|             7.61|
|2008|     61824|             5.69|
|2007|     61319|             5.33|
|2006|     60827|             5.42|
|2005|     60413|             4.83|
|2004|     59950|             4.75|
|2003|     59637|             5.01|
|2002|     59366|             5.19|
|2001|     59113|             5.10|
|2000|     58886|             5.46|
|1999|     58684|             5.98|
|1998|     58475|             6.26|
|1997|     58314|             6.97|
+----+----------+-----------------+
only showing top 20 rows

