Conversation
…ization This change enables isolated browser instances to share data and coordinate tasks through the Scheduler. Key changes include: - Extended TaskState to support communication channels and shared data. - Added share_data and get_shared_data tools for agents. - Implemented channel density prioritization in ResourceManager to optimize concurrent execution of related tasks. - Added automatic channel inheritance for subtasks. - Included comprehensive tests for the new functionality. Co-authored-by: ngoiyaeric <115367894+ngoiyaeric@users.noreply.github.com>
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
📝 WalkthroughWalkthroughThis PR introduces a channel-based task communication system. It adds communication_channel metadata to tasks, prioritizes executor allocation by channel density, provides inter-task data sharing utilities, and includes comprehensive integration tests for the new orchestration capabilities. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Scheduler
participant ResourceMgr
participant Tools
participant Task as Task (with channel)
Client->>Scheduler: schedule_task(communication_channel='ch1')
Scheduler->>Scheduler: create TaskState with channel
Scheduler->>Scheduler: group by channel priority
Client->>ResourceMgr: allocate_executors()
ResourceMgr->>ResourceMgr: _prioritize_tasks_by_channel()
ResourceMgr->>ResourceMgr: order channels by density
ResourceMgr->>Scheduler: fetch prioritized task_ids
Task->>Tools: share_data('ch1', data)
Tools->>Scheduler: find all tasks with channel='ch1'
Scheduler->>Scheduler: update shared_data on each task
Tools-->>Task: success
Task->>Tools: get_shared_data('ch1')
Tools->>Scheduler: aggregate shared_data from channel
Tools-->>Task: combined_data
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Compliance Guide 🔍Below is a summary of compliance checks for this PR:
Compliance status legend🟢 - Fully Compliant🟡 - Partial Compliant 🔴 - Not Compliant ⚪ - Requires Further Human Verification 🏷️ - Compliance label |
||||||||||||||||||||||||||
PR Code Suggestions ✨Explore these optional code suggestions:
|
|||||||||||||||
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
blastai/scheduler.py (1)
244-278:⚠️ Potential issue | 🟠 MajorSubtasks don’t automatically inherit the parent channel when callers omit the parameter.
Directschedule_subtask(...)calls will passNoneeven if the parent has a channel, which breaks the “automatic inheritance” guarantee.🛠️ Suggested fix: default to parent channel when omitted
def schedule_subtask( self, description: str, parent_task_id: str, cache_control: str = "", interactive_queues: Optional[Dict[str, asyncio.Queue]] = None, initial_url: Optional[str] = None, communication_channel: Optional[str] = None, ) -> str: """Schedule a subtask of an existing task. @@ if parent_task_id not in self.tasks: raise ValueError(f"Parent task {parent_task_id} not found") + if communication_channel is None: + communication_channel = self.tasks[parent_task_id].communication_channel @@ task_id = self.schedule_task( description=description, parent_task_id=parent_task_id, prerequisite_task_id=None, # Explicitly set no prerequisite cache_control=cache_control, interactive_queues=interactive_queues, initial_url=initial_url, communication_channel=communication_channel, )
🤖 Fix all issues with AI agents
In `@blastai/tools.py`:
- Around line 152-195: The share_data and get_shared_data handlers allow any
task to access any channel by accepting an arbitrary channel string; to enforce
channel isolation, look up the caller task via self.task_id in scheduler.tasks
and verify its communication_channel matches the provided channel, and if not
reject the request (e.g., return an error ActionResult or raise) before
proceeding; apply this check at the start of both share_data and
get_shared_data, keeping existing logic (target filtering, updating
task.shared_data, and json.dumps) only after the authorization check.
In `@tests/test_browser_communication.py`:
- Around line 72-90: The test should stub the cache manager's get_result and
get_plan to return falsy values to prevent implicit cache hits that let
schedule_task auto-complete; locate the cache manager used in the test setup
(the MagicMock passed into Scheduler) and set its get_result and get_plan to
return None (or AsyncMock returning None if awaited) before calling
scheduler.schedule_task in test_share_data_with_targets, and apply the same
stubbing to the other test block referenced (lines 92-110) so tasks do not
auto-complete due to truthy MagicMock defaults.
| @self.controller.action("Share data with related browsers") | ||
| async def share_data(channel: str, data: str, target_task_ids: Optional[str] = None) -> ActionResult: | ||
| """Share data across browsers in the same problem space. | ||
|
|
||
| Args: | ||
| channel: Communication channel ID | ||
| data: Data to share (string) | ||
| target_task_ids: Optional comma-separated list of task IDs to share with. | ||
| If not provided, shares with all tasks in the channel. | ||
| """ | ||
| # Get all tasks in the same communication channel | ||
| related_tasks = [ | ||
| task | ||
| for task in scheduler.tasks.values() | ||
| if task.communication_channel == channel and task.id != self.task_id | ||
| ] | ||
|
|
||
| # Filter by target_task_ids if provided | ||
| if target_task_ids: | ||
| targets = [tid.strip() for tid in target_task_ids.split(",")] | ||
| related_tasks = [task for task in related_tasks if task.id in targets] | ||
|
|
||
| # Update shared data for all related tasks | ||
| for task in related_tasks: | ||
| if not task.shared_data: | ||
| task.shared_data = {} | ||
| task.shared_data[self.task_id] = data | ||
|
|
||
| return ActionResult(extracted_content=f"Data shared with {len(related_tasks)} browsers") | ||
|
|
||
| @self.controller.action("Get shared data from problem space") | ||
| async def get_shared_data(channel: str) -> ActionResult: | ||
| """Retrieve data shared by other browsers in the problem space. | ||
|
|
||
| Args: | ||
| channel: Communication channel ID | ||
| """ | ||
| shared_data = {} | ||
| for task in scheduler.tasks.values(): | ||
| if task.communication_channel == channel and task.shared_data: | ||
| shared_data.update(task.shared_data) | ||
|
|
||
| return ActionResult(extracted_content=f"Shared data: {json.dumps(shared_data)}") | ||
|
|
There was a problem hiding this comment.
Enforce channel isolation in share_data/get_shared_data.
Right now any task can read/write any channel by passing a channel string, which bypasses boundaries.
🔒 Suggested fix: reject cross-channel access
`@self.controller.action`("Share data with related browsers")
async def share_data(channel: str, data: str, target_task_ids: Optional[str] = None) -> ActionResult:
"""Share data across browsers in the same problem space.
@@
- # Get all tasks in the same communication channel
+ if self.communication_channel is None:
+ return ActionResult(success=False, error="Task is not in a communication channel")
+ if channel != self.communication_channel:
+ return ActionResult(success=False, error="Channel mismatch")
+
+ # Get all tasks in the same communication channel
related_tasks = [
task
for task in scheduler.tasks.values()
if task.communication_channel == channel and task.id != self.task_id
]
@@
`@self.controller.action`("Get shared data from problem space")
async def get_shared_data(channel: str) -> ActionResult:
"""Retrieve data shared by other browsers in the problem space.
@@
- shared_data = {}
+ if self.communication_channel is None:
+ return ActionResult(success=False, error="Task is not in a communication channel")
+ if channel != self.communication_channel:
+ return ActionResult(success=False, error="Channel mismatch")
+
+ shared_data = {}
for task in scheduler.tasks.values():
if task.communication_channel == channel and task.shared_data:
shared_data.update(task.shared_data)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @self.controller.action("Share data with related browsers") | |
| async def share_data(channel: str, data: str, target_task_ids: Optional[str] = None) -> ActionResult: | |
| """Share data across browsers in the same problem space. | |
| Args: | |
| channel: Communication channel ID | |
| data: Data to share (string) | |
| target_task_ids: Optional comma-separated list of task IDs to share with. | |
| If not provided, shares with all tasks in the channel. | |
| """ | |
| # Get all tasks in the same communication channel | |
| related_tasks = [ | |
| task | |
| for task in scheduler.tasks.values() | |
| if task.communication_channel == channel and task.id != self.task_id | |
| ] | |
| # Filter by target_task_ids if provided | |
| if target_task_ids: | |
| targets = [tid.strip() for tid in target_task_ids.split(",")] | |
| related_tasks = [task for task in related_tasks if task.id in targets] | |
| # Update shared data for all related tasks | |
| for task in related_tasks: | |
| if not task.shared_data: | |
| task.shared_data = {} | |
| task.shared_data[self.task_id] = data | |
| return ActionResult(extracted_content=f"Data shared with {len(related_tasks)} browsers") | |
| @self.controller.action("Get shared data from problem space") | |
| async def get_shared_data(channel: str) -> ActionResult: | |
| """Retrieve data shared by other browsers in the problem space. | |
| Args: | |
| channel: Communication channel ID | |
| """ | |
| shared_data = {} | |
| for task in scheduler.tasks.values(): | |
| if task.communication_channel == channel and task.shared_data: | |
| shared_data.update(task.shared_data) | |
| return ActionResult(extracted_content=f"Shared data: {json.dumps(shared_data)}") | |
| `@self.controller.action`("Share data with related browsers") | |
| async def share_data(channel: str, data: str, target_task_ids: Optional[str] = None) -> ActionResult: | |
| """Share data across browsers in the same problem space. | |
| Args: | |
| channel: Communication channel ID | |
| data: Data to share (string) | |
| target_task_ids: Optional comma-separated list of task IDs to share with. | |
| If not provided, shares with all tasks in the channel. | |
| """ | |
| if self.communication_channel is None: | |
| return ActionResult(success=False, error="Task is not in a communication channel") | |
| if channel != self.communication_channel: | |
| return ActionResult(success=False, error="Channel mismatch") | |
| # Get all tasks in the same communication channel | |
| related_tasks = [ | |
| task | |
| for task in scheduler.tasks.values() | |
| if task.communication_channel == channel and task.id != self.task_id | |
| ] | |
| # Filter by target_task_ids if provided | |
| if target_task_ids: | |
| targets = [tid.strip() for tid in target_task_ids.split(",")] | |
| related_tasks = [task for task in related_tasks if task.id in targets] | |
| # Update shared data for all related tasks | |
| for task in related_tasks: | |
| if not task.shared_data: | |
| task.shared_data = {} | |
| task.shared_data[self.task_id] = data | |
| return ActionResult(extracted_content=f"Data shared with {len(related_tasks)} browsers") | |
| `@self.controller.action`("Get shared data from problem space") | |
| async def get_shared_data(channel: str) -> ActionResult: | |
| """Retrieve data shared by other browsers in the problem space. | |
| Args: | |
| channel: Communication channel ID | |
| """ | |
| if self.communication_channel is None: | |
| return ActionResult(success=False, error="Task is not in a communication channel") | |
| if channel != self.communication_channel: | |
| return ActionResult(success=False, error="Channel mismatch") | |
| shared_data = {} | |
| for task in scheduler.tasks.values(): | |
| if task.communication_channel == channel and task.shared_data: | |
| shared_data.update(task.shared_data) | |
| return ActionResult(extracted_content=f"Shared data: {json.dumps(shared_data)}") |
🤖 Prompt for AI Agents
In `@blastai/tools.py` around lines 152 - 195, The share_data and get_shared_data
handlers allow any task to access any channel by accepting an arbitrary channel
string; to enforce channel isolation, look up the caller task via self.task_id
in scheduler.tasks and verify its communication_channel matches the provided
channel, and if not reject the request (e.g., return an error ActionResult or
raise) before proceeding; apply this check at the start of both share_data and
get_shared_data, keeping existing logic (target filtering, updating
task.shared_data, and json.dumps) only after the authorization check.
| async def test_share_data_with_targets(): | ||
| constraints = Constraints() | ||
| scheduler = Scheduler(constraints, MagicMock(), MagicMock()) | ||
| resource_manager = MagicMock() | ||
|
|
||
| tid_1 = scheduler.schedule_task("T1", communication_channel="ch") | ||
| tid_2 = scheduler.schedule_task("T2", communication_channel="ch") | ||
| tid_3 = scheduler.schedule_task("T3", communication_channel="ch") | ||
|
|
||
| tools_1 = Tools(scheduler=scheduler, task_id=tid_1, resource_manager=resource_manager) | ||
|
|
||
| # Share only with tid_2 | ||
| await tools_1.controller.registry.registry.actions["share_data"].function( | ||
| channel="ch", data="secret", target_task_ids=tid_2 | ||
| ) | ||
|
|
||
| assert scheduler.tasks[tid_2].shared_data[tid_1] == "secret" | ||
| assert scheduler.tasks[tid_3].shared_data is None or tid_1 not in scheduler.tasks[tid_3].shared_data | ||
|
|
There was a problem hiding this comment.
Stub cache_manager.get_result/get_plan to avoid implicit cache hits.
MagicMock is truthy by default, so schedule_task can auto-complete tasks and mask future regressions.
🧪 Suggested test hardening
async def test_share_data_with_targets():
constraints = Constraints()
- scheduler = Scheduler(constraints, MagicMock(), MagicMock())
+ cache_manager = MagicMock()
+ cache_manager.get_result.return_value = None
+ cache_manager.get_plan.return_value = None
+ scheduler = Scheduler(constraints, cache_manager, MagicMock())
resource_manager = MagicMock()
@@
async def test_channel_inheritance():
constraints = Constraints()
- scheduler = Scheduler(constraints, MagicMock(), MagicMock())
+ cache_manager = MagicMock()
+ cache_manager.get_result.return_value = None
+ cache_manager.get_plan.return_value = None
+ scheduler = Scheduler(constraints, cache_manager, MagicMock())
resource_manager = MagicMock()Also applies to: 92-110
🤖 Prompt for AI Agents
In `@tests/test_browser_communication.py` around lines 72 - 90, The test should
stub the cache manager's get_result and get_plan to return falsy values to
prevent implicit cache hits that let schedule_task auto-complete; locate the
cache manager used in the test setup (the MagicMock passed into Scheduler) and
set its get_result and get_plan to return None (or AsyncMock returning None if
awaited) before calling scheduler.schedule_task in test_share_data_with_targets,
and apply the same stubbing to the other test block referenced (lines 92-110) so
tasks do not auto-complete due to truthy MagicMock defaults.
…ution - Extended TaskState with communication_channel and shared_data - Added share_data and get_shared_data tools for cross-browser coordination - Implemented density-based task prioritization in ResourceManager - Added automatic communication channel inheritance for subtasks - Added comprehensive unit tests for all new features - Ensured GIL-safe concurrent access to shared scheduler state Co-authored-by: ngoiyaeric <115367894+ngoiyaeric@users.noreply.github.com>
User description
Implemented inter-browser communication by extending the Scheduler to act as a data hub and updating the ResourceManager to prioritize tasks in the same communication channel. Added new tools for agents to share and retrieve data within these channels, and ensured subtasks inherit their parent's channel for seamless coordination.
PR created automatically by Jules for task 15229832897328717329 started by @ngoiyaeric
PR Type
Enhancement
Description
Implement inter-browser communication via communication channels
Prioritize tasks by channel density in ResourceManager
Enable automatic channel inheritance for subtasks
Add comprehensive test coverage for new functionality
Diagram Walkthrough
File Walkthrough
scheduler.py
Add communication channel support to task schedulingblastai/scheduler.py
dataclass
communication_channel parameter
3)
tasks
resource_manager.py
Add channel-aware task prioritization logicblastai/resource_manager.py
channel density
last
tools.py
Add inter-browser communication tools and channel inheritanceblastai/tools.py
channel
test_browser_communication.py
Add comprehensive tests for inter-browser communicationtests/test_browser_communication.py
workflow
prioritization
Summary by CodeRabbit
New Features
✏️ Tip: You can customize this high-level summary in your review settings.