# MongoDB Query Generator

In [1]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

## Install required libraries

`!pip install crewai crewai_tools python-dotenv pymongo`

## Set environment variables

The below keys are present in the `.env` file - 
- OPENAI_API_KEY
- OPENAI_MODEL_NAME
- MONGODB_CONNECTION_STRING
- DB_NAME
- COLLECTION_NAME

In [2]:
import os
from dotenv import load_dotenv

_ = load_dotenv()

MONGODB_CONNECTION_STRING = os.environ.get("MONGODB_CONNECTION_STRING")
DB_NAME = os.environ.get("DB_NAME")
COLLECTION_NAME = os.environ.get("COLLECTION_NAME")

In [3]:
from crewai import Agent, Crew, Task
from crewai_tools import BaseTool
import json
from pymongo import MongoClient

## Create Agents

In [4]:
mongodb_expert_agent = Agent(
    role="MongoDB Expert",
    goal="Write MongoDB queries for a given input and expected output",
    backstory=("You are a Principal engineer and an expert on MongoDB. "
               "You have a solid understanding of the NoSQL DB principles "
               "in general particularly in MongoDB. You are great at data modeling "
               "in MongoDB, querying and everything related to MongoDB"),
    allow_delegation=False,
    verbose=True
)

In [5]:
qa_agent = Agent(
    role="Quality Assurance Expert",
    goal="Review the MongoDB query for correctness and validatity",
    backstory=("You are a tester who tests the MongoDB queries to "
               "ensure that it produces the expected output. "
               "You test and validate the work of MongoDB Expert "
               "and work with them to get to a proper query for the "
               "given problem."),
    allow_delegation=True,
    verbose=True
)

## Create a custom Tool

In [6]:
class MongoDBQueryTool(BaseTool):
    name: str = "MongoDB Query Tool"
    description: str = ("This tool verifies a MongoDB query "
                        "by executing it on a given input "
                        "and comparing the output with the expected output. ")

    mongodb_conn_str: str
    db_name: str
    coll_name: str
    
    def _run(self, input_data: dict, expected_output: dict, mongodb_query: dict):
        db = self.get_database()
        collection = db[self.coll_name]
    
        self.delete_data(collection)
        self.insert_data(collection, input_data)
    
        result_from_db = self.execute_query(collection, mongodb_query)
        print(f"Result: {result_from_db}")
    
        # Sort the lists based on a canonical representation of each dictionary
        sorted_list1 = sorted(expected_output, key=lambda x: json.dumps(x, sort_keys=True))
        sorted_list2 = sorted(result_from_db, key=lambda x: json.dumps(x, sort_keys=True))
        
        return sorted_list1 == sorted_list2

    def get_database(self):
        client = MongoClient(self.mongodb_conn_str)
        return client[self.db_name]
    
    def insert_data(self, collection, data_array):
        if len(data_array) != 0 and collection is not None:
            collection.insert_many(data_array)
    
    def delete_data(self, collection):
        if collection is not None:
            collection.delete_many({})
    
    def execute_query(self, collection, aggregation_pipeline=None):
        if aggregation_pipeline:
            result = collection.aggregate(aggregation_pipeline)
        else:
            raise ValueError("Aggregation pipeline must be provided.")
        return list(result)

In [7]:
query_tool = MongoDBQueryTool(mongodb_conn_str=MONGODB_CONNECTION_STRING, db_name=DB_NAME, coll_name=COLLECTION_NAME)

## Create Tasks

In [8]:
write_query = Task(
    description=("1. Write a MongoDB aggregation pipeline that would "
                 "produce the expected output for the given input. "
                 "2. The aggregation pipeline written should "
                 "only be the list of stages in the aggregation pipeline "
                 "and shouldn't contain the `db.collection.aggregate` prefix. "
                 "3. If the Quality Assurance Expert finds an error with the "
                 "produce query, then review the error and come up "
                 "with a query that could avoid that issue and produce "
                 "the expected output. "
                 "Input data: {input_data} "
                 "Expected output data: {expected_output} "),
    expected_output="The MongoDB aggregation pipeline without the `db.collection.aggregate` prefix.",
    agent=mongodb_expert_agent
)

In [9]:
verify_query = Task(
    description=("1. Verify the query produced by MongoDB Expert "
                 "to check if it produces the expected output "
                 "for the given input. "
                 "2. Utilize the `MongoDB Query Tool` to verify the query "
                 "by passing the input data, expected output and "
                 "the aggregation pipeline. "
                 "3. If the tool responds with True, then the query is correct, "
                 "else, it is not. In the event it is not a correct query "
                 "then work with the MongoDB Expert to produce an updated query. "
                 "4. Once the accurate query is identified, return that as the result. "
                 "Input data: {input_data} "
                 "Expected output data: {expected_output} "),
    expected_output="The correct MongoDB aggregation pipeline that "
        "produces the expected output for the given input. "
        "The output should contain just the query without JSON formatting, "
        "a dict is what is expected. ",
    tools=[query_tool],
    agent=qa_agent
)

## Create Crew

In [10]:
mongodb_crew = Crew(
    agents=[mongodb_expert_agent, qa_agent],
    tasks=[write_query, verify_query],
    verbose=True
)

## Data

In [11]:
input_data = [
    { "name": "Sachin", "team": "India" },
    { "name": "Sourav", "team": "India" },
    { "name": "Lara", "team": "West Indies" }
]

output_data = [
    { "team": "India", "playerCount": 2 },
    { "team": "West Indies", "playerCount": 1 }
]

In [12]:
inputs = {
    "input_data": input_data,
    "expected_output": output_data
}

## Run the crew

In [13]:
result = mongodb_crew.kickoff(inputs=inputs)

[1m[95m [DEBUG]: == Working Agent: MongoDB Expert[00m
[1m[95m [INFO]: == Starting Task: 1. Write a MongoDB aggregation pipeline that would produce the expected output for the given input. 2. The aggregation pipeline written should only be the list of stages in the aggregation pipeline and shouldn't contain the `db.collection.aggregate` prefix. 3. If the Quality Assurance Expert finds an error with the produce query, then review the error and come up with a query that could avoid that issue and produce the expected output. Input data: [{'name': 'Sachin', 'team': 'India'}, {'name': 'Sourav', 'team': 'India'}, {'name': 'Lara', 'team': 'West Indies'}] Expected output data: [{'team': 'India', 'playerCount': 2}, {'team': 'West Indies', 'playerCount': 1}] [00m


[1m> Entering new CrewAgentExecutor chain...[0m
[32;1m[1;3mI now can give a great answer.

Final Answer:
```json
[
    {
        "$group": {
            "_id": "$team",
            "playerCount": {"$sum": 1}
        }
    },

In [14]:
print(result)

[
    {
        "$group": {
            "_id": "$team",
            "playerCount": {"$sum": 1}
        }
    },
    {
        "$project": {
            "_id": 0,
            "team": "$_id",
            "playerCount": 1
        }
    }
]


In [17]:
json.loads(result)

[{'$group': {'_id': '$team', 'playerCount': {'$sum': 1}}},
 {'$project': {'_id': 0, 'team': '$_id', 'playerCount': 1}}]