Skip to content

Commit

Permalink
Made agent and tasks private but going to rever that change
Browse files Browse the repository at this point in the history
  • Loading branch information
bhancockio committed May 17, 2024
1 parent 9e8ccfe commit 0118c0f
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 184 deletions.
89 changes: 45 additions & 44 deletions src/crewai/cli/templates/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,50 @@
# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool


@CrewBase
class {{crew_name}}Crew():
"""{{crew_name}} crew"""
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'

@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
# tools=[MyCustomTool()], # Example of custom tool, loaded on the beginning of file
verbose=True
)

@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)

@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
agent=self.researcher()
)

@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
agent=self.reporting_analyst(),
output_file='report.md'
)

@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=2,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)
"""{{crew_name}} crew"""
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'

@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
# tools=[MyCustomTool()], # Example of custom tool, loaded on the beginning of file
verbose=True
)

@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)

@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
agent=self.researcher()
)

@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
agent=self.reporting_analyst(),
output_file='report.md'
)

@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=2,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)
72 changes: 43 additions & 29 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class Crew(BaseModel):

cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[Agent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
_tasks: List[Task] = PrivateAttr(default_factory=list)
_agents: List[Agent] = PrivateAttr(default_factory=list)
_process: Process = PrivateAttr(default=Process.sequential)
verbose: Union[int, bool] = Field(default=0)
memory: bool = Field(
default=False,
Expand Down Expand Up @@ -128,6 +128,12 @@ class Crew(BaseModel):
description="output_log_file",
)

def __init__(self, **data: Any):
super().__init__(**data)
self._agents = data.get('agents', [])
self._tasks = data.get('tasks', [])
self._process = data.get('process', Process.sequential)

@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
Expand Down Expand Up @@ -183,7 +189,7 @@ def create_crew_memory(self) -> "Crew":
@model_validator(mode="after")
def check_manager_llm(self):
"""Validates that the language model is set when using hierarchical process."""
if self.process == Process.hierarchical:
if self._process == Process.hierarchical:
if not self.manager_llm and not self.manager_agent:
raise PydanticCustomError(
"missing_manager_llm_or_manager_agent",
Expand All @@ -205,7 +211,11 @@ def check_manager_llm(self):
@model_validator(mode="after")
def check_config(self):
"""Validates that the crew is properly configured with agents and tasks."""
if not self.config and not self.tasks and not self.agents:
print("Checking config", self.config)
print("tasks", self._tasks)
print("agents", self._agents)
# TODO: See if we can drop not self.tasks and not self.agents since moving to thread safe
if not self.config and not self._tasks and not self._agents:
raise PydanticCustomError(
"missing_keys",
"Either 'agents' and 'tasks' need to be set or 'config'.",
Expand All @@ -215,8 +225,10 @@ def check_config(self):
if self.config:
self._setup_from_config()

if self.agents:
for agent in self.agents:
# TODO: See if we can drop cache check since moving to thread safe
# TODO: See if we should pass thread_local to agents or self....
if self._agents:
for agent in self._agents:
if self.cache:
agent.set_cache_handler(self._cache_handler)
if self.max_rpm:
Expand All @@ -233,9 +245,10 @@ def _setup_from_config(self):
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {}
)

self.process = self.config.get("process", self.process)
self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
self._process = self.config.get("process", self._process)
self._agents = [Agent(**agent) for agent in self.config["agents"]]
self._tasks = [self._create_task(task)
for task in self.config["tasks"]]

def _create_task(self, task_config: Dict[str, Any]) -> Task:
"""Creates a task instance from its configuration.
Expand All @@ -247,28 +260,31 @@ def _create_task(self, task_config: Dict[str, Any]) -> Task:
A task instance.
"""
task_agent = next(
agt for agt in self.agents if agt.role == task_config["agent"]
agt for agt in self._agents if agt.role == task_config["agent"]
)
del task_config["agent"]
return Task(**task_config, agent=task_agent)

def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
"""Starts the crew to work on its assigned tasks."""
self._thread_local.agents = copy.deepcopy(self._agents)
self._thread_local.tasks = copy.deepcopy(self._tasks)

# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
self._set_tasks_callbacks

# Initialize thread-local agents and tasks
if not hasattr(self._thread_local, 'initialized'):
self._initialize_thread_specific_components()
self._thread_local.initialized = True

# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()

self._thread_local.execution_span = self._telemetry.crew_execution_span(
self)

i18n = I18N(prompt_file=self.prompt_file)

for agent in self.agents:
for agent in self._thread_local.agents:
agent.i18n = i18n
agent.crew = self

Expand All @@ -282,18 +298,18 @@ def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
metrics = []

try:
if self.process == Process.sequential:
if self._process == Process.sequential:
result = self._run_sequential_process(
thread_local=self._thread_local)
elif self.process == Process.hierarchical:
elif self._process == Process.hierarchical:
# type: ignore # Unpacking a string is disallowed
result, manager_metrics = self._run_hierarchical_process(
thread_local=self._thread_local)
# type: ignore # Cannot determine type of "manager_metrics"
metrics.append(manager_metrics)
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet.")
f"The process '{self._process}' is not implemented yet.")

metrics = metrics + [agent._token_process.get_summary()
for agent in self._thread_local.agents]
Expand Down Expand Up @@ -321,10 +337,6 @@ def _initialize_thread_specific_components(self):
self._long_term_memory)
self._thread_local.entity_memory = copy.deepcopy(
self._entity_memory)
self._thread_local.agents = [copy.deepcopy(
agent) for agent in self._shared_agents]
self._thread_local.tasks = [copy.deepcopy(
task) for task in self._shared_tasks]

def kickoff_for_each(self, inputs_list: List[Dict[str, Any]], use_threading: bool = True, max_threads: int = 10) -> List[str]:
"""Start multiple instances of crew to work on its assigned tasks for each input in a new thread if threading is enabled."""
Expand Down Expand Up @@ -457,11 +469,10 @@ def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
print("Interpolating inputs")
print(inputs)
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
for task in self._thread_local.tasks:
task.interpolate_inputs(inputs)
[task.interpolate_inputs(inputs) for task in self._thread_local.tasks]
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
for agent in self._thread_local.agents:
agent.interpolate_inputs(inputs)
[agent.interpolate_inputs(inputs)
for agent in self._thread_local.agents]

def _format_output(self, output: str) -> str:
"""Formats the output of the crew execution."""
Expand All @@ -475,12 +486,15 @@ def _format_output(self, output: str) -> str:

def _finish_execution(self, output, thread_local=None) -> None:
if self.max_rpm:
thread_local.rpm_controller.stop_rpm_counter()
if thread_local:
thread_local.rpm_controller.stop_rpm_counter()
else:
self._rpm_controller.stop_rpm_counter()
self._telemetry.end_crew(self, output)

def _reset_thread_local(self):
"""Resets the thread-local storage."""
self._thread_local.initialized = False

def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
return f"Crew(id={self.id}, process={self._process}, number_of_agents={len(self._agents)}, number_of_tasks={len(self._tasks)})"
Loading

0 comments on commit 0118c0f

Please sign in to comment.