<center>

<h1>Lab 2: Document Stores</h1>

<i>

Course: 23D020 Big Data Management for Data Science <br>

Author(s): Maria Simakova, Moritz Peist<br>

**Group: L2-T05**<br>

Programme: DSDM

<hr>

Note: This is a hands-on lab on Document Stores. We will be using one of the most popular document databases: MongoDB. We will practice how to import, create and model document databases, as well as how to query them.

</i>

</center>

<hr>

## Imports

In [1]:
# Libraries
import datetime
import time
import json
from pymongo import MongoClient
from faker import Faker
from time import perf_counter  # More accurate than time.time()
import random
from tqdm.notebook import tqdm
import pandas as pd

## B.1. Data Generation

In [None]:
class Lab2models:
    """
    A class for managing and operating on MongoDB database models.

    This class handles the creation of test data for different MongoDB data models,
    establishes connections to MongoDB, and provides methods for data generation
    and collection management.

    Parameters
    ----------
    num_companies : int
        Number of company documents to generate
    connection_string : str
        MongoDB connection string
    db_name : str
        Name of the MongoDB database to use

    Attributes
    ----------
    num_companies : int
        Number of companies to generate
    connection_string : str
        MongoDB connection string
    db_name : str
        MongoDB database name
    client : MongoClient
        MongoDB client instance
    db : Database
        MongoDB database instance
    """

    def __init__(
        self,
        num_companies,
        connection_string,
        db_name,
        faker=None,
        random_generator=None,
    ):
        # Set random seed for reproducibility
        Faker.seed(42)  # same people
        random.seed(42)  # for generating the same number of employees per company
        # Faker instance for generating fake data
        self.faker = faker or Faker(["es_ES"])
        self.random_generator = random_generator or random
        # Initialize the number of companies, employees will be generated accordingly
        self.num_companies = num_companies
        # MongoDB connection string and database name
        self.connection_string = connection_string
        self.db_name = db_name
        # Initialize MongoDB client and database
        self.client = None
        self.db = None

    def connect_to_mongo(self) -> None:
        """
        Connect to MongoDB using the provided connection string.

        Returns
        -------
        None
        """
        # Connect to MongoDB
        try:
            self.client = MongoClient(self.connection_string)
            self.db = self.client[self.db_name]
            print("✅ Connected to MongoDB\n")
        except Exception as e:
            print(f"❌ Error connecting to MongoDB: {e}\n")

    def list_collections(self) -> list:
        """
        List all collections in the MongoDB database.

        Returns
        -------
        list
            A list of collection names in the database
        """
        # Make sure we're connected first
        if self.db is None:
            self.connect_to_mongo()

        # List all collections in the database
        collections = self.db.list_collection_names()
        # print(f"Collections in {self.db_name}: {collections}\n")
        return collections

    def delete_collections(self) -> None:
        """
        Delete all model collections from the database.

        This method ensures we start with a clean database by removing
        all existing collections related to the models.

        Returns
        -------
        None
        """
        # Make sure we're connected first
        if self.db is None:
            self.connect_to_mongo()

        collections = self.db.list_collection_names()
        # Drop collections if they exist
        print(f"Deleting collections: {collections}")
        [self.db.drop_collection(collection) for collection in collections]

        # Now drop the collections
        # self.db.drop_collection("m1_people")
        # self.db.drop_collection("m1_companies")
        # self.db.drop_collection("m2_people")
        # self.db.drop_collection("m3_companies")

    ######################################################################## B1+2 ########################################################################
    def data_generator(self) -> tuple:
        """
        Generate test data for three different data models in MongoDB.

        This method orchestrates the entire data generation process by:
        1. Connecting to MongoDB and preparing collections
        2. Generating company and people data
        3. Storing data according to three different data models:
        - Model 1: Normalized model with people referencing companies
        - Model 2: Embedded model with companies embedded in people documents
        - Model 3: Embedded model with people embedded in company documents

        Returns
        -------
        tuple
            A tuple containing (number of companies, total number of employees)
        """
        # Initialize database and collections
        self._prepare_database()

        # Generate data
        company_data = self._generate_companies()
        company_ids, person_data = self._generate_people(company_data)
        total_employees = sum(len(persons) for _, persons in person_data.items())

        # Insert data into different models
        self._insert_model1_data(company_ids, person_data)
        self._insert_model2_data(company_data, person_data)
        self._insert_model3_data(company_data, person_data)

        return self.num_companies, total_employees

    def _prepare_database(self) -> tuple:
        """
        Connect to MongoDB and prepare collections for data insertion.

        This method connects to MongoDB, deletes existing collections,
        and creates new collections for all three data models.

        Returns
        -------
        tuple
            A tuple containing the four collection objects
        """
        # Connect to MongoDB
        self.connect_to_mongo()
        # Delete collection data if exists from all of the 3 models
        self.delete_collections()
        # Create and obtain collections
        m1_people = self.db.create_collection("m1_people")
        m1_companies = self.db.create_collection("m1_companies")
        m2_people = self.db.create_collection("m2_people")
        m3_companies = self.db.create_collection("m3_companies")

        self.collections = {
            "m1_people": m1_people,
            "m1_companies": m1_companies,
            "m2_people": m2_people,
            "m3_companies": m3_companies,
        }

        return self.collections

    def _generate_companies(self) -> list:
        """
        Generate company data for the specified number of companies.

        Uses Faker to create realistic company information including
        name, domain, email, URL, and VAT number.

        Returns
        -------
        list
            A list of dictionaries containing company data
        """
        # Set up faker
        fake = self.faker
        # Generate company data
        company_data = []
        for _ in range(self.num_companies):
            company_name = fake.company()
            domain = fake.domain_name()
            company = {
                "domain": domain,
                "email": f"info@{domain}",
                "name": company_name,
                "url": f"www.{domain}",
                "vatNumber": fake.bothify(text="??######"),
            }
            company_data.append(company)

        return company_data

    def _generate_people(self, company_data: list) -> tuple:
        """
        Generate employee data for each company.

        Creates a random number of employees for each company with
        realistic personal information including name, email, age, etc.

        Parameters
        ----------
        company_data : list
            List of company dictionaries generated by _generate_companies

        Returns
        -------
        tuple
            A tuple containing (company MongoDB IDs, dictionary mapping
            company indices to lists of person data)
        """
        fake = self.faker
        # Insert companies for Model 1 and get their IDs
        company_ids = (
            self.collections["m1_companies"].insert_many(company_data).inserted_ids
        )

        # Generate random number of employees per company
        employees_per_company = [
            self.random_generator.randint(25, 50) for _ in range(self.num_companies)
        ]

        # Organize people by company
        person_data = {company_idx: [] for company_idx in range(self.num_companies)}

        # For each company, generate the specific number of employees
        for company_idx in range(self.num_companies):
            for _ in range(employees_per_company[company_idx]):
                if fake.random_element(elements=(True, False)):
                    first_name = fake.first_name_male()
                    sex = "M"
                else:
                    first_name = fake.first_name_female()
                    sex = "F"

                last_name = fake.last_name()
                full_name = f"{first_name} {last_name}"
                birth_date = fake.date_of_birth(minimum_age=20, maximum_age=65)
                birth_datetime = datetime.datetime.combine(birth_date, datetime.time())
                age = (datetime.datetime.now().date() - birth_date).days // 365

                username_full = f"{first_name.lower()}.{last_name.lower()}".replace(
                    " ", ""
                )
                username_short = f"{first_name.lower()[0]}{last_name.lower()}".replace(
                    " ", ""
                )

                person = {
                    "age": age,
                    "companyEmail": f"{username_full}@{company_data[company_idx]['domain']}",
                    "dateOfBirth": birth_datetime,
                    "email": f"{username_short}@{fake.free_email_domain()}",
                    "firstName": first_name,
                    "fullName": full_name,
                    "sex": sex,
                }
                person_data[company_idx].append(person)

        return company_ids, person_data

    def _insert_model1_data(self, company_ids: list, person_data: dict) -> None:
        """
        Insert data using Model 1: Normalized model with people referencing companies.

        In this model, people documents contain a reference to their company.

        Parameters
        ----------
        company_ids : list
            List of MongoDB ObjectIDs for the companies
        person_data : dict
            Dictionary mapping company indices to lists of person data

        Returns
        -------
        None
        """
        m1_people = self.collections["m1_people"]
        for company_idx, persons in tqdm(
            person_data.items(), desc="Inserting data into Model 1: "
        ):
            for person in persons:
                m1_person = person.copy()
                m1_person["worksIn"] = company_ids[company_idx]  # Reference to company
                m1_people.insert_one(m1_person)

    def _insert_model2_data(self, company_data: list, person_data: dict) -> None:
        """
        Insert data using Model 2: Embedded model with companies embedded in people documents.

        In this model, each person document contains the full company document.

        Parameters
        ----------
        company_data : list
            List of company dictionaries
        person_data : dict
            Dictionary mapping company indices to lists of person data

        Returns
        -------
        None
        """
        m2_people = self.collections["m2_people"]
        for company_idx, persons in tqdm(
            person_data.items(), desc="Inserting data into Model 2: "
        ):
            for person in persons:
                m2_person = person.copy()
                m2_person["worksIn"] = company_data[
                    company_idx
                ]  # Embed company document
                m2_people.insert_one(m2_person)

    def _insert_model3_data(self, company_data: list, person_data: dict) -> None:
        """
        Insert data using Model 3: Embedded model with people embedded in company documents.

        In this model, each company document contains a list of all its employees.

        Parameters
        ----------
        company_data : list
            List of company dictionaries
        person_data : dict
            Dictionary mapping company indices to lists of person data

        Returns
        -------
        None
        """
        m3_companies = self.collections["m3_companies"]
        for company_idx, company in enumerate(
            tqdm(company_data, desc="Inserting data into Model 3: ")
        ):
            m3_company = company.copy()
            m3_company["staff"] = person_data.get(company_idx, [])
            m3_companies.insert_one(m3_company)

    ######################################################################## B4 ########################################################################
    def run_query(self, collection: str, pipeline: list, model_name: str) -> float:
        """
        Execute a MongoDB aggregation query and measure its performance.

        This method runs an aggregation pipeline against the specified collection,
        measures the execution time, and prints the results.

        Parameters
        ----------
        collection : str
            The name of the MongoDB collection to query
        pipeline : list
            MongoDB aggregation pipeline to execute
        model_name : str
            Name of the model being queried (for display purposes)

        Returns
        -------
        float
            Execution time of the query in seconds

        Notes
        -----
        The method prints the first 3 results and shows the total count of results
        along with the query execution time.
        """
        # Make sure we're connected first
        if self.db is None:
            self.connect_to_mongo()
        print(f"\n# {model_name}")
        start_time = perf_counter()
        results = list(self.db[collection].aggregate(pipeline))
        end_time = perf_counter()
        query_time = end_time - start_time
        # show the first 3 results
        for i, result in enumerate(results[:3]):
            print(result)
        if len(results) > 3:
            print(f"... and {len(results) - 3} other results")

        print(f"Time taken: {query_time:.4f} seconds")

        return query_time

    def run_update_query(
        self,
        collection: str,
        filter_query: dict,
        update_query: dict,
        array_filters: list,
        model_name: str,
    ) -> float:
        """
        Execute a MongoDB update operation and measure its performance.

        Parameters
        ----------
        collection_name : str
            The name of the MongoDB collection to update
        filter_query : dict
            MongoDB filter to select documents to update
        update_query : dict
            MongoDB update operation to apply
        model_name : str
            Name of the model being updated (for display purposes)

        Returns
        -------
        float
            Execution time of the update in seconds
        """
        # Make sure we're connected
        if self.db is None:
            self.connect_to_mongo()

        print(f"\n# {model_name}")

        # Start timing
        start_time = perf_counter()

        # Perform the update
        result = self.db[collection].update_many(
            filter_query, update_query, array_filters=array_filters
        )

        # End timing
        end_time = perf_counter()
        query_time = end_time - start_time

        print(f"Updated {result.modified_count} documents")
        print(f"Time taken: {query_time:.4f} seconds")

        return query_time

    def run_batch(
        self, models: list[dict[str, list, str]], is_update: bool = False
    ) -> dict[str, float]:
        """
        Run a batch of aggregation or update queries on MongoDB and measure performance.

        Parameters
        ----------
        models : list
            List of model configurations (dicts with name, pipeline, collection)

        Returns
        -------
        dict
            Execution times for each query
        """
        # Connect if needed
        if self.db is None:
            self.connect_to_mongo()

        # Execute queries with dictionary comprehension
        if is_update:
            results = {
                model["name"]: self.run_update_query(
                    collection=model["collection"],
                    filter_query=model["filter"],
                    update_query=model["update"],
                    array_filters=model["array_filters"],
                    model_name=model["name"],
                )
                for model in models
            }
        else:
            results = {
                model["name"]: self.run_query(
                    collection=model["collection"],
                    pipeline=model["pipeline"],
                    model_name=model["name"],
                )
                for model in models
            }

        # Find fastest model in one operation
        fastest_model = min(results, key=results.get)
        fastest_time = results[fastest_model]

        # Print results
        print(f"\n{30 * '='} Model comparison: {30 * '='}")

        for name, t in results.items():
            print(f"{name}: {t:.4}s")

        print(f"The fastest model is {fastest_model} with: {fastest_time:.4}s")

        return results

    def close_connection(self):
        """
        Close the MongoDB connection.
        """
        if self.client:
            self.client.close()
            print("✅ MongoDB connection closed")
        else:
            print("❌ No MongoDB connection to close")

## Setup 

In [3]:
# Setup parameters
connection_string = "mongodb://localhost:27017/"
db_name = "lab2"
num_companies = 3000

# Create an instance of the Lab2models class
lab = Lab2models(
    num_companies=num_companies, connection_string=connection_string, db_name=db_name
)

## B.2. Data Insertion

In [4]:
# Generate data
num_companies, actual_people = lab.data_generator()

# Print the number of documents in each collection
print(
    f"Actual number of documents: {num_companies} companies and {actual_people} people"
)

✅ Connected to MongoDB

Deleting collections: ['m3_companies', 'm2_people', 'm1_companies', 'm1_people']


Inserting data into Model 1:   0%|          | 0/3000 [00:00<?, ?it/s]

Inserting data into Model 2:   0%|          | 0/3000 [00:00<?, ?it/s]

Inserting data into Model 3:   0%|          | 0/3000 [00:00<?, ?it/s]

Actual number of documents: 3000 companies and 113101 people


## B.3. Queries

### Q1: For each person, retrieve their full name and their company’s name.

In [5]:
q1_1 = [
    {
        "$lookup": {
            "from": "m1_companies",
            "localField": "worksIn",
            "foreignField": "_id",
            "as": "companyName",
        }
    },
    {"$unwind": "$companyName"},
    {"$project": {"_id": 0, "fullName": 1, "companyName": "$companyName.name"}},
]

q1_2 = [{"$project": {"_id": 0, "fullName": 1, "companyName": "$worksIn.name"}}]

q1_3 = [
    {"$unwind": "$staff"},
    {"$project": {"_id": 0, "fullName": "$staff.fullName", "companyName": "$name"}},
]

In [6]:
queries_q1 = [
    {
        "name": "Model 1",
        "collection": "m1_people",
        "pipeline": q1_1,
    },
    {
        "name": "Model 2",
        "collection": "m2_people",
        "pipeline": q1_2,
    },
    {
        "name": "Model 3",
        "collection": "m3_companies",
        "pipeline": q1_3,
    },
]

In [7]:
q1_res = lab.run_batch(queries_q1)


# Model 1
{'fullName': 'Ramiro Cifuentes', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
{'fullName': 'Jose Luis Gutiérrez', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
{'fullName': 'Cruz Palomino', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
... and 113098 other results
Time taken: 7.0472 seconds

# Model 2
{'fullName': 'Ramiro Cifuentes', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
{'fullName': 'Jose Luis Gutiérrez', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
{'fullName': 'Cruz Palomino', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
... and 113098 other results
Time taken: 0.4007 seconds

# Model 3
{'fullName': 'Ramiro Cifuentes', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
{'fullName': 'Jose Luis Gutiérrez', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
{'fullName': 'Cruz Palomino', 'companyName': 'Banca Privada OLMJ S.L.N.E'}
... and 113098 other results
Time taken: 0.2809 seconds

Model 1: 7.047s
Model 2: 0.4007s
Model 3: 0.2809s
The fastest model is Model 3 with: 0.2809s


### Q2: For each company, retrieve its name and the number of employees.

In [8]:
q2_1 = [
    {"$group": {"_id": "$worksIn", "employeeCount": {"$sum": 1}}},
    {
        "$lookup": {
            "from": "m1_companies",
            "localField": "_id",
            "foreignField": "_id",
            "as": "companyName",
        }
    },
    {"$unwind": "$companyName"},
    {"$project": {"_id": 0, "companyName": "$companyName.name", "employeeCount": 1}},
    {
        "$replaceRoot": {
            "newRoot": {
                "companyName": "$companyName",
                "employeeCount": "$employeeCount",
            }
        }
    },
]  # for the order of the output, first company then number of employees

q2_2 = [
    {
        "$group": {
            "_id": "$worksIn._id",
            "companyName": {"$first": "$worksIn.name"},
            "employeeCount": {"$sum": 1},
        }
    },
    {"$project": {"_id": 0, "companyName": 1, "employeeCount": 1}},
]

q2_3 = [
    {
        "$project": {
            "_id": 0,
            "companyName": "$name",
            "employeeCount": {"$size": "$staff"},
        }
    }
]

In [9]:
queries_q2 = [
    {
        "name": "Model 1",
        "collection": "m1_people",
        "pipeline": q2_1,
    },
    {
        "name": "Model 2",
        "collection": "m2_people",
        "pipeline": q2_2,
    },
    {
        "name": "Model 3",
        "collection": "m3_companies",
        "pipeline": q2_3,
    },
]

In [10]:
q2_res = lab.run_batch(queries_q2)


# Model 1
{'companyName': 'Talleres Salinas S.L.', 'employeeCount': 47}
{'companyName': 'Montesinos y Canales S.L.N.E', 'employeeCount': 34}
{'companyName': 'Inversiones Internacionales S.L.L.', 'employeeCount': 26}
... and 2997 other results
Time taken: 0.3310 seconds

# Model 2
{'companyName': 'Talleres Salinas S.L.', 'employeeCount': 47}
{'companyName': 'Montesinos y Canales S.L.N.E', 'employeeCount': 34}
{'companyName': 'Inversiones Internacionales S.L.L.', 'employeeCount': 26}
... and 2997 other results
Time taken: 0.1424 seconds

# Model 3
{'companyName': 'Banca Privada OLMJ S.L.N.E', 'employeeCount': 25}
{'companyName': 'Grupo Rocamora S.Com.', 'employeeCount': 48}
{'companyName': 'Restauración CR S.Coop.', 'employeeCount': 33}
... and 2997 other results
Time taken: 0.0455 seconds

Model 1: 0.331s
Model 2: 0.1424s
Model 3: 0.0455s
The fastest model is Model 3 with: 0.0455s


### Q3: For each person born before 1988, update their age to “30”.

In [None]:
# Model 1 params
q3_1_filter = {"dateOfBirth": {"$lt": datetime.datetime(1988, 1, 1)}}
q3_1_update = {"$set": {"age": "30"}}
# Model 2 params
q3_2_filter = {"dateOfBirth": {"$lt": datetime.datetime(1988, 1, 1)}}
q3_2_update = {"$set": {"age": "30"}}
# Model 3 params -- To update ALL matching staff members and not only the first one, we need to use arrayFilters
q3_3_filter = {
    "staff": {"$elemMatch": {"dateOfBirth": {"$lt": datetime.datetime(1988, 1, 1)}}}
}
q3_3_update = {"$set": {"staff.$[elem].age": "30"}}
q3_3_array_filters = [{"elem.dateOfBirth": {"$lt": datetime.datetime(1988, 1, 1)}}]

In [13]:
queries_q3 = [
    {
        "name": "Model 1",
        "collection": "m1_people",
        "filter": q3_1_filter,
        "update": q3_1_update,
        "array_filters": None,
    },
    {
        "name": "Model 2",
        "collection": "m2_people",
        "filter": q3_2_filter,
        "update": q3_2_update,
        "array_filters": None,
    },
    {
        "name": "Model 3",
        "collection": "m3_companies",
        "filter": q3_3_filter,
        "update": q3_3_update,
        "array_filters": q3_3_array_filters,
    },
]

In [14]:
q3_res = lab.run_batch(queries_q3, is_update=True)


# Model 1
Updated 70649 documents
Time taken: 1.5385 seconds

# Model 2
Updated 70649 documents
Time taken: 1.4808 seconds

# Model 3
Updated 3000 documents
Time taken: 0.3657 seconds

Model 1: 1.538s
Model 2: 1.481s
Model 3: 0.3657s
The fastest model is Model 3 with: 0.3657s


### Q4: For each company, update its name to include the word “Company”.

In [None]:
q4_1 = {}
q4_2 = [
    
]
q4_3 = {}

In [None]:
queries_q4 = [
    {
        "name": "Model 1",
        "collection": "m1_people",
        "pipeline": q4_1
    },
    {
        "name": "Model 2",
        "collection": "m2_people",
        "pipeline": q4_2
    },
    {
        "name": "Model 3",
        "collection": "m3_companies",
        "pipeline": q4_3
    },
]

In [None]:
res_q4 = lab.run_batch(queries_q4)

In [None]:
# Finally close the connection
lab.close_connection()

## C. Performance Comparison