# Document Database Layer 
## Project: Beauty Lakehouse Analytics Platform 
 
In this notebook, we implement the **Document-Oriented Database layer** of our multi-model architecture using MongoDB Atlas. 
 
### Objectives 
- Load curated Delta tables from the Data Lake (saved in a Unity Catalog Volume) 
- Transform structured Spark DataFrames into nested JSON documents 
- Store documents in MongoDB Atlas using PyMongo 
- Query MongoDB collections 
- Understand the role of document databases in multi-model analytics 
 
This version is adapted for a **SQL Warehouse environment**, where Spark connectors cannot be installed, so we use **PyMongo** instead. 
 

In [0]:
%pip install pymongo certifi pandas

In [0]:
# Create widgets for MongoDB credentials
dbutils.widgets.text("mongo_user", "")
dbutils.widgets.text("mongo_pass", "")

print("Widgets created. Enter your MongoDB username and password above.")


In [0]:
mongo_user = dbutils.widgets.get("mongo_user")
mongo_pass = dbutils.widgets.get("mongo_pass")

print("Credentials loaded from widgets.")


In [0]:
%run ./config/settings


In [0]:
customers_df = spark.read.format("delta").load(curated_customers_path)
products_df = spark.read.format("delta").load(curated_products_path)
orders_df = spark.read.format("delta").load(curated_orders_path)
order_items_df = spark.read.format("delta").load(curated_order_items_path)

print("Curated Delta tables loaded from Unity Catalog Volume.")


In [0]:
print("Customers:", customers_df.count()) 
print("Products:", products_df.count()) 
print("Orders:", orders_df.count()) 
print("Order Items:", order_items_df.count())

In [0]:
from pymongo import MongoClient
import certifi

mongo_uri = f"mongodb+srv://{mongo_user}:{mongo_pass}@cluster0.7yguf6a.mongodb.net/"


client = MongoClient(mongo_uri, tlsCAFile=certifi.where())
db = client["beauty_lakehouse_db"]

client.admin.command("ping")
print("Connected to MongoDB Atlas!")


In [0]:
from pyspark.sql.functions import collect_list, struct

orders_with_items = (
    orders_df.join(order_items_df, "order_id")
    .groupBy(
        "order_id",
        "customer_id",
        "order_date",
        "total_amount",
        "payment_type",
        "status"
    )
    .agg(
        collect_list(
            struct(
                "product_id",
                "quantity",
                "unit_price",
                "line_total"
            )
        ).alias("items")
    )
)

display(orders_with_items.limit(5))


In [0]:
import numpy as np

customers_docs = customers_df.toPandas().to_dict("records")
products_docs = products_df.toPandas().to_dict("records")

orders_docs = orders_with_items.toPandas().to_dict("records")

for doc in orders_docs:
    if isinstance(doc["items"], np.ndarray):
        doc["items"] = doc["items"].tolist()


In [0]:
db.customers.drop()
db.products.drop()
db.orders.drop()


In [0]:
db.customers.insert_many(customers_docs)
db.products.insert_many(products_docs)
db.orders.insert_many(orders_docs)

print("Data successfully inserted into MongoDB!")


In [0]:
sample_order = db.orders.find_one()
sample_order


In [0]:
import pandas as pd

mongo_customers = list(db.customers.find())
mongo_customers_df = pd.DataFrame(mongo_customers)

mongo_customers_df.groupby("city").size().sort_values(ascending=False)
