# MongoDB data processing pipeline (Proof of concept)

## Demo

### Mock entries

We made an example database to demonstrate the idea, an example entry has been provided below

In [1]:
from mock_entries import mock_entries

mock_entries[0]

{'id': 0,
 'name': 'Alice Johnson',
 'age': 30,
 'email': 'alice.johnson@example.com',
 'interests': ['python', 'data science', 'hiking'],
 'address': {'street': '123 Oak St',
  'city': 'San Francisco',
  'country': 'USA'}}

### Demo functions

For each entry, we perform the following operations, in sequence:
1. Copy the entire entry
2. Capitalize the interests
3. Add the state code for the entry
Some of them are hard coded, we will the capacity to use llms later

In [2]:
def identity(doc: dict):
    return doc

def cap_interests(doc: dict):
    capped_interests = [interest.capitalize() for interest in doc["interests"]]
    return {"interests": capped_interests}

# Hard Coded, for demo purposes only
def correct_city_and_state(doc: dict):
    city = doc["address"]["city"]
    state_map = {
        "San Francisco": "CA",
        "New York": "NY",
        "Chicago": "IL",
        "Los Angeles": "CA",
        "Seattle": "WA",
        "Boston": "MA",
        "Houston": "TX",
        "Miami": "FL",
        "Austin": "TX",
        "Portland": "OR"
    }
    state = state_map.get(city, "Unknown")
    return {"address": {"state": state}}

### Demoing the pipeline

#### Running the pipeline

Start mongoDB first

In [5]:
from pymongo import MongoClient
from mongopipe.mongopipe import MongoPipe

# Set up MongoDB connections
client = MongoClient("mongodb://localhost:27017")
source_db = client["source_db"]
source_collection = source_db["source_collection"]
dest_db = client["dest_db"]
dest_collection = dest_db["processed_collection"]

# Clear existing data and insert mock entries
source_collection.delete_many({})
dest_collection.delete_many({})
source_collection.insert_many(mock_entries)

# Set up and run the pipeline
ops = [identity, cap_interests, correct_city_and_state]
pipe_instance = MongoPipe(source=source_collection, destination=dest_collection, ops=ops)
pipe_instance.run()

print("Pipeline processing complete.")

Pipeline processing complete.


### Comparing entries

In [6]:
def compare_entries(source_collection, dest_collection, entry_id):
    old_entry = source_collection.find_one({"id": entry_id})
    new_entry = dest_collection.find_one({"id": entry_id})
    
    if not old_entry or not new_entry:
        print(f"Entry with id {entry_id} not found in one or both collections.")
        return

    print(f"Comparing entries with ID: {entry_id}")
    print("Old entry:")
    print(old_entry)
    print("\nNew entry:")
    print(new_entry)
    print("\nDifferences:")
    
    for key in set(old_entry.keys()) | set(new_entry.keys()):
        if key not in old_entry:
            print(f"Added {key}: {new_entry[key]}")
        elif key not in new_entry:
            print(f"Removed {key}: {old_entry[key]}")
        elif old_entry[key] != new_entry[key]:
            print(f"Changed {key}:")
            print(f"  From: {old_entry[key]}")
            print(f"  To:   {new_entry[key]}")

In [8]:
compare_entries(source_collection, dest_collection, 0)

Comparing entries with ID: 0
Old entry:
{'_id': ObjectId('67162986f4f320d38153fb2e'), 'id': 0, 'name': 'Alice Johnson', 'age': 30, 'email': 'alice.johnson@example.com', 'interests': ['python', 'data science', 'hiking'], 'address': {'street': '123 Oak St', 'city': 'San Francisco', 'country': 'USA'}}

New entry:
{'_id': ObjectId('67162986f4f320d38153fb2e'), 'id': 0, 'name': 'Alice Johnson', 'age': 30, 'email': 'alice.johnson@example.com', 'interests': ['Python', 'Data science', 'Hiking'], 'address': {'street': '123 Oak St', 'city': 'San Francisco', 'country': 'USA', 'state': 'CA'}}

Differences:
Changed address:
  From: {'street': '123 Oak St', 'city': 'San Francisco', 'country': 'USA'}
  To:   {'street': '123 Oak St', 'city': 'San Francisco', 'country': 'USA', 'state': 'CA'}
Changed interests:
  From: ['python', 'data science', 'hiking']
  To:   ['Python', 'Data science', 'Hiking']


In [9]:
compare_entries(source_collection, dest_collection, 5)

Comparing entries with ID: 5
Old entry:
{'_id': ObjectId('67162986f4f320d38153fb33'), 'id': 5, 'name': 'Fiona Gallagher', 'age': 25, 'email': 'fiona.gallagher@example.com', 'interests': ['bartending', 'family', 'survival skills'], 'address': {'street': '303 Birch Ln', 'city': 'Boston', 'country': 'USA'}}

New entry:
{'_id': ObjectId('67162986f4f320d38153fb33'), 'id': 5, 'name': 'Fiona Gallagher', 'age': 25, 'email': 'fiona.gallagher@example.com', 'interests': ['Bartending', 'Family', 'Survival skills'], 'address': {'street': '303 Birch Ln', 'city': 'Boston', 'country': 'USA', 'state': 'MA'}}

Differences:
Changed address:
  From: {'street': '303 Birch Ln', 'city': 'Boston', 'country': 'USA'}
  To:   {'street': '303 Birch Ln', 'city': 'Boston', 'country': 'USA', 'state': 'MA'}
Changed interests:
  From: ['bartending', 'family', 'survival skills']
  To:   ['Bartending', 'Family', 'Survival skills']


### Demoing LLM capabilities

#### Use LLM to help us format (and add) city and state

In [None]:
from dotenv import load_dotenv
import os
import instructor
from openai import OpenAI

from mongopipe.demo_func.get_city_and_state_llm import get_city_and_state_func

load_dotenv()

client_args = {
    "model": "meta-llama/llama-3.2-3b-instruct",
    "temperature": 0.3, # To tune
}


client = OpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=os.getenv("OPENROUTER_API_KEY"),  # required, but unused
    )

structured_client = instructor.from_openai(
    client,
    mode=instructor.Mode.JSON,
)

correct_city_and_state_llm = get_city_and_state_func(structured_client, client_args)

#### Importing mock entries (again)

This time, we add an additional example to the thing

In [None]:
from mock_entries import mock_entries

wrong_city_spelling = {
    "id": 10,
    "name": "Kevin Malone",
    "age": 42,
    "email": "kevin.malone@example.com",
    "interests": ["accounting", "cooking", "basketball"],
    "address": {
        "street": "808 Maple Drive",
        "city": "Scranten",  # Misspelled, should be "Scranton"
        "country": "USA"
    }
}

new_mock_entries = mock_entries.append(wrong_city_spelling)

new_mock_entries[-1]

In [None]:
from pymongo import MongoClient
from mongopipe.mongopipe import MongoPipe

# Set up MongoDB connections
client = MongoClient("mongodb://localhost:27017")
source_db = client["source_db"]
source_collection = source_db["source_collection"]
dest_db = client["dest_db"]
dest_collection = dest_db["processed_collection"]

# Clear existing data and insert mock entries
source_collection.delete_many({})
dest_collection.delete_many({})
source_collection.insert_many(mock_entries)

# Set up and run the pipeline
ops = [identity, cap_interests, correct_city_and_state_llm]
pipe_instance = MongoPipe(source=source_collection, destination=dest_collection, ops=ops)
pipe_instance.run()

print("Pipeline processing complete.")

In [None]:
compare_entries(source_collection, dest_collection, 10)