# Connection Pooling

<img src="https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/replica_set_connection_pooling_three_connections.png" style="margin: 0 auto;">

---

# Robust Client Configuration

### Always use Connection Pooling

<img src="https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/replica_set_connection_pooling_three_connections.png" style="margin: 0 auto;">

### Always specify a wtimeout with majority writes.

<img src="https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/replica_set_primary_secondary_highlighted_w_majority.png" style="margin: 0 auto;">

But if there's a problem on the secondary nodes, we might not get acknowledgements back from the server for a while. If more writes than reads are coming into the system and operations aren't being acknowledged, this will eventually lead to system gridlock.

To avoid this, follow a simple rule. For any write operation written with majority, always specify a write timeout. The specific length of the timeout will need to be determined based on your network and hardware, but you should always be setting timeouts on these (point) writes.

{ w: "majority", wtimeout: 5000 }

### Always configure for and handle serverSelectionTimeout errors.

<img src="https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/world_map.png" style="margin: 0 auto;">

And lastly, you should always handle a server selection timeout error.

This error will be thrown in the event a MongoDB server is unavailable for a write or for a read with a preference that the replica set can't currently fulfill. At the end of the day, MongoDB is a distributed database. So you should expect the system to be running on remote servers, along with all the benefits and constraints that it brings to your application logic.

By default, the time before a driver will raise this error is 30 seconds, but you should change this to suit your application's needs. By handling this error you also passively monitor the health of your application stack and can become very quickly aware of any hardware and software problems that haven't recovered in an adequate amount of time.

Each driver and programming language has a specific way to deal with errors, and we handle this error in particular in Mflix.

---

# Writes with Error Handling

In [1]:
from pymongo import MongoClient, errors
uri = "mongodb+srv://m220student:m220password@mflix.rncav.mongodb.net/test"
mc = MongoClient(uri)
lessons = mc.lessons
shipments = lessons.shipments

So here's a URI string connecting to our Atlas cluster, and I've initialized a client with that string.

We're using a new collection called shipments, and the scenario for this lesson is that our application is a clothing manufacturer that also handles the shipping for their clothing items.

In [2]:
import time
import random
from pprint import pprint

shipments.drop()

cities = [ "Atlanta", "New York", "Miami", "Chicago", "Los Angeles", "Seattle", "Dallas" ]
products = [ "shoes", "pants", "shirts", "hats", "socks" ]
quantities = [ 10, 20, 40, 80, 160, 320, 640, 1280, 2560 ]
docs = []

for truck_id in range(30):
    source = random.choice(cities)
    destination = random.choice([c for c in cities if c != source])
    product = random.choice(products)
    quantity = random.choice(quantities)
    
    doc = {
        "truck_id": truck_id,
        "source": source,
        "destination": destination,
        "product": product,
        "quantity": quantity
    }
    
    docs.append(doc)

In [3]:
insert_response = shipments.insert_many(docs)
shipments.count_documents({})

30

In [5]:
shipments.find_one()

{'_id': ObjectId('6274a2b8af11371d14f95e44'),
 'truck_id': 0,
 'source': 'Atlanta',
 'destination': 'Miami',
 'product': 'pants',
 'quantity': 2560}

In [6]:
shipments.create_index("truck_id", unique=True)

'truck_id_1'

In [7]:
doc = {
    "source": "New York",
    "destination": "Atlanta",
    "truck_id": 4,
    "product": "socks",
    "quantity": 40
}

try:
    res = shipments.insert_one(doc)
    print(res.inserted_id)
except errors.DuplicateKeyError:
    truck_id = doc["truck_id"]
    print(f"Truck #{truck_id} is currently performing a shipment. Please select another truck.")

Truck #4 is currently performing a shipment. Please select another truck.


In [8]:
import string

trucks = lessons.trucks
trucks.drop()

trucks.insert_many([
    { "_id": i, "license": "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(7)) } for i in range(50)
])
trucks.count_documents({})

50

In [17]:
trucks.find_one()

{'_id': 0, 'license': '1IQ1VC2'}

In [18]:
doc = {
    "source": "New York",
    "destination": "Atlanta",
    "truck_id": 4,
    "product": "socks",
    "quantity": 40
}

try:
    res = shipments.insert_one(doc)
    print(res.inserted_id)
except errors.DuplicateKeyError:
    busy_trucks = set(shipments.distinct("truck_id"))
    all_trucks = set(trucks.distinct("_id"))
    available_trucks = all_trucks.difference(busy_trucks)
    old_truck_id = doc["truck_id"]
    if available_trucks:
        chosen_truck = random.choice(list(available_trucks))
        new_truck_id = doc["truck_id"] = chosen_truck
        res = shipments.insert_one(doc)
        print(f"Truck #{old_truck_id} is currently performing a shipment. Truck #{new_truck_id} has been sent out instead.")
    else:
        print(f"Truck #{old_truck_id} is currently performing a shipment. Could not find another truck.")

Truck #4 is currently performing a shipment. Truck #48 has been sent out instead.


# Principle of Least Privilege

<img src="https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/polp.png" style="margin: 0 auto;">

![noun_lock](https://s3.amazonaws.com/edu-static.mongodb.com/lessons/M220/notebook_assets/noun_lock.png)

# Change Streams

In [22]:
from pymongo import MongoClient, errors
uri = "mongodb+srv://m220student:m220password@mflix.rncav.mongodb.net/test"
client = MongoClient(uri)

In [23]:
lessons = client.lessons
inventory = lessons.inventory
inventory.drop()

fruits = [ "strawberries", "bananas", "apples" ]
for fruit in fruits:
    inventory.insert_one( { "type": fruit, "quantity": 100 } )
    
list(inventory.find())

[{'_id': ObjectId('6274db9baf11371d14f95e65'),
  'type': 'strawberries',
  'quantity': 100},
 {'_id': ObjectId('6274db9baf11371d14f95e66'),
  'type': 'bananas',
  'quantity': 100},
 {'_id': ObjectId('6274db9baf11371d14f95e67'),
  'type': 'apples',
  'quantity': 100}]

In [24]:
try:
    with inventory.watch(full_document='updateLookup') as change_stream_cursor:
        for data_change in change_stream_cursor:
            print(data_change)
except pymongo.errors.PyMongoError:
    print('Change stream closed because of an error.')

{'_id': {'_data': '826274DD9F000000012B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E650004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826079, 1), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e65'), 'type': 'strawberries', 'quantity': 99}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e65')}, 'updateDescription': {'updatedFields': {'quantity': 99}, 'removedFields': [], 'truncatedArrays': []}}
{'_id': {'_data': '826274DDA0000000022B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E670004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826080, 2), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e67'), 'type': 'apples', 'quantity': 98}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e67')}, 'updateDescription': {'updatedFields': {'quantity': 98}, 'removedFields': []

{'_id': {'_data': '826274DDB1000000012B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E650004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826097, 1), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e65'), 'type': 'strawberries', 'quantity': 83}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e65')}, 'updateDescription': {'updatedFields': {'quantity': 83}, 'removedFields': [], 'truncatedArrays': []}}
{'_id': {'_data': '826274DDB2000000012B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E650004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826098, 1), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e65'), 'type': 'strawberries', 'quantity': 82}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e65')}, 'updateDescription': {'updatedFields': {'quantity': 82}, 'removedField

{'_id': {'_data': '826274DDC2000000012B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E650004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826114, 1), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e65'), 'type': 'strawberries', 'quantity': 59}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e65')}, 'updateDescription': {'updatedFields': {'quantity': 59}, 'removedFields': [], 'truncatedArrays': []}}
{'_id': {'_data': '826274DDC3000000012B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E650004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826115, 1), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e65'), 'type': 'strawberries', 'quantity': 55}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e65')}, 'updateDescription': {'updatedFields': {'quantity': 55}, 'removedField

{'_id': {'_data': '826274DDD3000000012B022C0100296E5A100472E17B54E28F4701BB8D81CB07547A4646645F696400646274DB9BAF11371D14F95E650004'}, 'operationType': 'update', 'clusterTime': Timestamp(1651826131, 1), 'fullDocument': {'_id': ObjectId('6274db9baf11371d14f95e65'), 'type': 'strawberries', 'quantity': 27}, 'ns': {'db': 'lessons', 'coll': 'inventory'}, 'documentKey': {'_id': ObjectId('6274db9baf11371d14f95e65')}, 'updateDescription': {'updatedFields': {'quantity': 27}, 'removedFields': [], 'truncatedArrays': []}}


NameError: name 'pymongo' is not defined

In [26]:
low_quantity_pipeline = [ { "$match": { "fullDocument.quantity": { "$lt": 20 } } } ]

try:
    with inventory.watch(pipeline=low_quantity_pipeline, full_document='updateLookup') as change_stream_cursor:
        for data_change in change_stream_cursor:
            current_quantity = data_change["fullDocument"].get("quantity")
            fruit = data_change["fullDocument"].get("type")
            msg = "There are only {0} units left of {1}!".format(current_quantity, fruit)
            print(msg)
except pymongo.errors.PyMongoError:
    logging.error('Change stream closed because of an error.')

There are only 19 units left of bananas!
There are only 11 units left of bananas!
There are only 3 units left of bananas!
There are only 17 units left of apples!
There are only 15 units left of apples!
There are only 11 units left of apples!
There are only 10 units left of apples!
There are only 18 units left of strawberries!
There are only 14 units left of strawberries!
There are only 10 units left of strawberries!


NameError: name 'pymongo' is not defined

---