# Introdução ao Big Data com Apache Spark

Este tutorial tem como objetivo facilitar o entendimento do conceito de Big Data utilizando o Apache Spark.
    
# Arquitetura do Apache Spark

Antes de partir para o código, vamos ver uma visão geral da arquitetura do Apache Spark. Esta arquitetura permite que você possa processar seus códigos em várias máquinas como se fosse uma só através da arquitetura master-worker, onde existe um `driver` ou nó master no cluster, acompanhado pelos nós `worker`. O master envia o trabalho para os workers com instruções para carregar os dados da memória ou do disco.

O diagrama abaixo apresenta um exemplo de um cluster com Apache Spark, onde basicamente existe um nó Driver que comunica com os nós executors. Cada um destes nós executors tem slots que são logicamente como núcleos de processsamento.

![spark-architecture](https://intellipaat.com/mediaFiles/2017/02/Spark-Arch.jpg)



*   https://www.johnsnowlabs.com/unlocking-the-power-of-sentiment-analysis-with-deep-learning/
*   https://sparknlp.org/api/python/reference/autosummary/sparknlp/annotator/classifier_dl/sentiment_dl/index.html




# SparkSession - Um ponto de entrada unificado no Apache Spark 2.0

No Spark 2.0, foi introduzido o [SparkSession](https://spark.apache.org/docs/preview/api/python/pyspark.sql.html#pyspark.sql.SparkSession), um novo ponto de entrada que agrega o SparkContext, SQLContext, StreamingContext, and HiveContext. SparkSession possui muitas funcionalidades e nesse notebook vamos apresentar as mais importantes para ilustrar o acesso às funcionalidades do Spark.

## Spark NLP

Neste tutorial vamos fazer uso de uma técnica denominada Processamento de Linguagem Natural, mais conhecido pelo termo em inglês Natural Language Processing (NLP). O processamento de linguagem natural usa machine learning para revelar a estrutura e o significado do texto. Com aplicativos de processamento de linguagem natural, as organizações podem analisar textos e extrair informações sobre pessoas, lugares e eventos para entender melhor as opiniões em mídias sociais e conversas de clientes.

Spark NLP é uma biblioteca de processamento de linguagem natural de última geração que foi construída sobre o Apache Spark. Ele fornece bibliotecas de NLP simples, eficientes e precisas para pipelines de aprendizado de máquina que podem ser implantadas facilmente em um ambiente distribuído. Ele oferece suporte a quase todas as tarefas e módulos de NLP que podem ser usados sem problemas em um cluster Spark.

<img src="https://www.johnsnowlabs.com/wp-content/uploads/2023/05/23_05_2023.jpg"  width="90%" height="70%">


## Tutorial com Spark NLP
Neste tutorial iremos usar a análise de sentimentos para localizar e rotular campos em reviews de usuários do Google Play para entender melhor as opiniões dos clientes e encontrar insights sobre o produto e a experiência do usuário.

O Google Colab é talvez a maneira mais fácil de começar a usar o spark-nlp. Ele não requer nenhuma instalação ou configuração além de ter uma conta do Google.

Execute o código a seguir no notebook do Google Colab e comece a usar o Spark NLP imediatamente. O Spark NLP é compatível com Python 3.7.x e superior, dependendo da versão principal do PySpark.

In [None]:
# This is only to setup PySpark and Spark NLP on Colab
!wget https://setup.johnsnowlabs.com/colab.sh -O - | bash

Para iniciar um cluster Spark com suporte a NLP, basta importar a biblioteca e iniciar a sessão executando o comando abaixo. O parâmetro gpu=True indica para o Spark NLP que irá usar a GPU no processamento dos textos, caso tenha alguma disponível.

In [None]:
import sparknlp
spark = sparknlp.start(gpu=True)

print("Spark NLP version: {}".format(sparknlp.version()))
print("Apache Spark version: {}".format(spark.version))

# Análise de Sentimento com Spark NLP

Neste tutorial iremos utilizar modelos de Deep Learning (DL) com Large Language Models (LLM) para análise de sentimentos.

Os modelos de DL para análise de sentimentos podem aprender representações de texto automaticamente sem a necessidade de engenharia de feature. Esses modelos podem capturar padrões mais complexos nos dados e podem ter um desempenho melhor em tarefas com mais nuances, como detecção de sarcasmo ou reconhecimento de emoções. No entanto, eles podem exigir conjuntos de dados maiores para treinamento e podem ser computacionalmente caros, exigindo recursos de computação de alto desempenho (GPUs).

## Salvando o modelo do HuggingFace

Em muitos casos, você pode obter resultados de alta qualidade com modelos de aprendizado de máquina que foram treinados anteriormente em grandes conjuntos de dados de texto. Muitos desses modelos pré-treinados estão disponíveis em código aberto e são de uso gratuito. A HuggingFace é uma excelente fonte desses modelos, e sua biblioteca Transformers é uma ferramenta fácil de usar para aplicar os modelos e também adaptá-los aos seus próprios dados.

Neste artigo iremos mostrar como utilizar modelos do HuggingFace pré-treinados para carregar no Spark.

O primeiro passo é instalar e importar as bibliotecas transformers e tensorflow:

In [None]:
!pip install -q transformers==4.25.1 tensorflow==2.11.0

In [None]:
from transformers import TFBertForSequenceClassification, BertTokenizer
import tensorflow as tf

Iremos utilizar um modelo (https://huggingface.co/ramonmedeiro1/bertimbau-products-reviews-pt-br) treinado a partir do Bertimbau da Neuralmind (https://huggingface.co/neuralmind/bert-base-portuguese-cased) em um dataset chamado B2W-Reviews01. Este é um corpus aberto de reviews de produtos contendo mais de 130 mil avaliações de clientes de comércio eletrônico, coletadas no site da Americanas.com.

In [None]:
MODEL_NAME = "ramonmedeiro1/bertimbau-products-reviews-pt-br"

Para carregar o modelo iremos utilizar a função `from_pretrained`:

In [None]:
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)

O HuggingFace vem com uma função `save_pretrained` para modelos baseados no TensorFlow. Usaremos isso para salvar o vocabulário do modelo.

In [None]:
tokenizer.save_pretrained('./{}_tokenizer/'.format(MODEL_NAME))

Iremos carregar o modelo usando o `TFBertForSequenceClassification` que é usado para classificação de sentenças:

In [None]:
try:
  model = TFBertForSequenceClassification.from_pretrained(MODEL_NAME)
except:
  model = TFBertForSequenceClassification.from_pretrained(MODEL_NAME, from_pt=True)

O trecho de código abaixo salva o modelo completo que será usado, posteriormente, para análise de sentimento:

In [None]:
# Define TF Signature
@tf.function(
  input_signature=[
      {
          "input_ids": tf.TensorSpec((None, None), tf.int32, name="input_ids"),
          "attention_mask": tf.TensorSpec((None, None), tf.int32, name="attention_mask"),
          "token_type_ids": tf.TensorSpec((None, None), tf.int32, name="token_type_ids"),
      }
  ]
)
def serving_fn(input):
    return model(input)

model.save_pretrained("./{}".format(MODEL_NAME), saved_model=True, signatures={"serving_default": serving_fn})

A pasta de assets está vazia e iremos copiar o vocabulário para esta pasta. Agora o modelo estará completo:

In [None]:
asset_path = '{}/saved_model/1/assets'.format(MODEL_NAME)

!cp {MODEL_NAME}_tokenizer/vocab.txt {asset_path}

Carrega o dicionário label2id com os labels de sentimentos dos reviews:

In [None]:
labels = model.config.label2id

Ordena o dicionário baseado no id

In [None]:
labels = sorted(labels, key=labels.get)

Salva o dicionário com os labels de sentimentos na pasta de assets:

In [None]:
with open(asset_path+'/labels.txt', 'w') as f:
    f.write('\n'.join(labels))

## Coletando os reviews

In [None]:
#Instala a biblioteca Google play scraper: https://github.com/JoMingyu/google-play-scraper
!pip install google_play_scraper

In [None]:
from google_play_scraper import Sort, reviews, app
rvs, _ = reviews(
                'com.amazon.mShop.android.shopping',
                lang='pt',
                country='br',
                sort='most_relevant',
                count= 20
            )

In [None]:
import pandas as pd
app_reviews_df = pd.DataFrame(rvs)

In [None]:
reviews_df = spark.createDataFrame(app_reviews_df[['reviewId', 'content', 'score', 'at']])

## Criando o Pipeline de NLP
O Spark NLP processa os dados usando Pipelines, estrutura que contém todas as etapas a serem executadas nos dados de entrada.

Cada etapa contém um `Annotator` que executa uma tarefa específica, como tokenização, normalização e análise de dependência. Cada `Annotator` tem anotação(ões) de entrada e produz nova anotação. Um `Annotator` pega um documento de texto de entrada e produz um documento de saída com metadados adicionais, que podem ser usados para processamento ou análise posterior.

In [None]:
from sparknlp.annotator import *
from sparknlp.base import *

Para passar pelo pipeline de NLP, precisamos anotar os dados brutos. O `DocumentAssembler` prepara os textos de entrada em um formato que pode ser processado pelo Spark NLP. Esse é o ponto de entrada para todos os pipelines do Spark NLP.

In [None]:
document_assembler = DocumentAssembler() \
    .setInputCol('content') \
    .setOutputCol('document')

A classe Tokenizer irá gerar os tokens a partir do documento de entrada:

In [None]:
tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

* Vamos usar a função `loadSavedModel` da classe `BertForSequenceClassification`, que nos permite carregar o modelo do TensorFlow no formato SavedModel
* A maioria dos parâmetros pode ser definida posteriormente, quando você estiver carregando esse modelo no `BertForSequenceClassification` em tempo de execução, como `setMaxSentenceLength`, portanto, não se preocupe com o que está definindo agora.
* `loadSavedModel` aceita dois parâmetros:
  * o primeiro é o caminho para o TF SavedModel.
  * o segundo é a SparkSession, que é a variável Spark que iniciamos anteriormente por meio de `sparknlp.start()`

In [None]:
sequenceClassifier = BertForSequenceClassification.loadSavedModel(
     '{}/saved_model/1'.format(MODEL_NAME),
     spark
 )\
  .setInputCols(["document",'token'])\
  .setOutputCol("class")\
  .setMaxSentenceLength(128)

In [None]:
# introduzido no spark-nlp==3.4.0
sequenceClassifier.getClasses()

Agora criamos o Pipeline com os seguintes estágios:


1.   `Document Assembler`: recebe o texto de entrada e gera um objeto  `document` que contém os dados e metadados para o modelo de IA.
2.   `Tokenizer`: transforma o texto em tokens (objeto `token`) que serão utilizados pelo Transformer na análise de sentimento.
3. `SequenceClassifier`: recebe os objetos `document` e `token` e gera como saída o resultado final da análise de sentimento.



In [None]:
pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    sequenceClassifier
])

# Fazendo a análise de sentimento dos reviews

O código abaixo executa a inferência e determina o sentimento de cada linha do Dataframe:

In [None]:
result = pipeline.fit(reviews_df).transform(reviews_df)

O resultado da análise de sentimento é colocada em um vetor e temos que transformar em uma String:

In [None]:
result = result.withColumn("result", result["class.result"].getItem(0))

Aqui fazemos um tratamento para poder comparar o resultado do modelo com o review do usuário. Por isso, iremos ter três classes de resultados:

* **0.0**: Negativo e Muito Negativo
* **1.0**: Neutro
* **2.0**: Positivo e Muito Positivo

In [None]:
from pyspark.sql.functions import when, col, lit
result = result.withColumn("prediction",
       when((col('result') == lit('Negativo')) | (col('result') == lit('Muito Negativo')), 0.0)
      .when(col('result') == lit('Neutro'), 1.0)
      .otherwise(2.0))

Aqui fazemos um tratamento para poder comparar o resultado do modelo com o review do usuário. O score do usuário no Google Play vai de 1 a 5 e iremos mapear em três classes de resultados:

* **0.0**: Score 1 e 2
* **1.0**: Score 3
* **2.0**: Score 4 e 5

In [None]:
result = result.withColumn("label",
       when((col('score') == 1) | (col('score') == 2), 0.0)
      .when(col('score') == 3, 1.0)
      .otherwise(2.0))

In [None]:
result.select('content', 'prediction', 'label', 'score', 'result').filter("label != prediction").show(truncate=False)

## Predizendo reviews

Aqui fazemos a análise de sentimento em lote a partir do arquivo csv que baixamos dos reviews do Google Play.

In [None]:
!gdown https://drive.google.com/uc?id=1oquPsLdKqAUphOLXPbQ2YAZ15xELtXUu

Esta linha abaixo lê o arquivo csv com o Apache Spark:

In [None]:
test_dataset = spark.read.csv("reviews.csv", header=True, quote="\"", escape="\"", multiLine=True)

Iremos utilizar na análise de sentimento apenas as colunas "reviewId", "content", "score", "sentiment". Por isso, utilizamos a função `select` do Spark para selecionar apenas estas colunas:

In [None]:
test_dataset = test_dataset.select("reviewId", "content", "score", "sentiment")

Aqui fazemos a análise de sentimento de cada texto do csv:

In [None]:
result = pipeline.fit(test_dataset).transform(test_dataset)

O resultado da análise de sentimento é colocada em um vetor e temos que transformar em uma String:

In [None]:
result = result.withColumn("result", result["class.result"].getItem(0))

Nas duas células abaixo fazemos a transformação das colunas de resultado do modelo e do review do usuário para saber a acurácia do modelo:

In [None]:
from pyspark.sql.functions import when, col, lit
result = result.withColumn("prediction",
       when((col('result') == lit('Negativo')) | (col('result') == lit('Muito Negativo')), 0.0)
      .when(col('result') == lit('Neutro'), 1.0)
      .otherwise(2.0))

In [None]:
result = result.withColumn("label",
       when((col('score') == 1) | (col('score') == 2), 0.0)
      .when(col('score') == 3, 1.0)
      .otherwise(2.0))

A classe `MulticlassClassificationEvaluator` do Apache Spark ML faz o cálculo automático da acurácia do modelo utilizando os rótulos esperados (reviews do usuário) e o resultado do modelo (análise de sentimento).

In [None]:
# Select (prediction, true label) and compute test error
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(result)
print(f"Accuray = {accuracy}")