Skip to content

Commit

Permalink
flow example, save and load state
Browse files Browse the repository at this point in the history
  • Loading branch information
kyegomez committed Nov 3, 2023
1 parent f53236a commit 7d888c6
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 80 deletions.
9 changes: 7 additions & 2 deletions flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
# dynamic_temperature=False, # Set to 'True' for dynamic temperature handling.
)


# out = flow.load_state("flow_state.json")
# temp = flow.dynamic_temperature()
# filter = flow.add_response_filter("Trump")
out = flow.run("Generate a 10,000 word blog on health and wellness.")

# out = flow.validate_response(out)
# out = flow.analyze_feedback(out)
# out = flow.print_history_and_memory()
# out = flow.save_state("flow_state.json")
print(out)
14 changes: 14 additions & 0 deletions flow_state.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"memory": [
[
"Human: Generate a 10,000 word blog on health and wellness."
]
],
"llm_params": {},
"loop_interval": 1,
"retry_attempts": 3,
"retry_interval": 1,
"interactive": false,
"dashboard": true,
"dynamic_temperature": false
}
1 change: 1 addition & 0 deletions swarms/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from swarms.agents.omni_modal_agent import OmniModalAgent
from swarms.agents.hf_agents import HFAgent
from swarms.agents.message import Message

# from swarms.agents.stream_response import stream
from swarms.agents.base import AbstractAgent
from swarms.agents.registry import Registry
Expand Down
2 changes: 1 addition & 1 deletion swarms/models/distilled_whisperx.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""
"""
"""
2 changes: 1 addition & 1 deletion swarms/models/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def print_dashboard(self, task: str):
)

print(dashboard)

def set_device(self, device):
"""
Changes the device used for inference.
Expand Down
233 changes: 183 additions & 50 deletions swarms/structs/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
TODO:
- Add tools
- Add open interpreter style conversation
- Add configurable save and restore so the user can restore from previus flows
- Add memory vector database retrieval
"""

import json
import logging
import time
from typing import Any, Callable, Dict, List, Optional, Tuple, Generator
from typing import Any, Callable, Dict, List, Optional, Tuple
from termcolor import colored
import inspect
import random
# from swarms.tools.tool import BaseTool


# Constants
Expand All @@ -36,7 +34,6 @@
This will enable you to leave the flow loop.
"""


# Custome stopping condition
def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response
Expand Down Expand Up @@ -209,7 +206,7 @@ def print_dashboard(self, task: str):

print(dashboard)

def run(self, task: str, **kwargs):
def run(self, task: str, save: bool = True, **kwargs):
"""
Run the autonomous agent loop
Expand All @@ -223,17 +220,29 @@ def run(self, task: str, **kwargs):
4. If stopping condition is not met, generate a response
5. Repeat until stopping condition is met or max_loops is reached
Example:
>>> out = flow.run("Generate a 10,000 word blog on health and wellness.")
"""
# Start with a new history or continue from the last saved state
if not self.memory or not self.memory[-1]:
history = [f"Human: {task}"]
else:
history = self.memory[-1]

response = task
history = [f"Human: {task}"]

# If dashboard = True then print the dashboard
if self.dashboard:
self.print_dashboard(task)

for i in range(self.max_loops):
# Start or continue the loop process
for i in range(len(history), self.max_loops):
print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue"))
print("\n")
response = history[-1].split(": ", 1)[-1] # Get the last response

if self._check_stopping_condition(response) or parse_done_token(response):
break

Expand All @@ -245,15 +254,8 @@ def run(self, task: str, **kwargs):
while attempt < self.retry_attempts:
try:
response = self.llm(
f"""
SYSTEM_PROMPT:
{FLOW_SYSTEM_PROMPT}
History: {response}
""",
**kwargs,
self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response)
** kwargs,
)
# print(f"Next query: {response}")
# break
Expand All @@ -274,6 +276,10 @@ def run(self, task: str, **kwargs):
history.append(response)
time.sleep(self.loop_interval)
self.memory.append(history)

if save:
self.save("flow_history.json")

return response # , history

def _run(self, **kwargs: Any) -> str:
Expand All @@ -283,32 +289,31 @@ def _run(self, **kwargs: Any) -> str:
logging.info(f"Message history: {history}")
return response

def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
"""Generate responses for multiple input sets."""
return [self.run(**input_data) for input_data in inputs]

def run_dynamically(self, task: str, max_loops: Optional[int] = None):
def agent_history_prompt(
self,
system_prompt: str = FLOW_SYSTEM_PROMPT,
history=None,
):
"""
Run the autonomous agent loop dynamically based on the <DONE>
# Usage Example
Generate the agent history prompt
# Initialize the Flow
flow = Flow(llm=lambda x: x, max_loops=5)
# Run dynamically based on <DONE> token and optional max loops
response = flow.run_dynamically("Generate a report <DONE>", max_loops=3)
print(response)
Args:
system_prompt (str): The system prompt
history (List[str]): The history of the conversation
response = flow.run_dynamically("Generate a report <DONE>")
print(response)
Returns:
str: The agent history prompt
"""
agent_history_prompt = f"""
SYSTEM_PROMPT: {system_prompt}
History: {history}
"""
if "<DONE>" in task:
self.stopping_condition = parse_done_token
self.max_loops = max_loops or float("inf")
response = self.run(task)
return response
return agent_history_prompt

def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
"""Generate responses for multiple input sets."""
return [self.run(**input_data) for input_data in inputs]

@staticmethod
def from_llm_and_template(llm: Any, template: str) -> "Flow":
Expand Down Expand Up @@ -339,6 +344,60 @@ def validate_response(self, response: str) -> bool:
return False
return True

def print_history_and_memory(self):
"""
Prints the entire history and memory of the flow.
Each message is colored and formatted for better readability.
"""
print(colored("Flow History and Memory", "cyan", attrs=["bold"]))
print(colored("========================", "cyan", attrs=["bold"]))
for loop_index, history in enumerate(self.memory, start=1):
print(colored(f"\nLoop {loop_index}:", "yellow", attrs=["bold"]))
for message in history:
speaker, _, message_text = message.partition(": ")
if "Human" in speaker:
print(colored(f"{speaker}:", "green") + f" {message_text}")
else:
print(colored(f"{speaker}:", "blue") + f" {message_text}")
print(colored("------------------------", "cyan"))
print(colored("End of Flow History", "cyan", attrs=["bold"]))

def step(self, task: str, **kwargs):
"""
Executes a single step in the flow interaction, generating a response
from the language model based on the given input text.
Args:
input_text (str): The input text to prompt the language model with.
Returns:
str: The language model's generated response.
Raises:
Exception: If an error occurs during response generation.
"""
try:
# Generate the response using lm
response = self.llm(task, **kwargs)

# Update the flow's history with the new interaction
if self.interactive:
self.memory.append(f"AI: {response}")
self.memory.append(f"Human: {task}")
else:
self.memory.append(f"AI: {response}")

return response
except Exception as error:
logging.error(f"Error generating response: {error}")
raise

def graceful_shutdown(self):
"""Gracefully shutdown the system saving the state"""
return self.save_state("flow_state.json")

def run_with_timeout(self, task: str, timeout: int = 60) -> str:
"""Run the loop but stop if it takes longer than the timeout"""
start_time = time.time()
Expand Down Expand Up @@ -455,23 +514,97 @@ def streamed_generation(self, prompt: str) -> str:
print()
return response

def streamed_token_generation(self, prompt: str) -> Generator[str, None, None]:
def get_llm_params(self):
"""
Extracts and returns the parameters of the llm object for serialization.
It assumes that the llm object has an __init__ method with parameters that can be used to recreate it.
"""
Generate tokens in real-time for a given prompt.
if not hasattr(self.llm, "__init__"):
return None

This method simulates the real-time generation of each token.
For simplicity, we treat each character of the input as a token
and yield them with a slight delay. In a real-world scenario,
this would involve using the LLM's internal methods to generate
the response token by token.
init_signature = inspect.signature(self.llm.__init__)
params = init_signature.parameters
llm_params = {}

for name, param in params.items():
if name == "self":
continue
if hasattr(self.llm, name):
value = getattr(self.llm, name)
if isinstance(
value, (str, int, float, bool, list, dict, tuple, type(None))
):
llm_params[name] = value
else:
llm_params[name] = str(
value
) # For non-serializable objects, save their string representation.

return llm_params

def save_state(self, file_path: str) -> None:
"""
Saves the current state of the flow to a JSON file, including the llm parameters.
Args:
prompt (str): The input prompt for which the tokens should be generated.
file_path (str): The path to the JSON file where the state will be saved.
Yields:
str: The next token (character) from the generated response.
Example:
>>> flow.save_state('saved_flow.json')
"""
tokens = list(prompt)
for token in tokens:
time.sleep(0.1)
yield token
state = {
"memory": self.memory,
# "llm_params": self.get_llm_params(),
"loop_interval": self.loop_interval,
"retry_attempts": self.retry_attempts,
"retry_interval": self.retry_interval,
"interactive": self.interactive,
"dashboard": self.dashboard,
"dynamic_temperature": self.dynamic_temperature,
}

with open(file_path, "w") as f:
json.dump(state, f, indent=4)

saved = colored("Saved flow state to", "green")
print(f"{saved} {file_path}")

def load_state(self, file_path: str):
"""
Loads the state of the flow from a json file and restores the configuration and memory.
Example:
>>> flow = Flow(llm=llm_instance, max_loops=5)
>>> flow.load_state('saved_flow.json')
>>> flow.run("Continue with the task")
"""
with open(file_path, "r") as f:
state = json.load(f)

# Assuming 'llm_class' is a class reference to the language
# llm_params = state.get("llm_params", {})
# self.llm = self.llm(**llm_params)

# Restore other saved attributes
self.memory = state.get("memory", [])
self.max_loops = state.get("max_loops", 5)
self.loop_interval = state.get("loop_interval", 1)
self.retry_attempts = state.get("retry_attempts", 3)
self.retry_interval = state.get("retry_interval", 1)
self.interactive = state.get("interactive", False)

print(f"Flow state loaded from {file_path}")

def retry_on_failure(self, function, retries: int = 3, retry_delay: int = 1):
"""Retry wrapper for LLM calls."""
attempt = 0
while attempt < retries:
try:
return function()
except Exception as error:
logging.error(f"Error generating response: {error}")
attempt += 1
time.sleep(retry_delay)
raise Exception("All retry attempts failed")
Loading

0 comments on commit 7d888c6

Please sign in to comment.