# Conectando MongoDB ao PySpark

O Spark é uma das grandes ferramentas de Big Data sendo utilizado por diversas companhias no mundo todo.  
Diferentemente de outras tecnologias de manipulação de dados como Python Pandas, o Spark permite trabalhar com conjuntos grandes de dados sem que seja necessário carregá-los na memória.

Para quem já utiliza Python para tratar e analisar dados, o PySpark oferece as vantagens do Spark com uma interface bastante familiar e intuitiva do Python.

Se você trabalha com análise de dados no Linux, muito provavelmente um simples `pip install pyspark` é o suficiente para seguir este notebook.

## MongoDB

Quando falamos em grandes quantidades de dados, muito provavelmente estarão localizados em bancos de dados.  
O [MongoDB](https://www.mongodb.com/) é uma das opções mais populares de DBs NoSQL, por isso escrevi este notebook para mostrar como é simples acessar os dados armazenados no MongoDB.  
Instruções para criar/acessar um cluster do MongoDB podem ser encontradas facilmente. Não as incluí neste notebook.   
(Caso tenha outras dúvidas, usei como base a [documentação oficial](https://docs.mongodb.com/spark-connector/master/python-api/))

## Configurando o PySpark

Com o Spark e o PySpark instalados, simplesmente importamos o PySpark:

In [1]:
import pyspark

Então, inicializamos o `SparkSession` que vai fazer toda a ponte entre o que escrevermos aqui com toda a infraestrutura Spark que roda por baixo dos panos.

In [2]:
spark = pyspark.sql.SparkSession \
        .builder\
        .appName("PySpark - MongoDB Integration Example") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
        .getOrCreate()

Para que o Spark possa se comunicar com o MongoDB, são necessários os drivers apropriados.  
A linha 
```python
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0")
``` 
se encarrega de indicar o conector a ser utilizado e suas dependências.

## Configurando a conexão ao MongoDB

Tenha em mãos a URI para se conectar ao Cluster desejado.  
Neste notebook, utilizei um cluster pessoal para estudos.

In [3]:
mongo_base_uri = "mongodb+srv://{user}:{pw}@cluster0-6ftoe.gcp.mongodb.net/{dataset}.{collection}"

In [4]:
user = input()
pw = input()

## Lendo os dados

Ler uma collection como DataFrame é bem simples:

In [5]:
dataset = "sample_supplies"
collection = "sales"

mongo_uri = mongo_base_uri.format(user=user, pw=pw, dataset=dataset, collection=collection)

df = spark.read.format("mongo")\
    .option("uri", mongo_uri)\
    .load()

df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- couponUsed: boolean (nullable = true)
 |-- customer: struct (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- age: integer (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- satisfaction: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- price: decimal(6,2) (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- purchaseMethod: string (nullable = true)
 |-- saleDate: timestamp (nullable = true)
 |-- storeLocation: string (nullable = true)



## Manipulando os dados

A sintaxe do PySpark é bastante similar ao do Pandas. Veja uma operação simples de GroupBy:

In [6]:
import pyspark.sql.functions as F

purchases_per_location = df.groupBy("storeLocation", "couponUsed").agg(F.count("_id").alias("count")).sort("count", ascending=False)

purchases_per_location.show()

+-------------+----------+-----+
|storeLocation|couponUsed|count|
+-------------+----------+-----+
|       Denver|     false| 1392|
|      Seattle|     false| 1031|
|       London|     false|  718|
|       Austin|     false|  618|
|     New York|     false|  445|
|    San Diego|     false|  319|
|       Denver|      true|  157|
|      Seattle|      true|  103|
|       London|      true|   76|
|       Austin|      true|   58|
|     New York|      true|   56|
|    San Diego|      true|   27|
+-------------+----------+-----+



Note a similaridade com a sintaxe do Pandas :smile:

## Visualização dos dados

Uma das conveniências de se utilizar o PySpark é a facilidade de gerar visualizações com sua ferramenta de preferência.

Vamos gerar um gráfico interativo pelo Plotly:

In [7]:
import plotly.express as px

fig = px.bar(
    data_frame=purchases_per_location.toPandas(),
    x='storeLocation',
    y='count',
    color='couponUsed',
    )

fig.update_layout(
    title='How are purchases and coupons distributed?',
    xaxis_title='Store location',
    yaxis_title='Total purchases',
    legend=dict(
        title='Used coupon?',
        xanchor='right',
    )
)

fig.show()

Note que o Plotly trabalha bem com objetos pandas.DataFrame, porém nosso DataFrame é do PySpark, não do Pandas.

O método `.toPandas()` é uma maneira muito simples de converter um DataFrame do PySpark para Pandas.

## Colunas com arrays

Vamos olhar o Schema do nosso DataFrame novamente:

In [8]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- couponUsed: boolean (nullable = true)
 |-- customer: struct (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- age: integer (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- satisfaction: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- price: decimal(6,2) (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- purchaseMethod: string (nullable = true)
 |-- saleDate: timestamp (nullable = true)
 |-- storeLocation: string (nullable = true)



Note que a coluna "items" armazena arrays. Ou seja, para cada compra, ela armazena todos os itens (com suas propriedades "name", "price"...) como uma lista de objetos.

Para gerar uma linha para cada produto comprado, podemos utilizar a função `explode`:

In [9]:
df.select('_id', F.explode('items')).show(10)

+--------------------+--------------------+
|                 _id|                 col|
+--------------------+--------------------+
|[5bd761dcae323e45...|[printer paper, 4...|
|[5bd761dcae323e45...|[notepad, 35.29, ...|
|[5bd761dcae323e45...|[pens, 56.12, 5, ...|
|[5bd761dcae323e45...|[backpack, 77.71,...|
|[5bd761dcae323e45...|[notepad, 18.47, ...|
|[5bd761dcae323e45...|[envelopes, 19.95...|
|[5bd761dcae323e45...|[envelopes, 8.08,...|
|[5bd761dcae323e45...|[binder, 14.16, 3...|
|[5bd761dcae323e45...|[envelopes, 8.05,...|
|[5bd761dcae323e45...|[binder, 28.31, 9...|
+--------------------+--------------------+
only showing top 10 rows



Olhando novamente o Schema, observe que os objetos explodidos são Structs. Conseguimos referenciar os campos pelos nomes, como indicado no Schema.

In [10]:
df.select('_id', F.explode('items')).select('col.name', 'col.price').show(10)

+-------------+-----+
|         name|price|
+-------------+-----+
|printer paper|40.01|
|      notepad|35.29|
|         pens|56.12|
|     backpack|77.71|
|      notepad|18.47|
|    envelopes|19.95|
|    envelopes| 8.08|
|       binder|14.16|
|    envelopes| 8.05|
|       binder|28.31|
+-------------+-----+
only showing top 10 rows



Podemos nos referenciar a todos os campos com o wildcard \*.  
Também podemos aproveitar e criar uma nova coluna a partir das outras colunas:

In [11]:
sales = df.select('_id', F.explode('items'))\
    .select('_id','col.*')\
    .withColumn('total', F.col('price') * F.col('quantity'))

sales.show(10)

+--------------------+-------------+-----+--------+--------------------+------+
|                 _id|         name|price|quantity|                tags| total|
+--------------------+-------------+-----+--------+--------------------+------+
|[5bd761dcae323e45...|printer paper|40.01|       2|[office, stationary]| 80.02|
|[5bd761dcae323e45...|      notepad|35.29|       2|[office, writing,...| 70.58|
|[5bd761dcae323e45...|         pens|56.12|       5|[writing, office,...|280.60|
|[5bd761dcae323e45...|     backpack|77.71|       2|[school, travel, ...|155.42|
|[5bd761dcae323e45...|      notepad|18.47|       2|[office, writing,...| 36.94|
|[5bd761dcae323e45...|    envelopes|19.95|       8|[stationary, offi...|159.60|
|[5bd761dcae323e45...|    envelopes| 8.08|       3|[stationary, offi...| 24.24|
|[5bd761dcae323e45...|       binder|14.16|       3|[school, general,...| 42.48|
|[5bd761dcae323e45...|    envelopes| 8.05|      10|[stationary, offi...| 80.50|
|[5bd761dcae323e45...|       binder|28.3

## Outro exemplo de visualização

No gráfico anterior, vimos uma contagem de compras por loja. Vamos ver como as lojas se comparam por ganho em \$.

In [12]:
earnings = sales.join(df, on='_id').groupBy('storeLocation').agg(F.sum(sales['total']).alias('totalEarnings')).sort('totalEarnings', ascending=False)

In [13]:
earnings.show()

+-------------+-------------+
|storeLocation|totalEarnings|
+-------------+-------------+
|       Denver|   2921009.92|
|      Seattle|   2255947.69|
|       London|   1583066.79|
|       Austin|   1445603.11|
|     New York|   1016059.59|
|    San Diego|    672885.17|
+-------------+-------------+



In [14]:
fig = px.pie(
    data_frame=earnings.toPandas(),
    names='storeLocation',
    values='totalEarnings',
    hole=0.7,
)

fig.update_layout(
    title='How are earnings distributed?',
    annotations=[dict(
        font=dict(size=30),
        showarrow=False,
        text=f'$ {float(earnings.groupBy().sum().collect()[0][0])}',
        x=0.5,
        y=0.5
    )]
)

fig.show()

Podemos ver facilmente qual foi a arrecadação total e as participações de cada região.

Um dos motivos pelo qual gosto muito do Plotly é a interatividade. Se quisermos ver a arrecadação de cada região, podemos simplesmente passar o cursor por cima da região de escolha. 

## Salvando dados no MongoDB

Depois de filtrarmos e transformarmos os dados, pode ser que seja vantajoso salvarmos esses dados em uma nova collection para futuras análises e pouparmos um pouco de tempo.

In [15]:
database = 'sample_training'
collection = 'items_sales'

mongo_uri = mongo_base_uri.format(user=user, pw=pw, dataset=dataset, collection=collection)
sales.write.format('mongo').mode('overwrite')\
    .option('uri', f"{mongo_uri}{dataset}.{collection}")\
    .save()

Se quisermos apenas adicionar dados novos sem sobreescrevê-los, utilizamos `.mode('append')`.

# Conclusão

Com o URI em mãos, ler e salvar os dados no MongoDB é tão fácil quanto acessar arquivos locais.  
Ter em mente esta funcionalidade pode vir a ser muito útil quando for mexer com aplicações na nuvem.