Skip to content

Commit

Permalink
AutoGen cache using Azure Cosmos DB (#2327)
Browse files Browse the repository at this point in the history
* Create cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Create test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache_factory.py

* Update cache.py

* Update cache_factory.py

* Update test_cache.py

* Update test_cache.py

* Update cache.py

* Update llm-caching.md

* Update cache.py

* Update cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update build.yml

* Update build.yml

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update autogen/cache/cache_factory.py

Co-authored-by: Chi Wang <wang.chi@microsoft.com>

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update .github/workflows/build.yml

Co-authored-by: Chi Wang <wang.chi@microsoft.com>

* Update cache.py

* Update cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache_factory.py

* Update cache_factory.py

* Update cache.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache.py

* Update cache_factory.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update test_cache.py

* Update test_cache.py

* Update test_cache.py

* Update cache.py

* Update cache.py

* Update cache_factory.py

* Update cache.py

* Update cache_factory.py

* Update test_cache.py

* Update test_cache.py

* Update cache.py

* Update cache.py

* Update test_cache.py

* Update cache.py

* Update cache.py

* Update cache_factory.py

* Update cache_factory.py

* Update cache_factory.py

* Update cache_factory.py

* Update cache_factory.py

* Update build.yml

* Update test_cache.py

* Update test_cosmos_db_cache.py

* Update test_cache.py

* Update cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update test_cache.py

* Update test_cosmos_db_cache.py

* Update test_cache.py

* Update test_cosmos_db_cache.py

* Update build.yml

* Update build.yml

* Update build.yml

* Update build.yml

* Update cache_factory.py

* Update cache.py

* Update cosmos_db_cache.py

* Update cache.py

* Update build.yml

* Update test_cache.py

* Update test_cache.py

* Update test_cache.py

* Update test_cache.py

* Update cache_factory.py

* Update cosmos_db_cache.py

* Update test_cache.py

* Update test_cache.py

* Update test_cache.py

* Update test_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update test_cache.py

* Update test_cosmos_db_cache.py

* Update cache.py

* Update cache.py

* Update cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

* Update cache.py

* Update test_cosmos_db_cache.py

* Update cosmos_db_cache.py

* Update cache.py

* Update test_cosmos_db_cache.py

* Update test_cosmos_db_cache.py

---------

Co-authored-by: Chi Wang <wang.chi@microsoft.com>
  • Loading branch information
wmwxwa and sonichi committed Apr 26, 2024
1 parent 0d29cfb commit fbcc56c
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 51 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
- name: Install packages and dependencies
run: |
python -m pip install --upgrade pip wheel
pip install -e .
pip install -e .[cosmosdb]
python -c "import autogen"
pip install pytest mock
- name: Install optional dependencies for code executors
Expand All @@ -67,12 +67,16 @@ jobs:
if: matrix.python-version != '3.10' && matrix.os != 'ubuntu-latest'
run: |
pytest test --ignore=test/agentchat/contrib --skip-openai --skip-docker --durations=10 --durations-min=1.0
- name: Coverage
- name: Coverage with Redis
if: matrix.python-version == '3.10'
run: |
pip install -e .[test,redis,websockets]
coverage run -a -m pytest test --ignore=test/agentchat/contrib --skip-openai --durations=10 --durations-min=1.0
coverage xml
- name: Test with Cosmos DB
run: |
pip install -e .[test,cosmosdb]
coverage run -a -m pytest test/cache/test_cosmos_db_cache.py --skip-openai --durations=10 --durations-min=1.0
- name: Upload coverage to Codecov
if: matrix.python-version == '3.10'
uses: codecov/codecov-action@v3
Expand Down
45 changes: 40 additions & 5 deletions autogen/cache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import sys
from types import TracebackType
from typing import Any, Dict, Optional, Type, Union
from typing import Any, Dict, Optional, Type, TypedDict, Union

from .abstract_cache_base import AbstractCache
from .cache_factory import CacheFactory
Expand All @@ -26,7 +26,12 @@ class Cache(AbstractCache):
cache: The cache instance created based on the provided configuration.
"""

ALLOWED_CONFIG_KEYS = ["cache_seed", "redis_url", "cache_path_root"]
ALLOWED_CONFIG_KEYS = [
"cache_seed",
"redis_url",
"cache_path_root",
"cosmos_db_config",
]

@staticmethod
def redis(cache_seed: Union[str, int] = 42, redis_url: str = "redis://localhost:6379/0") -> "Cache":
Expand Down Expand Up @@ -56,6 +61,32 @@ def disk(cache_seed: Union[str, int] = 42, cache_path_root: str = ".cache") -> "
"""
return Cache({"cache_seed": cache_seed, "cache_path_root": cache_path_root})

@staticmethod
def cosmos_db(
connection_string: Optional[str] = None,
container_id: Optional[str] = None,
cache_seed: Union[str, int] = 42,
client: Optional[any] = None,
) -> "Cache":
"""
Create a Cosmos DB cache instance with 'autogen_cache' as database ID.
Args:
connection_string (str, optional): Connection string to the Cosmos DB account.
container_id (str, optional): The container ID for the Cosmos DB account.
cache_seed (Union[str, int], optional): A seed for the cache.
client: Optional[CosmosClient]: Pass an existing Cosmos DB client.
Returns:
Cache: A Cache instance configured for Cosmos DB.
"""
cosmos_db_config = {
"connection_string": connection_string,
"database_id": "autogen_cache",
"container_id": container_id,
"client": client,
}
return Cache({"cache_seed": str(cache_seed), "cosmos_db_config": cosmos_db_config})

def __init__(self, config: Dict[str, Any]):
"""
Initialize the Cache with the given configuration.
Expand All @@ -69,15 +100,19 @@ def __init__(self, config: Dict[str, Any]):
ValueError: If an invalid configuration key is provided.
"""
self.config = config
# Ensure that the seed is always treated as a string before being passed to any cache factory or stored.
self.config["cache_seed"] = str(self.config.get("cache_seed", 42))

# validate config
for key in self.config.keys():
if key not in self.ALLOWED_CONFIG_KEYS:
raise ValueError(f"Invalid config key: {key}")
# create cache instance
self.cache = CacheFactory.cache_factory(
self.config.get("cache_seed", "42"),
self.config.get("redis_url", None),
self.config.get("cache_path_root", None),
seed=self.config["cache_seed"],
redis_url=self.config.get("redis_url"),
cache_path_root=self.config.get("cache_path_root"),
cosmosdb_config=self.config.get("cosmos_db_config"),
)

def __enter__(self) -> "Cache":
Expand Down
58 changes: 41 additions & 17 deletions autogen/cache/cache_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional, Union
from typing import Any, Dict, Optional, Union

from .abstract_cache_base import AbstractCache
from .disk_cache import DiskCache
Expand All @@ -8,25 +8,28 @@
class CacheFactory:
@staticmethod
def cache_factory(
seed: Union[str, int], redis_url: Optional[str] = None, cache_path_root: str = ".cache"
seed: Union[str, int],
redis_url: Optional[str] = None,
cache_path_root: str = ".cache",
cosmosdb_config: Optional[Dict[str, Any]] = None,
) -> AbstractCache:
"""
Factory function for creating cache instances.
Based on the provided redis_url, this function decides whether to create a RedisCache
or DiskCache instance. If RedisCache is available and redis_url is provided,
a RedisCache instance is created. Otherwise, a DiskCache instance is used.
This function decides whether to create a RedisCache, DiskCache, or CosmosDBCache instance
based on the provided parameters. If RedisCache is available and a redis_url is provided,
a RedisCache instance is created. If connection_string, database_id, and container_id
are provided, a CosmosDBCache is created. Otherwise, a DiskCache instance is used.
Args:
seed (Union[str, int]): A string or int used as a seed or namespace for the cache.
This could be useful for creating distinct cache instances
or for namespacing keys in the cache.
redis_url (str or None): The URL for the Redis server. If this is None
or if RedisCache is not available, a DiskCache instance is created.
seed (Union[str, int]): Used as a seed or namespace for the cache.
redis_url (Optional[str]): URL for the Redis server.
cache_path_root (str): Root path for the disk cache.
cosmosdb_config (Optional[Dict[str, str]]): Dictionary containing 'connection_string',
'database_id', and 'container_id' for Cosmos DB cache.
Returns:
An instance of either RedisCache or DiskCache, depending on the availability of RedisCache
and the provided redis_url.
An instance of RedisCache, DiskCache, or CosmosDBCache.
Examples:
Expand All @@ -40,14 +43,35 @@ def cache_factory(
```python
disk_cache = cache_factory("myseed", None)
```
Creating a Cosmos DB cache:
```python
cosmos_cache = cache_factory("myseed", cosmosdb_config={
"connection_string": "your_connection_string",
"database_id": "your_database_id",
"container_id": "your_container_id"}
)
```
"""
if redis_url is not None:
if redis_url:
try:
from .redis_cache import RedisCache

return RedisCache(seed, redis_url)
except ImportError:
logging.warning("RedisCache is not available. Creating a DiskCache instance instead.")
return DiskCache(f"./{cache_path_root}/{seed}")
else:
return DiskCache(f"./{cache_path_root}/{seed}")
logging.warning(
"RedisCache is not available. Checking other cache options. The last fallback is DiskCache."
)

if cosmosdb_config:
try:
from .cosmos_db_cache import CosmosDBCache

return CosmosDBCache.create_cache(seed, cosmosdb_config)

except ImportError:
logging.warning("CosmosDBCache is not available. Fallback to DiskCache.")

# Default to DiskCache if neither Redis nor Cosmos DB configurations are provided
return DiskCache(f"./{cache_path_root}/{seed}")
144 changes: 144 additions & 0 deletions autogen/cache/cosmos_db_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Install Azure Cosmos DB SDK if not already

import pickle
from typing import Any, Optional, TypedDict, Union

from azure.cosmos import CosmosClient, PartitionKey, exceptions
from azure.cosmos.exceptions import CosmosResourceNotFoundError

from autogen.cache.abstract_cache_base import AbstractCache


class CosmosDBConfig(TypedDict, total=False):
connection_string: str
database_id: str
container_id: str
cache_seed: Optional[Union[str, int]]
client: Optional[CosmosClient]


class CosmosDBCache(AbstractCache):
"""
Synchronous implementation of AbstractCache using Azure Cosmos DB NoSQL API.
This class provides a concrete implementation of the AbstractCache
interface using Azure Cosmos DB for caching data, with synchronous operations.
Attributes:
seed (Union[str, int]): A seed or namespace used as a partition key.
client (CosmosClient): The Cosmos DB client used for caching.
container: The container instance used for caching.
"""

def __init__(self, seed: Union[str, int], cosmosdb_config: CosmosDBConfig):
"""
Initialize the CosmosDBCache instance.
Args:
seed (Union[str, int]): A seed or namespace for the cache, used as a partition key.
connection_string (str): The connection string for the Cosmos DB account.
container_id (str): The container ID to be used for caching.
client (Optional[CosmosClient]): An existing CosmosClient instance to be used for caching.
"""
self.seed = str(seed)
self.client = cosmosdb_config.get("client") or CosmosClient.from_connection_string(
cosmosdb_config["connection_string"]
)
database_id = cosmosdb_config.get("database_id", "autogen_cache")
self.database = self.client.get_database_client(database_id)
container_id = cosmosdb_config.get("container_id")
self.container = self.database.create_container_if_not_exists(
id=container_id, partition_key=PartitionKey(path="/partitionKey")
)

@classmethod
def create_cache(cls, seed: Union[str, int], cosmosdb_config: CosmosDBConfig):
"""
Factory method to create a CosmosDBCache instance based on the provided configuration.
This method decides whether to use an existing CosmosClient or create a new one.
"""
if "client" in cosmosdb_config and isinstance(cosmosdb_config["client"], CosmosClient):
return cls.from_existing_client(seed, **cosmosdb_config)
else:
return cls.from_config(seed, cosmosdb_config)

@classmethod
def from_config(cls, seed: Union[str, int], cosmosdb_config: CosmosDBConfig):
return cls(str(seed), cosmosdb_config)

@classmethod
def from_connection_string(cls, seed: Union[str, int], connection_string: str, database_id: str, container_id: str):
config = {"connection_string": connection_string, "database_id": database_id, "container_id": container_id}
return cls(str(seed), config)

@classmethod
def from_existing_client(cls, seed: Union[str, int], client: CosmosClient, database_id: str, container_id: str):
config = {"client": client, "database_id": database_id, "container_id": container_id}
return cls(str(seed), config)

def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
"""
Retrieve an item from the Cosmos DB cache.
Args:
key (str): The key identifying the item in the cache.
default (optional): The default value to return if the key is not found.
Returns:
The deserialized value associated with the key if found, else the default value.
"""
try:
response = self.container.read_item(item=key, partition_key=str(self.seed))
return pickle.loads(response["data"])
except CosmosResourceNotFoundError:
return default
except Exception as e:
# Log the exception or rethrow after logging if needed
# Consider logging or handling the error appropriately here
raise e

def set(self, key: str, value: Any) -> None:
"""
Set an item in the Cosmos DB cache.
Args:
key (str): The key under which the item is to be stored.
value: The value to be stored in the cache.
Notes:
The value is serialized using pickle before being stored.
"""
try:
serialized_value = pickle.dumps(value)
item = {"id": key, "partitionKey": str(self.seed), "data": serialized_value}
self.container.upsert_item(item)
except Exception as e:
# Log or handle exception
raise e

def close(self) -> None:
"""
Close the Cosmos DB client.
Perform any necessary cleanup, such as closing network connections.
"""
# CosmosClient doesn"t require explicit close in the current SDK
# If you created the client inside this class, you should close it if necessary
pass

def __enter__(self):
"""
Context management entry.
Returns:
self: The instance itself.
"""
return self

def __exit__(self, exc_type: Optional[type], exc_value: Optional[Exception], traceback: Optional[Any]) -> None:
"""
Context management exit.
Perform cleanup actions such as closing the Cosmos DB client.
"""
self.close()
Loading

0 comments on commit fbcc56c

Please sign in to comment.