# Module 2 Assignment: Adding functionalities to your Automatic Code Review Crew

Welcome to Module 2's assignment! In this lab, you will build upon the Automatic Code Review Crew you created in the assignment in Module 1. This time, you will add the new functionalities you learned in this lesson to take your crew's performance to the next level.

## Background
Your first try at the automation tool for code reviewing was a success. You want to go one step further by adding memory, guardrails and execution hooks to ensure you get the best possible results out of your tool.

Here is a visual summary of the structure of your crew, as well as the new elements you will be adding: 
<div style="text-align: center;">
<img src="./images/agents-tasks-diagram.png" width=600>
</div>

## General instructions for grading
- Replace all `None` instances with your own solution.
- You can add new cells to experiment, but these will be omitted by the grader. Only use the provided cells for your solution code.
- Before submitting, make sure all the cells in your lab work correctly.
- **Do not change variable names**: if you modify variable names, the grader won't be able to find your solutions
- **Use the provided configuration**: for grading, please use all provided configurations. Don't change the configuration files or settings. You can experiment after submitting your lab.
- To submit your notebook, save it and then click on the red **Submit Assignment** button at the top right of the page.

**<font color='#5DADEC'>Please make sure to save your work periodically, so you don't lose any progress.</font>**

## Table of contents

1. [Set up your notebook](#1)
2. [Agents](#2)
3. [Guardrails](#3)
   - [Exercise 1: Write the `security_review_output_guardrail` guardrail](#ex1)
   - [Exercise 2: Write the `review_decision_guardrail` guardrail](#ex2)
4. [Tasks](#4)
   - [Exercise 3: Create the Analyze Code Quality task](#ex3)
   - [Exercise 4: Create the Review Security task](#ex4)
   - [Exercise 5: Create the Review Decision task](#ex5)
5. [Execution Hooks](#5)
   - [Exercise 6: Create a hook to read the PR file](#ex6)
6. [Creating the Crew](#6)
   - [Exercise 7: Define the Crew](#ex7)
   - [Exercise 8: Kickoff the Crew](#ex8)

<a id="1"></a>

## 1 - Set up your notebook

Begin by importing all necessary modules and configure your environment variables to connect to the LLM APIs.

The libraries are already installed in the classroom. If you're running this notebook on your own machine, you can install the following:

`!pip install crewai[tools]=1.3.0`

<a id="1-1"></a>

### 1.1 - Import modules
Run the following cell to import all the modules you will need for this lab.

In [1]:
!pip install crewai[tools]==1.3.0

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
from crewai import Agent, Task, Crew
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
from pydantic import BaseModel
from utils import get_openai_api_key, get_serper_api_key, clean_markdown
from IPython.display import Markdown, display
import yaml
import os
os.environ["CREWAI_TESTING"] = "true"

In [3]:
import unittests
import dill

<a id="1-2"></a>

### 1.2 - Setup the environment variables

Next, set up the environment variables to connect to the APIs, and create the LLM instance you will use for your Agents

In [4]:
# set up the OpenAI model
os.environ["MODEL"] = "gpt-4o-mini"

# set up the OpenAI API key 
os.environ["OPENAI_API_KEY"] = get_openai_api_key()

# set the Serper API key for the WebsiteSearchTool
os.environ["SERPER_API_KEY"] = get_serper_api_key()

<a id="1-3"></a>

### 1.3 - Import configuration files

Since you already defined the agents and tasks in Module 1's assignment, this time you will just load the parameters from a `YAML` file. These files contain `role`, `goal` and `backstory` for Agents, and `description` and `expected_output` for Tasks. This way, you only need to set up the new parameters and functionalities.

Run the next cell to define the configurations.

In [5]:
# Define file paths for YAML configurations
files = {
    'agents': 'config/agents.yaml',
    'tasks': 'config/tasks.yaml'
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, 'r') as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
agents_config = configs['agents']
tasks_config = configs['tasks']

<a id="2"></a>

## 2 - Agents

You will begin by creating your agents. This time you don't have to write any code, because you will be using exactly the same agents as in the previous assignment. The only difference is that this time the `role`, `goal` and `backstory` for each agent are given to you in the YAML configuration file. 

Run the cell below to create all the agent instances

In [6]:
# Create the tool instances for the security engineer agent
serper_search_tool = SerperDevTool(search_url="https://owasp.org", 
                                   base_url=os.getenv("DLAI_SERPER_BASE_URL")) 
scrape_website_tool = ScrapeWebsiteTool()


# create the Senior Developer agent
senior_developer = Agent( 
    # load role, goal, and backstory from the YAML configuration
    config=agents_config['senior_developer'],
    # set verbose
    verbose=True 
)

# create the Security Engineer agent
security_engineer = Agent( 
    # load role, goal, and backstory from the YAML configuration
    config=agents_config['security_engineer'],
    # add the website search tools (you need to unpack the list)
    tools=[serper_search_tool, scrape_website_tool],
    # set verbose 
    verbose=True,
)

# create the Tech Lead agent
tech_lead = Agent(
    # load role, goal, and backstory from the YAML configuration
    config=agents_config['tech_lead'],
    # set verbose
    verbose=True
)

<a id="3"></a>

## 3 - Guardrails
You need to guarantee security reviews have proper structure and standardized risk levels to prevent vulnerabilities from being misclassified or overlooked. Guardrails provide this essential validation!

You will define two guardrails:


| Name             | Functionality      | Task          |
|------------------|--------------------|---------------|
| `security_review_output_guardrail`| Ensures the security risks are within the specified categories| `review_security`|
| `review_decision_guardrail` | 	Ensures output includes an actionable decision (approve, etc.) | `make_review_decision`|

Each guardrail must return a tuple with a `bool` of whether the checks passed or failed, and a message (optional). If you want to know more details about guardrails in CrewAI, please check the [docs](https://docs.crewai.com/en/concepts/tasks#task-guardrails).

<a id="ex1"></a>

### Exercise 1: Write the `security_review_output_guardrail` guardrail

Complete the cell below to define the function to create the guardrail for the security review output. This guardrail needs to validate: 
- The `risk_level` for each vulnerability is one of the three accepted categories: `low`, `medium` or `high`.
- The `highest_risk` actually matches the highest value in `risk_level`.

Some of the structure is already given to you; you only need to fill in the `None` placeholders.

In [7]:
# GRADED CELL: Exercise 1

def security_review_output_guardrail(output):
    
    # get the (JSON) output from the TaskOutput object
    try: 
        json_output = output if type(output)==dict else output.json_dict 
    except Exception as e:
        return (False, ("Error retrieving the `json_dict` argument: "
                        f"\n{str(e)}\n"
                        "Make sure you set the output_json parameter in the Task."
                        )
                )

    # define risk levels
    valid_risk_levels = ['low', 'medium', 'high']
    
    # Check if security_vulnerabilities key exists
    if 'security_vulnerabilities' not in json_output:
        return (False, f"Missing 'security_vulnerabilities' key in output. Got keys: {list(json_output.keys())}")
    
    # validate that each of the risk levels has a valid value
    for vuln in json_output['security_vulnerabilities']:
        # validate the risk level
        if vuln['risk_level'].lower() not in valid_risk_levels: 
            error_message = f"Invalid risk level: {vuln['risk_level']}"
            return (False, error_message) 
    
    ### START CODE HERE ###
    # validate that the highest risk level matches the highest risk level in the vulnerabilities
    
    # if the highest risk level is not valid risk level, return an error message
    if vuln["risk_level"].lower() not in valid_risk_levels:
        error_message = f"Invalid highest_risk: {json_output['highest_risk']}"
        return (False, error_message)
    
    # if it is one of the valid risk levels, then check if it matches the highest 
    # risk level in the vulnerabilities
    else:
        # get all risk_level values
        risk_levels = [vuln['risk_level'].lower() for vuln in json_output['security_vulnerabilities']] 
        
        # if "high" in risk levels, then highest risk level should be high
        if "high" in risk_levels: 
            if json_output["highest_risk"].lower() != "high":
                error_message = "Highest risk level does not match the highest risk level in the vulnerabilities." 
                return (False, error_message)
            
        # if high is not present and medium is in risk levels, then highest risk level should be medium
        elif "medium" in risk_levels: 
            if json_output["highest_risk"].lower() != "medium":
                error_message = "Highest risk level does not match the highest risk level in the vulnerabilities." 
                return (False, error_message)
            
        # if high and medium are not present, then lowest risk level should be low
        elif "low" in risk_levels: 
            if json_output["highest_risk"].lower() != "low":
                error_message = "Highest risk level does not match the highest risk level in the vulnerabilities." 
                return (True, output.json_dict)
    
    ### END CODE HERE ###

    return (True, output.json_dict)

Try the guardrail with an invalid JSON dictionary, where the highest risk is not correctly identified.

In [8]:
# GRADED CELL: Exercise 1

def security_review_output_guardrail(output):
    
    # get the (JSON) output from the TaskOutput object
    try: 
        json_output = output if type(output)==dict else output.json_dict 
    except Exception as e:
        return (False, ("Error retrieving the `json_dict` argument: "
                        f"\n{str(e)}\n"
                        "Make sure you set the output_json parameter in the Task."
                        )
                )

    # define risk levels
    valid_risk_levels = ['low', 'medium', 'high']
    
    # Check if security_vulnerabilities key exists
    if 'security_vulnerabilities' not in json_output:
        return (False, f"Missing 'security_vulnerabilities' key in output. Got keys: {list(json_output.keys())}")
    
    # validate that each of the risk levels has a valid value
    for vuln in json_output['security_vulnerabilities']:
        # validate the risk level
        if vuln['risk_level'].lower() not in valid_risk_levels: 
            error_message = f"Invalid risk level: {vuln['risk_level']}"
            return (False, error_message) 
    
    ### START CODE HERE ###
    # validate that the highest risk level matches the highest risk level in the vulnerabilities
    
    # if the highest risk level is not valid risk level, return an error message
    if json_output["highest_risk"].lower() not in valid_risk_levels:
        error_message = f"Invalid highest_risk: {json_output['highest_risk']}"
        return (False, error_message)
    
    # if it is one of the valid risk levels, then check if it matches the highest 
    # risk level in the vulnerabilities
    else:
        # get all risk_level values
        risk_levels = [vuln['risk_level'].lower() for vuln in json_output['security_vulnerabilities']] 
        
        # if "high" in risk levels, then highest risk level should be high
        if "high" in risk_levels: 
            if json_output["highest_risk"].lower() != "high":
                error_message = "Highest risk level does not match the highest risk level in the vulnerabilities." 
                return (False, error_message)
            
        # if high is not present and medium is in risk levels, then highest risk level should be medium
        elif "medium" in risk_levels: 
            if json_output["highest_risk"].lower() != "medium":
                error_message = "Highest risk level does not match the highest risk level in the vulnerabilities." 
                return (False, error_message)
            
        # if high and medium are not present, then lowest risk level should be low
        elif "low" in risk_levels: 
            if json_output["highest_risk"].lower() != "low":
                error_message = "Highest risk level does not match the highest risk level in the vulnerabilities." 
                return (False, error_message)
    
    ### END CODE HERE ###

    return (True, output.json_dict)

##### **Expected output**:
```
(False,
 'Highest risk level does not match the highest risk level in the vulnerabilities.')
```

In [9]:
# test the guardrail
unittests.test_security_review_output_guardrail(security_review_output_guardrail)

[92m All tests passed!



<a id="ex2"></a>

### Exercise 2: Write the `review_decision_guardrail` guardrail

Complete the cell below to define the function to create the guardrail for the review decision output. This guardrail needs to make sure the output includes one of the required decision values: "approve", "request changes" or "escalate".

Some of the structure is already given to you; you only need to fill in the `None` placeholders.

In [10]:
# GRADED CELL: Exercise 2

def review_decision_guardrail(output):
    # get the raw output from the TaskOutput object
    try:
        output_str = output if type(output)==str else output.raw
    except Exception as e:
        return (False, ("Error retrieving the `raw` argument: "
                        f"\n{str(e)}\n"
                        "Make sure you set the raw parameter in the Task."
                        )
                )

    # define the keywords to check for in the output
    keywords = ["approve", "request changes", "escalate"]
    
    ### START CODE HERE ###

    # check if any of the keywords are present in the output
    if not any(keyword in output_str.lower() for keyword in keywords):
        # write a suitable message to the console
        error_message = "Output does not include one of the valid actionable decisions: 'approve', 'request changes', or 'escalate'."
        return (False, error_message)

    # if all checks pass, return True, and the output
    return (True, output_str)

    ### END CODE HERE ###

Try the guardrail with an output missing the possible decision values.

In [11]:
# create the raw output for testing
input = 'Final decision: Elevate to human'

# test the guardrail
review_decision_guardrail(input)

(False,
 "Output does not include one of the valid actionable decisions: 'approve', 'request changes', or 'escalate'.")

##### **Expected output**:
In this case, the message will vary depending on what you chose
```
(False, 'Output does not include one of the valid actionable decisions.')
```

In [12]:
# test the guardrail
unittests.test_review_decision_guardrail(review_decision_guardrail)

[92m All tests passed!



<a id="4"></a>

## 4 - Tasks
Now that you have defined the guardrails and execution hooks, you are ready to define the tasks. You will create the three original tasks. For each one, you will load the `description`, `expected_output` and `name` from the yaml file, but you will need to add the guardrails and output types. The difference file (PR) contents will be set in the `file_content` key of the inputs to the crew. 

<a id="ex3"></a>

### Exercise 3: Create the Analyze Code Quality task
Start by defining the Analyze Code Quality task. The structure of the JSON output is defined for you. It includes the following keys:
* `critical_issues`: a list of issues that need fixing
* `minor_issues`: a list of suggested improvements
* `reasoning`: text with the explanation of the reasoning

You should:
1. Define the Task:
    * Use the `config` parameter to load the task configuration from the YAML file
    * Set the `output_json` parameter with the corresponding structure
    * Assign to the corresponding `agent`

In [15]:
# GRADED CELL: Exercise 3

# Define the pydantic model for the code quality analysis output
class CodeQualityJSON(BaseModel):
    critical_issues: list[str]
    minor_issues: list[str]
    reasoning: str

### START CODE HERE ###

# Create the quality analysis task
analyze_code_quality = Task(
    # Load the expected output, and name from the YAML configuration
    config=tasks_config['analyze_code_quality'], 
    # Define the output type as a pydantic model
    output_json=CodeQualityJSON,
    # Define the agent that will perform this task
    agent=senior_developer, 
)

### END CODE HERE ###

In [16]:
# test the analyze_code_quality task
unittests.test_analyze_code_quality(analyze_code_quality)

[92m All tests passed!



<a id="ex4"></a>

### Exercise 4: Create the Review Security task

1. Define the JSON structure of the Security vulnerabilities. You need the following keys:
    * `description`: string with the description
    * `risk_level`: a string indicating the level
    * `evidence`: a string showing the evidence for the risk

2. Define the JSON structure for the output. You need the following keys:
    * `security_vulnerabilities`: a list of Security vulnerabilities (structure defined before)
    * `blocking`: a boolean indicating if security issues should block approval
    * `highest_risk`: a string with the most severe risk level found
    * `security_recommendations`: a list of strings with specific fixes for identified vulnerabilities

2. Define the Task:
    * Use the `config` parameter to load the task configuration from the YAML file
    * Set the `output_json` parameter
    * Add the guardrail
    * Assign to the corresponding agent

In [17]:
# GRADED CELL: Exercise 4

### START CODE HERE ###

# Define the pydantic model for the security vulnerabilities
class SecurityVulnerability(BaseModel):
    description:str
    risk_level:str
    evidence:str

# Define the pydantic model for the security review output
class ReviewSecurityJSON(BaseModel):
    security_vulnerabilities: list[SecurityVulnerability] 
    blocking:bool
    highest_risk:str
    security_recommendations:list[str]

# Create the security review task
review_security = Task(
    # Load the expected output, agent, and name from the YAML configuration
    config=tasks_config['review_security'], 
    # Define the output type as a pydantic model
    output_json=ReviewSecurityJSON,
    # Add the security_review_output_guardrail guardrail
    guardrails=[security_review_output_guardrail],
    # Define the agent that will perform this task
    agent=security_engineer 
)

### END CODE HERE ###

In [18]:
# test the review_security task
unittests.test_review_security(review_security, SecurityVulnerability, ReviewSecurityJSON)

[92m All tests passed!



<a id="ex5"></a>

### Exercise 5: Create the Review Decision task

Next, define the Review Decision task. For this task, you will need to set the corresponding `guardrail`. 

1. Define the Task:
    * Use the `config` parameter to load the task configuration from the YAML file
    * Set the `Markdown` parameter, to get the final report in Markdown format
    * Add the guardrail
    * Assign to the corresponding agent

In [19]:
# GRADED CELL: Exercise 5

### START CODE HERE ###

# Create the review decision task
make_review_decision = Task(
    # Load the expected output, agent, and name from the YAML configuration
    config=tasks_config['make_review_decision'], 
    # Set the Markdown parameter to get the final report in Markdown format
    markdown=True,
    # Add the review_decision_guardrail guardrail
    guardrails=[review_decision_guardrail],
    # Set the context with the previous task objects
    context=[analyze_code_quality, review_security], 
    # Define the agent that will perform this task
    agent=tech_lead 
)

### END CODE HERE ###

In [20]:
# test the make_review_decision task
unittests.test_make_review_decision(make_review_decision)

[92m All tests passed!



<a id="5"></a>

## 5 - Execution Hooks

All your tasks require access to the code difference (PR file), but reading the file isn't really an "agentic" task that requires intelligent decision-making. Instead of having your agents read the file directly, you can create a **before-kickoff hook** that handles this automatically. This hook will read the file and add its content to the crew's inputs before the agents begin their work.

<a id="ex6"></a>

### Exercise 6: Create a hook to read the PR file
Write a function to be used as a hook. This function should:
1. Receive the file path from the `file_path` parameter in the crew's inputs
2. Read the contents of the PR file
3. Add a new key to the `input` dictionary called `file_content` containing the loaded file contents

In [21]:
# GRADED CELL: Exercise 6

# Define the execution hook to read the PR file
def read_file_hook(inputs):
    
    ### START CODE HERE ###
    
    # get the file_path from the inputs
    filename = inputs.get("file_path")
    
    ### END CODE HERE ###

    # if the filename is not provided, raise an error
    if not filename:
        raise ValueError("Missing 'file_path' in inputs")

    # try reading the file
    try:
        with open(filename, "r") as f:
            file_contents = f.read()
    # if there are any issues, raise an error
    except Exception as e:
        raise RuntimeError(f"Failed to read file {filename}: {e}")

    
    ### START CODE HERE ###

    # add the file contents to the inputs
    inputs["file_content"] = file_contents
    
    # return the modified inputs
    return inputs

    ### END CODE HERE ###

In [22]:
# test the read_file_hook
unittests.test_read_file_hook(read_file_hook)

[92m All tests passed!



<a id="6"></a>

## 6 - Creating the Crew

Now that all the elements are in place, you can define the Crew and kick it off to get the assessment for your PR.

<a id="ex7"></a>

### Exercise 7: Define the Crew
In this step, you will define the crew. You need to set up agents and tasks, just like in the previous module, but this time you will also need to add memory. Adding memory enables agents to remember previously identified security vulnerabilities and coding patterns across multiple pull requests, improving consistency and allowing them to recognize recurring issues without starting from scratch each time.

In [23]:
# GRADED CELL: Exercise 7

### START CODE HERE ###

# Create the code review crew
crew = Crew(
    # add the list of agents
    agents=[senior_developer, security_engineer, tech_lead],
    # add the list of tasks (in order of execution) 
    tasks=[analyze_code_quality, review_security, make_review_decision],
    # add memory to the crew
    memory=True,
    # add the before-kickoff hook to read the PR file
    # you need to pass a list with the hook function from Ex 8
    before_kickoff_callbacks= [read_file_hook]
)

### END CODE HERE ###

In [24]:
# test the crew
unittests.test_crew(crew)

[92m All tests passed!



<a id="ex8"></a>

### Exercise 8: Kickoff the crew

If all tests from the previous exercises are passed, you are ready to kickoff the crew and save the results in a `dill` file. 

In [25]:
# GRADED CELL: Exercise 8

# define the file path for the PR file
file_path = "files/code_changes.txt" 

### START CODE HERE ###

# kickoff the crew
result = crew.kickoff(
    # add the inputs dictionary with the file path
    inputs={'file_path':file_path}
    )

### END CODE HERE ###

# Save the result to a dill file
with open('result.dill', 'wb') as f:
    dill.dump(result, f)

Let's check out the final report!

In [26]:
# clean the markdown output, in case the report is wrapped with code fences
clean_report = clean_markdown(result.raw)

# display the final report as markdown
display(Markdown(clean_report))

# Code Review Decision

## Final Decision
**Request Changes**

## Summary of Key Findings
The code changes introduced in the pull request present several concerns that, while not critical, require attention to enhance security, maintainability, and overall code quality. The following minor issues were identified:

- **SQL Injection Risk**: The code constructs raw SQL queries with direct user input, which poses a medium risk of SQL injection attacks. This can be mitigated by implementing parameterized queries or using an Object-Relational Mapping (ORM) library.
- **Handling Date/Time in SQL**: Using `NOW()` directly in the SQL statement for updating the `last_login` timestamp may not work uniformly across different database systems. A better practice would be to handle date/time in Python or use compatible database functions.
- **Logging for Security**: The current implementation of login attempts employs print statements, which is not secure. It is recommended to use a logging framework to manage logs correctly and securely.

## Changes Required
To align the code with best practices and eliminate vulnerabilities, please address the following:

1. **SQL Injection Prevention**: 
   - Modify the way user input is handled in SQL queries. Use parameterized queries or an ORM to prevent SQL injection.

   Example:
   ```python
   user = db.query("SELECT * FROM users WHERE username = %s", (username,))
   ```

2. **Date/Time Handling**:
   - Refactor the `last_login` update logic to either use Python's datetime capabilities or compatible SQL functions rather than relying on `NOW()`.
   
   Example:
   ```python
   now = datetime.now()
   db.execute("UPDATE users SET last_login = %s WHERE id = %s", (now, user.id))
   ```

3. **Logging Strategy**:
   - Replace print statements with a structured logging framework that properly handles sensitive information.
   
   Example:
   ```python
   import logging
   logger = logging.getLogger(__name__)
   logger.info(f"User {username} logged in successfully")
   logger.warning(f"Failed login attempt for user {username}")
   ```

## Additional Recommendations
- Ensure that comprehensive unit tests cover these changes to validate that both authentication and logging work as expected without introducing additional vulnerabilities.
- Consider performing a broader review of the entire user authentication process to maintain consistency and security across the application.

By addressing these issues, the code will not only conform to security best practices but also enhance maintainability, ensuring a more robust application in future development cycles.

You reached the end of the assignment. At this point you are ready to submit for grading.

After submitting and being satisfied with your grade you can take some time to experiment changing the guardrails, or adding new URLs to the Website Reading Tool, or even upload a different pull request file with code differences. Don't be afraid to shake things up!