## Connecting to Mongo

In [56]:
from pymongo import MongoClient, UpdateOne
from time import perf_counter

client = MongoClient('mongodb://mongodb:27017/')
db = client.demo

# Create indices for faster querying
db.subscriptions.create_index([('customer', 1)])
db.addresses.create_index([('customer', 1)])
db.payments.create_index([('customer', 1)])

'customer_1'

In [47]:
from mongo_helper import load_jsonl

In [57]:
db.customers.drop()
db.subscriptions.drop()
db.addresses.drop()
db.payments.drop()

## Approach 1: Load data in separate entities

In [58]:
from bson.objectid import ObjectId
from bson.dbref import DBRef

def insert_separate_entities(file_path):
    """Insert data into separate collections and create relationships."""
    customer_refs = {}
    address_refs = {}
    payment_refs = {}

    for record in load_jsonl(file_path):
        customer = record["customer"]
        customer_id = customer["merchant_user_id"]
        customer_obj_id = ObjectId()
        customer_refs[customer_id] = DBRef("customers", customer_obj_id)
        customer_doc = {
            "_id": customer_obj_id,
            "merchant_user_id": customer_id,
            "data": customer
        }
        db.customers.insert_one(customer_doc)

        for address in record["addresses"]:
            address_id = address["origin"]["id"]
            address_obj_id = ObjectId()
            address_refs[address_id] = DBRef("addresses", address_obj_id)
            address_doc = {
                "_id": address_obj_id,
                "customer": customer_refs[customer_id],
                "customer_external_id": customer_id,
                "data": address
            }
            db.addresses.insert_one(address_doc)

        for payment in record["payments"]:
            payment_id = payment["origin"]["id"]
            payment_obj_id = ObjectId()
            billing_address_id = payment["origin"].get("billing_address")
            payment_doc = {
                "_id": payment_obj_id,
                "customer": customer_refs[customer_id],
                "customer_external_id": customer_id,
                "billing_address": address_refs.get(billing_address_id),
                "data": payment
            }
            payment_refs[payment_id] = DBRef("payments", payment_obj_id)
            db.payments.insert_one(payment_doc)

        for subscription in record["subscriptions"]:
            subscription_obj_id = ObjectId()
            shipping_address_id = subscription["origin"].get("shipping_address")
            payment_id = subscription["origin"].get("payment")
            subscription_doc = {
                "_id": subscription_obj_id,
                "customer": customer_refs[customer_id],
                "customer_external_id": customer_id,
                "shipping_address": address_refs.get(shipping_address_id),
                "payment": payment_refs.get(payment_id),
                "data": subscription
            }
            db.subscriptions.insert_one(subscription_doc)

In [59]:
start_time = perf_counter()
insert_separate_entities('10k.jsonl')
end_time = perf_counter()

print(f"Data inserted into separate collections with relationships in: {end_time - start_time}")

Data inserted into separate collections with relationships in: 10.846546921995468


Aggregate data

In [60]:
# Define a list of customer IDs to filter
customer_ids = ["036e25cf-bd92-4a87-bdfe-5834b6fac428", "972cf6c4-f689-46b9-a210-c0783432e894"]

# Start timing the aggregation
start_time = time.time()

# Perform the aggregation to find live subscriptions for a subset of customers
pipeline = [
    {"$match": {"customer_external_id": {"$in": customer_ids}}},
    
    {"$lookup": {
        "from": "customers",
        "localField": "customer.$id",
        "foreignField": "_id",
        "as": "customer_data"
    }},
    
    {"$unwind": "$customer_data"},  # { _id, ..sub fields, [ ..customer fields ] } --> { _id, ..sub fields, ..customer fields } 
    
    {"$match": {"data.live": True}},  # Filter for live subscriptions
    
    {"$project": {
        "_id": 0,
        "subscription": "$data",
        "customer_first_name": "$customer_data.data.first_name",
        "customer_last_name": "$customer_data.data.last_name"
    }}
]

# Execute the aggregation pipeline
live_subscriptions = list(db.subscriptions.aggregate(pipeline))

# End timing the aggregation
end_time = time.time()

# Print the results and the time taken
import json
print("Live Subscriptions example:", json.dumps(live_subscriptions[0]))
print("Aggregation Time:", end_time - start_time)


Live Subscriptions example: {"subscription": {"live": true, "offer": "66efa00a6d5d11ec9c47ce4cb48ea2ce", "customer": "036e25cf-bd92-4a87-bdfe-5834b6fac428", "start_date": "2023-06-01", "next_order_date": "2024-12-25", "cancelled": null, "merchant_order_id": "56108", "product": "product2", "components": null, "every": 10, "every_period": "week", "price": "51.45", "quantity": 4, "origin": {"id": "plan-item-4626103", "payment": "payment-8405522", "shipping_address": "shipping_address-3390904"}}, "customer_first_name": "Kiara", "customer_last_name": "Roman"}
Aggregation Time: 0.028088092803955078
