diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index 3e6f5ea2..f31fbc7f 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -87,14 +87,7 @@ jobs: feedback_storage: "/tmp/data/feedback" transcripts_disabled: false transcripts_storage: "/tmp/data/transcripts" - data_collector: - enabled: false - ingress_server_url: null - ingress_server_auth_token: null - ingress_content_service_name: null - collection_interval: 7200 # 2 hours in seconds - cleanup_after_send: true - connection_timeout_seconds: 30 + authentication: module: "noop" diff --git a/Makefile b/Makefile index 6cbb4925..d6a1fc72 100644 --- a/Makefile +++ b/Makefile @@ -8,8 +8,7 @@ PYTHON_REGISTRY = pypi run: ## Run the service locally uv run src/lightspeed_stack.py -run-data-collector: ## Run the data collector service locally - uv run src/lightspeed_stack.py --data-collector + test-unit: ## Run the unit tests @echo "Running unit tests..." diff --git a/README.md b/README.md index 77509b4e..d120bd6d 100644 --- a/README.md +++ b/README.md @@ -42,10 +42,7 @@ Lightspeed Core Stack (LCS) is an AI-powered assistant that provides answers to * [Utility to generate OpenAPI schema](#utility-to-generate-openapi-schema) * [Path](#path) * [Usage](#usage-1) - * [Data Collector Service](#data-collector-service) - * [Features](#features) - * [Configuration](#configuration-1) - * [Running the Service](#running-the-service) + @@ -253,7 +250,6 @@ Usage: make ... Available targets are: run Run the service locally -run-data-collector Run the data collector service test-unit Run the unit tests test-integration Run integration tests tests test-e2e Run BDD tests for the service @@ -421,50 +417,6 @@ This script re-generated OpenAPI schema for the Lightspeed Service REST API. make schema ``` -## Data Collector Service - -The data collector service is a standalone service that runs separately from the main web service. It is responsible for collecting and sending user data including feedback and transcripts to an ingress server for analysis and archival. - -### Features - -- **Periodic Collection**: Runs at configurable intervals -- **Data Packaging**: Packages feedback and transcript files into compressed tar.gz archives -- **Secure Transmission**: Sends data to a configured ingress server with optional authentication -- **File Cleanup**: Optionally removes local files after successful transmission -- **Error Handling**: Includes retry logic and comprehensive error handling - -### Configuration - -The data collector service is configured through the `user_data_collection.data_collector` section in your configuration file: - -```yaml -user_data_collection: - feedback_enabled: true - feedback_storage: "/tmp/data/feedback" - transcripts_enabled: true - transcripts_storage: "/tmp/data/transcripts" - data_collector: - enabled: true - ingress_server_url: "https://your-ingress-server.com" - ingress_server_auth_token: "your-auth-token" - ingress_content_service_name: "lightspeed-team" - collection_interval: 7200 # 2 hours in seconds - cleanup_after_send: true - connection_timeout: 30 -``` - -### Running the Service - -To run the data collector service: - -```bash -# Using Python directly -uv run src/lightspeed_stack.py --data-collector - -# Using Make target -make run-data-collector -``` - # Project structure diff --git a/docs/config.puml b/docs/config.puml index 12af1399..135064a1 100644 --- a/docs/config.puml +++ b/docs/config.puml @@ -26,16 +26,7 @@ class "Customization" as src.models.config.Customization { system_prompt_path : Optional[FilePath] check_customization_model() -> Self } -class "DataCollectorConfiguration" as src.models.config.DataCollectorConfiguration { - cleanup_after_send : bool - collection_interval : Annotated - connection_timeout : Annotated - enabled : bool - ingress_content_service_name : Optional[str] - ingress_server_auth_token : Optional[str] - ingress_server_url : Optional[str] - check_data_collector_configuration() -> Self -} + class "InferenceConfiguration" as src.models.config.InferenceConfiguration { default_model : Optional[str] default_provider : Optional[str] @@ -78,14 +69,13 @@ class "TLSConfiguration" as src.models.config.TLSConfiguration { check_tls_configuration() -> Self } class "UserDataCollection" as src.models.config.UserDataCollection { - data_collector feedback_enabled : bool feedback_storage : Optional[str] transcripts_enabled : bool transcripts_storage : Optional[str] check_storage_location_is_set_when_needed() -> Self } -src.models.config.DataCollectorConfiguration --* src.models.config.UserDataCollection : data_collector + src.models.config.InferenceConfiguration --* src.models.config.Configuration : inference src.models.config.JwtConfiguration --* src.models.config.JwkConfiguration : jwt_configuration src.models.config.LlamaStackConfiguration --* src.models.config.Configuration : llama_stack diff --git a/docs/deployment_guide.md b/docs/deployment_guide.md index 6e6aed8f..b3467eef 100644 --- a/docs/deployment_guide.md +++ b/docs/deployment_guide.md @@ -1099,14 +1099,7 @@ user_data_collection: feedback_storage: "/tmp/data/feedback" transcripts_enabled: true transcripts_storage: "/tmp/data/transcripts" - data_collector: - enabled: false - ingress_server_url: null - ingress_server_auth_token: null - ingress_content_service_name: null - collection_interval: 7200 # 2 hours in seconds - cleanup_after_send: true - connection_timeout_seconds: 30 + authentication: module: "noop" ``` @@ -1261,14 +1254,7 @@ user_data_collection: feedback_storage: "/tmp/data/feedback" transcripts_enabled: true transcripts_storage: "/tmp/data/transcripts" - data_collector: - enabled: false - ingress_server_url: null - ingress_server_auth_token: null - ingress_content_service_name: null - collection_interval: 7200 # 2 hours in seconds - cleanup_after_send: true - connection_timeout_seconds: 30 + authentication: module: "noop" ``` diff --git a/docs/getting_started.md b/docs/getting_started.md index 11d719d0..cdc5e9d0 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -264,14 +264,7 @@ user_data_collection: feedback_storage: "/tmp/data/feedback" transcripts_enabled: true transcripts_storage: "/tmp/data/transcripts" - data_collector: - enabled: false - ingress_server_url: null - ingress_server_auth_token: null - ingress_content_service_name: null - collection_interval: 7200 # 2 hours in seconds - cleanup_after_send: true - connection_timeout_seconds: 30 + authentication: module: "noop" ``` diff --git a/docs/openapi.json b/docs/openapi.json index 43065685..4b591bcc 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -1101,67 +1101,32 @@ "title": "Customization", "description": "Service customization." }, - "DataCollectorConfiguration": { + "DatabaseConfiguration": { "properties": { - "enabled": { - "type": "boolean", - "title": "Enabled", - "default": false - }, - "ingress_server_url": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "title": "Ingress Server Url" - }, - "ingress_server_auth_token": { + "sqlite": { "anyOf": [ { - "type": "string" + "$ref": "#/components/schemas/SQLiteDatabaseConfiguration" }, { "type": "null" } - ], - "title": "Ingress Server Auth Token" + ] }, - "ingress_content_service_name": { + "postgres": { "anyOf": [ { - "type": "string" + "$ref": "#/components/schemas/PostgreSQLDatabaseConfiguration" }, { "type": "null" } - ], - "title": "Ingress Content Service Name" - }, - "collection_interval": { - "type": "integer", - "exclusiveMinimum": 0.0, - "title": "Collection Interval", - "default": 7200 - }, - "cleanup_after_send": { - "type": "boolean", - "title": "Cleanup After Send", - "default": true - }, - "connection_timeout": { - "type": "integer", - "exclusiveMinimum": 0.0, - "title": "Connection Timeout", - "default": 30 + ] } }, "type": "object", - "title": "DataCollectorConfiguration", - "description": "Data collector configuration for sending data to ingress server." + "title": "DatabaseConfiguration", + "description": "Database configuration." }, "DatabaseConfiguration": { "properties": { @@ -2122,15 +2087,6 @@ } ], "title": "Transcripts Storage" - }, - "data_collector": { - "$ref": "#/components/schemas/DataCollectorConfiguration", - "default": { - "enabled": false, - "collection_interval": 7200, - "cleanup_after_send": true, - "connection_timeout": 30 - } } }, "type": "object", diff --git a/docs/openapi.md b/docs/openapi.md index 07677950..cddc5fb3 100644 --- a/docs/openapi.md +++ b/docs/openapi.md @@ -577,21 +577,6 @@ Service customization. | system_prompt | | | -## DataCollectorConfiguration - - -Data collector configuration for sending data to ingress server. - - -| Field | Type | Description | -|-------|------|-------------| -| enabled | boolean | | -| ingress_server_url | | | -| ingress_server_auth_token | | | -| ingress_content_service_name | | | -| collection_interval | integer | | -| cleanup_after_send | boolean | | -| connection_timeout | integer | | ## DatabaseConfiguration @@ -1026,7 +1011,6 @@ User data collection configuration. | feedback_storage | | | | transcripts_enabled | boolean | | | transcripts_storage | | | -| data_collector | | | ## ValidationError diff --git a/docs/output.md b/docs/output.md index 20f4e99c..1d30763e 100644 --- a/docs/output.md +++ b/docs/output.md @@ -577,22 +577,6 @@ Service customization. | system_prompt | | | -## DataCollectorConfiguration - - -Data collector configuration for sending data to ingress server. - - -| Field | Type | Description | -|-------|------|-------------| -| enabled | boolean | | -| ingress_server_url | | | -| ingress_server_auth_token | | | -| ingress_content_service_name | | | -| collection_interval | integer | | -| cleanup_after_send | boolean | | -| connection_timeout | integer | | - ## DatabaseConfiguration @@ -1016,7 +1000,6 @@ User data collection configuration. | feedback_storage | | | | transcripts_enabled | boolean | | | transcripts_storage | | | -| data_collector | | | ## ValidationError diff --git a/docs/testing.md b/docs/testing.md index 2b78ce42..0b8e9660 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -105,11 +105,9 @@ As specified in Definition of Done, new changes need to be covered by tests. │   ├── test_requests.py │   └── test_responses.py ├── runners -│   ├── __init__.py -│   ├── test_data_collector_runner.py -│   └── test_uvicorn_runner.py +│ ├── __init__.py +│ └── test_uvicorn_runner.py ├── services -│   └── test_data_collector.py ├── test_client.py ├── test_configuration.py ├── test_lightspeed_stack.py diff --git a/lightspeed-stack.yaml b/lightspeed-stack.yaml index a1e77e38..d7d3f571 100644 --- a/lightspeed-stack.yaml +++ b/lightspeed-stack.yaml @@ -20,13 +20,6 @@ user_data_collection: feedback_storage: "/tmp/data/feedback" transcripts_enabled: true transcripts_storage: "/tmp/data/transcripts" - data_collector: - enabled: false - ingress_server_url: null - ingress_server_auth_token: null - ingress_content_service_name: null - collection_interval: 7200 # 2 hours in seconds - cleanup_after_send: true - connection_timeout_seconds: 30 + authentication: module: "noop" diff --git a/src/constants.py b/src/constants.py index b8380f33..595c6924 100644 --- a/src/constants.py +++ b/src/constants.py @@ -47,10 +47,6 @@ DEFAULT_JWT_UID_CLAIM = "user_id" DEFAULT_JWT_USER_NAME_CLAIM = "username" -# Data collector constants -DATA_COLLECTOR_COLLECTION_INTERVAL = 7200 # 2 hours in seconds -DATA_COLLECTOR_CONNECTION_TIMEOUT = 30 -DATA_COLLECTOR_RETRY_INTERVAL = 300 # 5 minutes in seconds # PostgreSQL connection constants # See: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-SSLMODE diff --git a/src/lightspeed_stack.py b/src/lightspeed_stack.py index bbe004d6..cf47c2f9 100644 --- a/src/lightspeed_stack.py +++ b/src/lightspeed_stack.py @@ -10,7 +10,6 @@ from rich.logging import RichHandler from runners.uvicorn import start_uvicorn -from runners.data_collector import start_data_collector from configuration import configuration from client import AsyncLlamaStackClientHolder @@ -48,13 +47,7 @@ def create_argument_parser() -> ArgumentParser: help="path to configuration file (default: lightspeed-stack.yaml)", default="lightspeed-stack.yaml", ) - parser.add_argument( - "--data-collector", - dest="start_data_collector", - help="start data collector service instead of web service", - action="store_true", - default=False, - ) + return parser @@ -76,10 +69,6 @@ def main() -> None: if args.dump_configuration: configuration.configuration.dump() - elif args.start_data_collector: - start_data_collector( - configuration.user_data_collection_configuration.data_collector - ) else: start_uvicorn(configuration.service_configuration) logger.info("Lightspeed stack finished") diff --git a/src/models/config.py b/src/models/config.py index d0b37412..c03eec30 100644 --- a/src/models/config.py +++ b/src/models/config.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Optional -from pydantic import BaseModel, model_validator, FilePath, AnyHttpUrl, PositiveInt +from pydantic import BaseModel, model_validator, FilePath, AnyHttpUrl from typing_extensions import Self, Literal import constants @@ -173,31 +173,6 @@ def check_llama_stack_model(self) -> Self: return self -class DataCollectorConfiguration(BaseModel): - """Data collector configuration for sending data to ingress server.""" - - enabled: bool = False - ingress_server_url: Optional[str] = None - ingress_server_auth_token: Optional[str] = None - ingress_content_service_name: Optional[str] = None - collection_interval: PositiveInt = constants.DATA_COLLECTOR_COLLECTION_INTERVAL - cleanup_after_send: bool = True # Remove local files after successful send - connection_timeout: PositiveInt = constants.DATA_COLLECTOR_CONNECTION_TIMEOUT - - @model_validator(mode="after") - def check_data_collector_configuration(self) -> Self: - """Check data collector configuration.""" - if self.enabled and self.ingress_server_url is None: - raise ValueError( - "ingress_server_url is required when data collector is enabled" - ) - if self.enabled and self.ingress_content_service_name is None: - raise ValueError( - "ingress_content_service_name is required when data collector is enabled" - ) - return self - - class UserDataCollection(BaseModel): """User data collection configuration.""" @@ -205,7 +180,6 @@ class UserDataCollection(BaseModel): feedback_storage: Optional[str] = None transcripts_enabled: bool = False transcripts_storage: Optional[str] = None - data_collector: DataCollectorConfiguration = DataCollectorConfiguration() @model_validator(mode="after") def check_storage_location_is_set_when_needed(self) -> Self: diff --git a/src/runners/data_collector.py b/src/runners/data_collector.py deleted file mode 100644 index 7bf05e8f..00000000 --- a/src/runners/data_collector.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Data collector runner.""" - -import logging - -from models.config import DataCollectorConfiguration -from services.data_collector import DataCollectorService - -logger: logging.Logger = logging.getLogger(__name__) - - -def start_data_collector(configuration: DataCollectorConfiguration) -> None: - """Start the data collector service as a standalone process.""" - logger.info("Starting data collector runner") - - if not configuration.enabled: - logger.info("Data collection is disabled") - return - - try: - service = DataCollectorService() - service.run() - except Exception as e: - logger.error( - "Data collector service encountered an exception: %s", e, exc_info=True - ) - raise diff --git a/src/services/data_collector.py b/src/services/data_collector.py deleted file mode 100644 index f87d2503..00000000 --- a/src/services/data_collector.py +++ /dev/null @@ -1,258 +0,0 @@ -"""Data archival service for packaging and sending feedback and transcripts.""" - -import tarfile -import tempfile -import time -from datetime import datetime, UTC -from pathlib import Path -from typing import List - -import requests -import constants -from configuration import configuration -from log import get_logger - -logger = get_logger(__name__) - - -class DataCollectorService: # pylint: disable=too-few-public-methods - """Service for collecting and sending user data to ingress server. - - This service handles the periodic collection and transmission of user data - including feedback and transcripts to the configured ingress server. - """ - - def run(self) -> None: - """Run the periodic data collection loop.""" - collector_config = ( - configuration.user_data_collection_configuration.data_collector - ) - - logger.info("Starting data collection service") - - while True: - try: - self._perform_collection() - logger.info( - "Next collection scheduled in %s seconds", - collector_config.collection_interval, - ) - if collector_config.collection_interval is not None: - time.sleep(collector_config.collection_interval) - except KeyboardInterrupt: - logger.info("Data collection service stopped by user") - break - except (OSError, requests.RequestException) as e: - logger.error("Error during collection process: %s", e, exc_info=True) - time.sleep( - constants.DATA_COLLECTOR_RETRY_INTERVAL - ) # Wait 5 minutes before retrying on error - - def _perform_collection(self) -> None: - """Perform a single collection operation.""" - logger.info("Starting data collection process") - - # Collect files to archive - feedback_files = self._collect_feedback_files() - transcript_files = self._collect_transcript_files() - - if not feedback_files and not transcript_files: - logger.info("No files to collect") - return - - logger.info( - "Found %s feedback files and %s transcript files to collect", - len(feedback_files), - len(transcript_files), - ) - - # Create and send archives - collections_sent = 0 - try: - if feedback_files: - udc_config = configuration.user_data_collection_configuration - if udc_config.feedback_storage: - feedback_base = Path(udc_config.feedback_storage) - collections_sent += self._create_and_send_tarball( - feedback_files, "feedback", feedback_base - ) - if transcript_files: - udc_config = configuration.user_data_collection_configuration - if udc_config.transcripts_storage: - transcript_base = Path(udc_config.transcripts_storage) - collections_sent += self._create_and_send_tarball( - transcript_files, "transcripts", transcript_base - ) - - logger.info( - "Successfully sent %s collections to ingress server", collections_sent - ) - except (OSError, requests.RequestException, tarfile.TarError) as e: - logger.error("Failed to create or send collections: %s", e, exc_info=True) - raise - - def _collect_feedback_files(self) -> List[Path]: - """Collect all feedback files that need to be collected.""" - udc_config = configuration.user_data_collection_configuration - - if not udc_config.feedback_enabled or not udc_config.feedback_storage: - return [] - - feedback_dir = Path(udc_config.feedback_storage) - if not feedback_dir.exists(): - return [] - - return list(feedback_dir.glob("*.json")) - - def _collect_transcript_files(self) -> List[Path]: - """Collect all transcript files that need to be collected.""" - udc_config = configuration.user_data_collection_configuration - - if not udc_config.transcripts_enabled or not udc_config.transcripts_storage: - return [] - - transcripts_dir = Path(udc_config.transcripts_storage) - if not transcripts_dir.exists(): - return [] - - # Recursively find all JSON files in the transcript directory structure - return list(transcripts_dir.rglob("*.json")) - - def _create_and_send_tarball( - self, files: List[Path], data_type: str, base_directory: Path - ) -> int: - """Create a single tarball from all files and send to ingress server.""" - if not files: - return 0 - - collector_config = ( - configuration.user_data_collection_configuration.data_collector - ) - - # Create one tarball with all files - tarball_path = self._create_tarball(files, data_type, base_directory) - try: - self._send_tarball(tarball_path) - if collector_config.cleanup_after_send: - self._cleanup_files(files) - self._cleanup_empty_directories() - return 1 - finally: - self._cleanup_tarball(tarball_path) - - def _create_tarball( - self, files: List[Path], data_type: str, base_directory: Path - ) -> Path: - """Create a tarball containing the specified files.""" - timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") - tarball_name = f"{data_type}_{timestamp}.tar.gz" - - # Create tarball in a temporary directory - temp_dir = Path(tempfile.gettempdir()) - tarball_path = temp_dir / tarball_name - - logger.info("Creating tarball %s with %s files", tarball_path, len(files)) - - with tarfile.open(tarball_path, "w:gz") as tar: - for file_path in files: - try: - # Add file with relative path to maintain directory structure - arcname = str(file_path.relative_to(base_directory)) - tar.add(file_path, arcname=arcname) - except (OSError, ValueError) as e: - logger.warning("Failed to add %s to tarball: %s", file_path, e) - - logger.info( - "Created tarball %s (%s bytes)", tarball_path, tarball_path.stat().st_size - ) - return tarball_path - - def _send_tarball(self, tarball_path: Path) -> None: - """Send the tarball to the ingress server.""" - collector_config = ( - configuration.user_data_collection_configuration.data_collector - ) - - if collector_config.ingress_server_url is None: - raise ValueError("Ingress server URL is not configured") - - # pylint: disable=line-too-long - headers = { - "Content-Type": f"application/vnd.redhat.{collector_config.ingress_content_service_name}.periodic+tar", - } - - if collector_config.ingress_server_auth_token: - headers["Authorization"] = ( - f"Bearer {collector_config.ingress_server_auth_token}" - ) - - with open(tarball_path, "rb") as f: - data = f.read() - - logger.info( - "Sending tarball %s to %s", - tarball_path.name, - collector_config.ingress_server_url, - ) - - response = requests.post( - collector_config.ingress_server_url, - data=data, - headers=headers, - timeout=collector_config.connection_timeout, - ) - - if response.status_code >= 400: - raise requests.HTTPError( - f"Failed to send tarball to ingress server. " - f"Status: {response.status_code}, Response: {response.text}" - ) - - logger.info("Successfully sent tarball %s to ingress server", tarball_path.name) - - def _cleanup_files(self, files: List[Path]) -> None: - """Remove files after successful transmission.""" - for file_path in files: - try: - file_path.unlink() - logger.debug("Removed file %s", file_path) - except OSError as e: - logger.warning("Failed to remove file %s: %s", file_path, e) - - def _cleanup_empty_directories(self) -> None: - """Remove empty directories from transcript storage.""" - udc_config = configuration.user_data_collection_configuration - - if not udc_config.transcripts_enabled or not udc_config.transcripts_storage: - return - - transcripts_dir = Path(udc_config.transcripts_storage) - if not transcripts_dir.exists(): - return - - # Remove empty directories (conversation and user directories) - for user_dir in transcripts_dir.iterdir(): - if user_dir.is_dir(): - for conv_dir in user_dir.iterdir(): - if conv_dir.is_dir() and not any(conv_dir.iterdir()): - try: - conv_dir.rmdir() - logger.debug("Removed empty directory %s", conv_dir) - except OSError: - pass - - # Remove user directory if empty - if not any(user_dir.iterdir()): - try: - user_dir.rmdir() - logger.debug("Removed empty directory %s", user_dir) - except OSError: - pass - - def _cleanup_tarball(self, tarball_path: Path) -> None: - """Remove the temporary tarball file.""" - try: - tarball_path.unlink() - logger.debug("Removed temporary tarball %s", tarball_path) - except OSError as e: - logger.warning("Failed to remove temporary tarball %s: %s", tarball_path, e) diff --git a/tests/unit/models/test_config.py b/tests/unit/models/test_config.py index 4d9a29da..acdd9493 100644 --- a/tests/unit/models/test_config.py +++ b/tests/unit/models/test_config.py @@ -11,8 +11,6 @@ AUTH_MOD_NOOP, AUTH_MOD_K8S, AUTH_MOD_JWK_TOKEN, - DATA_COLLECTOR_COLLECTION_INTERVAL, - DATA_COLLECTOR_CONNECTION_TIMEOUT, ) from models.config import ( @@ -24,7 +22,6 @@ UserDataCollection, TLSConfiguration, ModelContextProtocolServer, - DataCollectorConfiguration, InferenceConfiguration, ) @@ -217,53 +214,6 @@ def test_user_data_collection_transcripts_disabled() -> None: UserDataCollection(transcripts_enabled=True, transcripts_storage=None) -def test_user_data_collection_data_collector_enabled() -> None: - """Test the UserDataCollection constructor for data collector.""" - # correct configuration - cfg = UserDataCollection( - data_collector=DataCollectorConfiguration( - enabled=True, - ingress_server_url="http://localhost:8080", - ingress_server_auth_token="xyzzy", - ingress_content_service_name="lightspeed-core", - collection_interval=60, - ) - ) - assert cfg is not None - assert cfg.data_collector.enabled is True - - -def test_user_data_collection_data_collector_wrong_configuration() -> None: - """Test the UserDataCollection constructor for data collector.""" - # incorrect configuration - with pytest.raises( - ValueError, - match="ingress_server_url is required when data collector is enabled", - ): - UserDataCollection( - data_collector=DataCollectorConfiguration( - enabled=True, - ingress_server_url=None, - ingress_server_auth_token="xyzzy", - ingress_content_service_name="lightspeed-core", - collection_interval=60, - ) - ) - with pytest.raises( - ValueError, - match="ingress_content_service_name is required when data collector is enabled", - ): - UserDataCollection( - data_collector=DataCollectorConfiguration( - enabled=True, - ingress_server_url="http://localhost:8080", - ingress_server_auth_token="xyzzy", - ingress_content_service_name=None, - collection_interval=60, - ) - ) - - def test_tls_configuration() -> None: """Test the TLS configuration.""" cfg = TLSConfiguration( @@ -528,15 +478,6 @@ def test_dump_configuration(tmp_path) -> None: "feedback_storage": None, "transcripts_enabled": False, "transcripts_storage": None, - "data_collector": { - "enabled": False, - "ingress_server_url": None, - "ingress_server_auth_token": None, - "ingress_content_service_name": None, - "collection_interval": DATA_COLLECTOR_COLLECTION_INTERVAL, - "cleanup_after_send": True, - "connection_timeout": DATA_COLLECTOR_CONNECTION_TIMEOUT, - }, }, "mcp_servers": [], "authentication": { diff --git a/tests/unit/runners/test_data_collector_runner.py b/tests/unit/runners/test_data_collector_runner.py deleted file mode 100644 index 4384b8ab..00000000 --- a/tests/unit/runners/test_data_collector_runner.py +++ /dev/null @@ -1,60 +0,0 @@ -"""Unit tests for runners.""" - -from unittest.mock import patch - -from models.config import DataCollectorConfiguration -from runners.data_collector import start_data_collector - - -def test_start_data_collector() -> None: - """Test the function to start data collector service.""" - configuration = DataCollectorConfiguration( - enabled=True, - ingress_server_url="http://localhost:8080", - ingress_server_auth_token="xyzzy", - ingress_content_service_name="lightspeed-core", - collection_interval=60, - ) - - # don't start real data collector service - with patch("services.data_collector.DataCollectorService.run") as mocked_run: - start_data_collector(configuration) - mocked_run.assert_called_once() - - -def test_start_data_collector_disabled() -> None: - """Test the function to start data collector service.""" - configuration = DataCollectorConfiguration( - enabled=False, - ingress_server_url="http://localhost:8080", - ingress_server_auth_token="xyzzy", - ingress_content_service_name="lightspeed-core", - collection_interval=60, - ) - - # don't start real data collector service - with patch("services.data_collector.DataCollectorService.run") as mocked_run: - start_data_collector(configuration) - mocked_run.assert_not_called() - - -def test_start_data_collector_exception() -> None: - """Test the function to start data collector service when an exception occurs.""" - configuration = DataCollectorConfiguration( - enabled=True, - ingress_server_url="http://localhost:8080", - ingress_server_auth_token="xyzzy", - ingress_content_service_name="lightspeed-core", - collection_interval=60, - ) - - # Mock the DataCollectorService to raise an exception - with patch("services.data_collector.DataCollectorService.run") as mocked_run: - mocked_run.side_effect = Exception("Test exception") - - try: - start_data_collector(configuration) - assert False, "Expected exception to be raised" - except Exception as e: # pylint: disable=broad-exception-caught - assert str(e) == "Test exception" - mocked_run.assert_called_once() diff --git a/tests/unit/services/test_data_collector.py b/tests/unit/services/test_data_collector.py deleted file mode 100644 index b2103ad5..00000000 --- a/tests/unit/services/test_data_collector.py +++ /dev/null @@ -1,587 +0,0 @@ -"""Unit tests for data collector service.""" - -from pathlib import Path -from unittest.mock import patch, MagicMock -import requests -import tarfile - -from services.data_collector import DataCollectorService - - -def test_data_collector_service_creation() -> None: - """Test that DataCollectorService can be created.""" - service = DataCollectorService() - assert service is not None - - -@patch("services.data_collector.time.sleep") -@patch("services.data_collector.configuration") -def test_run_normal_operation(mock_config, mock_sleep) -> None: - """Test normal operation of the run method.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.data_collector.collection_interval = ( - 60 - ) - - with patch.object(service, "_perform_collection") as mock_perform: - mock_perform.side_effect = [None, KeyboardInterrupt()] - - service.run() - - assert mock_perform.call_count == 2 - mock_sleep.assert_called_once_with(60) - - -@patch("services.data_collector.time.sleep") -@patch("services.data_collector.configuration") -def test_run_with_exception(mock_config, mock_sleep) -> None: - """Test run method with exception handling.""" - service = DataCollectorService() - - with patch.object(service, "_perform_collection") as mock_perform: - mock_perform.side_effect = [OSError("Test error"), KeyboardInterrupt()] - - service.run() - - assert mock_perform.call_count == 2 - mock_sleep.assert_called_once_with(300) - - -@patch("services.data_collector.configuration") -def test_collect_feedback_files_disabled(mock_config) -> None: - """Test collecting feedback files when disabled.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.feedback_enabled = False - - result = service._collect_feedback_files() - assert result == [] - - -@patch("services.data_collector.configuration") -def test_collect_feedback_files_no_storage(mock_config) -> None: - """Test collecting feedback files when no storage configured.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.feedback_enabled = True - mock_config.user_data_collection_configuration.feedback_storage = None - - result = service._collect_feedback_files() - assert result == [] - - -@patch("services.data_collector.configuration") -def test_collect_feedback_files_directory_not_exists(mock_config) -> None: - """Test collecting feedback files when directory doesn't exist.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.feedback_enabled = True - mock_config.user_data_collection_configuration.feedback_storage = "/tmp/feedback" - - with patch("services.data_collector.Path") as mock_path: - mock_path.return_value.exists.return_value = False - - result = service._collect_feedback_files() - assert result == [] - - -@patch("services.data_collector.configuration") -def test_collect_feedback_files_success(mock_config) -> None: - """Test collecting feedback files successfully.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.feedback_enabled = True - mock_config.user_data_collection_configuration.feedback_storage = "/tmp/feedback" - - mock_files = [Path("/tmp/feedback/file1.json")] - - with patch("services.data_collector.Path") as mock_path: - mock_path.return_value.exists.return_value = True - mock_path.return_value.glob.return_value = mock_files - - result = service._collect_feedback_files() - assert result == mock_files - - -@patch("services.data_collector.configuration") -def test_collect_transcript_files_disabled(mock_config) -> None: - """Test collecting transcript files when disabled.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = False - - result = service._collect_transcript_files() - assert result == [] - - -@patch("services.data_collector.configuration") -def test_collect_transcript_files_directory_not_exists(mock_config) -> None: - """Test collecting transcript files when directory doesn't exist.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = True - mock_config.user_data_collection_configuration.transcripts_storage = ( - "/tmp/transcripts" - ) - - with patch("services.data_collector.Path") as mock_path: - mock_path.return_value.exists.return_value = False - - result = service._collect_transcript_files() - assert result == [] - - -@patch("services.data_collector.configuration") -def test_collect_transcript_files_success(mock_config) -> None: - """Test collecting transcript files successfully.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = True - mock_config.user_data_collection_configuration.transcripts_storage = ( - "/tmp/transcripts" - ) - - mock_files = [Path("/tmp/transcripts/user1/conv1/file1.json")] - - with patch("services.data_collector.Path") as mock_path: - mock_path.return_value.exists.return_value = True - mock_path.return_value.rglob.return_value = mock_files - - result = service._collect_transcript_files() - assert result == mock_files - - -@patch("services.data_collector.configuration") -def test_perform_collection_no_files(mock_config) -> None: - """Test _perform_collection when no files are found.""" - service = DataCollectorService() - - with ( - patch.object(service, "_collect_feedback_files", return_value=[]), - patch.object(service, "_collect_transcript_files", return_value=[]), - ): - service._perform_collection() - - -@patch("services.data_collector.configuration") -def test_perform_collection_with_files(mock_config) -> None: - """Test _perform_collection when files are found.""" - service = DataCollectorService() - - feedback_files = [Path("/tmp/feedback/file1.json")] - - with ( - patch.object(service, "_collect_feedback_files", return_value=feedback_files), - patch.object(service, "_collect_transcript_files", return_value=[]), - patch.object(service, "_create_and_send_tarball", return_value=1), - ): - service._perform_collection() - - -@patch("services.data_collector.configuration") -def test_perform_collection_with_exception(mock_config) -> None: - """Test _perform_collection when an exception occurs.""" - service = DataCollectorService() - - with ( - patch.object( - service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] - ), - patch.object(service, "_collect_transcript_files", return_value=[]), - patch.object( - service, "_create_and_send_tarball", side_effect=Exception("Test error") - ), - ): - try: - service._perform_collection() - assert False, "Expected exception" - except Exception as e: - assert str(e) == "Test error" - - -@patch("services.data_collector.configuration") -def test_create_and_send_tarball_no_files(mock_config) -> None: - """Test creating tarball with no files.""" - service = DataCollectorService() - - result = service._create_and_send_tarball([], "test", Path("/tmp")) - assert result == 0 - - -@patch("services.data_collector.configuration") -def test_create_and_send_tarball_success(mock_config) -> None: - """Test creating and sending tarball successfully.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.data_collector.cleanup_after_send = ( - True - ) - - files = [Path("/tmp/test/file1.json")] - tarball_path = Path("/tmp/test_tarball.tar.gz") - - with ( - patch.object(service, "_create_tarball", return_value=tarball_path), - patch.object(service, "_send_tarball"), - patch.object(service, "_cleanup_files"), - patch.object(service, "_cleanup_empty_directories"), - patch.object(service, "_cleanup_tarball"), - ): - result = service._create_and_send_tarball(files, "test", Path("/tmp")) - assert result == 1 - - -@patch("services.data_collector.configuration") -def test_create_and_send_tarball_no_cleanup(mock_config) -> None: - """Test creating and sending tarball without cleanup.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.data_collector.cleanup_after_send = ( - False - ) - - files = [Path("/tmp/test/file1.json")] - - with ( - patch.object(service, "_create_tarball", return_value=Path("/tmp/test.tar.gz")), - patch.object(service, "_send_tarball"), - patch.object(service, "_cleanup_tarball"), - ): - result = service._create_and_send_tarball(files, "test", Path("/tmp")) - assert result == 1 - - -@patch("services.data_collector.datetime") -@patch("services.data_collector.tempfile.gettempdir") -@patch("services.data_collector.tarfile.open") -def test_create_tarball_success(mock_tarfile, mock_gettempdir, mock_datetime) -> None: - """Test creating tarball successfully.""" - service = DataCollectorService() - mock_datetime.now.return_value.strftime.return_value = "20230101_120000" - mock_gettempdir.return_value = "/tmp" - - mock_tar = MagicMock() - mock_tarfile.return_value.__enter__.return_value = mock_tar - - files = [Path("/data/test/file1.json")] - - with patch.object(Path, "stat") as mock_stat: - mock_stat.return_value.st_size = 1024 - - result = service._create_tarball(files, "test", Path("/data")) - - expected_path = Path("/tmp/test_20230101_120000.tar.gz") - assert result == expected_path - mock_tar.add.assert_called_once() - - -@patch("services.data_collector.datetime") -@patch("services.data_collector.tempfile.gettempdir") -@patch("services.data_collector.tarfile.open") -def test_create_tarball_file_add_error( - mock_tarfile, mock_gettempdir, mock_datetime -) -> None: - """Test creating tarball with file add error.""" - service = DataCollectorService() - mock_datetime.now.return_value.strftime.return_value = "20230101_120000" - mock_gettempdir.return_value = "/tmp" - - mock_tar = MagicMock() - mock_tar.add.side_effect = OSError("File error") - mock_tarfile.return_value.__enter__.return_value = mock_tar - - files = [Path("/data/test/file1.json")] - - with patch.object(Path, "stat") as mock_stat: - mock_stat.return_value.st_size = 1024 - - result = service._create_tarball(files, "test", Path("/data")) - - expected_path = Path("/tmp/test_20230101_120000.tar.gz") - assert result == expected_path - - -@patch("services.data_collector.configuration") -@patch("services.data_collector.requests.post") -def test_send_tarball_success(mock_post, mock_config) -> None: - """Test successful tarball sending.""" - service = DataCollectorService() - - mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( - "http://test.com" - ) - mock_config.user_data_collection_configuration.data_collector.ingress_server_auth_token = ( - "token" - ) - mock_config.user_data_collection_configuration.data_collector.connection_timeout = ( - 30 - ) - - mock_response = MagicMock() - mock_response.status_code = 200 - mock_post.return_value = mock_response - - with patch("builtins.open", create=True) as mock_open: - mock_open.return_value.__enter__.return_value.read.return_value = b"test data" - service._send_tarball(Path("/tmp/test.tar.gz")) - - mock_post.assert_called_once() - - -@patch("services.data_collector.configuration") -@patch("services.data_collector.requests.post") -def test_send_tarball_no_auth_token(mock_post, mock_config) -> None: - """Test sending tarball without auth token.""" - service = DataCollectorService() - - mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( - "http://test.com" - ) - mock_config.user_data_collection_configuration.data_collector.ingress_server_auth_token = ( - None - ) - mock_config.user_data_collection_configuration.data_collector.connection_timeout = ( - 30 - ) - - mock_response = MagicMock() - mock_response.status_code = 200 - mock_post.return_value = mock_response - - with patch("builtins.open", create=True): - service._send_tarball(Path("/tmp/test.tar.gz")) - mock_post.assert_called_once() - - -@patch("services.data_collector.configuration") -@patch("services.data_collector.requests.post") -def test_send_tarball_http_error(mock_post, mock_config) -> None: - """Test tarball sending with HTTP error.""" - service = DataCollectorService() - - mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( - "http://test.com" - ) - mock_config.user_data_collection_configuration.data_collector.connection_timeout = ( - 30 - ) - - mock_response = MagicMock() - mock_response.status_code = 500 - mock_response.text = "Server Error" - mock_post.return_value = mock_response - - with patch("builtins.open", create=True): - try: - service._send_tarball(Path("/tmp/test.tar.gz")) - assert False, "Expected exception" - except Exception as e: - assert "Failed to send tarball" in str(e) - - -@patch("services.data_collector.configuration") -def test_send_tarball_missing_url(mock_config) -> None: - """Test tarball sending when ingress server URL is None.""" - service = DataCollectorService() - - mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( - None - ) - - try: - service._send_tarball(Path("/tmp/test.tar.gz")) - assert False, "Expected ValueError" - except ValueError as e: - assert "Ingress server URL is not configured" in str(e) - - -@patch("services.data_collector.configuration") -def test_perform_collection_with_specific_exceptions(mock_config) -> None: - """Test _perform_collection with specific exception types that should be caught.""" - service = DataCollectorService() - - # Test with OSError - with ( - patch.object( - service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] - ), - patch.object(service, "_collect_transcript_files", return_value=[]), - patch.object( - service, "_create_and_send_tarball", side_effect=OSError("OS Error") - ), - ): - try: - service._perform_collection() - assert False, "Expected OSError" - except OSError as e: - assert str(e) == "OS Error" - - # Test with requests.RequestException - with ( - patch.object( - service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] - ), - patch.object(service, "_collect_transcript_files", return_value=[]), - patch.object( - service, - "_create_and_send_tarball", - side_effect=requests.RequestException("Request Error"), - ), - ): - try: - service._perform_collection() - assert False, "Expected RequestException" - except requests.RequestException as e: - assert str(e) == "Request Error" - - # Test with tarfile.TarError - with ( - patch.object( - service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] - ), - patch.object(service, "_collect_transcript_files", return_value=[]), - patch.object( - service, - "_create_and_send_tarball", - side_effect=tarfile.TarError("Tar Error"), - ), - ): - try: - service._perform_collection() - assert False, "Expected TarError" - except tarfile.TarError as e: - assert str(e) == "Tar Error" - - -def test_cleanup_files_success() -> None: - """Test successful file cleanup.""" - service = DataCollectorService() - files = [Path("/tmp/test1.json"), Path("/tmp/test2.json")] - - with patch.object(Path, "unlink") as mock_unlink: - service._cleanup_files(files) - assert mock_unlink.call_count == 2 - - -def test_cleanup_files_with_error() -> None: - """Test file cleanup with error.""" - service = DataCollectorService() - files = [Path("/tmp/test1.json")] - - with patch.object(Path, "unlink") as mock_unlink: - mock_unlink.side_effect = OSError("Permission denied") - service._cleanup_files(files) - mock_unlink.assert_called_once() - - -def test_cleanup_tarball_success() -> None: - """Test successful tarball cleanup.""" - service = DataCollectorService() - - with patch.object(Path, "unlink") as mock_unlink: - service._cleanup_tarball(Path("/tmp/test.tar.gz")) - mock_unlink.assert_called_once() - - -def test_cleanup_tarball_with_error() -> None: - """Test tarball cleanup with error.""" - service = DataCollectorService() - - with patch.object(Path, "unlink") as mock_unlink: - mock_unlink.side_effect = OSError("Permission denied") - service._cleanup_tarball(Path("/tmp/test.tar.gz")) - mock_unlink.assert_called_once() - - -@patch("services.data_collector.configuration") -def test_cleanup_empty_directories_disabled(mock_config) -> None: - """Test directory cleanup when transcripts disabled.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = False - - service._cleanup_empty_directories() - - -@patch("services.data_collector.configuration") -def test_cleanup_empty_directories_success(mock_config) -> None: - """Test successful directory cleanup.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = True - mock_config.user_data_collection_configuration.transcripts_storage = ( - "/tmp/transcripts" - ) - - transcripts_dir = MagicMock() - user_dir = MagicMock() - conv_dir = MagicMock() - - transcripts_dir.exists.return_value = True - transcripts_dir.iterdir.return_value = [user_dir] - user_dir.is_dir.return_value = True - user_dir.iterdir.side_effect = [ - [conv_dir], - [], - ] # First call returns conv_dir, second call empty - conv_dir.is_dir.return_value = True - conv_dir.iterdir.return_value = [] # Empty directory - - with patch("services.data_collector.Path", return_value=transcripts_dir): - service._cleanup_empty_directories() - - conv_dir.rmdir.assert_called_once() - user_dir.rmdir.assert_called_once() - - -@patch("services.data_collector.configuration") -def test_cleanup_empty_directories_with_errors(mock_config) -> None: - """Test directory cleanup when rmdir operations fail.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = True - mock_config.user_data_collection_configuration.transcripts_storage = ( - "/tmp/transcripts" - ) - - transcripts_dir = MagicMock() - user_dir = MagicMock() - conv_dir = MagicMock() - - transcripts_dir.exists.return_value = True - transcripts_dir.iterdir.return_value = [user_dir] - user_dir.is_dir.return_value = True - user_dir.iterdir.side_effect = [[conv_dir], []] - conv_dir.is_dir.return_value = True - conv_dir.iterdir.return_value = [] - - # Both rmdir operations fail - conv_dir.rmdir.side_effect = OSError("Permission denied") - user_dir.rmdir.side_effect = OSError("Permission denied") - - with patch("services.data_collector.Path", return_value=transcripts_dir): - # Should not raise exception - service._cleanup_empty_directories() - - conv_dir.rmdir.assert_called_once() - user_dir.rmdir.assert_called_once() - - -@patch("services.data_collector.configuration") -def test_cleanup_empty_directories_directory_not_exists(mock_config) -> None: - """Test directory cleanup when transcripts directory doesn't exist.""" - service = DataCollectorService() - mock_config.user_data_collection_configuration.transcripts_enabled = True - mock_config.user_data_collection_configuration.transcripts_storage = ( - "/tmp/transcripts" - ) - - with patch("services.data_collector.Path") as mock_path: - mock_path.return_value.exists.return_value = False - - service._cleanup_empty_directories() - - -@patch("services.data_collector.configuration") -def test_perform_collection_with_transcript_files(mock_config) -> None: - """Test _perform_collection with transcript files only.""" - service = DataCollectorService() - - transcript_files = [Path("/tmp/transcripts/file1.json")] - - with ( - patch.object(service, "_collect_feedback_files", return_value=[]), - patch.object( - service, "_collect_transcript_files", return_value=transcript_files - ), - patch.object(service, "_create_and_send_tarball", return_value=1), - ): - service._perform_collection()