# Apache Spark
O Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala com módulos integrados para SQL, streaming, machine learning e processamento de grafos. O Apache Spark é uma estrutura de processamento paralelo de código aberto que oferece suporte ao processamento na memória para aumentar o desempenho de aplicativos que analisam big data. As soluções de big data são projetadas para lidar com dados muito grandes ou complexos para bancos de dados tradicionais. O Spark processa grandes quantidades de dados na memória, o que é muito mais rápido do que as alternativas baseadas em disco.

# Pipeline de dados com Apache Spark, Kafka e MongoDB

Neste tutorial iremos construir um pipeline de dados utilizando o Spark para ler dados do Kafka e escrever no MongoDB

## Utilizar o Pyspark como biblioteca

O PySpark já está instalado no ambiente, mas não está no sys.path por padrão, ou seja, não pode ser utilizado como uma biblioteca Python. Neste tutorial iremos levantar o Spark e iremos precisar da biblioteca PySpark. A biblioteca findspark tem como objetivo adicionar o pyspark ao sys.path em tempo de execução.

Por isso, iremos instalar e iniciar o findspark.

In [None]:
!pip install -q findspark

In [None]:
import findspark
findspark.init()

## Iniciando uma sessão Spark

Por padrão o MongoDB não vem instalado no cluster Spark e, por isso, iremos iniciar uma sessão Spark passando os argumentos necessários para importar a biblioteca do MongoDB no Spark. Aqui o SparkSession inicia com os seguintes parâmetros:

- **appName**: nome da aplicação Spark.
- **spark.mongodb.input.uri**: uri para acesso de leitura de dados do MongoDB
- **spark.mongodb.output.uri**: uri para acesso de escrita de dados do MongoDB
- **spark.jars.packages**: aqui passamos os pacotes do MongoDB para esta versão do Spark que iremos importar. Este parâmetro é importante para que o Spark seja capaz de se comunicar com o  MongoDB.

Repare que não precisamos configurar nenhuma biblioteca para acesso ao Kafka, pois já vem instalado neste cluster do Spark.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Aula Spark") \
    .config("spark.mongodb.input.uri", "mongodb://root:root@mongo/admin") \
    .config("spark.mongodb.output.uri", "mongodb://root:root@mongo/admin") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.4")\
    .getOrCreate()

## Leitura de dados do Kafka

Para leitura de dados do Kafka devemos definir:

- **kafka.bootstrap.servers**: o IP do broker do Kafka, ou seja, o endereço de onde está o cluster do Kafka.
- **subscribe**: tópico que iremos ler os dados.
- **startingOffsets**: offset de leitura do tópico do Kafka. Aqui definimos *earliest* que significa que iremos obter os dados desde o início do tópico. Outra opção é definir como *latest*, ou seja, que iremos ler dados a partir dos últimos dados que estão chegando no tópico.

In [None]:
kafka_df = spark.read.format("kafka")\
                .option("kafka.bootstrap.servers", "kafka:9092")\
                .option("subscribe", "usuarios")\
                .option("startingOffsets", "earliest")\
                .load()

Como pode ser visto no print abaixo, os dados são lidos do Kafka serializados do Kafka.

In [None]:
kafka_df.show(1, truncate=False)

Temos que converter os dados do Kafka em string. O conteúdo da mensagem do Kafka está na coluna *value* e, por isso, iremos converter o conteúdo desta coluna para string com o código abaixo:

In [None]:
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")
kafka_df.show(1, truncate=False)

Agora a mensagem está no formato string armazenada na coluna *value*. A mensagem foi enviada como um objeto JSON e desejamos transformar este objeto JSON em um formato tabular antes de inserir no MongoDB. Para isto, temos que definir um esquema dos dados do JSON, conforme código abaixo:

In [None]:
schema = "id_usuario string, nome_usuario string, endereco_usuario string, platforma string, data_cadastro timestamp"

O esquema criado no código anterior é utilizado na função `from_json` para traduzir a coluna value em uma lista com tipos para cada coluna. O resultado desta função é armazenado em uma nova coluna *jsonData* criada através da função *withColumn*.

In [None]:
from pyspark.sql.functions import from_json, col
df = kafka_df.withColumn("jsonData",from_json(col("value"),schema))

In [None]:
df.show(3, truncate=False, vertical=True)

In [None]:
df.printSchema()

Como queremos apenas os dados presentes na coluna *jsonData*, iremos fazer um 'select \*' nesta coluna para obter os dados no formato tabular:

In [None]:
df = df.select("jsonData.*")

In [None]:
df.show(3, truncate=False)

In [None]:
df.printSchema()

## Escrita no MongoDB

Depois de preparar os dados, iremos agora escrever estes dados no MongoDB com os seguintes parâmetros:

- **format**: aqui informamos ao Spark que iremos armazenar no MongoDB com o valor *mongo*.
- **mode("append")**: definimos que cada escrita irá adicionar dados ao MongoDB.
- **option("database","aula_db")**: informamos o banco de dados que iremos escrever.
- **option("collection", "usuarios")**: informamos a coleção do MongoDB que iremos escrever.

In [None]:
df.write.format("mongo")\
    .mode("append")\
    .option("database","aula_db")\
    .option("collection", "usuarios")\
    .save()