# Lesson 3: Data Assistant with Agentic AI

## Import package

In [1]:
from crewai import Agent, Task, Crew, LLM

## Load ollama model

In [2]:
ollama_llama = LLM(
    model="ollama/llama3.2",
    base_url="http://localhost:11434",
    temperature=0
)

In [3]:
ollama_qwen_coder = LLM(
    model="ollama/qwen2.5-coder:14b",
    base_url="http://localhost:11434",
    temperature=0
)

## Create Pydantic Models for Structured Output

In [4]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

class FinalQuery(BaseModel):
    old_query: str = Field(..., description='the query from senior data engineer')
    adjusted_query: str = Field(..., description='the query which is adjusted by senior data engineering manager')
    explaination: str = Field(..., description='the explanation review from the senior data engineering manager')

## Create Agent

In [5]:
senior_product_manager = Agent(
    role="Senior Product Manager",
    goal="Define clear and precise text logic that aligns with business requirements.",
    backstory=(
        "You are a Top Product Manager tasked with rewriting the business requirements "
        "which is more easier to be understood by data engineer based on the business requirement. "
        "Do not directly generate sql query."        
    ),
    verbose=True,
    allow_delegation=False,
    llm=ollama_llama
)


In [6]:
senior_data_engineer = Agent(
    role="Senior Data Engineer",
    goal="Develop optimized, well-structured SQL queries",
    backstory=(
        "As a Senior Data Engineer, your task is to generate syntactically correct GoogleSQL queries "
        "based on given business requirements. Ensure queries are clear, efficient, and only "
        "retrieve necessary columns—never select all columns from a table. Use only column names "
        "explicitly available in the schema and avoid referencing non-existent columns. "
        "For string column comparisons, always use UPPER case to ensure case insensitivity."
    ),
    verbose=True,
    allow_delegation=False,
    llm=ollama_qwen_coder
)


In [7]:
senior_data_engineer_manager = Agent(
    role="Senior Data Engineering Manager",
    goal="Review and refine BigSQL queries from data engineers",
    backstory=(
        "As a Senior Data Engineering Manager, your task is to review SQL queries provided by senior data engineers. "
        "Verify their correctness, optimize them if necessary, and ensure they adhere to best practices. "
        "Keep queries up-to-date, accurate, and as simple as possible while maintaining efficiency. "
        "Ensure that only the columns defined in the schema are used. "
    ),
    verbose=True,
    allow_delegation=False,
    llm=ollama_qwen_coder
)


## Create Tasks

In [8]:
req = "Fetch all customer details along with their submitted reviews"

understand_requirement_task = Task(
    description=(
        "Translate the business requirement into a clear and structured format for data engineers. "
        "The requirement is to retrieve all customer details along with their history reviews. Ensure the requirement "
        "is precise, unambiguous, and easy to understand. and refer all table infomation which we have in the columns_information"
        "refer below columns_information, only those infomation can be used. "

    """
    <columns_information>\n
table: customer_reviews
{
    "review_id": "A unique identifier for each review (review id). This is typically the primary key.",\n
    "customer_id": "Refers to the unique ID of the customer who wrote the review. This is a foreign key linking to a customers table",\n
    "review_text": "Contains the actual review content written by the customer.",\n
    "rating": "Stores the rating score given by the customer. this would be a value between 1 to 5. 1 is lowest and 5 is highest."
}
table: customers
{
    "customer_id": "A unique identifier for each customer. This is the primary key.",
    "customer_name": "Stores the full name of the customer.",
    "email": "The email address of the customer. This is unique and not null.",
    "phone": "The phone number of the customer. It should be unique to prevent duplicate contacts.",
    "address": "The physical address of the customer.",
    "age": "Stores the age of the customer. This should be a non-negative integer.",
    "gender": "The gender of the customer. Allowed values are 'Male', 'Female', 'Non-binary', or 'Other'.",
    "income_level": "Represents the income category of the customer. Allowed values are 'Low', 'Medium', or 'High'.",
    "membership_status": "Defines the loyalty membership status. Allowed values are 'Bronze', 'Silver', 'Gold', or 'Platinum'."
}
</columns_information>
    """
    ),
    agent=senior_product_manager,
    expected_output="Well-defined, clear text requirement"
)

In [9]:
write_query_task = Task(
    description=(
        "Generate a clear and precise SQL query based on the requirement provided by the Product Manager."
        " Ensure the query adheres to best practices and retrieves only the necessary columns."
        " if the requirement (from product mamanger) has some columns which are not in the schema, just ignore it."
        "\n\n"
    """
     <table_schema>\n\n
    CREATE TABLE IF NOT EXISTS "customer_reviews" (
        "review_id" INTEGER,
        "customer_id" INTEGER,
        "review_text" TEXT,
        "rating" INTEGER
        );

    CREATE TABLE IF NOT EXISTS "customers" (
        "customer_id" INTEGER,
        "customer_name" TEXT,
        "email" TEXT,
        "phone" TEXT,
        "address" TEXT,
        "age" INTEGER,
        "gender" TEXT,
        "income_level" TEXT,
        "membership_status" TEXT
        );
        </table_schema>\n\n
    """ 
    """
    <columns_information>\n
table: customer_reviews
{
    "review_id": "A unique identifier for each review (review id). This is typically the primary key.",\n
    "customer_id": "Refers to the unique ID of the customer who wrote the review. This is a foreign key linking to a customers table",\n
    "review_text": "Contains the actual review content written by the customer.",\n
    "rating": "Stores the rating score given by the customer. this would be a value between 1 to 5. 1 is lowest and 5 is highest."
}
table: customers
{
    "customer_id": "A unique identifier for each customer. This is the primary key.",
    "customer_name": "Stores the full name of the customer.",
    "email": "The email address of the customer. This is unique and not null.",
    "phone": "The phone number of the customer. It should be unique to prevent duplicate contacts.",
    "address": "The physical address of the customer.",
    "age": "Stores the age of the customer. This should be a non-negative integer.",
    "gender": "The gender of the customer. Allowed values are 'Male', 'Female', 'Non-binary', or 'Other'.",
    "income_level": "Represents the income category of the customer. Allowed values are 'Low', 'Medium', or 'High'.",
    "membership_status": "Defines the loyalty membership status. Allowed values are 'Bronze', 'Silver', 'Gold', or 'Platinum'."
}
</columns_information>
    """
    ),
    agent=senior_data_engineer,
    expected_output="Optimized SQL Query"
)

In [10]:
code_review_task = Task(
    description=(
        "Review the SQL query provided by the data engineer, ensuring it aligns with the business "
        "and product requirements. Verify correctness, efficiency, and adherence to best practices. "
        "Ensure the query is accurate, optimized, and as simple as possible."
        "\n\n"
    """
     <table_schema>\n\n
    CREATE TABLE IF NOT EXISTS "customer_reviews" (
        "review_id" INTEGER,
        "customer_id" INTEGER,
        "review_text" TEXT,
        "rating" INTEGER
        );

    CREATE TABLE IF NOT EXISTS "customers" (
        "customer_id" INTEGER,
        "customer_name" TEXT,
        "email" TEXT,
        "phone" TEXT,
        "address" TEXT,
        "age" INTEGER,
        "gender" TEXT,
        "income_level" TEXT,
        "membership_status" TEXT
        );
        </table_schema>\n\n
    """ 

        ""
    """
    <columns_information>\n
table: customer_reviews
{
    "review_id": "A unique identifier for each review. This is typically the primary key.",\n
    "customer_id": "Refers to the unique ID of the customer who wrote the review. This is a foreign key linking to a customers table",\n
    "review_text": "Contains the actual review content written by the customer.",\n
    "rating": "Stores the rating score given by the customer. this would be a value between 1 to 5. 1 is lowest and 5 is highest."
}
table: customers
{
    "customer_id": "A unique identifier for each customer. This is the primary key.",
    "customer_name": "Stores the full name of the customer.",
    "email": "The email address of the customer. This is unique and not null.",
    "phone": "The phone number of the customer. It should be unique to prevent duplicate contacts.",
    "address": "The physical address of the customer.",
    "age": "Stores the age of the customer. This should be a non-negative integer.",
    "gender": "The gender of the customer. Allowed values are 'Male', 'Female', 'Non-binary', or 'Other'.",
    "income_level": "Represents the income category of the customer. Allowed values are 'Low', 'Medium', or 'High'.",
    "membership_status": "Defines the loyalty membership status. Allowed values are 'Bronze', 'Silver', 'Gold', or 'Platinum'."
}
</columns_information>
    """
    ),
    agent=senior_data_engineer_manager,
    expected_output="Reviewed and optimized SQL query",
    context=[understand_requirement_task, write_query_task],
    output_pydantic=FinalQuery
)


## Create Crew

In [11]:
crew = Crew(
    agents=[senior_product_manager, senior_data_engineer, senior_data_engineer_manager],
    tasks=[understand_requirement_task, write_query_task, code_review_task],
    verbose=True
)

In [12]:
result = crew.kickoff()

[1m[95m# Agent:[00m [1m[92mSenior Product Manager[00m
[95m## Task:[00m [92mTranslate the business requirement into a clear and structured format for data engineers. The requirement is to retrieve all customer details along with their history reviews. Ensure the requirement is precise, unambiguous, and easy to understand. and refer all table infomation which we have in the columns_informationrefer below columns_information, only those infomation can be used. 
    <columns_information>

table: customer_reviews
{
    "review_id": "A unique identifier for each review (review id). This is typically the primary key.",

    "customer_id": "Refers to the unique ID of the customer who wrote the review. This is a foreign key linking to a customers table",

    "review_text": "Contains the actual review content written by the customer.",

    "rating": "Stores the rating score given by the customer. this would be a value between 1 to 5. 1 is lowest and 5 is highest."
}
table: customers
{

In [13]:
print(result["old_query"])

SELECT 
    c.customer_id,
    c.customer_name,
    c.email,
    c.phone,
    c.address,
    c.age,
    c.gender,
    c.income_level,
    c.membership_status,
    r.review_id,
    r.review_text,
    r.rating
FROM 
    customers c
LEFT JOIN 
    customer_reviews r ON UPPER(c.customer_id) = UPPER(r.customer_id);


In [14]:
print(result["adjusted_query"])

SELECT 
    c.customer_id,
    c.customer_name,
    c.email,
    c.phone,
    c.address,
    c.age,
    c.gender,
    c.income_level,
    c.membership_status,
    r.review_id,
    r.review_text,
    r.rating
FROM 
    customers c
LEFT JOIN 
    customer_reviews r ON c.customer_id = r.customer_id;


# Save data

In [15]:
import sqlite3
import pandas as pd

In [16]:
import sqlite3
import csv

def run_query_and_save_csv(db_path, query, csv_path):
    """
    Runs a query on a SQLite database and saves the result to a CSV file.

    Args:
        db_path (str): Path to the SQLite database file.
        query (str): SQL query to execute.
        csv_path (str): Path to save the CSV file.
    """
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        cursor.execute(query)
        rows = cursor.fetchall()
        # Get column names
        column_names = [description[0] for description in cursor.description]
        with open(csv_path, 'w', newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(column_names)
            csv_writer.writerows(rows)
        print(f"Query results saved to {csv_path}")

    except sqlite3.Error as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if conn:
            conn.close()


In [17]:
q = """
SELECT 
    c.customer_id,
    c.customer_name,
    c.email,
    c.phone,
    c.address,
    c.age,
    c.gender,
    c.income_level,
    c.membership_status,
    r.review_id,
    r.review_text,
    r.rating
FROM 
    customers c
LEFT JOIN 
    customer_reviews r ON c.customer_id = r.customer_id;
"""


path = "/Users/jiazhenzhu/Desktop/Agentic AI/customer_reviews.db"


csv_path = "/Users/jiazhenzhu/Desktop/Agentic AI/result.csv"

In [18]:
run_query_and_save_csv(path, q, csv_path)

Query results saved to /Users/jiazhenzhu/Desktop/Agentic AI/result.csv
