<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

Aluno: Raul Pichinin

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

 **Download** do Spark, versão 3.0.0.

In [None]:
%%capture

!wget -q https://archive.apache.org/ + dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz

!tar xf spark-3.0.0-bin-hadoop2.7.tgz !rm spark-3.0.0-bin-hadoop2.7.tgz

2 - Download e instalação do Java, versão 8.

In [None]:
%%capture

!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

**Nota:** Verificando a versão do PySpark deve ser a mesma que a versão da aplicação Spark.

In [None]:
!pip install -q pyspark==3.0.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m18.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


**Configuração**

Na etapa de configuração, é necessário configurar as máquinas (nós) do cluster para que tanto a aplicação do Spark quanto a instalação do Java possam ser encontrados pelo PySpark e, consequentemente, pelo Python. Para isso, basta preencher as variáveis de ambiente JAVA_HOME e SPARK_HOME com o seus respectivos caminhos de instalação.

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop2.7"

**Conexão**

Para conectar o PySpark (e o Python) ao Spark e ao Java, pode-se utilizar o pacote Python FindSpark.

In [None]:
!pip install -q findspark==2.0.0

Já o método init() injeta as variáveis de ambiente JAVA_HOME e SPARK_HOME no ambiente de execução Python, permitindo assim a correta conexão entre o pacote PySpark com a aplicação Spark.

In [None]:
import findspark

findspark.init()

Com o cluster devidamente configurado, vamos criar uma aplicação Spark. O objeto SparkSession do pacote PySpark (e seu atributo builder auxiliam na criação da aplicação: • master: endereço (local ou remoto) do cluster; • appName: nome da aplicação; • getOrCreate: método que de fato cria os recursos e instância a aplicação.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

Com o objeto SparkSession devidamente instanciado, podemos começar a **interagir com os dados** utilizando os recursos do cluster através de uma estrutura de dados que já conhecemos: DataFrames

## 2\. Data Wrangling

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

In [None]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

In [None]:
data_macroeconomicos = spark.read.csv(path="uk-macroeconomic-data.csv" , sep=",", header=True)

In [None]:
data_macroeconomicos.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [None]:
data_macroeconomicos.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

**2.2. Wrangling / Exploração**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

Utilizando a API Python do Spark, o pacote PySpark, para limpar os dados.
**Nota: ** Sempre nos atentando à natureza distribuída das operações

**2.3. Limpeza com PySpark**

O método *select* seleciona colunas do DataFrame. Já o método *withColumnRenamed* renomeia colunas.

In [None]:
data_macroeconomicos = data_macroeconomicos.select([ "Description", "Population (GB+NI)", "Unemployment rate" ])

In [None]:
data_macroeconomicos = data_macroeconomicos.withColumnRenamed ( "Description", 'year' ).withColumnRenamed( "Population (GB+NI)", "population" ).withColumnRenamed( "Unemployment rate", "unemployment_rate" )

In [None]:
data_macroeconomicos.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
+-----+----------+-----------------+
only showing top 10 rows



Aqui método *filter* seleciona linhas do DataFrame baseado no conteúdo de uma coluna

In [None]:
data_macroeconomicos_description = data_macroeconomicos.filter(data_macroeconomicos['year'] == 'Units')

In [None]:
data_macroeconomicos_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



**2.4. Junção**

In [None]:
(data_macroeconomicos.count(), len(data_macroeconomicos.columns))

(841, 3)

In [None]:
(data_macroeconomicos_description.count(), len(data_macroeconomicos_description.columns))

(1, 3)

O método *join* faz a junção de dois DataFrames. Já o método *broadcast*  "marca" um DataFrame como "pequeno" e força o Spark a trafegá-lo pela rede.

In [None]:
from pyspark.sql.functions import broadcast

In [None]:
data_macroeconomicos = data_macroeconomicos.join( other=broadcast(data_macroeconomicos_description) , on=['year'], how='left_anti' )

**left_anti:** compara se o registro esquerdo existe no DataFrame direito, porém, o seu diferencial é que, ao invés de manter os valores correspondentes no DataFrame direito, ele mantém apenas os valores que não possuem uma chave correspondente.

In [None]:
data_macroeconomicos.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



O método *dropna* remove todas as linhas que apresentarem ao menos um valor nulo.

In [None]:
data_macroeconomicos = data_macroeconomicos.dropna()

In [None]:
data_macroeconomicos.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



Agora com o método *withColumn* que ajuda a criar novas colunas.

In [None]:
data_macroeconomicos = data_macroeconomicos.withColumn( 'century', 1 + (data_macroeconomicos['year']/100).cast('int') )

In [None]:
data_macroeconomicos.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



O método *colletc* é uma ação que coleta os resultados dos nós e retorna para o Python.

In [None]:
timing = data_macroeconomicos.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect()

In [None]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [None]:
timing[0].asDict()

{'century': 20, 'count(year)': 100}

**2.5. Escrita**

O método *write.csv *persiste o DataFrame em formato csv .Já o método *repartition* controla o número de partições da escrita.

In [None]:
data_macroeconomicos.repartition('century').write.csv(path="uk-macroeconomic-data-clean" , sep=",", header=True, mode="overwrite")

**2.6. Limpeza com Pandas**

In [None]:
import pandas as pd

dados_macroeconomicos = pd.read_csv('uk-macroeconomic-data.csv')
dados_macroeconomicos.head(10)

Unnamed: 0,Description,Real GDP of England at market prices,Real GDP of England at factor cost,"Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders","Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders","Index of real UK GDP at factor cost - based on changing political boundaries,",Composite estimate of English and (geographically-consistent) UK real GDP at factor cost,HP-filter of log of real composite estimate of English and UK real GDP at factor cost,"Real UK gross disposable national income at market prices, constant border estimate",Real consumption,...,UK Public sector debt.1,UK Public sector debt.2,Central Government Gross Debt,Central Government Gross Debt.1,Trade deficit,Trade deficit.1,Current account,Current account .1,Current account deficit including estimated non-monetary bullion flows,Current account deficit including estimated non-monetary bullion flows.1
0,Units,"£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","GB before 1801, GB+Ireland 1801-1920, GB + Nor...",2013=100,approx. % difference from trend,"£mn, Chained Volume measure, 2013 prices. Nom...","£mn, Chained Volume measure, 2013 prices",...,as a % of nominal GDP: measure 1,as a % of nominal GDP: measure 2,"Financial year end, nominal par value £mn","Financial year end, market value £mn",£mn,as a % of nominal GDP,£mn,as a % of nominal GDP,£mn,as a % of nominal GDP
1,1209,,,,,,,,,,...,,,,,,,,,,
2,1210,,,,,,,,,,...,,,,,,,,,,
3,1211,,,,,,,,,,...,,,,,,,,,,
4,1212,,,,,,,,,,...,,,,,,,,,,
5,1213,,,,,,,,,,...,,,,,,,,,,
6,1214,,,,,,,,,,...,,,,,,,,,,
7,1215,,,,,,,,,,...,,,,,,,,,,
8,1216,,,,,,,,,,...,,,,,,,,,,
9,1217,,,,,,,,,,...,,,,,,,,,,


*Selecionando colunas e colocando em ordem crescente

In [None]:
dados_macroeconomicos = dados_macroeconomicos.loc[1:]
colunas_dados_macroeconomicos = ['Description', 'Population (GB+NI)', 'Unemployment rate']
dados_macroeconomicos = dados_macroeconomicos[colunas_dados_macroeconomicos].dropna().sort_values('Description', ascending=True)
dados_macroeconomicos

Unnamed: 0,Description,Population (GB+NI),Unemployment rate
647,1855,23241,3.73
648,1856,23466,3.52
649,1857,23689,3.95
650,1858,23914,5.23
651,1859,24138,3.27
...,...,...,...
804,2012,63705,7.97
805,2013,64106,7.61
806,2014,64597,6.18
807,2015,65110,5.38


In [None]:
dados_macroeconomicos.columns = ['Year', 'Population', 'Unemployment rate']
dados_macroeconomicos.head(10)

Unnamed: 0,Year,Population,Unemployment rate
647,1855,23241,3.73
648,1856,23466,3.52
649,1857,23689,3.95
650,1858,23914,5.23
651,1859,24138,3.27
652,1860,24360,2.94
653,1861,24585,3.72
654,1862,24862,4.68
655,1863,25142,4.15
656,1864,25425,2.99


* Aplicando séculos aos anos

In [None]:
dados_macroeconomicos['Year'] = pd.to_numeric(dados_macroeconomicos['Year'], errors='coerce')
dados_macroeconomicos['Century'] = (dados_macroeconomicos['Year'] / 100).astype(int) + 1

dados_macroeconomicos

Unnamed: 0,Year,Population,Unemployment rate,Century
647,1855,23241,3.73,19
648,1856,23466,3.52,19
649,1857,23689,3.95,19
650,1858,23914,5.23,19
651,1859,24138,3.27,19
...,...,...,...,...
804,2012,63705,7.97,21
805,2013,64106,7.61,21
806,2014,64597,6.18,21
807,2015,65110,5.38,21


**2.7. Escrita com Pandas**

In [None]:
dados_macroeconomicos.to_csv('uk-macroeconomic-data-cleaned.csv', index=False)
