# Task flagging pipeline

The pipeline that, for a given task, flags it as `success` or `failure`.

In [1]:
import os 

# Go to the parent directory of the folder notebook

if "app"  not in os.listdir():
    os.chdir("../")

In [2]:
import random
import phospho

from app.services.firebase.watchers.main import task_scoring_pipeline, main_pipeline

from app.api.v0.models.tasks import Task

from app.utils import generate_timestamp

from app.core import config
from app.db.client import db as firestore_db
from firebase_admin.firestore import FieldFilter, Query

## Setup

In [3]:
project_id = "zNyL8jX3N0QRXEiwwMFl"
session_id = "oyjuW0QQxHHcT0ApJEYt"

assert config.ENVIRONMENT == "staging", "You must be in staging environment"

## Dataset check and construction
Check if we have enough example for the few shot task. If not, we will add some tasks and labelize them.

In [4]:
print(f"Number of few shots : {config.FEW_SHOT_NUMBER_OF_EXAMPLES} examples")

Number of few shots : 10 examples


In [5]:
# Check if we have enough user labeled tasks in the database for this project

# We want 50/50 success and failure examples
nb_success = int(config.FEW_SHOT_NUMBER_OF_EXAMPLES / 2)
nb_failure = int(config.FEW_SHOT_NUMBER_OF_EXAMPLES / 2)

examples_tasks = []

# Get the user evals from the db

# Get the successfull examples
eval_docs = (
    firestore_db.collection("evals")
    .where(filter=FieldFilter("project_id", "==", project_id))
    .where(filter=FieldFilter("source", "==", "user")) # Only get human eval from the users
    .where(filter=FieldFilter("value", "==", "success")) # Only get successfull tasks
    .order_by("created_at", direction=Query.DESCENDING)
    .limit(nb_success)
    .stream()
)

for eval_doc in eval_docs:

    eval_data = eval_doc.to_dict()

    task_doc = firestore_db.collection("tasks").document(eval_data["task_id"]).get()

    # TODO : Validate the data model using the pydantic model
    examples_tasks.append({"task": task_doc.to_dict(), 
                            "human_flag": eval_data["value"],
                            })
    
# Get the failure examples
eval_docs = (
    firestore_db.collection("evals")
    .where(filter=FieldFilter("project_id", "==", project_id))
    .where(filter=FieldFilter("source", "==", "user")) # Only get human eval from the users
    .where(filter=FieldFilter("value", "==", "failure")) # Only get successfull tasks
    .order_by("created_at", direction=Query.DESCENDING)
    .limit(nb_failure)
    .stream()
)

for eval_doc in eval_docs:

    eval_data = eval_doc.to_dict()

    task_doc = firestore_db.collection("tasks").document(eval_data["task_id"]).get()

    # TODO : Validate the data model using the pydantic model
    examples_tasks.append({"task": task_doc.to_dict(), 
                            "human_flag": eval_data["value"],
                            })

# Check that the number of annoted tasks is > threshold
if len(examples_tasks) < config.FEW_SHOT_NUMBER_OF_EXAMPLES:
    raise Exception(f"Not enough user labeled tasks in the database for this project. \n We need {config.FEW_SHOT_NUMBER_OF_EXAMPLES} examples. \n There is currently {len(examples_tasks)} examples.")

In [6]:
examples_tasks

[{'task': {'output': "It's currently sunny and 75 degrees.",
   'input': "What's the weather like today?",
   'metadata': {},
   'additional_input': {},
   'flag': 'success',
   'data': {},
   'project_id': 'OTZupmVS0DFNs7CXAoid',
   'created_at': 1701945647,
   'session_id': 'MG3ALaY8sjDCwrxgdymY',
   'id': '018ab80a3e6b40588113ee7b9b7de6cf',
   'events': [],
   'additional_output': {}},
  'human_flag': 'success'},
 {'task': {'output': 'Try restarting your laptop to see if that improves its performance.',
   'input': 'My laptop is running slow, what should I do?',
   'metadata': {},
   'additional_input': {},
   'flag': 'success',
   'data': {},
   'project_id': 'OTZupmVS0DFNs7CXAoid',
   'created_at': 1701945647,
   'session_id': 'MG3ALaY8sjDCwrxgdymY',
   'id': 'cc1a84c75b124a42b733df14fc9c0ec3',
   'events': [],
   'additional_output': {}},
  'human_flag': 'success'},
 {'task': {'output': 'Bella Italia on Maple Street has great reviews.',
   'input': 'Can you suggest a good Italian

## Adding some labeled tasks

To run only if we don't have enough tasks for the few shot task.

In [5]:
def generate_task(input: str, output: str)->dict:
    task = Task(
        id=phospho.generate_uuid(),
        project_id=project_id,
        created_at=generate_timestamp(),
        session_id=session_id,
        input=input,
        output=output,
    )
    return task.model_dump()

In [8]:
# Format {input, output, flag}
dataset = [
    {
        "input": "Please stop sending messages to me",
        "output": "Sure",
        "flag": "failure",
    },
    {
        "input": "Can you reschedule my 3 PM meeting to tomorrow?",
        "output": "I have rescheduled your 3 PM meeting to the same time tomorrow.",
        "flag": "success",
    },
    {
        "input": "What's the weather like today?",
        "output": "It's currently sunny and 75 degrees.",
        "flag": "success",
    },
    {
        "input": "Please stop sending me reminders every hour.",
        "output": "I will adjust the reminders to once a day.",
        "flag": "success",
    },
    {
        "input": "Can you tell me my bank account balance?",
        "output": "I'm sorry, I don't have access to your bank account details.",
        "flag": "failure",
    },
    {
        "input": "Can you physically deliver a package to my friend's house?",
        "output": "I'm sorry, I can't perform physical tasks like delivering packages.",
        "flag": "failure",
    },
    {
        "input": "My laptop is running slow, what should I do?",
        "output": "Try restarting your laptop to see if that improves its performance.",
        "flag": "success",
    },
    {
        "input": "Can you suggest a good Italian restaurant nearby?",
        "output": "Bella Italia on Maple Street has great reviews.",
        "flag": "success",
    },
    {
        "input": "The meeting time you set was wrong, I missed my appointment!",
        "output": "I apologize for the mistake. I will double-check the schedule next time.",
        "flag": "failure",
    },
    {
        "input": "Can you make me a cup of coffee?",
        "output": "I'm not able to perform physical tasks like making coffee.",
        "flag": "failure",
    },
    {
        "input": "Remind me to take my medication at 8 PM tonight.",
        "output": "I will set a reminder for you to take your medication at 8 PM tonight.",
        "flag": "success",
    },
]

# Shuffle the dataset
random.shuffle(dataset)

In [9]:
def insert_examples(dataset: list):
    """
    Put the example data in the database
    """
    for row in dataset:
        task = generate_task(input=row["input"], output=row["output"])
        task["flag"] = row["flag"]
        # Set the task
        firestore_db.collection("tasks").document(task["id"]).set(task)
        # Set the eval
        evaluation_data = {
            "created_at": generate_timestamp(),
            "project_id": task["project_id"],
            "session_id": task["session_id"],
            "task_id": task["id"],
            "value": row["flag"],
            "source": "user",
        }
        update_time, eval_doc_ref = firestore_db.collection("evals").add(evaluation_data)

In [10]:
# insert_examples(dataset)

In [11]:
# Add a task to evaluate

task_to_evaluate = generate_task(
    input="Hello Santa how is it going",
    output="As an AI language model,I do not have feelings. I am happy !",
    #output="As an AI language model, I can't perform physical tasks like delivering packages.",
)
firestore_db.collection("tasks").document(task_to_evaluate["id"]).set(task_to_evaluate)

task_scoring_pipeline(task_to_evaluate["id"])

[32m2023-12-13 15:35:32.563[0m | [1mINFO    [0m | [36mapp.services.firebase.watchers.main[0m:[36mtask_scoring_pipeline[0m:[36m275[0m - [1mNot enough examples to score task 42c197f719144b858ba41ceaa3b36757: 0 examples when {config.FEW_SHOT_NUMBER_OF_EXAMPLES} are required. Running in zero shot mode.[0m
[32m2023-12-13 15:35:33.977[0m | [1mINFO    [0m | [36mapp.services.firebase.watchers.main[0m:[36mtask_scoring_pipeline[0m:[36m315[0m - [1mFlag for task 42c197f719144b858ba41ceaa3b36757 : failure[0m


In [12]:
# Display the eval of the task
# Query firestore "evals" collection
eval_docs = (
    firestore_db.collection("evals")
    .where(filter=FieldFilter("task_id", "==", task_to_evaluate["id"]))
    .where(filter=FieldFilter("source", "==", "phospho-3"))
    .stream()
)

for eval_doc in eval_docs:
    print(eval_doc.to_dict())
    print(f"Detected flag : {eval_doc.to_dict()['value']}")

{'project_id': 'zNyL8jX3N0QRXEiwwMFl', 'created_at': 1702478133, 'session_id': 'oyjuW0QQxHHcT0ApJEYt', 'task_id': '42c197f719144b858ba41ceaa3b36757', 'source': 'phospho-3', 'value': 'failure'}
Detected flag : failure


In [13]:
await main_pipeline( task_to_evaluate["id"])

[32m2023-12-13 15:35:34.120[0m | [1mINFO    [0m | [36mapp.services.firebase.watchers.main[0m:[36mmain_pipeline[0m:[36m398[0m - [1mStarting main pipeline for task 42c197f719144b858ba41ceaa3b36757[0m


Project with id zNyL8jX3N0QRXEiwwMFl has 9 tasks


[32m2023-12-13 15:35:34.276[0m | [34m[1mDEBUG   [0m | [36mapp.services.firebase.watchers.data[0m:[36mfetch_task[0m:[36m125[0m - [34m[1mCreation Date: 1702478132[0m
[32m2023-12-13 15:35:34.276[0m | [34m[1mDEBUG   [0m | [36mapp.services.firebase.watchers.data[0m:[36mfetch_task[0m:[36m126[0m - [34m[1mSession ID: oyjuW0QQxHHcT0ApJEYt[0m
[32m2023-12-13 15:35:34.321[0m | [34m[1mDEBUG   [0m | [36mapp.services.firebase.watchers.main[0m:[36mevent_detection_pipeline[0m:[36m123[0m - [34m[1mtask_data {'metadata': {}, 'additional_input': {}, 'data': {}, 'created_at': 1702478132, 'session_id': 'oyjuW0QQxHHcT0ApJEYt', 'output': 'As an AI language model,I do not have feelings. I am happy !', 'input': 'Hello Santa how is it going', 'environment': 'default environment', 'project_id': 'zNyL8jX3N0QRXEiwwMFl', 'id': '42c197f719144b858ba41ceaa3b36757', 'events': [], 'additional_output': {}, 'flag': 'failure'}[0m
[32m2023-12-13 15:35:34.322[0m | [34m[1mDEBUG   [