# MBA em Ciência de Dados - Análise de Dados com Base em Processamento Massivo em Paralelo ##

### Aula 06: Processamento Paralelo e Distribuído

## Lista de Exercícios com Respostas ##



---



**Material Produzido por:**<br>
>**Profa. Dra. Cristina Dutra de Aguiar**<br>

**CEMEAI - ICMC/USP São Carlos**

Esta lista possui 11 exercícios, sendo possível navegar pelos mesmos utilizando o sumário na esquerda. Os primeiros exercícios contam com dicas para auxiliar na sua resolução. Leiam-os com atenção e desenvolvam as respostas nos blocos indicados com 
```
# Resposta do exercício
```

Por completude, o *notebook* possui todas as descrições apresentadas na parte prática da Aula 06. **Recomenda-se fortemente** que esta lista seja respondida antes de se consultar o material com as respostas.


# 1 Introdução

A aplicação de *data warehousing* da BI Solutions utiliza como base uma contelação de fatos que une dois esquemas estrela, conforme descrito a seguir.

**Tabelas de dimensão**

- data (`dataPK, dataCompleta, dataDia, dataMes, dataBimestre, dataTrimestre, dataSemestre, dataAno`)
- funcionario (`funcPK, funcMatricula, funcNome, funcSexo, funcDataNascimento, funcDiaNascimento, funcMesNascimento, funcAnoNascimento, funcCidade, funcEstadoNome, funcEstadoSigla, funcRegiaoNome, funcRegiaoSigla, funcPaisNome, funcPaisSigla`)
- equipe (`equipePK, equipeNome, filialNome, filialCidade, filialEstadoNome, filialEstadoSigla, filialRegiaoNome, filialRegiaoSigla, filialPaisNome, filialPaisSigla`)
- cargo (`cargoPK, cargoNome, cargoRegimeTrabalho, cargoEscolaridadeMinima, cargoNivel`)
- cliente (`clientePK, clienteNomeFantasia, clienteSetor, clienteCidade, clienteEstadoNome, clienteEstadoSigla, clienteRegiaoNome, clienteRegiaoSigla, clientePaisNome, clientePaisSigla`)

**Tabelas de fatos**
- pagamento (`dataPK, funcPK, equipePK, cargoPK, salario, quantidadeLancamento`)
- negociacao (`dataPK, equipePK, clientePK, receita, quantidadeNegociacao`)

Primeiramente, são definidos `paths`, sendo que cada `path` se refere a uma tabela de fatos ou uma tabela de dimensão. 

In [None]:
# Tabelas de dimensão
pathData = 'dados/data.csv'
pathFuncionario = 'dados/funcionario.csv'
pathEquipe = 'dados/equipe.csv'
pathCargo = 'dados/cargo.csv'
pathCliente = 'dados/cliente.csv'

# Tabelas de fato
pathPagamento = 'dados/pagamento.csv'
pathNegociacao = 'dados/negociacao.csv'

Na sequência,  todos os arquivos referentes às tabelas de fatos e às tabelas de dimensão são baixados, sendo armazenados na pasta `dados`.

In [None]:
%%capture
!git clone https://github.com/GuiMuzziUSP/Data_Mart_BI_Solutions.git dados

# 2 Apache Spark Cluster: instalação e configuração

**2.1 Instalação**

Neste *notebook* é criado um *cluster* Spark composto apenas por um **nó mestre**. Ou seja, o *cluster* não possui um ou mais **nós de trabalho** e o **gerenciador de cluster**. Nessa configuração, as tarefas (*tasks*) são realizadas no próprio *driver* localizado no **nó mestre**.

In [None]:
#instalando Java Runtime Environment (JRE) versão 8
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Na sequência, é feito o *download* do Apache Spark versão 3.0.0.

In [None]:
#baixando Apache Spark versão 3.0.0
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

Na sequência, são configuradas as variáveis de ambiente JAVA_HOME e SPARK_HOME. Isto permite que tanto o Java quanto o Spark possam ser encontrados.

In [None]:
import os
#configurando a variável de ambiente JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#configurando a variável de ambiente SPARK_HOME
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Por fim, são instalados dois pacotes da linguagem de programação Python, cujas funcionalidades são descritas a seguir.

Pacote findspark: Usado para ler a variável de ambiente SPARK_HOME e armazenar seu valor na variável dinâmica de ambiente PYTHONPATH. Como resultado, Python pode encontrar a instalação do Spark.

Pacote pyspark: PySpark é a API do Python para Spark. Ela possibilita o uso de Python, considerando que o framework Apache Spark encontra-se desenvolvido na linguagem de programação Scala.

In [None]:
%%capture
#instalando o pacote findspark
!pip install -q findspark==1.4.2
#instalando o pacote pyspark
!pip install -q pyspark==3.0.0

**2.2 Conexão**

PySpark não é adicionado ao *sys.path* por padrão. Isso significa que não é possível importá-lo, pois o interpretador da linguagem Python não sabe onde encontrá-lo. 

Para resolver esse aspecto, é necessário instalar o módulo `findspark`. Esse módulo mostra onde PySpark está localizado. Os comandos a seguir têm essa finalidade.

In [None]:
#importando o módulo findspark
import findspark
#carregando a variávels SPARK_HOME na variável dinâmica PYTHONPATH
findspark.init()

Depois de configurados os pacotes e módulos e inicializadas as variáveis de ambiente, é possível criar o objeto SparkContext. No comando de criação a seguir, é definido que é utilizado o próprio sistema operacional deste notebook como nó mestre por meio do parâmetro local do método setMaster. O complemento do parametro [*] indica que são alocados todos os núcleos de processamento disponíveis para o objeto driver criado.

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]")
spark = SparkContext(conf=conf)

# 3 Carregamento dos Dados da Aplicação da BI Solutions

**3.1 Carregamento da tabela de dimensão** *data*

O comando a seguir utiliza o método `textFile()` para armazenar no RDD chamado `data` os registros do arquivo de texto `"data.csv"`, os quais possuem os dados da tabela de dimensão `data`.

In [None]:
data_rdd = spark.textFile(pathData)

Os comandos a seguir realizam alterações no RDD `data` de forma que seus elementos representem linhas (ou tuplas) da tabela.

In [None]:
#imprimindo as 3 primeiras linhas de "data" e verificando que a primeira linha contém metadados (ou seja, o esquema referente aos dados)
data_rdd.take(3)

['dataPK,dataCompleta,dataDia,dataMes,dataBimestre,dataTrimestre,dataSemestre,dataAno',
 '1,1/1/2016,1,1,1,1,1,2016',
 '2,2/1/2016,2,1,1,1,1,2016']

In [None]:
#removendo a primeira linha de "data", desde que ela se refere a metadados
#capturando o cabeçalho
firstRow = data_rdd.first()
#removendo o cabeçalho
data_rdd = data_rdd.filter(lambda line: line != firstRow)

In [None]:
#imprimindo as 3 primeiras linhas de "data" e verificando que elas contêm apenas dados
data_rdd.take(3)

['1,1/1/2016,1,1,1,1,1,2016',
 '2,2/1/2016,2,1,1,1,1,2016',
 '3,3/1/2016,3,1,1,1,1,2016']

In [None]:
#imprimindo o cabeçalho de "data" e verificando os metadados 
data_header = firstRow[:].split(",")
display(data_header)

['dataPK',
 'dataCompleta',
 'dataDia',
 'dataMes',
 'dataBimestre',
 'dataTrimestre',
 'dataSemestre',
 'dataAno']

Desde que o arquivo lido encontra-se no formato `.csv`, utiliza-se o método `()` para transformar os elementos do RDD `data` em uma lista de valores divididos por `","`.

In [None]:
#mapeando os valores do RDD "data" utilizando o separador vírgula
data_rdd = data_rdd.map(lambda line: tuple(line.split(",")))
#imprimindo as 3 primeiras linhas de "data"
data_rdd.take(3)

[('1', '1/1/2016', '1', '1', '1', '1', '1', '2016'),
 ('2', '2/1/2016', '2', '1', '1', '1', '1', '2016'),
 ('3', '3/1/2016', '3', '1', '1', '1', '1', '2016')]

# **Exercícios**

#### **Exercício 1**

Dê continuidade ao exemplo anterior e realize o carregamento dos demais arquivos referentes à constelação de fatos da BI solution, a saber: (i) tabelas de dimensão `funcionario`, `equipe`, `cargo` e `cliente`; e (ii) tabelas de fato `pagamento` e `negociação`.

In [None]:
# Resposta do exercício

def processaRdd(spark, path):
  rddCsv = spark.textFile(path)
  #capturando o cabeçalho
  firstRow = rddCsv.first()
  rddCsv = rddCsv.filter(lambda line: line != firstRow)
  header = firstRow[:].split(",")

  #processando o rddCsv
  rddCsv = rddCsv.map(lambda x: tuple(x.split(",")))

  return header, rddCsv


#realizando o carregamento das tabelas de dimensão
funcionario_header, funcionario_rdd = processaRdd(spark, pathFuncionario)
equipe_header, equipe_rdd = processaRdd(spark, pathEquipe)
cargo_header, cargo_rdd = processaRdd(spark, pathCargo)
cliente_header, cliente_rdd = processaRdd(spark, pathCliente)

#realizando o carregamento das tabelas de fatos
pagamento_header, pagamento_rdd = processaRdd(spark, pathPagamento)
negociacao_header, negociacao_rdd = processaRdd(spark, pathNegociacao)

#Checando o resultado
display(equipe_header)

['equipePK',
 'equipeNome',
 'filialNome',
 'filialCidade',
 'filialEstadoNome',
 'filialEstadoSigla',
 'filialRegiaoNome',
 'filialRegiaoSigla',
 'filialPaisNome',
 'filialPaisSigla']

**Dica 1 para o exercício**: Crie uma função para executar este procedimento repetidas vezes. Utilize o esqueleto a seguir como base.
```python
def processaRdd(spark, path):
  ...
  return header, rddCsv
```
**Dica 2**: Se tiver dificuldades em criar a função, replique os comandos descritos para o carregamento da tabela de dimensão data para todas as tabelas restantes.

### **Exercício 2**

Selecione as seguintes colunas do RDD `cliente`: primeira, segunda, terceira e sexta. Exiba as 5 primeiras linhas resultantes. Consulte `cliente_header` caso necessário. 

In [None]:
# Resposta do exercício

# Verificando quais colunas estão presentes no RDD cliente
display(cliente_header)

# Selecionando as colunas solicitadas
cliente_rdd.map(lambda x: (x[0], x[1], x[2], x[5])).take(5)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

[('1', 'VIA FOOD', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('2', 'VIA PIZZA', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('3', 'VIA JAPA', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('4', 'VIA VEG', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('5', 'VIA DRINK', 'BEBIDAS E ALIMENTOS', 'SP')]

**Dica para o exercício:** método ***map()***

O método `map()` pode ser utilizado para selecionar colunas. Veja o exemplo abaixo:

In [None]:
#selecionando as seguintes colunas do RDD "funcionario":  segunda, terceira, décima
funcionario_rdd \
  .map(lambda x: (x[1], x[2], x[9])) \
  .take(5)

[('M-1', 'ALINE ALMEIDA', 'SAO PAULO'),
 ('M-2', 'ARAO ALVES', 'SAO PAULO'),
 ('M-3', 'ARON ANDRADE', 'SAO PAULO'),
 ('M-4', 'ADA BARBOSA', 'SAO PAULO'),
 ('M-5', 'ABADE BATISTA', 'SAO PAULO')]

### **Exercício 3**

Recupere os clientes que moram no estado de Minas Gerais.

In [None]:
# Resposta do exercício

# Verificando quais colunas estão presentes no RDD cliente
display(cliente_header)

# Mostrando os 5 primeiros elementos do RDD que contém os clientes que moram no estado de Minas Gerais
cliente_rdd.filter(lambda x: x[5] == "MG").take(5)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

[('10',
  'VIA LIFE',
  'SAUDE',
  'BELO HORIZONTE',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('11',
  'VIA MED',
  'SAUDE',
  'UBERLANDIA',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('30',
  'SR. HAPPY HOUR',
  'BEBIDAS E ALIMENTOS',
  'BELO HORIZONTE',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('31',
  'SR. LIFE',
  'SAUDE',
  'UBERLANDIA',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('50',
  'SR. FRIENDS',
  'BEBIDAS E ALIMENTOS',
  'BELO HORIZONTE',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR')]

**Dica para o exercício**: Método ***filter()***

O método `filter()` pode ser utilizado para filtrar valores de acordo com  critérios de seleção. No exemplo a seguir, o método `filter()` é utilizado para filtrar os funcionários que **não** são do estado de `'SAO PAULO'`.

In [None]:
cliente_rdd.filter(lambda x: x[5] != "SP").take(7)

[('7',
  'VIA PASTA',
  'BEBIDAS E ALIMENTOS',
  'RIO DE JANEIRO',
  'RIO DE JANEIRO',
  'RJ',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('8',
  'VIA FRIENDS',
  'BEBIDAS E ALIMENTOS',
  'RIO DE JANEIRO',
  'RIO DE JANEIRO',
  'RJ',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('9',
  'VIA HAPPY HOUR',
  'BEBIDAS E ALIMENTOS',
  'RIO DE JANEIRO',
  'RIO DE JANEIRO',
  'RJ',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('10',
  'VIA LIFE',
  'SAUDE',
  'BELO HORIZONTE',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('11',
  'VIA MED',
  'SAUDE',
  'UBERLANDIA',
  'MINAS GERAIS',
  'MG',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('12',
  'VIA PREVENT',
  'SAUDE',
  'SALVADOR',
  'CEARA',
  'CE',
  'NORDESTE',
  'NE',
  'BRASIL',
  'BR'),
 ('13',
  'VIA SENIOR',
  'SAUDE',
  'RECIFE',
  'PERNAMBUCO',
  'PE',
  'NORDESTE',
  'NE',
  'BRASIL',
  'BR')]

### **Exercício 4**

Realize a junção da tabela cliente com a tabela negociacao, considerando a integridade referencial definida em termos de clientePK (ou seja, `cliente.clientePK = negociacao.clientePK`). 

In [None]:
# Resposta do exercício

# Verificando quais colunas estão presentes no RDD cliente
display(cliente_header)

# Verificando quais colunas estão presentes no RDD negociacao
display(negociacao_header)

# Realizando a junção dos RDDs cliente e negociacao
cliente_rdd.join(negociacao_rdd).take(5)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

['equipePK', 'clientePK', 'dataPK', 'receita', 'quantidadeNegociacoes']

[('4', ('VIA VEG', '4')),
 ('4', ('VIA VEG', '18')),
 ('4', ('VIA VEG', '36')),
 ('4', ('VIA VEG', '135')),
 ('4', ('VIA VEG', '142'))]

**Dica para exercício:** Método ***join()***

O método `join()` pode ser utilizado para juntar duas tabelas de acordo com a integridade referencial, ou seja, de acordo com a chave primária presente em uma primeira tabela e a chave secundária presente em uma segunda tabela.

No exemplo a seguir, o método `join()` é utilizado para juntar dados da tabela de dimensão funcionario com dados da tabela de dimensão pagamento, utilizando a junção estrela definida em termos de `funcionario.funcPK = pagamento.funcPK`

In [None]:
#listando os metadados de funcionário
display(funcionario_header)

#criando um RDD temporário para funcionário, contendo apenas algumas colunas 
funcionario_temp = funcionario_rdd \
  .map(lambda x: (x[0], x[2], x[9]))

#listando os 5 primeiros elementos de "funcionario_temp"
funcionario_temp \
  .take(5)

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

[('1', 'ALINE ALMEIDA', 'SAO PAULO'),
 ('2', 'ARAO ALVES', 'SAO PAULO'),
 ('3', 'ARON ANDRADE', 'SAO PAULO'),
 ('4', 'ADA BARBOSA', 'SAO PAULO'),
 ('5', 'ABADE BATISTA', 'SAO PAULO')]

In [None]:
#listando os metadados de pagamento
display(pagamento_header)

#criando um RDD temporário para pagamento, contendo apenas as colunas referentes à funcPK e às medidas numéricas
pagamento_temp = pagamento_rdd\
  .map(lambda x: (x[0], x[4], x[1]))

#listando os 5 primeiros elementos de "pagamento_temp" para o funcionário com funcPK = 5
def filterPagamentoFuncionario(func):
  return True if func[0] == '4' else False
pagamento_temp.filter(filterPagamentoFuncionario).take(5)

['funcPK', 'equipePK', 'dataPK', 'cargoPK', 'salario', 'quantidadeLancamentos']

[('4', '10498.14', '5'),
 ('4', '10498.14', '5'),
 ('4', '10498.14', '5'),
 ('4', '10498.14', '5'),
 ('4', '10498.14', '5')]

In [None]:
#realizando a junção de funcionario_temp com pagamento_temp, para o funcionário com funcPK igual a 5
#note que a juncao é feita considerando a igualdade na primeira coluna
#note também que somente os valores da segunda coluna de funcionario e de pagamento são retornados 
funcionario_temp \
  .join(pagamento_temp) \
  .take(5)

[('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14'))]

### **Exercício 5** 

Liste a quantidade de clientes por região. Ordene o resultado pelo nome da região em ordem crescente.

In [None]:
# Resposta do exercício

# Verificando quais colunas estão presentes no RDD cliente
display(cliente_header)

# Calculando a quantidade de clientes por região e ordenando o resultado
cliente_rdd.map(lambda x: (x[6], 1)) \
  .reduceByKey(lambda x, y: x + y) \
  .sortBy(lambda element: element[0]) \
  .collect()


['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

[('CENTRO-OESTE', 10),
 ('NORDESTE', 30),
 ('NORTE', 10),
 ('SUDESTE', 130),
 ('SUL', 30)]

**Dica para o exercício**: Método ***reduceByKey()***

O método `reduceByKey()` pode ser utilizado para calcular valores agregados para cada valor de chave presente em uma coluna. 

No exemplo a seguir, é listada a quantidade de funcionários por estado.

Para responder a essa consulta, é necessário utilizar os dados de `funcionario_rdd`, usando a coluna `funcEstadoNome` e contando quantas vezes o mesmo nome de estado aparece. São executados os seguintes passos:

- Utilização do método `map()` para selecionar a coluna desejada `funcEstadoNome`, que é a décima coluna de `funcionario_rdd`, e para criar pares chave-valor da seguinte forma: a chave corresponde à coluna `funcEstadoNome` e o valor corresponde a 1.
- Utilização do método `reduceByKey()` para calcular, para cada chave a quantidade de vezes que ela aparece.
- Utilização do método `collect()` para exibir os pares chave-valor obtidos.

In [None]:
# Verificando quais colunas estão presentes no RDD funcionario
display(funcionario_header)

# Executando os passos necessários para responder à consulta
funcionario_rdd \
  .map(lambda x: (x[9], 1)) \
  .reduceByKey(lambda x, y: x + y) \
  .collect()

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

[('MINAS GERAIS', 28),
 ('PARANA', 28),
 ('PERNAMBUCO', 21),
 ('SAO PAULO', 95),
 ('RIO DE JANEIRO', 28)]

### **Exercício 6** 

Qual foi o maior e o menor salário pago em 2019?



**Dica para o exercício**: Para responder a essa consulta, é necessário utilizar os dados de pagamento_rdd, usando a coluna salario e aplicando o método min() e max(), considerando a seguinte sequência de passos.

- Utilização do método `map()` para transformar o tipo de dados da coluna salario em um número de ponto flutuante (ou seja, float)

- Utilização do método `min()` para descobrir o menor salário.

- Utilização do método `max()` para descobrir o maior salário.

In [None]:
# Resposta do exercício

# Verificando quais colunas estão presentes no RDD pagamento
display(pagamento_header)

['funcPK', 'equipePK', 'dataPK', 'cargoPK', 'salario', 'quantidadeLancamentos']

In [None]:
# Obtendo o menor salário
pagamento_rdd.map(lambda x: float(x[4])).min()

1501.57

In [None]:
# Obtendo o maior salário
pagamento_rdd.map(lambda x: float(x[4])).max()

47140.17

### **Exercício 7** 

Qual a idade média dos funcionários? Para fazer este exercício, não precisa considerar a idade exata, ou seja, não é necessário considerar o dia no qual o funcionário nasceu. Considere apenas o ano de nascimento do funcionário.  

**Exercício 7a** Resolva o exercício utilizando o método *mean()*.

A seguinte sequência de passos deve ser realizada:

- Utilização do método `map()` para obter os anos de nascimento dos funcionários.
- Utilização do método `map()` para calcular a idade do funcionário, considerando o ano atual de 2020 e o ano no qual o funcionário nasceu. 
- Utilização do método `mean()` para gerar o resultado final.

In [None]:
# Resposta do exercício

# Criando uma função que calcula a idade a partir do ano
def idadeApartirDeAno(ano):
  anoAtual = 2020
  return anoAtual - int(ano)

# Calculando a idade média dos funcionários usando o método mean()
funcionario_rdd \
  .map(lambda x: (x[7])) \
  .map(lambda x: idadeApartirDeAno(x)) \
  .mean()

37.595

**Exercício 7b** Resolva o exercício **sem** usar o método *mean().*

A seguinte sequência de passos deve ser realizada:

- Utilização do método `map()` para obter os anos de nascimento dos funcionários.
- Utilização do método `map()` para calcular a idade do funcionário, considerando o ano atual de 2020 e o ano no qual o funcionário nasceu. 
- Utilização do método `map()` para gerar pares chave-valor, de forma que a chave seja a idade do funcionário e o valor seja 1.
- Utilização do método `reduceByKey()` reduzindo pelas chaves e somando as idades e as quantidades de linhas.
- Utilização do método `mapValues()` para mapear os valores e para dividir as somas das idades pelas quantidades de linhas.
- Utilização do método `map()` para mapear a idade média dos funcionários.

In [None]:
# Resposta do exercício

# Criando uma função que calcula idade a partir de ano
def idadeApartirDeAno(ano):
  anoAtual = 2020
  return anoAtual - int(ano)

# Calculando a idade média dos funcionários usando o método mean()
funcionario_rdd \
  .map(lambda x: (x[7])) \
  .map(lambda x: idadeApartirDeAno(x)) \
  .map(lambda x: (1, (x, 1))) \
  .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
  .mapValues(lambda x: x[0]/x[1]) \
  .map(lambda x: x[1]) \
  .collect()

[37.595]

### **Exercício 8**

Liste a quantidade de vezes que cada palavra individual aparece nos nomes de cidades que os funcionários moram. Por exemplo, na cidade de SAO JOSE DO RIO PRETO, existem 5 palavras individuais. Ordene o resultado de forma que as palavras individuais que mais se repetem sejam mostradas primeiro. 


In [None]:
# Resposta do exercício

# Verificando quais colunas estão presentes no RDD pagamento
display(funcionario_header)

# Calculando a quantidade de vezes que cada palavra individual aparece
funcionario_rdd.map(lambda x: x[8]). \
    flatMap(lambda line: line.split(" ")). \
    map(lambda word: (word, 1)). \
    reduceByKey(lambda x, y: x + y). \
    sortBy(lambda word_count: word_count[1], ascending=False). \
    collect()

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

[('SAO', 24),
 ('PRETO', 23),
 ('RECIFE', 21),
 ('RIO', 15),
 ('CAMPINAS', 8),
 ('RIBEIRAO', 8),
 ('ILHA', 8),
 ('JOSE', 8),
 ('PAULO', 8),
 ('SANTOS', 8),
 ('SANTO', 8),
 ('ANDRE', 8),
 ('PIRACICABA', 8),
 ('CARLOS', 8),
 ('BELA', 8),
 ('DO', 8),
 ('OSASCO', 8),
 ('ARARAQUARA', 8),
 ('BARUERI', 7),
 ('DE', 7),
 ('REDONDA', 7),
 ('ANGRA', 7),
 ('DOS', 7),
 ('PRAIA', 7),
 ('SECA', 7),
 ('HORIZONTE', 7),
 ('ARAGUARI', 7),
 ('MONTE', 7),
 ('CURITIBA', 7),
 ('GUARATUBA', 7),
 ('MORRETES', 7),
 ('JANEIRO', 7),
 ('VOLTA', 7),
 ('REIS', 7),
 ('BELO', 7),
 ('OURO', 7),
 ('VERDE', 7),
 ('LONDRINA', 7)]

### **Exercício 9** 

Considere o código Python a seguir, o qual tem como objetivo contar quantas
vezes uma determinada palavra aparece em um arquivo texto com 4 linhas de log.

```python
error_lines = ["""(2021-11-01 06:58:43) ERROR - Event with job id 1abc Failed,
                  (2021-11-01 06:58:47) ERROR - Event with job id 2abc Failed,
                  (2021-11-01 06:58:50) ERROR - Event with job id 3abc Failed,
                  (2021-11-01 06:59:43) INFO - Number of table lines is 102345."""]

output = spark.\
parallelize(error_lines).\
flatMap(lambda element: element.split(" ")).\
filter(lambda element: True if element == 'ERROR' else False).\
count()
```
Explique qual é a função dos métodos *flatMap(), filter() e count(). Explique também qual o resultado da variável `output`.


**Resposta do Exercício:**

**Método `flatMap()`**: Aplica a função lambda para todos os elementos presentes no RDD gerado pelo método `parallelize()` e nivela os resultados. Da forma como está especificada,a função lambda realiza a separação das palavras por espaço em branco usando o método `split()` nativo do Python.

**Método `filter()`:** Aplica a função booleana lambda sobre todos os elementos do RDD e retorna apenas os elementos que são verdadeiros baseados em uma condição. Da forma como está especificada, a função lambda retorna todas as palavras que são iguais à string "`ERROR`".

**Método `count()`:** Retorna o número de elementos do RDD. 

**Resultado da variável `output`:** Da forma como está especificado, o método `count()` retorna quantas vezes a palavra “ERROR” aparece. O valor da variável `output` é 3, pois considerando o arquivo de log representado por `error_lines` e a forma como está especificado o código Python, a palavra "`ERROR`" aparece 3 vezes.

### **Exercício 10**

Considere a figura a seguir, a qual representa um cluster de computadores que segue
a arquitetura do sistema de arquivos distribuídos HDFS (*Hadoop Distributed File System*).
Ao lado de cada quadrado, existe uma numeração que representa o componente dentro
daquele quadrado. Responda às questões a seguir, utilizando como base essa numeração.



<br>

<p align="center"><img src="https://raw.githubusercontent.com/kenjitakatuzi/MBA-ICMC-2022/main/images/CLUSTER.png" width="500" height="300"></p>
<!-- <p align="center">Figura 1 - Representação de um cluster de computadores que segue a arquitetura do HDFS. </p> -->

<br>

**Resposta do Exercício:**

(1) Componente 1

(2) Componentes 2, 3 e 4

(3) Componente 5

(4) Componentes 2, 3 e 4

(5) Componentes 1 e 5

(6) Componentes 2, 3 e 4

### **Exercício 11**

Transformações são operações que transformam um RDD (Resilient and Distributed Datasets) em outro RDD. No Spark, as transformações não são executadas imediatamente sobre os dados do RDD. Elas são anexadas ao grafo acíclico direcionado de transformações, o qual é executado apenas quando uma ação sobre o RDD em questão for solicitada. Essa característica é conhecida como lazy-evaluation.

Como resultado, operações podem ser executadas em conjunto, possibilitando otimizações. Por exemplo, os métodos `map()` e `reduceByKey()` são transformações, e eles somente são executados quando ocorrer uma ação, como quando o método collect() for chamado. Portanto, `map()`, `reduceByKey()` e `collect()` podem ser executados em conjunto.

No exemplo do contador de palavras, comumente utiliza-se o que é conhecido como side-map reducer, indicando que internamente o Spark usa uma estrutura de HashMap e faz o método de `reduceByKey()` localmente antes de enviar o resultado para a etapa de reducing propriamente dita. Isso é ilustrado por meio da seta azul na figura a seguir.

Por outro lado, o framework Hadoop não provê operações com a característica de *lazy-evaluation*. Portanto, operações não são executadas em conjunto. Refaça a figura a seguir de forma que o exemplo do contador de palavras não considere essa característica, ou seja, que o contador de palavras não considere a característica de *lazy-evaluation*.

**Dica para o exercício:** Ao invés de refazer a figura, é possível discutir as alterações que precisam ser feitas.

<br>

<p align="center"><img src="https://raw.githubusercontent.com/kenjitakatuzi/MBA-ICMC-2022/main/images/FIGURA1.jpeg" width="600" height="300"></p>
<!-- <p align="center">Figura 2 - Exemplo de contador de palavras.</p> -->

<br>



**Resposta do exercício:**

<br>

<p align="center"><img src="https://raw.githubusercontent.com/kenjitakatuzi/MBA-ICMC-2022/main/images/FIGURA2.jpeg" width="600" height="300"></p>
<!-- <p align="center">Figura 1. Exemplo de contador de palavras. </p> -->

<br>