# TaskFlowr â€” Kaggle Demo Notebook

This notebook is a runnable demo of **TaskFlowr**, a 3-agent workflow automation system (Coordinator, Automation, Communication). It's built as a self-contained demo so you can run it directly on Kaggle or locally in a Jupyter environment.

## GitHub Repo Link
https://github.com/saad2134/taskflowr/

## Architecture

The architecture is: User -> Coordinator -> Automation Agent & Communication Agent -> Observability/Memory. See the repo for full diagrams.

In [1]:

# Agent implementations (minimal, runnable demo)
import json, time
from typing import Any, Dict

class DummyLogger:
    def __init__(self):
        self.events = []
    def log(self, name, payload):
        self.events.append((name, payload))
    def show(self):
        import pandas as pd
        return pd.DataFrame([{'event': e[0], **(e[1] if isinstance(e[1], dict) else {'data': e[1]})} for e in self.events])

class Memory:
    def __init__(self):
        self.session = []
        self.longterm = []
    def append_session(self, item):
        self.session.append(item)
    def append_longterm(self, item):
        self.longterm.append(item)
    def get_session(self):
        return self.session
    def get_longterm(self):
        return self.longterm

class AutomationAgent:
    def __init__(self, tools=None, logger=None):
        self.tools = tools or {}
        self.logger = logger
    def generate_report(self, payload: str):
        # Very simple synthetic report using no external data
        report = {
            "title": "Weekly Operations Report",
            "summary": "Auto-generated synthetic summary",
            "metrics": {"uptime_pct": 99.92, "tasks_processed": 128, "errors": 2}
        }
        if 'file_write' in self.tools:
            self.tools['file_write']('report.json', json.dumps(report, indent=2))
        if self.logger:
            self.logger.log('report_generated', {'size': len(json.dumps(report))})
        return {'report': report}
    def generic(self, payload: str):
        return {'echo': payload}

class CommunicationAgent:
    def __init__(self, memory=None, logger=None):
        self.memory = memory
        self.logger = logger
    def draft_email(self, payload: str):
        subject = "Weekly Ops Update"
        body = (
            "Hello team,\n\n"
            "Please find the weekly operations summary attached. Key highlights: uptime stable, tasks processed.\n\n"
            "Regards,\nOpsFlow"
        )
        if self.logger:
            self.logger.log('email_drafted', {'subject': subject})
        return {'subject': subject, 'body': body}
    def summarize(self, payload: str):
        summary = payload[:200] + ('...' if len(payload) > 200 else '')
        return {'summary': summary}

class DummyA2A:
    def __init__(self, automation, communication):
        self.automation = automation
        self.communication = communication
    def call(self, name, **kwargs):
        if name == 'automation_agent':
            fn = getattr(self.automation, kwargs.get('action'), None)
            if fn:
                return fn(kwargs.get('payload'))
        if name == 'communication_agent':
            fn = getattr(self.communication, kwargs.get('action'), None)
            if fn:
                return fn(kwargs.get('payload'))
        return {'error': 'missing_agent_or_action'}

class Coordinator:
    def __init__(self, memory=None, a2a_client=None, logger=None):
        self.memory = memory
        self.a2a = a2a_client
        self.logger = logger
    def interpret_intent(self, user_input: str):
        # minimal planner: simple keyword heuristics
        tasks = []
        if any(k in user_input.lower() for k in ['report', 'summary']):
            tasks.append({'agent':'automation','action':'generate_report','input':user_input})
            tasks.append({'agent':'communication','action':'draft_email','input':'send report to team'})
            return {'tasks': tasks}
        if any(k in user_input.lower() for k in ['email','announce','announcement']):
            tasks.append({'agent':'communication','action':'draft_email','input':user_input})
            return {'tasks': tasks}
        # fallback generic
        tasks.append({'agent':'automation','action':'generic','input':user_input})
        return {'tasks': tasks}
    def run(self, user_input: str):
        start = time.time()
        plan = self.interpret_intent(user_input)
        results = []
        for task in plan.get('tasks', []):
            agent = task.get('agent')
            action = task.get('action')
            payload = task.get('input')
            if agent == 'automation':
                out = self.a2a.call('automation_agent', action=action, payload=payload)
            elif agent == 'communication':
                out = self.a2a.call('communication_agent', action=action, payload=payload)
            else:
                out = {'error':'unknown_agent'}
            results.append({'task': task, 'result': out})
            if self.memory:
                self.memory.append_session({'task': task, 'result': out})
        latency = time.time() - start
        if self.logger:
            self.logger.log('coordinator_run', {'latency': latency, 'tasks': len(results)})
        return {'status':'ok','results': results}


In [2]:

# Demo: initialize system and run sample workflows
from pprint import pprint
from json import dumps
logger = DummyLogger()
memory = Memory()
automation = AutomationAgent(tools={'file_write': lambda p, c: open(p, 'w').write(c)}, logger=logger)
communication = CommunicationAgent(memory=memory, logger=logger)
a2a = DummyA2A(automation, communication)
coord = Coordinator(memory=memory, a2a_client=a2a, logger=logger)

# 1) Run a report workflow
print('--- Running: Create a weekly operations report and draft an email ---')
out1 = coord.run('Create a weekly operations report and draft an email to the team.')
pprint(out1)

# 2) Run an email-only workflow
print('\n--- Running: Draft an announcement email ---')
out2 = coord.run('Draft an announcement email for the new workflow changes.')
pprint(out2)

# Show logs captured
print('\n--- Logger events ---')
display(logger.show())

# Show session memory
print('\n--- Session memory ---')
pprint(memory.get_session())


--- Running: Create a weekly operations report and draft an email ---
{'results': [{'result': {'report': {'metrics': {'errors': 2,
                                                'tasks_processed': 128,
                                                'uptime_pct': 99.92},
                                    'summary': 'Auto-generated synthetic '
                                               'summary',
                                    'title': 'Weekly Operations Report'}},
              'task': {'action': 'generate_report',
                       'agent': 'automation',
                       'input': 'Create a weekly operations report and draft '
                                'an email to the team.'}},
             {'result': {'body': 'Hello team,\n'
                                 '\n'
                                 'Please find the weekly operations summary '
                                 'attached. Key highlights: uptime stable, '
                                 'tasks p

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,event,size,subject,latency,tasks
0,report_generated,155.0,,,
1,email_drafted,,Weekly Ops Update,,
2,coordinator_run,,,0.00118,2.0
3,email_drafted,,Weekly Ops Update,,
4,coordinator_run,,,1.5e-05,1.0



--- Session memory ---
[{'result': {'report': {'metrics': {'errors': 2,
                                    'tasks_processed': 128,
                                    'uptime_pct': 99.92},
                        'summary': 'Auto-generated synthetic summary',
                        'title': 'Weekly Operations Report'}},
  'task': {'action': 'generate_report',
           'agent': 'automation',
           'input': 'Create a weekly operations report and draft an email to '
                    'the team.'}},
 {'result': {'body': 'Hello team,\n'
                     '\n'
                     'Please find the weekly operations summary attached. Key '
                     'highlights: uptime stable, tasks processed.\n'
                     '\n'
                     'Regards,\n'
                     'OpsFlow',
             'subject': 'Weekly Ops Update'},
  'task': {'action': 'draft_email',
           'agent': 'communication',
           'input': 'send report to team'}},
 {'result': {'body'

In [3]:

# Simple evaluator that uses the Coordinator to run predefined tests
import json
TESTS = [
    {'id':'checklist_generation','input':'Create a checklist for onboarding a new project manager.'},
    {'id':'summary_generation','input':'Summarize the update: We closed Q3 with 14% growth and hired 3 engineers.'},
    {'id':'report_workflow','input':'Create a weekly operational report template.'},
    {'id':'email_draft','input':'Draft an email announcing the new workflow changes.'},
    {'id':'multi_step','input':'Generate a checklist for performance review and draft an announcement.'}
]
results = []
for t in TESTS:
    out = coord.run(t['input'])
    ok = len(out.get('results', []))>0
    results.append({'id': t['id'], 'ok': ok, 'out': out})
print('\n--- Evaluation Results ---')
print(json.dumps(results, indent=2))



--- Evaluation Results ---
[
  {
    "id": "checklist_generation",
    "ok": true,
    "out": {
      "status": "ok",
      "results": [
        {
          "task": {
            "agent": "automation",
            "action": "generic",
            "input": "Create a checklist for onboarding a new project manager."
          },
          "result": {
            "echo": "Create a checklist for onboarding a new project manager."
          }
        }
      ]
    }
  },
  {
    "id": "summary_generation",
    "ok": true,
    "out": {
      "status": "ok",
      "results": [
        {
          "task": {
            "agent": "automation",
            "action": "generic",
            "input": "Summarize the update: We closed Q3 with 14% growth and hired 3 engineers."
          },
          "result": {
            "echo": "Summarize the update: We closed Q3 with 14% growth and hired 3 engineers."
          }
        }
      ]
    }
  },
  {
    "id": "report_workflow",
    "ok": true,
    "ou