<a href="https://colab.research.google.com/github/robsonbrandao/artigos/blob/main/03%20-%20Apache%20Spark%20-%20Usando%20RDD/ContadorPalavras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <span style="color:blue">Processamento Analítico de Dados em Larga Escala</span>

## <span style="color:blue">Aula 03: Spark: Fundamentos e Consultas em SQL</span>
## <span style="color:blue">Apache Spark RDD: Contador de Palavras</span>


**ICMC/USP**

O objetivo deste *notebook* é mostrar uma implementação do contador de palavras usando a classe pyspark.RDD do Apache Spark RDD. O contador de palavras é um exemplo clássico de explicação da funcionalidade do modelo de programação funcional MapReduce.

O detalhamento da classe `pyspark.RDD` pode ser encontrada na documentação oficial do Spark neste [link](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD). Neste notebook são explicados apenas o uso de alguns métodos para exemplificar o contador de palavras.

**IMPORTANTE: O uso do *framework* Spark requer diversas configurações no ambiente de desenvolvimento para executar o *notebook*. Dado que tal complexidade foge do escopo de nossa disciplina, recomenda-se que o *notebook* seja executado na plataforma de desenvolvimento COLAB. O uso do COLAB  proporciona um ambiente de desenvolvimento pré-configurado e remove a complexidade de instalação e configuração de pacotes e *frameworks* que são utilizados na disciplina.**

# 1 Apache Spark Cluster

## 1.1 Instalação

Neste *notebook* é criado um *cluster* Spark composto apenas por um **nó mestre**. Ou seja, o *cluster* não possui um ou mais **nós de trabalho** e o **gerenciador de cluster**. Nessa configuração, as tarefas (*tasks*) são realizadas no próprio *driver* localizado no **nó mestre**.

Para que o cluster possa ser criado, primeiramente é instalado o Java Runtime Environment (JRE) versão 8. 

In [1]:
# instalando Java Runtime Environment (JRE) versão 8
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Na sequência, é feito o *download* do Apache Spark versão 3.0.0.

In [2]:
# baixando Apache Spark versão 3.0.0
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

Na sequência, são configuradas as variáveis de ambiente JAVA_HOME e SPARK_HOME. Isto permite que tanto o Java quanto o Spark possam ser encontrados.

In [3]:
import os

#configurando a variável de ambiente JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

#configurando a variável de ambiente SPARK_HOME
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Por fim, são instalados dois pacotes da linguagem de programação Python, cujas funcionalidades são descritas a seguir.

> **Pacote findspark:** Usado para ler a variável de ambiente SPARK_HOME e armazenar seu valor na variável dinâmica de ambiente PYTHONPATH. Como resultado, Python pode encontrar a instalação do Spark. 

> **Pacote pyspark:** PySpark é a API do Python para Spark. Ela possibilita o uso de Python, considerando que o *framework* Apache Spark encontra-se desenvolvido na linguagem de programação Scala. 

In [4]:
%%capture

# instalando o pacote findspark
!pip install -q findspark==1.4.2

# instalando o pacote pyspark
!pip install -q pyspark==3.0.0

## 1.2 Conexão

PySpark não é adicionado ao *sys.path* por padrão. Isso significa que não é possível importá-lo, pois o interpretador da linguagem Python não sabe onde encontrá-lo. 

Para resolver esse aspecto, é necessário instalar o módulo `findspark`. Esse módulo mostra onde PySpark está localizado. Os comandos a seguir têm essa finalidade.


In [5]:
# importando o módulo findspark
import findspark

# carregando a variávels SPARK_HOME na variável dinâmica PYTHONPATH
findspark.init()

Depois de configurados os pacotes e módulos e inicializadas as variáveis de ambiente, é possível criar o objeto *SparkContext*. Esse objeto é criado com o objetivo de manipular RDDs diretamente. 

No comando de criação a seguir, é definido que é utilizado o próprio sistema operacional deste *notebook* como **nó mestre** por meio do parâmetro **local** do método **setMaster**. O complemento do parametro **[*]** indica que são alocados todos os núcleos de processamento disponíveis para o objeto *driver* criado.

In [6]:
from pyspark import SparkConf, SparkContext

# O nó mestre é o local onde rodo
conf = SparkConf().setMaster("local[*]")
spark = SparkContext(conf=conf)

# 2 Contador de Palavras

### 2.1 Conjunto de Dados

Como conjunto de dados, é usado o arquivo `logs.txt`. Ele é um arquivo texto com 56.481 linhas de *log* de um servidor web. O arquivo fonte pode ser obtido neste ([link](https://github.com/logpai/loghub/tree/master/Apache)).

O comando a seguir obtém os dados do arquivo texto `logs.txt`.

In [7]:
# obtendo os dados do arquivo texto logs.txt
# Arquivo de log de um servidor Web com 56K entradas
%%capture
!wget -q https://zenodo.org/record/3227177/files/Apache.tar.gz?download=1 -O logs.tar.gz
!tar xf logs.tar.gz && rm -rf logs.tar.gz
!mv Apache.log logs.txt

In [8]:
# abrindo o arquivo logs.txt no modo leitura (mode="r") 
# exibindo os primeiros 10 registros do arquivo
with open(file="logs.txt", mode="r") as fp: 
  for _ in range(0, 10): print(fp.readline())

[Thu Jun 09 06:07:04 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK

[Thu Jun 09 06:07:04 2005] [notice] LDAP: SSL support unavailable

[Thu Jun 09 06:07:04 2005] [notice] suEXEC mechanism enabled (wrapper: /usr/sbin/suexec)

[Thu Jun 09 06:07:05 2005] [notice] Digest: generating secret for digest authentication ...

[Thu Jun 09 06:07:05 2005] [notice] Digest: done

[Thu Jun 09 06:07:05 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK

[Thu Jun 09 06:07:05 2005] [notice] LDAP: SSL support unavailable

[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating channel.jni:jni ( channel.jni, jni)

[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create channel.jni:jni

[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating vm: ( vm, )



### 2.2 Criação do RDD

Existem duas formas de se criar RDDs, conforme descrito a seguir:

- Paralelizando uma coleção de dados já existente no *driver*.

- Referenciando um conjunto de dados armazenado em um sistema de armazenamento externo, como um sistema de arquivo compartilhado, HDFS, HBase, Cassandra, ou qualquer outra fonte de dados que ofereça suporte para o formato de entrada do Hadoop. 

O arquivo `logs.txt` é um arquivo armazenado em um sistema de armazenamento externo. Para se criar o RDD referente a esse arquivo, é utilizado o método `textFile()`. 

O comando a seguir utiliza o método `textFile()` para armazenar no RDD chamado `lines_rdd` os registros do arquivo de texto `"logs.txt"`. Como resultado,  `lines_rdd` consiste de um RDD de *strings*. 

In [9]:
# criando o RDD lines_rdd
lines_rdd = spark.textFile("logs.txt")

In [10]:
# exibindo as 15 primeiras linhas de lines_rdd. Método take()
lines_rdd.take(15)

['[Thu Jun 09 06:07:04 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK',
 '[Thu Jun 09 06:07:04 2005] [notice] LDAP: SSL support unavailable',
 '[Thu Jun 09 06:07:04 2005] [notice] suEXEC mechanism enabled (wrapper: /usr/sbin/suexec)',
 '[Thu Jun 09 06:07:05 2005] [notice] Digest: generating secret for digest authentication ...',
 '[Thu Jun 09 06:07:05 2005] [notice] Digest: done',
 '[Thu Jun 09 06:07:05 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK',
 '[Thu Jun 09 06:07:05 2005] [notice] LDAP: SSL support unavailable',
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating channel.jni:jni ( channel.jni, jni)',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create channel.jni:jni",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating vm: ( vm, )',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create vm:",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating worker.jni:onStartup 

## Passo a Passo

O método `flatMap` aplica uma função `func` sobre todos os elementos de um RDD e nivela os resultados gerados. 

No comando a seguir, aplica-se o método nativo do Python `split`, por meio da transformação do método `flatMap`, sobre os elementos em `line_rdd` para separar as linhas do arquivo `logs.txt` em palavras. Uma palavra é identificada quando se encontra um espaço em branco. 

In [11]:
# Quebrar os textos em palavras
words_rdd = lines_rdd.flatMap(lambda line: line.split(" "))
words_rdd.take(7)

['[Thu', 'Jun', '09', '06:07:04', '2005]', '[notice]', 'LDAP:']

O método `map` aplica uma função `func` sobre todos os elementos de um RDD. 

No comando a seguir, usa-se o método `map` para produzir pares chave-valor. O método `map` faz o mapeamento de cada palavra presente no RDD words_rdd em um par chave-valor. O valor 1 é associado a cada palavra (ou chave).

In [12]:
keyValue_rdd = words_rdd.map(lambda word: (word, 1))
keyValue_rdd.take(7)

[('[Thu', 1),
 ('Jun', 1),
 ('09', 1),
 ('06:07:04', 1),
 ('2005]', 1),
 ('[notice]', 1),
 ('LDAP:', 1)]

O método `reduceByKey()`, quando aplicado a um RDD composto por pares chave-valor (C,V), agrega os valores V de forma que esses valores são computados dois a dois usando uma função `func` e agrupados de acordo com a chave C.

No comando a seguir, o método `reduceByKey()` é usado para agrupar palavras iguais, somando-se o número de vezes que cada palavra aparece.

In [13]:
contaPal_rdd = keyValue_rdd.reduceByKey(lambda x, y: x + y)
contaPal_rdd.take(7)

[('09', 1793),
 ('06:07:04', 7),
 ('LDAP:', 106),
 ('SDK', 53),
 ('SSL', 53),
 ('support', 53),
 ('unavailable', 53)]

Pode ser interessante ordenar o resultado final usando o método `sortBy()`, de forma que as palavras com maior ocorrência apareçam primeiro. 

No comando a seguir, as palavras que mais aparecem aparecem primeiro, desde que o método `sortBy()` realiza a ordenação de forma decrescente. 

In [14]:
ordContaPal = contaPal_rdd.sortBy(lambda word_count: word_count[1], ascending=False)
ordContaPal.take(7)

[('[error]', 38081),
 ('2005]', 32309),
 ('[client', 31115),
 ('not', 28808),
 ('File', 20861),
 ('does', 20861),
 ('exist:', 20861)]

## Todos os Passos 

Todos os passos anteriores podem ser agrupados, conforme descrito a seguir.



In [15]:
words_counts = spark.textFile("logs.txt"). \
               flatMap(lambda line: line.split(" ")). \
               map(lambda word: (word, 1)). \
               reduceByKey(lambda x, y: x + y). \
               sortBy(lambda word_count: word_count[1], ascending=False). \
               collect()

In [16]:
words_counts[0:7]

[('[error]', 38081),
 ('2005]', 32309),
 ('[client', 31115),
 ('not', 28808),
 ('File', 20861),
 ('does', 20861),
 ('exist:', 20861)]