<a href="https://colab.research.google.com/github/stephenfrein/csc8491/blob/main/MongoClusterExamples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# install a Python library that interacts with MongoDB
!pip install pymongo


Collecting pymongo
  Downloading pymongo-4.7.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (669 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m669.1/669.1 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.6.1-py3-none-any.whl (307 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.6.1 pymongo-4.7.3


In [2]:
# set up your credentials - same as your Oracle credentials
username = ''
password = ''

In [3]:
# test your connection - if it fails, check username/password
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://"+username+":"+password+"@cluster0.yrg2exm.mongodb.net"
# create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [6]:
# OBSERVE THE PROPAGATION DELAY IN A CLUSTER

import datetime
import pandas as pd
import threading
import time
from pymongo import ReadPreference

# transactions in a CSV file in an AWS S3 bucket
transactions_url = 'https://csc8491.s3.amazonaws.com/mongo_transactions.csv'
# name of collection
trans_collection = 'transactions'

# you have a database that matches your username
db = client[username]
collection = db[trans_collection]

# get rid of collection if it already exists
try:
  collection.drop()
except e:
  print(e)

# function to load our collection from the S3 file - we'll run this in a thread
def load_collection(collection, filename):
    data = pd.read_csv(filename, header=0)
    collection.insert_many(data.to_dict('records'))

# function to get the count in a collection - we'll read this for both the primary and a secondary
def get_count(collection, type):
  # we read it 20 times in a row to compare how loading progresses for the primary or the secondary
  for i in range(1,21):
    print('\n' + type + ' ' + str(i) + ': ' + str(collection.count_documents({})) + ' ' + str(datetime.datetime.now()))

# reference to the collection for loading
trans_load = db.get_collection(trans_collection)
# reference to the collection as it exists on the primary node - we will use for reading
trans_primary = db.get_collection(trans_collection, read_preference=ReadPreference.PRIMARY)
# reference to the collection as it exists on a secondary node - we will use for reading
trans_secondary = db.get_collection(trans_collection, read_preference=ReadPreference.SECONDARY)

# thread for loading data
t_load_data = threading.Thread(target=load_collection, args=(trans_load,transactions_url,))
# thread for reading from primary
t_count_primary = threading.Thread(target=get_count, args=(trans_primary,'Primary',))
# thread for reading froms secondary
t_count_secondary = threading.Thread(target=get_count, args=(trans_secondary,'Secondary',))

# start the clock
start = time.time()

# starting load thread
t_load_data.start()
# give load time to get moving
time.sleep(15)
# start reading from primary
t_count_primary.start()
# start reading from secondary
t_count_secondary.start()

# wait until threads are completely executed
t_load_data.join()
t_count_primary.join()
t_count_secondary.join()

# stop the clock
end = time.time()
print("Run took: " + str(end - start) + ' seconds')

# what are our final numbers once load is complete?
print('Final Primary: ' + str(trans_primary.count_documents({})))
print('Final Secondary: ' + str(trans_secondary.count_documents({})))



Secondary 1: 69536 2024-06-17 17:53:46.332463

Secondary 2: 69536 2024-06-17 17:53:46.547600

Secondary 3: 69536 2024-06-17 17:53:46.788059

Secondary 4: 69536 2024-06-17 17:53:47.002561

Secondary 5: 69536 2024-06-17 17:53:47.215122

Secondary 6: 69536 2024-06-17 17:53:47.446126

Secondary 7: 69536 2024-06-17 17:53:47.683865

Secondary 8: 69536 2024-06-17 17:53:47.898946

Secondary 9: 69536 2024-06-17 17:53:48.112116

Secondary 10: 69536 2024-06-17 17:53:48.358785

Secondary 11: 69536 2024-06-17 17:53:48.603022

Secondary 12: 69536 2024-06-17 17:53:48.849076

Secondary 13: 69536 2024-06-17 17:53:49.061923

Secondary 14: 69536 2024-06-17 17:53:49.273749

Secondary 15: 69536 2024-06-17 17:53:49.538568

Secondary 16: 69536 2024-06-17 17:53:49.755037

Secondary 17: 69536 2024-06-17 17:53:49.967534

Secondary 18: 69536 2024-06-17 17:53:50.179251

Secondary 19: 69536 2024-06-17 17:53:50.440857

Secondary 20: 69536 2024-06-17 17:53:50.705804

Primary 1: 91104 2024-06-17 17:53:55.282224

Pri

Exception in thread Thread-16 (load_collection):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-6-5834830b892e>", line 27, in load_collection
  File "/usr/local/lib/python3.10/dist-packages/pymongo/_csot.py", line 108, in csot_wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/collection.py", line 742, in insert_many
    blk.execute(write_concern, session, _Op.INSERT)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/bulk.py", line 595, in execute
    return self.execute_command(generator, write_concern, session, operation)
  File "/usr/local/lib/python3.10/dist-packages/pymongo/bulk.py", line 452, in execute_command
    client._retryable_write(
  File "/usr/local/lib/python3.10/dist-packages/pymongo/mongo_client.py", line 156

Run took: 340.94284987449646 seconds
Final Primary: 1533065
Final Secondary: 1533065


In [None]:
# OBSERVE THE PERFORMANCE IMPACTS OF DIFFERENT WRITE CONCERNS

import pandas as pd
import time
from pymongo import WriteConcern

# transactions in a CSV file in an AWS S3 bucket
transactions_url = 'https://csc8491.s3.amazonaws.com/mongo_transactions.csv'
# name of collection
trans_collection = 'transactions'

# you have a database that matches your username
db = client[username]
collection = db[trans_collection]

# get rid of collection if it already exists
try:
  collection.drop()
except e:
  print(e)

# function to load our collection from the S3 file
# do it in a loop to better see performance impacts
def load_collection(collection, filename):
    df = pd.read_csv(filename, header=0)
    df = df.iloc[:1000] # grab first thousand records
    for index, row in df.iterrows():
      collection.insert_one(row.to_dict())

# reference to the collection for loading
# manipulate write concern to affect speed

# default gives majority - 2 nodes in this 3-node cluster (primary + 1 secondary)
trans_load = db.get_collection(trans_collection)
# setting write concern to 0 means no write acknowledgement needed - super fast
# trans_load = db.get_collection(trans_collection, write_concern=WriteConcern(w=0))
# setting write concern to 1 means just the primary has to acknowledge
# trans_load = db.get_collection(trans_collection, write_concern=WriteConcern(w=1))

# start the clock
start = time.time()
# load the data
load_collection(trans_load, transactions_url)
# stop the clock
end = time.time()
print("Run took: " + str(end - start) + ' seconds')