<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

In [40]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

In [41]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [42]:
!pip install -q pyspark==3.0.0

In [43]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [44]:
!pip install -q findspark==1.4.2

In [45]:
import findspark

findspark.init()

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [46]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

Para isso, utilize:

 - Pandas

In [47]:
import pandas as pd

dados = pd.read_csv('uk-macroeconomic-data.csv')

In [48]:
dados.head()

Unnamed: 0,Description,Real GDP of England at market prices,Real GDP of England at factor cost,"Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders","Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders","Index of real UK GDP at factor cost - based on changing political boundaries,",Composite estimate of English and (geographically-consistent) UK real GDP at factor cost,HP-filter of log of real composite estimate of English and UK real GDP at factor cost,"Real UK gross disposable national income at market prices, constant border estimate",Real consumption,...,UK Public sector debt.1,UK Public sector debt.2,Central Government Gross Debt,Central Government Gross Debt.1,Trade deficit,Trade deficit.1,Current account,Current account .1,Current account deficit including estimated non-monetary bullion flows,Current account deficit including estimated non-monetary bullion flows.1
0,Units,"£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","GB before 1801, GB+Ireland 1801-1920, GB + Nor...",2013=100,approx. % difference from trend,"£mn, Chained Volume measure, 2013 prices. Nom...","£mn, Chained Volume measure, 2013 prices",...,as a % of nominal GDP: measure 1,as a % of nominal GDP: measure 2,"Financial year end, nominal par value £mn","Financial year end, market value £mn",£mn,as a % of nominal GDP,£mn,as a % of nominal GDP,£mn,as a % of nominal GDP
1,1209,,,,,,,,,,...,,,,,,,,,,
2,1210,,,,,,,,,,...,,,,,,,,,,
3,1211,,,,,,,,,,...,,,,,,,,,,
4,1212,,,,,,,,,,...,,,,,,,,,,


In [49]:
dados = dados.loc[1:]

In [50]:
dados_colums = ['Description', 'Population (GB+NI)', 'Unemployment rate']

In [51]:
dados = dados[dados_colums].dropna().sort_values('Description', ascending=False)
dados.head()

Unnamed: 0,Description,Population (GB+NI),Unemployment rate
808,2016,65573,4.9
807,2015,65110,5.38
806,2014,64597,6.18
805,2013,64106,7.61
804,2012,63705,7.97


In [74]:
dados.to_csv('uk-macroeconomic-data-cleaned.csv', index=False)

 - PySpark

In [52]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

In [53]:
data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

In [54]:
data.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [55]:
data.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

In [56]:
data = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [57]:
data = data.\
  withColumnRenamed("Description", 'year').\
  withColumnRenamed("Population (GB+NI)", "population").\
  withColumnRenamed("Unemployment rate", "unemployment_rate")

In [58]:
data.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
+-----+----------+-----------------+
only showing top 10 rows



In [59]:
data_description = data.filter(data['year'] == 'Units')

In [60]:
data_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [61]:
(data.count(), len(data.columns))

(841, 3)

In [62]:
(data_description.count(), len(data_description.columns))

(1, 3)

O método `join` faz a junção distribuída de dois `DataFrames`. Já o método `broadcast` "marca" um `DataFrame` como "pequeno" e força o Spark a trafega-lo pela rede.

In [63]:
from pyspark.sql.functions import broadcast

In [64]:
data = data.join(other=broadcast(data_description), on=['year'], how='left_anti')

In [65]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



O método `dropna` remove todas as linhas que apresentarem ao menos um valor nulo.

In [66]:
data = data.dropna()

In [67]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



O método `withColumn` ajuda a criar novas colunas.

In [68]:
data = data.withColumn('century', 1 + (data['year']/100).cast('int'))

In [69]:
data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



O método `collect` é uma ação que coleta os resultados dos nós e retorna para o Python.

In [70]:
timing = data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect()

In [71]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [72]:
timing[0].asDict()

{'century': 20, 'count(year)': 100}

In [73]:
data.repartition('century').write.csv(path="uk-macroeconomic-data-clean", sep=",", header=True, mode="overwrite")