In [104]:
import re
from metagpt.actions import Action
from metagpt.logs import logger
from typing import Dict, List
import json
import math
import nest_asyncio
nest_asyncio.apply()

In [103]:
# Function to fetch restaurant reviews from a data file
def fetch_restaurant_data(restaurant_name: str) -> Dict[str, List[str]]:  
    # Dictionary to store restaurant reviews
    restaurant_reviews = {}

    # Open and read the restaurant-data.txt file, assuming each line contains: RestaurantName.
    with open("restaurant-data.txt", "r") as file:
        for line in file:
            line = line.strip()
            if not line:
                continue  # Skip empty lines
            try:
                # Split each line into restaurant name and review
                name, review = line.split(". ", 1)
                name = name.strip()
                review = review.strip()
                # Store review in the dictionary under the corresponding restaurant name
                if name not in restaurant_reviews:
                    restaurant_reviews[name] = []
                restaurant_reviews[name].append(review)
            except ValueError:
                # Skip lines that do not match the expected format
                continue

    # Find reviews for the specified restaurant (case insensitive search)
    reviews = []
    for key in restaurant_reviews.keys():
        if restaurant_name.lower() in key.lower():
            reviews.extend(restaurant_reviews[key])

    # Return the reviews if found, otherwise return a "No reviews found" message
    return {restaurant_name: reviews} if reviews else {restaurant_name: ["No reviews found for this restaurant."]}

# Function to extract scores from the review analysis
def extract_scores(reviews_data: Dict) -> [List[int], List[int]]:  # type: ignore 
    # Initialize lists to store food and service scores
    food_scores = []
    customer_service_scores = []

    # Extract scores for each review
    for review in reviews_data.get("reviews", []):
        food_scores.append(review.get("food_score", 0))
        customer_service_scores.append(review.get("customer_service_score", 0))

    return food_scores, customer_service_scores

# Function to calculate the overall score of a restaurant
def calculate_overall_score(restaurant_name: str, food_scores: List[int], customer_service_scores: List[int]) -> Dict[str, float]:  
    # Check if the lists are empty or if they have different lengths
    if not food_scores or not customer_service_scores or len(food_scores) != len(customer_service_scores):
        return {restaurant_name: 0.0}

    # Number of reviews
    N = len(food_scores)
    total_score = 0.0

    # Calculate the geometric mean score (penalizing food quality more than service quality)
    for food_score, service_score in zip(food_scores, customer_service_scores):
        total_score += math.sqrt(food_score ** 2 * service_score)

    # Calculate the final overall score, ensure it has at least 3 decimal places
    overall_score = (total_score / (N * math.sqrt(125))) * 10
    overall_score = round(overall_score, 3)

    return {restaurant_name: overall_score}

In [None]:
class Extract(Action):
    # Define the prompt template for extracting the restaurant name
    PROMPT_TEMPLATE: str = """
    Extract the name of the restaurant from ```{query}```. 
    Return the extracted name following this output format ```the name is: XXX``` with NO other texts.
    """

    # Set the default name for the action
    name: str = "ExtractName"

    # Asynchronous method to run the extraction action
    async def run(self, query: str):
        # Format the prompt using the provided query
        prompt = self.PROMPT_TEMPLATE.format(query=query)
        
        # Send the prompt to the AI model and await the response
        rsp = await self._aask(prompt)

        # Extract the name from the response
        extracted_name = Extract.extract_name(rsp)
    
        # Return the extracted name
        return extracted_name

    # Static method to extract the name from the response
    @staticmethod
    def extract_name(rsp):
        # Define the pattern to match the extracted name
        pattern = r"the name is: (.*)"
        
        # Search for the pattern in the response
        match = re.search(pattern, rsp)
        
        # Extract the name if a match is found, otherwise return the response
        name = match.group(1) if match else rsp
        
        # Return the extracted name, stripped of any leading/trailing whitespace
        return name.strip()
    
class Compare(Action):
    # Define the prompt template for comparing names
    PROMPT_TEMPLATE: str = """
    Consider the following list of names:\n
    ```
    {names}
    ```\n
    Select the entry which is most similar to ```{query}```.
    Return the selected entry following this output format ```the entry is: XXX``` with NO other texts.
    """

    # Set the default name for the action
    name: str = "CompareNames"

    # Asynchronous method to run the comparison action
    async def run(self, query: str, names: str):
        # Format the prompt using the provided query and names
        prompt = self.PROMPT_TEMPLATE.format(query=query, names=names)
        
        # Send the prompt to the AI model and await the response
        rsp = await self._aask(prompt)

        # Extract the most similar name from the response
        extracted_name = Compare.compare_names(rsp)
    
        # Return the extracted name
        return extracted_name

    # Static method to extract the most similar name from the response
    @staticmethod
    def compare_names(rsp):
        # Define the pattern to match the extracted name
        pattern = r"the entry is: (.*)"
        
        # Search for the pattern in the response
        match = re.search(pattern, rsp)
        
        # Extract the name if a match is found, otherwise return the response
        name = match.group(1) if match else rsp
        
        # Return the extracted name, stripped of any leading/trailing whitespace
        return name.strip()
    
class Analyze(Action):
    # Name of the action
    name: str = "AnalyzeReviews"

    # Asynchronous method to run the analysis on the provided reviews
    async def run(self, reviews: List[str]):
        # Generate the prompt using the provided reviews
        prompt = Analyze.get_prompt(reviews)
        
        # Send the prompt to the AI model and await the response
        rsp = await self._aask(prompt)
    
        # Return the response from the AI model
        return rsp

    # Static method to generate the prompt for the AI model
    @staticmethod
    def get_prompt(reviews: List[str]) -> str:  
        # Define the initial part of the prompt with instructions and score mappings
        prompt = (
            "Analyze the following restaurant reviews and extract two scores for each review:\n"
            "1. food_score (1-5)\n"
            "2. customer_service_score (1-5)\n\n"
            "Use the following adjective mappings to determine the scores:\n"
            "- Score 1: awful, horrible, disgusting\n"
            "- Score 2: bad, unpleasant, offensive\n"
            "- Score 3: average, uninspiring, forgettable\n"
            "- Score 4: good, enjoyable, satisfying\n"
            "- Score 5: awesome, incredible, amazing\n\n"
            "Each review contains exactly two adjectives, one for food and one for customer service.\n\n"
            "Provide the results in the following JSON format without adding any markdown:\n"
            "{\n"
            "  \"reviews\": [\n"
            "    {\n"
            "      \"food_score\": <int>,\n"
            "      \"customer_service_score\": <int>\n"
            "    },\n"
            "    ...\n"
            "  ]\n"
            "}"
        )
        
        # Append each review to the prompt
        for review in reviews:
            prompt += f"\n- \"{review}\""
        
        # Return the complete prompt
        return prompt

In [None]:
# Define a string containing a list of restaurant names
names = """
"McDonald's",
"Subway",
"Taco Bell",
"Chick-fil-A",
"Applebee's",
"Olive Garden",
"Cheesecake Factory",
"Buffalo Wild Wings",
"Starbucks",
"Krispy Kreme",
"Panera Bread",
"Tim Horton's",
"Chipotle",
"In-n-Out",
"Five Guys",
"Panda Express",
"Pret A Manger",
"Cinnabon",
"IHOP",
"Burger King"
"""

# Define the user query
query = "What is the overall score IHOPP?"

# Create an instance of the Extract agent
role = Extract()
# Log the query for debugging purposes
logger.info(query)

# Run the Extract agent to extract the restaurant name from the query
extracted_name = await role.run(query)
# Print the extracted name
print(extracted_name)

# Create an instance of the Compare agent
role = Compare()
# Log the extracted name for debugging purposes
logger.info(extracted_name)
# Run the Compare agent to find the most similar name from the list of names
extracted_name = await role.run(extracted_name, names)

# Print the most similar name found
print(extracted_name)


2024-11-07 16:39:41.059 | INFO     | __main__:<module>:30 - What is the overall score IHOPP?


the name is: IHOP

2024-11-07 16:39:44.097 | INFO     | metagpt.utils.cost_manager:update_cost:108 - prompt_tokens: 68, completion_tokens: 8
2024-11-07 16:39:44.101 | INFO     | __main__:<module>:36 - What is the overall score IHOPP?


P
IHOPP
the entry is: IHOP

2024-11-07 16:39:50.831 | INFO     | metagpt.utils.cost_manager:update_cost:108 - prompt_tokens: 176, completion_tokens: 7



IHOP


In [None]:
# Fetch restaurant data using the extracted_name and retrieves the reviews for that restaurant.
reviews = fetch_restaurant_data(extracted_name).get(extracted_name)

# Return the fetched reviews
reviews

["IHOP serves average breakfast fare that's uninspiring but gets the job done. The customer service is forgettable, with waitstaff who are sometimes attentive, sometimes not.",
 'The pancakes were average, nothing special. The service was unpleasant, with long wait times and inattentive staff.',
 'The food at IHOP was average, with standard breakfast fare. Unfortunately, the customer service was bad, with inattentive waitstaff and long waits for food.',
 "IHOP's food is forgettable, with pancakes that are nothing special. Unfortunately, the service was bad, with long wait times and inattentive staff.",
 'The food was average, and the service was bad. The pancakes were forgettable, and our server was unpleasant and inattentive throughout the meal.',
 "IHOP's food is uninspiring, with pancakes that are decent but not exceptional. The service is average, sometimes slow during busy breakfast hours.",
 "IHOP serves average breakfast fare that's uninspiring but gets the job done. The custome

In [None]:
# Create an instance of the Analyze agent
role = Analyze()
# Log the reviews for debugging purposes
logger.info(reviews)
# Run the Analyze agent to calculate the food and customer service scores for each review
scores = await role.run(reviews)

2024-11-07 16:39:55.747 | INFO     | __main__:<module>:2 - ["IHOP serves average breakfast fare that's uninspiring but gets the job done. The customer service is forgettable, with waitstaff who are sometimes attentive, sometimes not.", 'The pancakes were average, nothing special. The service was unpleasant, with long wait times and inattentive staff.', 'The food at IHOP was average, with standard breakfast fare. Unfortunately, the customer service was bad, with inattentive waitstaff and long waits for food.', "IHOP's food is forgettable, with pancakes that are nothing special. Unfortunately, the service was bad, with long wait times and inattentive staff.", 'The food was average, and the service was bad. The pancakes were forgettable, and our server was unpleasant and inattentive throughout the meal.', "IHOP's food is uninspiring, with pancakes that are decent but not exceptional. The service is average, sometimes slow during busy breakfast hours.", "IHOP serves average breakfast fare 

{
  "reviews": [
    {
      "food_score": 3,
      "customer_service_score": 2
    },
    {
      "food_score": 2,
      "customer_service_score": 1
    },
    {
      "food_score": 4,
      "customer_service_score": 2
    },
    {
      "food_score": 2,
      "customer_service_score": 1
    },
    {
      "food_score": 3,
      "customer_service_score": 2
    },
    {
      "food_score": 2,
      "customer_service_score": 1
    },
    {
      "food_score": 4,
      "customer_service_score": 2
    },
    {
      "food_score": 3,
      "customer_service_score": 2
    },
    {
      "food_score": 2,
      "customer_service_score": 1
    },
    {
      "food_score": 3,
      "customer_service_score": 2
    },
    {
      "food_score": 4,
      "customer_service_score": 2
    },
    {
      "food_score": 2,
      "customer_service_score": 1
    },
    {
      "food_score": 3,
      "customer_service_score": 2
    },
    {
      "food_score": 2,
      "customer_service_score": 1
    },
   

2024-11-07 16:42:50.304 | INFO     | metagpt.utils.cost_manager:update_cost:108 - prompt_tokens: 1507, completion_tokens: 997





In [None]:
# Extract the food and customer service scores in the form of lists
food_scores, customer_service_scores = extract_scores(json.loads(scores))

In [None]:
# Calculate the overall score for the restaurant using the extracted name and scores
score = calculate_overall_score(extracted_name, food_scores, customer_service_scores)

# Print the overall score for the restaurant
print(score)

{'IHOP': 3.179}
