diff --git a/Makefile b/Makefile index 93d57997..9169c7b7 100644 --- a/Makefile +++ b/Makefile @@ -8,6 +8,9 @@ PYTHON_REGISTRY = pypi run: ## Run the service locally uv run src/lightspeed_stack.py +run-data-collector: ## Run the data collector service locally + uv run src/lightspeed_stack.py --data-collector + test-unit: ## Run the unit tests @echo "Running unit tests..." @echo "Reports will be written to ${ARTIFACT_DIR}" diff --git a/README.md b/README.md index 31832279..a8e253a1 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ Usage: make ... Available targets are: run Run the service locally +run-data-collector Run the data collector service test-unit Run the unit tests test-integration Run integration tests tests test-e2e Run BDD tests for the service @@ -308,3 +309,46 @@ This script re-generated OpenAPI schema for the Lightspeed Service REST API. make schema ``` +## Data Collector Service + +The data collector service is a standalone service that runs separately from the main web service. It is responsible for collecting and sending user data including feedback and transcripts to an ingress server for analysis and archival. + +### Features + +- **Periodic Collection**: Runs at configurable intervals +- **Data Packaging**: Packages feedback and transcript files into compressed tar.gz archives +- **Secure Transmission**: Sends data to a configured ingress server with optional authentication +- **File Cleanup**: Optionally removes local files after successful transmission +- **Error Handling**: Includes retry logic and comprehensive error handling + +### Configuration + +The data collector service is configured through the `user_data_collection.data_collector` section in your configuration file: + +```yaml +user_data_collection: + feedback_disabled: false + feedback_storage: "/tmp/data/feedback" + transcripts_disabled: false + transcripts_storage: "/tmp/data/transcripts" + data_collector: + enabled: true + ingress_server_url: "https://your-ingress-server.com" + ingress_server_auth_token: "your-auth-token" + ingress_content_service_name: "lightspeed-team" + collection_interval: 7200 # 2 hours in seconds + cleanup_after_send: true + connection_timeout: 30 +``` + +### Running the Service + +To run the data collector service: + +```bash +# Using Python directly +uv run src/lightspeed_stack.py --data-collector + +# Using Make target +make run-data-collector +``` \ No newline at end of file diff --git a/lightspeed-stack.yaml b/lightspeed-stack.yaml index e729b5ee..39c81436 100644 --- a/lightspeed-stack.yaml +++ b/lightspeed-stack.yaml @@ -20,5 +20,13 @@ user_data_collection: feedback_storage: "/tmp/data/feedback" transcripts_disabled: false transcripts_storage: "/tmp/data/transcripts" + data_collector: + enabled: false + ingress_server_url: null + ingress_server_auth_token: null + ingress_content_service_name: null + collection_interval: 7200 # 2 hours in seconds + cleanup_after_send: true + connection_timeout_seconds: 30 authentication: module: "noop" diff --git a/pyproject.toml b/pyproject.toml index 004b999c..3bfd84d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ dev = [ "pydocstyle>=6.3.0", "mypy>=1.16.0", "types-PyYAML>=6.0.2", + "types-requests>=2.28.0", "ruff>=0.11.13", "aiosqlite", "behave>=1.2.6", diff --git a/src/constants.py b/src/constants.py index 9407a4ee..d4c5b02a 100644 --- a/src/constants.py +++ b/src/constants.py @@ -42,3 +42,8 @@ } ) DEFAULT_AUTHENTICATION_MODULE = AUTH_MOD_NOOP + +# Data collector constants +DATA_COLLECTOR_COLLECTION_INTERVAL = 7200 # 2 hours in seconds +DATA_COLLECTOR_CONNECTION_TIMEOUT = 30 +DATA_COLLECTOR_RETRY_INTERVAL = 300 # 5 minutes in seconds diff --git a/src/lightspeed_stack.py b/src/lightspeed_stack.py index c850ea36..4479e308 100644 --- a/src/lightspeed_stack.py +++ b/src/lightspeed_stack.py @@ -10,6 +10,7 @@ from rich.logging import RichHandler from runners.uvicorn import start_uvicorn +from runners.data_collector import start_data_collector from configuration import configuration from client import LlamaStackClientHolder, AsyncLlamaStackClientHolder @@ -47,6 +48,13 @@ def create_argument_parser() -> ArgumentParser: help="path to configuration file (default: lightspeed-stack.yaml)", default="lightspeed-stack.yaml", ) + parser.add_argument( + "--data-collector", + dest="start_data_collector", + help="start data collector service instead of web service", + action="store_true", + default=False, + ) return parser @@ -70,6 +78,10 @@ def main() -> None: if args.dump_configuration: configuration.configuration.dump() + elif args.start_data_collector: + start_data_collector( + configuration.user_data_collection_configuration.data_collector + ) else: start_uvicorn(configuration.service_configuration) logger.info("Lightspeed stack finished") diff --git a/src/models/config.py b/src/models/config.py index 957afe42..8e2d36e3 100644 --- a/src/models/config.py +++ b/src/models/config.py @@ -2,7 +2,7 @@ from typing import Optional -from pydantic import BaseModel, model_validator, FilePath, AnyHttpUrl +from pydantic import BaseModel, model_validator, FilePath, AnyHttpUrl, PositiveInt from typing_extensions import Self import constants @@ -85,6 +85,31 @@ def check_llama_stack_model(self) -> Self: return self +class DataCollectorConfiguration(BaseModel): + """Data collector configuration for sending data to ingress server.""" + + enabled: bool = False + ingress_server_url: Optional[str] = None + ingress_server_auth_token: Optional[str] = None + ingress_content_service_name: Optional[str] = None + collection_interval: PositiveInt = constants.DATA_COLLECTOR_COLLECTION_INTERVAL + cleanup_after_send: bool = True # Remove local files after successful send + connection_timeout: PositiveInt = constants.DATA_COLLECTOR_CONNECTION_TIMEOUT + + @model_validator(mode="after") + def check_data_collector_configuration(self) -> Self: + """Check data collector configuration.""" + if self.enabled and self.ingress_server_url is None: + raise ValueError( + "ingress_server_url is required when data collector is enabled" + ) + if self.enabled and self.ingress_content_service_name is None: + raise ValueError( + "ingress_content_service_name is required when data collector is enabled" + ) + return self + + class UserDataCollection(BaseModel): """User data collection configuration.""" @@ -92,6 +117,7 @@ class UserDataCollection(BaseModel): feedback_storage: Optional[str] = None transcripts_disabled: bool = True transcripts_storage: Optional[str] = None + data_collector: DataCollectorConfiguration = DataCollectorConfiguration() @model_validator(mode="after") def check_storage_location_is_set_when_needed(self) -> Self: diff --git a/src/runners/data_collector.py b/src/runners/data_collector.py new file mode 100644 index 00000000..7bf05e8f --- /dev/null +++ b/src/runners/data_collector.py @@ -0,0 +1,26 @@ +"""Data collector runner.""" + +import logging + +from models.config import DataCollectorConfiguration +from services.data_collector import DataCollectorService + +logger: logging.Logger = logging.getLogger(__name__) + + +def start_data_collector(configuration: DataCollectorConfiguration) -> None: + """Start the data collector service as a standalone process.""" + logger.info("Starting data collector runner") + + if not configuration.enabled: + logger.info("Data collection is disabled") + return + + try: + service = DataCollectorService() + service.run() + except Exception as e: + logger.error( + "Data collector service encountered an exception: %s", e, exc_info=True + ) + raise diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 00000000..c7775ec9 --- /dev/null +++ b/src/services/__init__.py @@ -0,0 +1 @@ +"""Services package.""" diff --git a/src/services/data_collector.py b/src/services/data_collector.py new file mode 100644 index 00000000..75ca74a7 --- /dev/null +++ b/src/services/data_collector.py @@ -0,0 +1,258 @@ +"""Data archival service for packaging and sending feedback and transcripts.""" + +import tarfile +import tempfile +import time +from datetime import datetime, UTC +from pathlib import Path +from typing import List + +import requests +import constants +from configuration import configuration +from log import get_logger + +logger = get_logger(__name__) + + +class DataCollectorService: # pylint: disable=too-few-public-methods + """Service for collecting and sending user data to ingress server. + + This service handles the periodic collection and transmission of user data + including feedback and transcripts to the configured ingress server. + """ + + def run(self) -> None: + """Run the periodic data collection loop.""" + collector_config = ( + configuration.user_data_collection_configuration.data_collector + ) + + logger.info("Starting data collection service") + + while True: + try: + self._perform_collection() + logger.info( + "Next collection scheduled in %s seconds", + collector_config.collection_interval, + ) + if collector_config.collection_interval is not None: + time.sleep(collector_config.collection_interval) + except KeyboardInterrupt: + logger.info("Data collection service stopped by user") + break + except (OSError, requests.RequestException) as e: + logger.error("Error during collection process: %s", e, exc_info=True) + time.sleep( + constants.DATA_COLLECTOR_RETRY_INTERVAL + ) # Wait 5 minutes before retrying on error + + def _perform_collection(self) -> None: + """Perform a single collection operation.""" + logger.info("Starting data collection process") + + # Collect files to archive + feedback_files = self._collect_feedback_files() + transcript_files = self._collect_transcript_files() + + if not feedback_files and not transcript_files: + logger.info("No files to collect") + return + + logger.info( + "Found %s feedback files and %s transcript files to collect", + len(feedback_files), + len(transcript_files), + ) + + # Create and send archives + collections_sent = 0 + try: + if feedback_files: + udc_config = configuration.user_data_collection_configuration + if udc_config.feedback_storage: + feedback_base = Path(udc_config.feedback_storage) + collections_sent += self._create_and_send_tarball( + feedback_files, "feedback", feedback_base + ) + if transcript_files: + udc_config = configuration.user_data_collection_configuration + if udc_config.transcripts_storage: + transcript_base = Path(udc_config.transcripts_storage) + collections_sent += self._create_and_send_tarball( + transcript_files, "transcripts", transcript_base + ) + + logger.info( + "Successfully sent %s collections to ingress server", collections_sent + ) + except (OSError, requests.RequestException, tarfile.TarError) as e: + logger.error("Failed to create or send collections: %s", e, exc_info=True) + raise + + def _collect_feedback_files(self) -> List[Path]: + """Collect all feedback files that need to be collected.""" + udc_config = configuration.user_data_collection_configuration + + if udc_config.feedback_disabled or not udc_config.feedback_storage: + return [] + + feedback_dir = Path(udc_config.feedback_storage) + if not feedback_dir.exists(): + return [] + + return list(feedback_dir.glob("*.json")) + + def _collect_transcript_files(self) -> List[Path]: + """Collect all transcript files that need to be collected.""" + udc_config = configuration.user_data_collection_configuration + + if udc_config.transcripts_disabled or not udc_config.transcripts_storage: + return [] + + transcripts_dir = Path(udc_config.transcripts_storage) + if not transcripts_dir.exists(): + return [] + + # Recursively find all JSON files in the transcript directory structure + return list(transcripts_dir.rglob("*.json")) + + def _create_and_send_tarball( + self, files: List[Path], data_type: str, base_directory: Path + ) -> int: + """Create a single tarball from all files and send to ingress server.""" + if not files: + return 0 + + collector_config = ( + configuration.user_data_collection_configuration.data_collector + ) + + # Create one tarball with all files + tarball_path = self._create_tarball(files, data_type, base_directory) + try: + self._send_tarball(tarball_path) + if collector_config.cleanup_after_send: + self._cleanup_files(files) + self._cleanup_empty_directories() + return 1 + finally: + self._cleanup_tarball(tarball_path) + + def _create_tarball( + self, files: List[Path], data_type: str, base_directory: Path + ) -> Path: + """Create a tarball containing the specified files.""" + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") + tarball_name = f"{data_type}_{timestamp}.tar.gz" + + # Create tarball in a temporary directory + temp_dir = Path(tempfile.gettempdir()) + tarball_path = temp_dir / tarball_name + + logger.info("Creating tarball %s with %s files", tarball_path, len(files)) + + with tarfile.open(tarball_path, "w:gz") as tar: + for file_path in files: + try: + # Add file with relative path to maintain directory structure + arcname = str(file_path.relative_to(base_directory)) + tar.add(file_path, arcname=arcname) + except (OSError, ValueError) as e: + logger.warning("Failed to add %s to tarball: %s", file_path, e) + + logger.info( + "Created tarball %s (%s bytes)", tarball_path, tarball_path.stat().st_size + ) + return tarball_path + + def _send_tarball(self, tarball_path: Path) -> None: + """Send the tarball to the ingress server.""" + collector_config = ( + configuration.user_data_collection_configuration.data_collector + ) + + if collector_config.ingress_server_url is None: + raise ValueError("Ingress server URL is not configured") + + # pylint: disable=line-too-long + headers = { + "Content-Type": f"application/vnd.redhat.{collector_config.ingress_content_service_name}.periodic+tar", + } + + if collector_config.ingress_server_auth_token: + headers["Authorization"] = ( + f"Bearer {collector_config.ingress_server_auth_token}" + ) + + with open(tarball_path, "rb") as f: + data = f.read() + + logger.info( + "Sending tarball %s to %s", + tarball_path.name, + collector_config.ingress_server_url, + ) + + response = requests.post( + collector_config.ingress_server_url, + data=data, + headers=headers, + timeout=collector_config.connection_timeout, + ) + + if response.status_code >= 400: + raise requests.HTTPError( + f"Failed to send tarball to ingress server. " + f"Status: {response.status_code}, Response: {response.text}" + ) + + logger.info("Successfully sent tarball %s to ingress server", tarball_path.name) + + def _cleanup_files(self, files: List[Path]) -> None: + """Remove files after successful transmission.""" + for file_path in files: + try: + file_path.unlink() + logger.debug("Removed file %s", file_path) + except OSError as e: + logger.warning("Failed to remove file %s: %s", file_path, e) + + def _cleanup_empty_directories(self) -> None: + """Remove empty directories from transcript storage.""" + udc_config = configuration.user_data_collection_configuration + + if udc_config.transcripts_disabled or not udc_config.transcripts_storage: + return + + transcripts_dir = Path(udc_config.transcripts_storage) + if not transcripts_dir.exists(): + return + + # Remove empty directories (conversation and user directories) + for user_dir in transcripts_dir.iterdir(): + if user_dir.is_dir(): + for conv_dir in user_dir.iterdir(): + if conv_dir.is_dir() and not any(conv_dir.iterdir()): + try: + conv_dir.rmdir() + logger.debug("Removed empty directory %s", conv_dir) + except OSError: + pass + + # Remove user directory if empty + if not any(user_dir.iterdir()): + try: + user_dir.rmdir() + logger.debug("Removed empty directory %s", user_dir) + except OSError: + pass + + def _cleanup_tarball(self, tarball_path: Path) -> None: + """Remove the temporary tarball file.""" + try: + tarball_path.unlink() + logger.debug("Removed temporary tarball %s", tarball_path) + except OSError as e: + logger.warning("Failed to remove temporary tarball %s: %s", tarball_path, e) diff --git a/tests/unit/models/test_config.py b/tests/unit/models/test_config.py index 254ae783..950dbc38 100644 --- a/tests/unit/models/test_config.py +++ b/tests/unit/models/test_config.py @@ -5,7 +5,13 @@ from pathlib import Path -from constants import AUTH_MOD_NOOP, AUTH_MOD_K8S +from constants import ( + AUTH_MOD_NOOP, + AUTH_MOD_K8S, + DATA_COLLECTOR_COLLECTION_INTERVAL, + DATA_COLLECTOR_CONNECTION_TIMEOUT, +) + from models.config import ( Configuration, LLamaStackConfiguration, @@ -13,6 +19,7 @@ UserDataCollection, TLSConfiguration, ModelContextProtocolServer, + DataCollectorConfiguration, ) @@ -129,6 +136,53 @@ def test_user_data_collection_transcripts_disabled() -> None: UserDataCollection(transcripts_disabled=False, transcripts_storage=None) +def test_user_data_collection_data_collector_enabled() -> None: + """Test the UserDataCollection constructor for data collector.""" + # correct configuration + cfg = UserDataCollection( + data_collector=DataCollectorConfiguration( + enabled=True, + ingress_server_url="http://localhost:8080", + ingress_server_auth_token="xyzzy", + ingress_content_service_name="lightspeed-core", + collection_interval=60, + ) + ) + assert cfg is not None + assert cfg.data_collector.enabled is True + + +def test_user_data_collection_data_collector_wrong_configuration() -> None: + """Test the UserDataCollection constructor for data collector.""" + # incorrect configuration + with pytest.raises( + ValueError, + match="ingress_server_url is required when data collector is enabled", + ): + UserDataCollection( + data_collector=DataCollectorConfiguration( + enabled=True, + ingress_server_url=None, + ingress_server_auth_token="xyzzy", + ingress_content_service_name="lightspeed-core", + collection_interval=60, + ) + ) + with pytest.raises( + ValueError, + match="ingress_content_service_name is required when data collector is enabled", + ): + UserDataCollection( + data_collector=DataCollectorConfiguration( + enabled=True, + ingress_server_url="http://localhost:8080", + ingress_server_auth_token="xyzzy", + ingress_content_service_name=None, + collection_interval=60, + ) + ) + + def test_tls_configuration() -> None: """Test the TLS configuration.""" cfg = TLSConfiguration( @@ -366,6 +420,15 @@ def test_dump_configuration(tmp_path) -> None: "feedback_storage": None, "transcripts_disabled": True, "transcripts_storage": None, + "data_collector": { + "enabled": False, + "ingress_server_url": None, + "ingress_server_auth_token": None, + "ingress_content_service_name": None, + "collection_interval": DATA_COLLECTOR_COLLECTION_INTERVAL, + "cleanup_after_send": True, + "connection_timeout": DATA_COLLECTOR_CONNECTION_TIMEOUT, + }, }, "mcp_servers": [], "authentication": { @@ -434,6 +497,15 @@ def test_dump_configuration_with_one_mcp_server(tmp_path) -> None: "feedback_storage": None, "transcripts_disabled": True, "transcripts_storage": None, + "data_collector": { + "enabled": False, + "ingress_server_url": None, + "ingress_server_auth_token": None, + "ingress_content_service_name": None, + "collection_interval": DATA_COLLECTOR_COLLECTION_INTERVAL, + "cleanup_after_send": True, + "connection_timeout": DATA_COLLECTOR_CONNECTION_TIMEOUT, + }, }, "mcp_servers": [ { @@ -516,6 +588,15 @@ def test_dump_configuration_with_more_mcp_servers(tmp_path) -> None: "feedback_storage": None, "transcripts_disabled": True, "transcripts_storage": None, + "data_collector": { + "enabled": False, + "ingress_server_url": None, + "ingress_server_auth_token": None, + "ingress_content_service_name": None, + "collection_interval": DATA_COLLECTOR_COLLECTION_INTERVAL, + "cleanup_after_send": True, + "connection_timeout": DATA_COLLECTOR_CONNECTION_TIMEOUT, + }, }, "mcp_servers": [ { diff --git a/tests/unit/runners/test_data_collector_runner.py b/tests/unit/runners/test_data_collector_runner.py new file mode 100644 index 00000000..5fb623a5 --- /dev/null +++ b/tests/unit/runners/test_data_collector_runner.py @@ -0,0 +1,60 @@ +"""Unit tests for runners.""" + +from unittest.mock import patch + +from models.config import DataCollectorConfiguration +from runners.data_collector import start_data_collector + + +def test_start_data_collector() -> None: + """Test the function to start data collector service.""" + configuration = DataCollectorConfiguration( + enabled=True, + ingress_server_url="http://localhost:8080", + ingress_server_auth_token="xyzzy", + ingress_content_service_name="lightspeed-core", + collection_interval=60, + ) + + # don't start real data collector service + with patch("services.data_collector.DataCollectorService.run") as mocked_run: + start_data_collector(configuration) + mocked_run.assert_called_once() + + +def test_start_data_collector_disabled() -> None: + """Test the function to start data collector service.""" + configuration = DataCollectorConfiguration( + enabled=False, + ingress_server_url="http://localhost:8080", + ingress_server_auth_token="xyzzy", + ingress_content_service_name="lightspeed-core", + collection_interval=60, + ) + + # don't start real data collector service + with patch("services.data_collector.DataCollectorService.run") as mocked_run: + start_data_collector(configuration) + mocked_run.assert_not_called() + + +def test_start_data_collector_exception() -> None: + """Test the function to start data collector service when an exception occurs.""" + configuration = DataCollectorConfiguration( + enabled=True, + ingress_server_url="http://localhost:8080", + ingress_server_auth_token="xyzzy", + ingress_content_service_name="lightspeed-core", + collection_interval=60, + ) + + # Mock the DataCollectorService to raise an exception + with patch("services.data_collector.DataCollectorService.run") as mocked_run: + mocked_run.side_effect = Exception("Test exception") + + try: + start_data_collector(configuration) + assert False, "Expected exception to be raised" + except Exception as e: + assert str(e) == "Test exception" + mocked_run.assert_called_once() diff --git a/tests/unit/services/test_data_collector.py b/tests/unit/services/test_data_collector.py new file mode 100644 index 00000000..03448c4a --- /dev/null +++ b/tests/unit/services/test_data_collector.py @@ -0,0 +1,587 @@ +"""Unit tests for data collector service.""" + +from pathlib import Path +from unittest.mock import patch, MagicMock +import requests +import tarfile + +from services.data_collector import DataCollectorService + + +def test_data_collector_service_creation() -> None: + """Test that DataCollectorService can be created.""" + service = DataCollectorService() + assert service is not None + + +@patch("services.data_collector.time.sleep") +@patch("services.data_collector.configuration") +def test_run_normal_operation(mock_config, mock_sleep) -> None: + """Test normal operation of the run method.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.data_collector.collection_interval = ( + 60 + ) + + with patch.object(service, "_perform_collection") as mock_perform: + mock_perform.side_effect = [None, KeyboardInterrupt()] + + service.run() + + assert mock_perform.call_count == 2 + mock_sleep.assert_called_once_with(60) + + +@patch("services.data_collector.time.sleep") +@patch("services.data_collector.configuration") +def test_run_with_exception(mock_config, mock_sleep) -> None: + """Test run method with exception handling.""" + service = DataCollectorService() + + with patch.object(service, "_perform_collection") as mock_perform: + mock_perform.side_effect = [OSError("Test error"), KeyboardInterrupt()] + + service.run() + + assert mock_perform.call_count == 2 + mock_sleep.assert_called_once_with(300) + + +@patch("services.data_collector.configuration") +def test_collect_feedback_files_disabled(mock_config) -> None: + """Test collecting feedback files when disabled.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.feedback_disabled = True + + result = service._collect_feedback_files() + assert result == [] + + +@patch("services.data_collector.configuration") +def test_collect_feedback_files_no_storage(mock_config) -> None: + """Test collecting feedback files when no storage configured.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.feedback_disabled = False + mock_config.user_data_collection_configuration.feedback_storage = None + + result = service._collect_feedback_files() + assert result == [] + + +@patch("services.data_collector.configuration") +def test_collect_feedback_files_directory_not_exists(mock_config) -> None: + """Test collecting feedback files when directory doesn't exist.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.feedback_disabled = False + mock_config.user_data_collection_configuration.feedback_storage = "/tmp/feedback" + + with patch("services.data_collector.Path") as mock_path: + mock_path.return_value.exists.return_value = False + + result = service._collect_feedback_files() + assert result == [] + + +@patch("services.data_collector.configuration") +def test_collect_feedback_files_success(mock_config) -> None: + """Test collecting feedback files successfully.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.feedback_disabled = False + mock_config.user_data_collection_configuration.feedback_storage = "/tmp/feedback" + + mock_files = [Path("/tmp/feedback/file1.json")] + + with patch("services.data_collector.Path") as mock_path: + mock_path.return_value.exists.return_value = True + mock_path.return_value.glob.return_value = mock_files + + result = service._collect_feedback_files() + assert result == mock_files + + +@patch("services.data_collector.configuration") +def test_collect_transcript_files_disabled(mock_config) -> None: + """Test collecting transcript files when disabled.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = True + + result = service._collect_transcript_files() + assert result == [] + + +@patch("services.data_collector.configuration") +def test_collect_transcript_files_directory_not_exists(mock_config) -> None: + """Test collecting transcript files when directory doesn't exist.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = False + mock_config.user_data_collection_configuration.transcripts_storage = ( + "/tmp/transcripts" + ) + + with patch("services.data_collector.Path") as mock_path: + mock_path.return_value.exists.return_value = False + + result = service._collect_transcript_files() + assert result == [] + + +@patch("services.data_collector.configuration") +def test_collect_transcript_files_success(mock_config) -> None: + """Test collecting transcript files successfully.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = False + mock_config.user_data_collection_configuration.transcripts_storage = ( + "/tmp/transcripts" + ) + + mock_files = [Path("/tmp/transcripts/user1/conv1/file1.json")] + + with patch("services.data_collector.Path") as mock_path: + mock_path.return_value.exists.return_value = True + mock_path.return_value.rglob.return_value = mock_files + + result = service._collect_transcript_files() + assert result == mock_files + + +@patch("services.data_collector.configuration") +def test_perform_collection_no_files(mock_config) -> None: + """Test _perform_collection when no files are found.""" + service = DataCollectorService() + + with ( + patch.object(service, "_collect_feedback_files", return_value=[]), + patch.object(service, "_collect_transcript_files", return_value=[]), + ): + service._perform_collection() + + +@patch("services.data_collector.configuration") +def test_perform_collection_with_files(mock_config) -> None: + """Test _perform_collection when files are found.""" + service = DataCollectorService() + + feedback_files = [Path("/tmp/feedback/file1.json")] + + with ( + patch.object(service, "_collect_feedback_files", return_value=feedback_files), + patch.object(service, "_collect_transcript_files", return_value=[]), + patch.object(service, "_create_and_send_tarball", return_value=1), + ): + service._perform_collection() + + +@patch("services.data_collector.configuration") +def test_perform_collection_with_exception(mock_config) -> None: + """Test _perform_collection when an exception occurs.""" + service = DataCollectorService() + + with ( + patch.object( + service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] + ), + patch.object(service, "_collect_transcript_files", return_value=[]), + patch.object( + service, "_create_and_send_tarball", side_effect=Exception("Test error") + ), + ): + try: + service._perform_collection() + assert False, "Expected exception" + except Exception as e: + assert str(e) == "Test error" + + +@patch("services.data_collector.configuration") +def test_create_and_send_tarball_no_files(mock_config) -> None: + """Test creating tarball with no files.""" + service = DataCollectorService() + + result = service._create_and_send_tarball([], "test", Path("/tmp")) + assert result == 0 + + +@patch("services.data_collector.configuration") +def test_create_and_send_tarball_success(mock_config) -> None: + """Test creating and sending tarball successfully.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.data_collector.cleanup_after_send = ( + True + ) + + files = [Path("/tmp/test/file1.json")] + tarball_path = Path("/tmp/test_tarball.tar.gz") + + with ( + patch.object(service, "_create_tarball", return_value=tarball_path), + patch.object(service, "_send_tarball"), + patch.object(service, "_cleanup_files"), + patch.object(service, "_cleanup_empty_directories"), + patch.object(service, "_cleanup_tarball"), + ): + result = service._create_and_send_tarball(files, "test", Path("/tmp")) + assert result == 1 + + +@patch("services.data_collector.configuration") +def test_create_and_send_tarball_no_cleanup(mock_config) -> None: + """Test creating and sending tarball without cleanup.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.data_collector.cleanup_after_send = ( + False + ) + + files = [Path("/tmp/test/file1.json")] + + with ( + patch.object(service, "_create_tarball", return_value=Path("/tmp/test.tar.gz")), + patch.object(service, "_send_tarball"), + patch.object(service, "_cleanup_tarball"), + ): + result = service._create_and_send_tarball(files, "test", Path("/tmp")) + assert result == 1 + + +@patch("services.data_collector.datetime") +@patch("services.data_collector.tempfile.gettempdir") +@patch("services.data_collector.tarfile.open") +def test_create_tarball_success(mock_tarfile, mock_gettempdir, mock_datetime) -> None: + """Test creating tarball successfully.""" + service = DataCollectorService() + mock_datetime.now.return_value.strftime.return_value = "20230101_120000" + mock_gettempdir.return_value = "/tmp" + + mock_tar = MagicMock() + mock_tarfile.return_value.__enter__.return_value = mock_tar + + files = [Path("/data/test/file1.json")] + + with patch.object(Path, "stat") as mock_stat: + mock_stat.return_value.st_size = 1024 + + result = service._create_tarball(files, "test", Path("/data")) + + expected_path = Path("/tmp/test_20230101_120000.tar.gz") + assert result == expected_path + mock_tar.add.assert_called_once() + + +@patch("services.data_collector.datetime") +@patch("services.data_collector.tempfile.gettempdir") +@patch("services.data_collector.tarfile.open") +def test_create_tarball_file_add_error( + mock_tarfile, mock_gettempdir, mock_datetime +) -> None: + """Test creating tarball with file add error.""" + service = DataCollectorService() + mock_datetime.now.return_value.strftime.return_value = "20230101_120000" + mock_gettempdir.return_value = "/tmp" + + mock_tar = MagicMock() + mock_tar.add.side_effect = OSError("File error") + mock_tarfile.return_value.__enter__.return_value = mock_tar + + files = [Path("/data/test/file1.json")] + + with patch.object(Path, "stat") as mock_stat: + mock_stat.return_value.st_size = 1024 + + result = service._create_tarball(files, "test", Path("/data")) + + expected_path = Path("/tmp/test_20230101_120000.tar.gz") + assert result == expected_path + + +@patch("services.data_collector.configuration") +@patch("services.data_collector.requests.post") +def test_send_tarball_success(mock_post, mock_config) -> None: + """Test successful tarball sending.""" + service = DataCollectorService() + + mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( + "http://test.com" + ) + mock_config.user_data_collection_configuration.data_collector.ingress_server_auth_token = ( + "token" + ) + mock_config.user_data_collection_configuration.data_collector.connection_timeout = ( + 30 + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + with patch("builtins.open", create=True) as mock_open: + mock_open.return_value.__enter__.return_value.read.return_value = b"test data" + service._send_tarball(Path("/tmp/test.tar.gz")) + + mock_post.assert_called_once() + + +@patch("services.data_collector.configuration") +@patch("services.data_collector.requests.post") +def test_send_tarball_no_auth_token(mock_post, mock_config) -> None: + """Test sending tarball without auth token.""" + service = DataCollectorService() + + mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( + "http://test.com" + ) + mock_config.user_data_collection_configuration.data_collector.ingress_server_auth_token = ( + None + ) + mock_config.user_data_collection_configuration.data_collector.connection_timeout = ( + 30 + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_post.return_value = mock_response + + with patch("builtins.open", create=True): + service._send_tarball(Path("/tmp/test.tar.gz")) + mock_post.assert_called_once() + + +@patch("services.data_collector.configuration") +@patch("services.data_collector.requests.post") +def test_send_tarball_http_error(mock_post, mock_config) -> None: + """Test tarball sending with HTTP error.""" + service = DataCollectorService() + + mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( + "http://test.com" + ) + mock_config.user_data_collection_configuration.data_collector.connection_timeout = ( + 30 + ) + + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Server Error" + mock_post.return_value = mock_response + + with patch("builtins.open", create=True): + try: + service._send_tarball(Path("/tmp/test.tar.gz")) + assert False, "Expected exception" + except Exception as e: + assert "Failed to send tarball" in str(e) + + +@patch("services.data_collector.configuration") +def test_send_tarball_missing_url(mock_config) -> None: + """Test tarball sending when ingress server URL is None.""" + service = DataCollectorService() + + mock_config.user_data_collection_configuration.data_collector.ingress_server_url = ( + None + ) + + try: + service._send_tarball(Path("/tmp/test.tar.gz")) + assert False, "Expected ValueError" + except ValueError as e: + assert "Ingress server URL is not configured" in str(e) + + +@patch("services.data_collector.configuration") +def test_perform_collection_with_specific_exceptions(mock_config) -> None: + """Test _perform_collection with specific exception types that should be caught.""" + service = DataCollectorService() + + # Test with OSError + with ( + patch.object( + service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] + ), + patch.object(service, "_collect_transcript_files", return_value=[]), + patch.object( + service, "_create_and_send_tarball", side_effect=OSError("OS Error") + ), + ): + try: + service._perform_collection() + assert False, "Expected OSError" + except OSError as e: + assert str(e) == "OS Error" + + # Test with requests.RequestException + with ( + patch.object( + service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] + ), + patch.object(service, "_collect_transcript_files", return_value=[]), + patch.object( + service, + "_create_and_send_tarball", + side_effect=requests.RequestException("Request Error"), + ), + ): + try: + service._perform_collection() + assert False, "Expected RequestException" + except requests.RequestException as e: + assert str(e) == "Request Error" + + # Test with tarfile.TarError + with ( + patch.object( + service, "_collect_feedback_files", return_value=[Path("/tmp/test.json")] + ), + patch.object(service, "_collect_transcript_files", return_value=[]), + patch.object( + service, + "_create_and_send_tarball", + side_effect=tarfile.TarError("Tar Error"), + ), + ): + try: + service._perform_collection() + assert False, "Expected TarError" + except tarfile.TarError as e: + assert str(e) == "Tar Error" + + +def test_cleanup_files_success() -> None: + """Test successful file cleanup.""" + service = DataCollectorService() + files = [Path("/tmp/test1.json"), Path("/tmp/test2.json")] + + with patch.object(Path, "unlink") as mock_unlink: + service._cleanup_files(files) + assert mock_unlink.call_count == 2 + + +def test_cleanup_files_with_error() -> None: + """Test file cleanup with error.""" + service = DataCollectorService() + files = [Path("/tmp/test1.json")] + + with patch.object(Path, "unlink") as mock_unlink: + mock_unlink.side_effect = OSError("Permission denied") + service._cleanup_files(files) + mock_unlink.assert_called_once() + + +def test_cleanup_tarball_success() -> None: + """Test successful tarball cleanup.""" + service = DataCollectorService() + + with patch.object(Path, "unlink") as mock_unlink: + service._cleanup_tarball(Path("/tmp/test.tar.gz")) + mock_unlink.assert_called_once() + + +def test_cleanup_tarball_with_error() -> None: + """Test tarball cleanup with error.""" + service = DataCollectorService() + + with patch.object(Path, "unlink") as mock_unlink: + mock_unlink.side_effect = OSError("Permission denied") + service._cleanup_tarball(Path("/tmp/test.tar.gz")) + mock_unlink.assert_called_once() + + +@patch("services.data_collector.configuration") +def test_cleanup_empty_directories_disabled(mock_config) -> None: + """Test directory cleanup when transcripts disabled.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = True + + service._cleanup_empty_directories() + + +@patch("services.data_collector.configuration") +def test_cleanup_empty_directories_success(mock_config) -> None: + """Test successful directory cleanup.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = False + mock_config.user_data_collection_configuration.transcripts_storage = ( + "/tmp/transcripts" + ) + + transcripts_dir = MagicMock() + user_dir = MagicMock() + conv_dir = MagicMock() + + transcripts_dir.exists.return_value = True + transcripts_dir.iterdir.return_value = [user_dir] + user_dir.is_dir.return_value = True + user_dir.iterdir.side_effect = [ + [conv_dir], + [], + ] # First call returns conv_dir, second call empty + conv_dir.is_dir.return_value = True + conv_dir.iterdir.return_value = [] # Empty directory + + with patch("services.data_collector.Path", return_value=transcripts_dir): + service._cleanup_empty_directories() + + conv_dir.rmdir.assert_called_once() + user_dir.rmdir.assert_called_once() + + +@patch("services.data_collector.configuration") +def test_cleanup_empty_directories_with_errors(mock_config) -> None: + """Test directory cleanup when rmdir operations fail.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = False + mock_config.user_data_collection_configuration.transcripts_storage = ( + "/tmp/transcripts" + ) + + transcripts_dir = MagicMock() + user_dir = MagicMock() + conv_dir = MagicMock() + + transcripts_dir.exists.return_value = True + transcripts_dir.iterdir.return_value = [user_dir] + user_dir.is_dir.return_value = True + user_dir.iterdir.side_effect = [[conv_dir], []] + conv_dir.is_dir.return_value = True + conv_dir.iterdir.return_value = [] + + # Both rmdir operations fail + conv_dir.rmdir.side_effect = OSError("Permission denied") + user_dir.rmdir.side_effect = OSError("Permission denied") + + with patch("services.data_collector.Path", return_value=transcripts_dir): + # Should not raise exception + service._cleanup_empty_directories() + + conv_dir.rmdir.assert_called_once() + user_dir.rmdir.assert_called_once() + + +@patch("services.data_collector.configuration") +def test_cleanup_empty_directories_directory_not_exists(mock_config) -> None: + """Test directory cleanup when transcripts directory doesn't exist.""" + service = DataCollectorService() + mock_config.user_data_collection_configuration.transcripts_disabled = False + mock_config.user_data_collection_configuration.transcripts_storage = ( + "/tmp/transcripts" + ) + + with patch("services.data_collector.Path") as mock_path: + mock_path.return_value.exists.return_value = False + + service._cleanup_empty_directories() + + +@patch("services.data_collector.configuration") +def test_perform_collection_with_transcript_files(mock_config) -> None: + """Test _perform_collection with transcript files only.""" + service = DataCollectorService() + + transcript_files = [Path("/tmp/transcripts/file1.json")] + + with ( + patch.object(service, "_collect_feedback_files", return_value=[]), + patch.object( + service, "_collect_transcript_files", return_value=transcript_files + ), + patch.object(service, "_create_and_send_tarball", return_value=1), + ): + service._perform_collection() diff --git a/uv.lock b/uv.lock index f5143115..c57c7f4e 100644 --- a/uv.lock +++ b/uv.lock @@ -112,11 +112,11 @@ wheels = [ [[package]] name = "astroid" -version = "3.3.10" +version = "3.3.11" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/00/c2/9b2de9ed027f9fe5734a6c0c0a601289d796b3caaf1e372e23fa88a73047/astroid-3.3.10.tar.gz", hash = "sha256:c332157953060c6deb9caa57303ae0d20b0fbdb2e59b4a4f2a6ba49d0a7961ce", size = 398941, upload-time = "2025-05-10T13:33:10.405Z" } +sdist = { url = "https://files.pythonhosted.org/packages/18/74/dfb75f9ccd592bbedb175d4a32fc643cf569d7c218508bfbd6ea7ef9c091/astroid-3.3.11.tar.gz", hash = "sha256:1e5a5011af2920c7c67a53f65d536d65bfa7116feeaf2354d8b94f29573bb0ce", size = 400439, upload-time = "2025-07-13T18:04:23.177Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/15/58/5260205b9968c20b6457ed82f48f9e3d6edf2f1f95103161798b73aeccf0/astroid-3.3.10-py3-none-any.whl", hash = "sha256:104fb9cb9b27ea95e847a94c003be03a9e039334a8ebca5ee27dafaf5c5711eb", size = 275388, upload-time = "2025-05-10T13:33:08.391Z" }, + { url = "https://files.pythonhosted.org/packages/af/0f/3b8fdc946b4d9cc8cc1e8af42c4e409468c84441b933d037e101b3d72d86/astroid-3.3.11-py3-none-any.whl", hash = "sha256:54c760ae8322ece1abd213057c4b5bba7c49818853fc901ef09719a60dbf9dec", size = 275612, upload-time = "2025-07-13T18:04:21.07Z" }, ] [[package]] @@ -215,11 +215,11 @@ wheels = [ [[package]] name = "certifi" -version = "2025.7.9" +version = "2025.7.14" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/de/8a/c729b6b60c66a38f590c4e774decc4b2ec7b0576be8f1aa984a53ffa812a/certifi-2025.7.9.tar.gz", hash = "sha256:c1d2ec05395148ee10cf672ffc28cd37ea0ab0d99f9cc74c43e588cbd111b079", size = 160386, upload-time = "2025-07-09T02:13:58.874Z" } +sdist = { url = "https://files.pythonhosted.org/packages/b3/76/52c535bcebe74590f296d6c77c86dabf761c41980e1347a2422e4aa2ae41/certifi-2025.7.14.tar.gz", hash = "sha256:8ea99dbdfaaf2ba2f9bac77b9249ef62ec5218e7c2b2e903378ed5fccf765995", size = 163981, upload-time = "2025-07-14T03:29:28.449Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/66/f3/80a3f974c8b535d394ff960a11ac20368e06b736da395b551a49ce950cce/certifi-2025.7.9-py3-none-any.whl", hash = "sha256:d842783a14f8fdd646895ac26f719a061408834473cfc10203f6a575beb15d39", size = 159230, upload-time = "2025-07-09T02:13:57.007Z" }, + { url = "https://files.pythonhosted.org/packages/4f/52/34c6cf5bb9285074dc3531c437b3919e825d976fde097a7a73f79e726d03/certifi-2025.7.14-py3-none-any.whl", hash = "sha256:6b31f564a415d79ee77df69d757bb49a5bb53bd9f756cbbe24394ffd6fc1f4b2", size = 162722, upload-time = "2025-07-14T03:29:26.863Z" }, ] [[package]] @@ -858,6 +858,7 @@ dev = [ { name = "pytest-mock" }, { name = "ruff" }, { name = "types-pyyaml" }, + { name = "types-requests" }, ] [package.metadata] @@ -889,6 +890,7 @@ dev = [ { name = "pytest-mock", specifier = ">=3.14.0" }, { name = "ruff", specifier = ">=0.11.13" }, { name = "types-pyyaml", specifier = ">=6.0.2" }, + { name = "types-requests", specifier = ">=2.28.0" }, ] [[package]] @@ -2190,6 +2192,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/99/5f/e0af6f7f6a260d9af67e1db4f54d732abad514252a7a378a6c4d17dd1036/types_pyyaml-6.0.12.20250516-py3-none-any.whl", hash = "sha256:8478208feaeb53a34cb5d970c56a7cd76b72659442e733e268a94dc72b2d0530", size = 20312, upload-time = "2025-05-16T03:08:04.019Z" }, ] +[[package]] +name = "types-requests" +version = "2.32.4.20250611" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6d/7f/73b3a04a53b0fd2a911d4ec517940ecd6600630b559e4505cc7b68beb5a0/types_requests-2.32.4.20250611.tar.gz", hash = "sha256:741c8777ed6425830bf51e54d6abe245f79b4dcb9019f1622b773463946bf826", size = 23118, upload-time = "2025-06-11T03:11:41.272Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/ea/0be9258c5a4fa1ba2300111aa5a0767ee6d18eb3fd20e91616c12082284d/types_requests-2.32.4.20250611-py3-none-any.whl", hash = "sha256:ad2fe5d3b0cb3c2c902c8815a70e7fb2302c4b8c1f77bdcd738192cdb3878072", size = 20643, upload-time = "2025-06-11T03:11:40.186Z" }, +] + [[package]] name = "typing-extensions" version = "4.14.1"