## PoC: Pipeline to mongoDB with PySpark as ETL strategy to segmentationDB 

Author: Pedro Cândido do Nascimento Fiho 

Sponsor: Rafael Buck; Venkat Karum

Partner: Nicolas Silva

Related works: https://docs.google.com/document/d/1QQcbm9fwU-l2XokGvoEEXA3lLvBxHU3rDxGZ5KHPS8M/edit

Repository:

Report:

Impacts: Increase of qualified customers (MEI, SU, Credit users, ViraCredito users)

Metrics: Related to experiments. Client's lift or profits.

References:
- Pymongo: https://pymongo.readthedocs.io/en/stable/tutorial.html
- MongoDB divers: https://mvnrepository.com/artifact/org.mongodb
- Spark https://dlcdn.apache.org/spark/

How to use this Notebook:

- Run all cells bellow
- You'll be prompted to enter user id and password to execute a query on datalake.


In [1]:
# Frameworks

from pyhive import trino
from getpass import getpass
import json
import pandas as pd
import pymongo
from pyspark.sql import SparkSession

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Custom framework

from utils import MongoPipeline

### 1. Ingestion example from DataLake 

In [2]:
# Connection object to datalake

conn = trino.connect(
    host = 'trino.de.in.devneon.com.br',
    port = 443,
    protocol = 'https',
    username = getpass('Insert your User u00xxxx: '),  
    password = getpass('Insert password: '),
)
print(conn)

Insert your User u00xxxx: ········
Insert password: ········
<pyhive.trino.Connection object at 0x7effe9ed56f0>


In [3]:
# Query to simple extraction on dimension-client table
# IMPORTANT: the limit impacts on the time execution: [1000, 4 sec],  [10000, 7 sec], [100000, 49 sec] 
# over this limits the ingestion takes much more time [from 5 min to 30 min]

query_sql = f"""
SELECT *
FROM neondw_bi.dimension_client 
limit 1000
"""

In [4]:
%%time
# Data ingestion to DataFrame

df_datalake = pd.read_sql(query_sql, conn)



CPU times: user 75.6 ms, sys: 22.1 ms, total: 97.7 ms
Wall time: 3.7 s


In [5]:
%%time
# Saving data to .csv

df_datalake.to_csv("databases/query_results.csv",index=False)

CPU times: user 25.9 ms, sys: 4.32 ms, total: 30.3 ms
Wall time: 32.8 ms


### 2. Writing and Reading data with PyMongo and PySpark

<b>Variables:</b>

- url: mongoDB container access address in Docker
- db_name: name of the database to be created in mongoDB
- collection_name: name of the collection of documents that will be part of the database
- input/output uri's: address to complete database access 


In [6]:
# Database Name, Collection and url

url = "mongodb://mongodb:27017/"
db_name = "dimension"
collection_name = "public"
input_uri = url + db_name +"."+ collection_name
output_uri = url + db_name +"."+ collection_name

print("Complete address to input/output:",input_uri)

Complete address to input/output: mongodb://mongodb:27017/dimension.public


In [8]:
# PySpark object to access mongoDB

spark = SparkSession\
    .builder\
    .appName("Spark_pipe")\
    .config("spark.mongodb.input.uri", input_uri )\
    .config("spark.mongodb.output.uri", output_uri )\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.2" )\
    .getOrCreate()

(spark)

:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3e3a006d-bbc5-47e2-aa04-b039bb63edc7;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;2.4.2 in central
	found org.mongodb#mongo-java-driver;3.12.5 in central
downloading https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/2.4.2/mongo-spark-connector_2.12-2.4.2.jar ...
	[SUCCESSFUL ] org.mongodb.spark#mongo-spark-connector_2.12;2.4.2!mongo-spark-connector_2.12.jar (1225ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.5/mongo-java-driver-3.12.5.jar ...
	[SUCCESSFUL ] org.mongodb#mongo-java-driver;3.12.5!mongo-java-driver.jar (1083ms)
:: resolution report :: resolve 3995ms :: artifacts dl 2311ms
	:: modules in use:
	org.mongodb#mongo-java-driver;3.12.5 from central in [default]
	or

<b>Framework options:</b>

- db_address: database url
- database: database name
- collection: database collection
- spark_obj: spark object instance
- mode: Valid only to non-Spark functions - select 1, to single document operations or 0 to many documents operations
- input_uri: spark uri 
- output_uri: spark uri

#### 2.1 - Simple test inserting/reading many data

In [9]:
# Creating and instance of the framework through the pipe object

pipe = MongoPipeline(
    
    url,
    db_name,
    collection_name,
    spark,
    input_uri,
    output_uri
)

23/06/13 12:34:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [10]:
%%time
# Getting persisted data

df = pd.read_csv("databases/query_results.csv")
df.head(5)

CPU times: user 18.1 ms, sys: 8.81 ms, total: 26.9 ms
Wall time: 28.7 ms


Unnamed: 0,skclient,cpf_cnpj,name,gender,email,device,platform,osversion,appversion,installsource,...,segment,flagclientemei,situacaoclientemei,flagcnpjmei,flagmeibaseativa30d,flagclientemeiregistrado,contaconductorid,fund,expurgado,persontype
0,4655424,5031130cb6ba1ddb37c32363ddaec0aeca7caeb37bde23...,79104a52867892da60007a1a8e7705dfde3ed224cbe072...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,4a287b8d5dbfbfbe1d608df88ec68ecf84df0e1888c968...,motorola+moto+g(6)+play,Android,9,3.4.3,,...,DIA A DIA - JOVENS,0,NULA,0,0,0,,BV_PF,False,PF
1,4655426,ab82004fb6b8c2591d8ee0eb555a6babf3249487ba23c9...,85e606ba2f17de7ff58baa7b837d5ec79e21708f369d09...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,f2e0dac9c95f26d8037b58c0105431638d45d6db7455f4...,iPhone,iPhone+OS,12.3.1,3.6.3,Facebook Ads,...,EMPREENDEDORES,0,ATIVA,1,0,0,1476748.0,FIDC_PF,False,PF
2,4655429,532e6966f1a633df0ccb4cbc293fabd71613d7e7880193...,a0a1326a86ede67716cc0868183e05cb0ff4e104be3a3b...,6750b6ca63857bc14c538590d1322785aff98193987532...,8b6e92290be8c79aa586535a6325d175273362530e62f9...,LENOVO+Lenovo+K33b36,Android,7.0,,CRM,...,EMPREENDEDORES,0,ATIVA,1,0,0,1464513.0,BV_PF,False,PF
3,4655430,af2131093fbd1a3270da092a0d473ffd280568a06e373c...,e9c007ca47d38ad0e0a3b86663b4a698ea50c077f14c24...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,0bd722df6a8278842d96e9ccba45acd740228988020b4d...,samsung+SM-G3812B,Android,4.2.2,3.4.3,googleadwords_int,...,DIA A DIA - JOVENS,0,NULA,0,0,0,,BV_PF,False,PF
4,4655431,e2041df2053961f1dd87fbc62fb74170d2c9855f55387f...,061e6d40558ddc79ace3b99efcbbcc7a0f4094e0ea027e...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,a65dc88da88e73aa84edcc1b9ca76a5d5dce1f0ee44c4f...,samsung+SM-G610M,Android,8.1.0,3.4.3,Facebook Ads,...,DIA A DIA - JOVENS,0,NULA,0,0,0,,,False,PF


<b>PyMongo: Writing data on mongoDB</b>

In [11]:
%%time
pipe.pymongo_create(df)

Success on DataFrame to dic conversion
Success writing data on mongoDB
CPU times: user 84.3 ms, sys: 1.62 ms, total: 85.9 ms
Wall time: 113 ms


<pymongo.results.InsertManyResult at 0x7effa82519f0>

<b>PyMongo: Extracting data from mongoDB </b>

for more info on mongoDB operators: https://www.mongodb.com/docs/manual/reference/operator/

In [12]:
%%time
# Here was used a simple query to get active users

df_result_pymongo = pipe.pymongo_read({'clientstatus':"Ativo"})
df_result_pymongo.head(5)

Success reading mongoDB data


Unnamed: 0,_id,skclient,cpf_cnpj,name,gender,email,device,platform,osversion,appversion,...,segment,flagclientemei,situacaoclientemei,flagcnpjmei,flagmeibaseativa30d,flagclientemeiregistrado,contaconductorid,fund,expurgado,persontype
0,64886284302cd61254d01cb1,4655424,5031130cb6ba1ddb37c32363ddaec0aeca7caeb37bde23...,79104a52867892da60007a1a8e7705dfde3ed224cbe072...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,4a287b8d5dbfbfbe1d608df88ec68ecf84df0e1888c968...,motorola+moto+g(6)+play,Android,9,3.4.3,...,DIA A DIA - JOVENS,0,NULA,0,0,0,,BV_PF,False,PF
1,64886284302cd61254d01cb2,4655426,ab82004fb6b8c2591d8ee0eb555a6babf3249487ba23c9...,85e606ba2f17de7ff58baa7b837d5ec79e21708f369d09...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,f2e0dac9c95f26d8037b58c0105431638d45d6db7455f4...,iPhone,iPhone+OS,12.3.1,3.6.3,...,EMPREENDEDORES,0,ATIVA,1,0,0,1476748.0,FIDC_PF,False,PF
2,64886284302cd61254d01cb3,4655429,532e6966f1a633df0ccb4cbc293fabd71613d7e7880193...,a0a1326a86ede67716cc0868183e05cb0ff4e104be3a3b...,6750b6ca63857bc14c538590d1322785aff98193987532...,8b6e92290be8c79aa586535a6325d175273362530e62f9...,LENOVO+Lenovo+K33b36,Android,7.0,,...,EMPREENDEDORES,0,ATIVA,1,0,0,1464513.0,BV_PF,False,PF
3,64886284302cd61254d01cb4,4655430,af2131093fbd1a3270da092a0d473ffd280568a06e373c...,e9c007ca47d38ad0e0a3b86663b4a698ea50c077f14c24...,137c92788c178987a9d55e695a37bb0acfda93cc08fcd2...,0bd722df6a8278842d96e9ccba45acd740228988020b4d...,samsung+SM-G3812B,Android,4.2.2,3.4.3,...,DIA A DIA - JOVENS,0,NULA,0,0,0,,BV_PF,False,PF
4,64886284302cd61254d01cb6,4655432,e06b1d4db33c0fb25659f7a00ac0b30172de3eeaf500bb...,6d7d73fa453c0397f3eaad749773dde3bd399e9d35aa85...,6750b6ca63857bc14c538590d1322785aff98193987532...,491ba7b967480fc2d870da068936fdab0e8d44b469bd66...,TCL+5085N,Android,6.0,3.4.3,...,DIA A DIA - FAMILIA,0,NULA,0,0,0,1662054.0,BV_PF,False,PF


<b>PySpark: Writing data on mongoDB</b>

In [13]:
%%time
# The framework is configured to ovewrite all data using Spark. It will erase the prior
# pymongo writing replace with the same data.

pipe.pyspark_write(df)

Success on Spark DataFrame creation!


23/06/13 12:35:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 0:>                                                          (0 + 8) / 8]

Success writing data on mongoDB
CPU times: user 427 ms, sys: 7.74 ms, total: 435 ms
Wall time: 4.24 s


                                                                                

<b>PySpark: Reading data on mongoDB</b>

In [14]:
%%time
# The framework ingests and read all data from the collection, getting the data as a spark dataframe

df_result_pyspark = pipe.pyspark_read()

Success reading mongoDB data!
CPU times: user 3.88 ms, sys: 1.84 ms, total: 5.72 ms
Wall time: 438 ms


In [14]:
%%time
# Converting to Pandas

df_result_pyspark = df_result_pyspark.toPandas()
df_result_pyspark.head(5)

23/06/12 02:16:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

CPU times: user 364 ms, sys: 0 ns, total: 364 ms
Wall time: 2.82 s


Unnamed: 0,_id,address,addresscity,addresscomplement,addressneighborhood,addressnumber,addresspostalcode,addressstate,adsetid,advertisementid,...,questionscore,registerdate,registersmalldate,registerstatus,segment,situacaoclientemei,skclient,skproductfirsttransaction,source,startdepositvalue
0,"(648675bfa7196e4d12e75170,)",8cfe6572b9cdb32fd1ad6cd0580c578111bad061cd4acc...,0b9dd47e772147501462c08457860ce63945669128bd60...,bde98b4430acf4d570a47f3864656b0d4adf5ea6bebfeb...,a1714351684ec7749ff2036aa4a093de680019fdd6d527...,5902938793f62130b4a348c08f1e748dd965fc5068f5f6...,14160420.0,75cf6d111463179065170655a25c9250b22ce1a12e44a7...,,,...,0.0,2019-09-16 21:53:48.340,2019-09-16,Aprovado,DIA A DIA - JOVENS,NULA,5864282.0,14.0,APP,10.0
1,"(648675bfa7196e4d12e75167,)",81ec9d1dac5fd01fb54b4ec37144f971776e4fdc2cca0d...,6c3752d765bce76f97e10e11f02655033a40f3cb2411a6...,,df7f5eae433254982b58a09e2a117464b7156660f38eb3...,2b0f4665de113c4e5a7fbaf7b241ec63dd3b57d3623a13...,35530000.0,4132cefb25e2ceb2c5fe70011571b035ee9f4c24bef1f0...,6104422000000.0,6104421873871.0,...,0.0,2019-09-16 21:53:19.730,2019-09-16,Aprovado,DIA A DIA - JOVENS,NULA,5864275.0,,APP,0.01
2,"(648675bfa7196e4d12e75168,)",b898b0e20150a98ef9e7c9e27f553caf2da52cb4f92c08...,58b4407996f00e1636011fb957333c47e40737d6787ae8...,,21de34848f53ececf704dda98678af7e84db97027a5993...,64ccd2b38891e3800f3d1cc9920b073b0da081351470b1...,8588440.0,75cf6d111463179065170655a25c9250b22ce1a12e44a7...,6131521000000.0,6131520742271.0,...,0.0,2019-09-16 21:53:49.170,2019-09-16,Aprovado,DIA A DIA - JOVENS,NULA,5864283.0,,APP,0.01
3,"(648675bfa7196e4d12e7516d,)",664bf43f58c217523d380eefac8d8261470d522f149f44...,f085b7b2d4e38e8015dcbf5eeeb9737cc3d7d6dacf58d4...,,3c0fdc3a33878a0904ea5466a3c064336a783078500119...,26897851cef92f1f0384ec66fbeda06b770fb5f1df585f...,7085373.0,75cf6d111463179065170655a25c9250b22ce1a12e44a7...,,,...,0.0,2019-09-16 21:53:16.657,2019-09-16,Aprovado,DIA A DIA - JOVENS,NULA,5864274.0,,APP,0.01
4,"(648675bfa7196e4d12e7516e,)",f49defe9f6967b32a5065bd78581d2b0523112b28087e9...,b93fb435d8da6a9f22da22a88eea101e48ee55adf5df4e...,bde98b4430acf4d570a47f3864656b0d4adf5ea6bebfeb...,51d33e411581015962d552e9fe86e113a1c2353c10acf6...,6200d9a9723891d9ff1f001f42c9bd6a1944f9b33d2614...,41750370.0,48f6b6507078084e1d9c2eff0b31c46aa7d539a1fecee8...,,,...,100.0,2019-09-16 21:53:42.993,2019-09-16,Aprovado,EMPREENDEDORES,ATIVA,5864278.0,2.0,APP,10.0


#### 2.2 - Simple test inserting/reading one data with Pymongo

In [15]:
%%time
# Creating a simple customer object

client_zero = {
  "clientid": 0,
  "persontype": "PF",
  "clientstatus" : "Inativo"
}

df_client_0_spark = spark.createDataFrame([client_zero])

df_client_0_pandas = pd.DataFrame([client_zero])



CPU times: user 11.4 ms, sys: 483 µs, total: 11.9 ms
Wall time: 27.8 ms


In [16]:
# Spark Dataframe 

df_client_0_spark.show()

+--------+------------+----------+
|clientid|clientstatus|persontype|
+--------+------------+----------+
|       0|     Inativo|        PF|
+--------+------------+----------+



In [17]:
# Pandas Dataframe 

df_client_0_pandas

Unnamed: 0,clientid,persontype,clientstatus
0,0,PF,Inativo


In [18]:
%%time
# Creating only one line of data

pipe.pymongo_create(df_client_0_pandas)

Success on DataFrame to dic conversion
Success writing data on mongoDB
CPU times: user 2.49 ms, sys: 1.2 ms, total: 3.69 ms
Wall time: 2.01 ms


<pymongo.results.InsertManyResult at 0x7effc465aaa0>

In [19]:
%%time
# Creating only one line of data

query_one = {"clientid": 0}

pipe.pymongo_read(query_one)

Success reading mongoDB data
CPU times: user 2.43 ms, sys: 0 ns, total: 2.43 ms
Wall time: 2.4 ms


Unnamed: 0,_id,clientid,persontype,clientstatus
0,648862e2302cd61254d02099,0,PF,Inativo


In [20]:
%%time
# Creating one attribute of a client

where = {'clientid': 0}
what_include = {'$inc': {'x': 0}}


pipe.pymongo_update(where, what_include)
pipe.pymongo_read(query_one)

Success writing data on mongoDB
Success reading mongoDB data
CPU times: user 3.27 ms, sys: 235 µs, total: 3.51 ms
Wall time: 5.51 ms


Unnamed: 0,_id,clientid,persontype,clientstatus,x
0,648862e2302cd61254d02099,0,PF,Inativo,0


In [22]:
%%time
# Change one attribute of a client

where = {'clientid': 0}
what_change = {'$set': {'x': 1}}

pipe.pymongo_update(where, what_change)
pipe.pymongo_read(query_one)

Success writing data on mongoDB
Success reading mongoDB data
CPU times: user 3.07 ms, sys: 1.44 ms, total: 4.5 ms
Wall time: 4.7 ms


Unnamed: 0,_id,clientid,persontype,clientstatus,x
0,648862e2302cd61254d02099,0,PF,Inativo,1
