## Integração do SPARK com Mongo DB e API do IBGE

### Dependencias
Primeiro é necessário que seja importado o packge do Mongo DB para realizarmos a conexão

- Em **Workspace**, clicar com o botão direito>create>Library
- Em Library Source, escolher **Maven**
- Em Coordinates, preencher:
    >org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
- Então **Create**

<img src="https://i.ibb.co/2tvDPZq/mongo-databricks1.png" alt="Mongo Config 1" width="300">

<img src="https://i.ibb.co/dkwc4HH/mongo-databricks2.png" alt="Mongo Config 1" width="400">

- Com o cluster ligado, abra a biblioteca criada e selecione a instalação para o cluster

##### Importando bibliotecas necessárias

In [0]:
import json
import requests
from pyspark.sql.functions import *

##### Lendo os dados do Mongo DB

In [0]:
user = "estudante_igti"
password = "SRwkJTDz2nA28ME9"

uri = f"mongodb+srv://{user}:{password}@unicluster.ixhvw.mongodb.net/ibge.pnadc20203?retryWrites=true&w=majority"

ibge_mongo = spark.read.format("mongo")\
                  .option("uri", uri)\
                  .load()

##### Informações do dataframe

In [0]:
ibge_mongo.printSchema()
ibge_mongo.show(truncate=False)

##### Lendo os dados do API

In [0]:
def parse_json_dataframe(json_list):
    string_list = [json.dumps(i) for i in json_list]
    rdd = sc.parallelize(string_list)
    return spark.read.json(rdd)

In [0]:
url = 'https://servicodados.ibge.gov.br/api/v1/localidades/estados/MG/mesorregioes'
r = requests.get(url)
j = r.json()
ibge_api = parse_json_dataframe(j)

##### Informações do dataframe

In [0]:
ibge_api.printSchema()
ibge_api.show(truncate=False)

##### Gravação
Executada a gravação no file storage do DataBricks, no caso de estar conectado a um data lake, seria apenas necessário modificar o caminho

In [0]:
ibge_mongo.write.parquet("/FileStore/ibge-mongo", mode="overwrite")

In [0]:
ibge_api.write.parquet("/FileStore/ibge-api", mode="overwrite")

##### Leitura
Na leitura iremos realizar algumas limpezas limpeza dos dados

In [0]:
ibge_mongo = spark.read.parquet("/FileStore/ibge-mongo")\
                  .drop("_id")\
                  .na.drop(subset=["renda"])\
                  .filter((col("sexo")=="Mulher")\
                         &(col("idade")>=20)\
                         &(col("idade")<=40))

ibge_api = spark.read.parquet("/FileStore/ibge-api")\
                .select(col('id')\
                      , col('nome').alias('nome_mesorregioes')\
                      , col('UF.nome').alias('nome_uf')\
                      , col('UF.regiao.nome').alias('nome_regiao'))

##### Para utilizar com liguagem SQL é possivel criar views temporárias dos dataframes

In [0]:
ibge_mongo.createOrReplaceTempView('vw_ibge_mongo')
ibge_api.createOrReplaceTempView('vw_ibge_api')

In [0]:
spark.sql(
"""
SELECT * FROM vw_ibge_mongo LIMIT 5
"""
).show()

spark.sql(
"""
SELECT * FROM vw_ibge_api LIMIT 5
"""
).show()

##### Gravando em um database os dados tratados
Utilizando o hive como database porem é possivel criar uma conexão jdbc para o banco de preferencia

##### Criando database

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS integraDb")

##### É possivel criar a table com mais recursos e opções no Hive, será usado nesse exemplo uma forma mais simples

In [0]:
ibge_mongo.write.mode("overwrite").saveAsTable("integraDb.ibge_mongo")

In [0]:
ibge_api.write.mode("overwrite").saveAsTable("integraDb.ibge_api")

In [0]:
spark.sql(
"""
SELECT * FROM integraDb.ibge_mongo LIMIT 5
"""
).show()

spark.sql(
"""
SELECT * FROM integraDb.ibge_api LIMIT 5
"""
).show()