# Upsert DataFrame to MongoDB
>MongoDB is a rich document-oriented NoSQL database. I spent some time diving into it. In this post I will share how to 'upsert' the Pandas DataFrame against the MongoDB

- toc: true
- comments: true
- branch: master
- badges: true
- categories: [NoSQL, Python]
- image: images/upsert.png


# Requirement

Let's say we have a DataFrame. We want to 'upsert'(Insert/update) the MongoDB collection with rows in DataFrame. More specifically, for the rows that don't exist we do the creation, otherwise the update should be performed.

In [1]:
#hide
from pymongo import MongoClient, UpdateOne, collection
from pandas import DataFrame
from numpy import where
from datetime import datetime
    
CONN_STR = 'mongodb+srv://fin-market.x5tm4.mongodb.net/myFirstDatabase?authSource=%24external&authMechanism=MONGODB-X509&tls=true&tlsCertificateKeyFile=../../../drivers/X509-cert-fin-market.pem'

In [2]:
#collapse_output
def show_collection(collection):
    display(DataFrame([row for row in collection.find()]))

# Setup Cloud MongoDB for Free

Setup a free cloud based MongoDB database [here](https://www.mongodb.com/cloud/atlas/lp/try2?utm_source=google&utm_campaign=gs_apac_hong_kong_search_core_brand_atlas_desktop&utm_term=mongodb%20atlas&utm_medium=cpc_paid_search&utm_ad=e&utm_ad_campaign_id=12212624344&adgroup=115749709143&gclid=Cj0KCQiAqbyNBhC2ARIsALDwAsApK8irJVZnesxewSitv8kTagWactqhvZPG4gpz5CKx0JV3Fh1pEIsaAttLEALw_wcB).

Connect to the database and create the 'test' database and 'employee' collection.

In [3]:
DB_NAME = 'test'
COLLECTION_NAME = 'employee'

In [4]:
client = MongoClient(CONN_STR)
client.drop_database(DB_NAME) # clear the database
db = client[DB_NAME] # switch database
collection = db[COLLECTION_NAME] # get the collection

In [5]:
#collapse_output
now = datetime.now()
emplyee = [['user1',25,'male', now],['user2',55,'male', now],['user3',43,'male', now]]
df_emplyee = DataFrame(emplyee, columns=['name','age','sex', 'lastModifiedAt'])
df_emplyee = df_emplyee[['name','age','sex']]

Prepare a test DataFrame df_emplyee that contains three columns:
- name
- age
- sex

In [6]:
df_emplyee

Unnamed: 0,name,age,sex
0,user1,25,male
1,user2,55,male
2,user3,43,male


Once the records are inserted to the collection, we get the \_id column for every row. ['\_id' column is a potentially a good shard key used by MongoDB cluster. ](https://www.mongodb.com/blog/post/on-selecting-a-shard-key-for-mongodb)

In [7]:
collection.insert_many(df_emplyee.to_dict("records"))
show_collection(collection)


Unnamed: 0,_id,name,age,sex
0,61bb536d771f7760addc5d1c,user1,25,male
1,61bb536d771f7760addc5d1d,user2,55,male
2,61bb536d771f7760addc5d1e,user3,43,male


# Update

Let's do the update. Change the age of user1 to 32 with [update_one](https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html) method.
- filter parameter get the rows that need update.
- [\$set](https://docs.mongodb.com/manual/reference/operator/update/set/) flag is to update the columns specified.
- [\$currentDate](https://docs.mongodb.com/manual/reference/operator/update/currentDate/) here is to update lastModifiedAt column with the current time.


In [8]:
myquery = { "name": "user1" }
newvalues = { "$set": { "age": "32" }, "$currentDate": {"lastModifiedAt": { "$type": "date" }} }
res = collection.update_one(filter=myquery, update=newvalues)

In [9]:
show_collection(collection)

Unnamed: 0,_id,name,age,sex,lastModifiedAt
0,61bb536d771f7760addc5d1c,user1,32,male,2021-12-16 14:55:41.593
1,61bb536d771f7760addc5d1d,user2,55,male,NaT
2,61bb536d771f7760addc5d1e,user3,43,male,NaT


# Upsert

As the name suggests 'upsert' means update/insert records based on the specified filters. For intansce, in the following first example, 'user1' exists in the collection. The 'upsert' performs the update. In the second example, user4 doesn't exist and it is inserted to the collection.


In [10]:
collection.update_one({"name":"user1"}, 
                      {"$set":{"age":32},
                              "$currentDate": 
                              {"lastModifiedAt": { "$type": "date" }}
                      }, 
                      upsert=True)
show_collection(collection)

Unnamed: 0,_id,name,age,sex,lastModifiedAt
0,61bb536d771f7760addc5d1c,user1,32,male,2021-12-16 14:55:41.807
1,61bb536d771f7760addc5d1d,user2,55,male,NaT
2,61bb536d771f7760addc5d1e,user3,43,male,NaT


In [11]:
collection.update_one({"name":"user4"}, 
                      {"$set":{"age":32}, 
                              "$setOnInsert":{"sex":"female"},
                              "$currentDate":{"lastModifiedAt": { "$type": "date" }}
                      }, 
                      upsert=True)
show_collection(collection)

Unnamed: 0,_id,name,age,sex,lastModifiedAt
0,61bb536d771f7760addc5d1c,user1,32,male,2021-12-16 14:55:41.807
1,61bb536d771f7760addc5d1d,user2,55,male,NaT
2,61bb536d771f7760addc5d1e,user3,43,male,NaT
3,61bb536eef8c67586582a4ea,user4,32,female,2021-12-16 14:55:42.012


# Bulk updates

Obviously it's inefficient if the updates/inserts are performed one by one. To improve this, we firstly collect all the 'UpdateOne' operations and perform the updates with ['bulk_write'](https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/) like the following.

In [12]:
df_emplyee = df_emplyee.append({'name':'user5','age': 65, 'sex':'male'},ignore_index=True)


updates = []
df_emplyee.apply(
    lambda row: updates.append(
        UpdateOne(
            {"name": row.get("name")}, 
            {"$set": row.to_dict(), 
                     "$currentDate":{"lastModifiedAt": { "$type": "date" }}
            }, 
            upsert=True
        )),
    axis=1)
collection.bulk_write(updates)
show_collection(collection)

Unnamed: 0,_id,name,age,sex,lastModifiedAt
0,61bb536d771f7760addc5d1c,user1,25,male,2021-12-16 14:55:42.229
1,61bb536d771f7760addc5d1d,user2,55,male,2021-12-16 14:55:42.230
2,61bb536d771f7760addc5d1e,user3,43,male,2021-12-16 14:55:42.230
3,61bb536eef8c67586582a4ea,user4,32,female,2021-12-16 14:55:42.012
4,61bb536eef8c67586582a4f6,user5,65,male,2021-12-16 14:55:42.230


# DataFrame Upsert

Back to our initial requirement, we need to perform the 'Upsert' on individual DataFrame row. I created a function df_upsert for this.

In [13]:
def df_upsert(df:DataFrame, collection, keys:[]):
    def row_query(row, keys ):
        res = {}
        for key in keys:
            res[key] = row.get(key)
        return res
    updates = []
    df_emplyee.apply(
        lambda row: updates.append(
            UpdateOne(
                row_query(row, keys), 
                {'$set': row.to_dict(),
                    "$currentDate":{"lastModifiedAt": { "$type": "date" }}
                }, 
                upsert=True)), 
            axis=1
    )
    collection.bulk_write(updates)


Let's do some testing. Some updates are applied to the df_emplyee. Here are the difference between the df_emplyee and the MongoDB collection. As we see, user6 is added and user1's age is changed to 20 from 25. Please note user4 doesn't exist in the df_emplyee, it will not be touched in the collection.

In [14]:
df_emplyee.loc[0,'age'] = 20
df_emplyee = df_emplyee.append({'name':'user6','age': 37, 'sex':'female'},ignore_index=True)

show_collection(collection)
display(df_emplyee)

Unnamed: 0,_id,name,age,sex,lastModifiedAt
0,61bb536d771f7760addc5d1c,user1,25,male,2021-12-16 14:55:42.229
1,61bb536d771f7760addc5d1d,user2,55,male,2021-12-16 14:55:42.230
2,61bb536d771f7760addc5d1e,user3,43,male,2021-12-16 14:55:42.230
3,61bb536eef8c67586582a4ea,user4,32,female,2021-12-16 14:55:42.012
4,61bb536eef8c67586582a4f6,user5,65,male,2021-12-16 14:55:42.230


Unnamed: 0,name,age,sex
0,user1,20,male
1,user2,55,male
2,user3,43,male
3,user5,65,male
4,user6,37,female


Once we perform the 'df_upsert', we get the expected results.

In [15]:
df_upsert(df_emplyee, collection, ['name'])
show_collection(collection)


Unnamed: 0,_id,name,age,sex,lastModifiedAt
0,61bb536d771f7760addc5d1c,user1,20,male,2021-12-16 14:55:42.574
1,61bb536d771f7760addc5d1d,user2,55,male,2021-12-16 14:55:42.575
2,61bb536d771f7760addc5d1e,user3,43,male,2021-12-16 14:55:42.575
3,61bb536eef8c67586582a4ea,user4,32,female,2021-12-16 14:55:42.012
4,61bb536eef8c67586582a4f6,user5,65,male,2021-12-16 14:55:42.575
5,61bb536eef8c67586582a51b,user6,37,female,2021-12-16 14:55:42.575


# Consolution

In this post I showed some basic operations of MongoDB through Python and made the function df_upsert. The function df_upsert can be polished further, for instance we can do:
- Pass a dictionary of mulitiple rows to the function with collection name as key and DataFrame as the value.
- Upate strategy: drop & creation or incremental updates?

This is my first blog and thanks for your reading.