Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ llama_stack:
url: http://localhost:8321
user_data_collection:
feedback_enabled: true
feedback_storage: "/tmp/data/feedback"
transcripts_enabled: true
transcripts_storage: "/tmp/data/transcripts"
user_data_dir: "/tmp/data"
```

### Llama Stack project and configuration
Expand Down Expand Up @@ -186,9 +185,8 @@ llama_stack:
library_client_config_path: <path-to-llama-stack-run.yaml-file>
user_data_collection:
feedback_enabled: true
feedback_storage: "/tmp/data/feedback"
transcripts_enabled: true
transcripts_storage: "/tmp/data/transcripts"
user_data_dir: "/tmp/data"
```

## System prompt
Expand Down Expand Up @@ -438,9 +436,8 @@ The data collector service is configured through the `user_data_collection.data_
```yaml
user_data_collection:
feedback_enabled: true
feedback_storage: "/tmp/data/feedback"
transcripts_enabled: true
transcripts_storage: "/tmp/data/transcripts"
user_data_dir: "/tmp/data"
data_collector:
enabled: true
ingress_server_url: "https://your-ingress-server.com"
Expand Down
3 changes: 1 addition & 2 deletions lightspeed-stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ llama_stack:
api_key: xyzzy
user_data_collection:
feedback_enabled: true
feedback_storage: "/tmp/data/feedback"
transcripts_enabled: true
transcripts_storage: "/tmp/data/transcripts"
user_data_dir: "/tmp/data"
data_collector:
enabled: false
ingress_server_url: null
Expand Down
14 changes: 5 additions & 9 deletions src/app/endpoints/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
from typing import Any
from pathlib import Path
import json
from datetime import datetime, UTC

Expand Down Expand Up @@ -109,19 +108,16 @@ def store_feedback(user_id: str, feedback: dict) -> None:
feedback: The feedback to store.
"""
logger.debug("Storing feedback for user %s", user_id)
# Creates storage path only if it doesn't exist. The `exist_ok=True` prevents
# race conditions in case of multiple server instances trying to set up storage
# at the same location.
storage_path = Path(
configuration.user_data_collection_configuration.feedback_storage or ""
)
storage_path.mkdir(parents=True, exist_ok=True)

# Create feedback directory if it doesn't exist
feedback_dir = configuration.user_data_collection_configuration.feedback_storage
feedback_dir.mkdir(parents=True, exist_ok=True)

current_time = str(datetime.now(UTC))
data_to_store = {"user_id": user_id, "timestamp": current_time, **feedback}

# stores feedback in a file under unique uuid
feedback_file_path = storage_path / f"{get_suid()}.json"
feedback_file_path = feedback_dir / f"{get_suid()}.json"
with open(feedback_file_path, "w", encoding="utf-8") as feedback_file:
json.dump(data_to_store, feedback_file)

Expand Down
4 changes: 1 addition & 3 deletions src/app/endpoints/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,7 @@ def construct_transcripts_path(user_id: str, conversation_id: str) -> Path:
# this Path sanitization pattern
uid = os.path.normpath("/" + user_id).lstrip("/")
cid = os.path.normpath("/" + conversation_id).lstrip("/")
file_path = (
configuration.user_data_collection_configuration.transcripts_storage or ""
)
file_path = configuration.user_data_collection_configuration.transcripts_storage
return Path(file_path, uid, cid)


Expand Down
4 changes: 1 addition & 3 deletions src/lightspeed_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ def main() -> None:
if args.dump_configuration:
configuration.configuration.dump()
elif args.start_data_collector:
start_data_collector(
configuration.user_data_collection_configuration.data_collector
)
start_data_collector(configuration.user_data_collection_configuration)
else:
start_uvicorn(configuration.service_configuration)
logger.info("Lightspeed stack finished")
Expand Down
65 changes: 33 additions & 32 deletions src/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,48 +101,49 @@ def check_llama_stack_model(self) -> Self:
return self


class DataCollectorConfiguration(BaseModel):
"""Data collector configuration for sending data to ingress server."""
class UserDataCollection(BaseModel):
"""User data collection configuration."""

enabled: bool = False
ingress_server_url: Optional[str] = None
ingress_server_auth_token: Optional[str] = None
ingress_content_service_name: Optional[str] = None
collection_interval: PositiveInt = constants.DATA_COLLECTOR_COLLECTION_INTERVAL
cleanup_after_send: bool = True # Remove local files after successful send
connection_timeout: PositiveInt = constants.DATA_COLLECTOR_CONNECTION_TIMEOUT
# data collection
feedback_enabled: bool = False
transcripts_enabled: bool = False
user_data_dir: Path = Path("user_data")

# data export
export_enabled: bool = False
ingress_server_url: str = ""
ingress_server_auth_token: str = ""
ingress_content_service_name: str = ""
export_collection_interval: PositiveInt = (
constants.DATA_COLLECTOR_COLLECTION_INTERVAL
)
export_connection_timeout: PositiveInt = constants.DATA_COLLECTOR_CONNECTION_TIMEOUT
cleanup_after_send: bool = True # Remove local files after successful export

@property
def feedback_storage(self) -> Path:
"""Feedback storage directory path."""
return self.user_data_dir / "feedback"

@property
def transcripts_storage(self) -> Path:
"""Transcripts storage directory path."""
return self.user_data_dir / "transcripts"

@model_validator(mode="after")
def check_data_collector_configuration(self) -> Self:
"""Check data collector configuration."""
if self.enabled and self.ingress_server_url is None:
if self.export_enabled and self.ingress_server_url == "":
raise ValueError(
"ingress_server_url is required when data collector is enabled"
"ingress_server_url is required when data export is enabled"
)
if self.enabled and self.ingress_content_service_name is None:
if self.export_enabled and self.ingress_server_auth_token == "":
raise ValueError(
"ingress_content_service_name is required when data collector is enabled"
"ingress_server_auth_token is required when data export is enabled"
)
return self


class UserDataCollection(BaseModel):
"""User data collection configuration."""

feedback_enabled: bool = False
feedback_storage: Optional[str] = None
transcripts_enabled: bool = False
transcripts_storage: Optional[str] = None
data_collector: DataCollectorConfiguration = DataCollectorConfiguration()

@model_validator(mode="after")
def check_storage_location_is_set_when_needed(self) -> Self:
"""Check that storage_location is set when enabled."""
if self.feedback_enabled and self.feedback_storage is None:
raise ValueError("feedback_storage is required when feedback is enabled")
if self.transcripts_enabled and self.transcripts_storage is None:
if self.export_enabled and self.ingress_content_service_name == "":
raise ValueError(
"transcripts_storage is required when transcripts is enabled"
"ingress_content_service_name is required when data export is enabled"
)
return self

Expand Down
17 changes: 13 additions & 4 deletions src/runners/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,31 @@

import logging

from models.config import DataCollectorConfiguration
from models.config import UserDataCollection
from services.data_collector import DataCollectorService

logger: logging.Logger = logging.getLogger(__name__)


def start_data_collector(configuration: DataCollectorConfiguration) -> None:
def start_data_collector(configuration: UserDataCollection) -> None:
"""Start the data collector service as a standalone process."""
logger.info("Starting data collector runner")

if not configuration.enabled:
if not configuration.export_enabled:
logger.info("Data collection is disabled")
return

try:
service = DataCollectorService()
service = DataCollectorService(
feedback_dir=configuration.feedback_storage,
transcripts_dir=configuration.transcripts_storage,
collection_interval=configuration.export_collection_interval,
cleanup_after_send=configuration.cleanup_after_send,
ingress_server_url=configuration.ingress_server_url,
ingress_server_auth_token=configuration.ingress_server_auth_token,
ingress_content_service_name=configuration.ingress_content_service_name,
connection_timeout=configuration.export_connection_timeout,
)
service.run()
except Exception as e:
logger.error(
Expand Down
Loading