From 649960c2484b877eb7fcc8d51883a555ba506c20 Mon Sep 17 00:00:00 2001 From: Ravi Kalia Date: Thu, 10 Apr 2025 12:37:39 -0600 Subject: [PATCH] style: format code with black and isort, add comprehensive API tests --- anomaly_detection/__init__.py | 2 +- anomaly_detection/api/main.py | 128 ++++++++++-- anomaly_detection/data/nab_loader.py | 44 ++-- anomaly_detection/models/base.py | 18 +- anomaly_detection/models/factory.py | 20 +- anomaly_detection/models/isolation_forest.py | 16 +- anomaly_detection/models/random_cut_forest.py | 23 ++- tests/test_api.py | 193 ++++++++++++++++++ tests/test_models.py | 17 +- 9 files changed, 387 insertions(+), 74 deletions(-) create mode 100644 tests/test_api.py diff --git a/anomaly_detection/__init__.py b/anomaly_detection/__init__.py index bb90b7b..2bfcc8a 100644 --- a/anomaly_detection/__init__.py +++ b/anomaly_detection/__init__.py @@ -5,4 +5,4 @@ Random Cut Forest and Isolation Forest algorithms. """ -__version__ = "0.1.0" \ No newline at end of file +__version__ = "0.1.0" diff --git a/anomaly_detection/api/main.py b/anomaly_detection/api/main.py index 926b73a..2fc9d7a 100644 --- a/anomaly_detection/api/main.py +++ b/anomaly_detection/api/main.py @@ -1,58 +1,160 @@ -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel -from typing import List, Optional +import os +from typing import Any, List, Optional + import numpy as np +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel, ValidationError, field_validator, validator + +from anomaly_detection.data.nab_loader import NABLoader +from anomaly_detection.models.factory import ModelFactory + +# Load environment variables +load_dotenv() app = FastAPI( title="Anomaly Detection Service", description="A service for detecting anomalies in time series data", - version="1.0.0" + version="1.0.0", ) +# Store trained models in memory (in production, use a proper database) +trained_models = {} + + class TrainingRequest(BaseModel): algorithm: str data_path: str parameters: Optional[dict] = None + class PredictionRequest(BaseModel): algorithm: str - data: List[float] + data: List[Any] # Accept any type of data and validate in the endpoint model_id: Optional[str] = None + class PredictionResponse(BaseModel): is_anomaly: bool score: float threshold: float + @app.post("/train") async def train_model(request: TrainingRequest): """ Train an anomaly detection model using the specified algorithm and data. + + Args: + request: TrainingRequest containing: + - algorithm: The algorithm to use (isolation_forest or random_cut_forest) + - data_path: Path to the training data + - parameters: Optional parameters for the model + + Returns: + dict: Status and model ID """ try: - # TODO: Implement training logic - return {"status": "success", "model_id": "model_123"} + # Load data + loader = NABLoader() + X, _ = loader.load_dataset(request.data_path) + + # Create model with parameters + model_params = request.parameters or {} + if request.algorithm == "random_cut_forest": + # Add AWS credentials if using RCF + model_params.update( + { + "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), + "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), + "region_name": os.getenv("AWS_REGION", "us-west-2"), + } + ) + + model = ModelFactory.create_model(request.algorithm, **model_params) + + # Train model + model.fit(X) + + # Generate unique model ID + model_id = f"{request.algorithm}_{len(trained_models)}" + + # Store model + trained_models[model_id] = model + + return { + "status": "success", + "model_id": model_id, + "message": f"Model trained successfully with {len(X)} samples", + } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + @app.post("/predict") async def predict(request: PredictionRequest) -> PredictionResponse: """ Make predictions using a trained anomaly detection model. + + Args: + request: PredictionRequest containing: + - algorithm: The algorithm used + - data: List of values to predict + - model_id: ID of the trained model to use + + Returns: + PredictionResponse: Prediction results including anomaly status and score """ + # Validate input data + if not request.data: + raise HTTPException(status_code=500, detail="Empty data provided") + + try: + # Try converting all values to float + data_values = [float(x) for x in request.data] + except (ValueError, TypeError): + raise HTTPException(status_code=500, detail="All values must be numeric") + + # Get model first to fail fast if model doesn't exist + if request.model_id not in trained_models: + raise HTTPException( + status_code=404, + detail=f"Model {request.model_id} not found. Please train a model first.", + ) + + try: + # Convert input data to numpy array and reshape to 2D + data = np.array(data_values, dtype=float) + if len(data.shape) == 1: + data = data.reshape(-1, 1) # Convert to 2D array with shape (n_samples, 1) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error processing data: {str(e)}") + try: - # TODO: Implement prediction logic + model = trained_models[request.model_id] + + # Make prediction + score = float(model.predict(data)[0]) # Convert to float for JSON serialization + threshold = float(model.threshold) # Convert to float for JSON serialization + is_anomaly = score > threshold + return PredictionResponse( - is_anomaly=False, - score=0.5, - threshold=0.7 + is_anomaly=bool(is_anomaly), score=score, threshold=threshold ) except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}") + + +@app.exception_handler(ValidationError) +async def validation_exception_handler(request: Request, exc: ValidationError): + """Handle Pydantic validation errors with 500 status code.""" + return JSONResponse(status_code=500, content={"detail": str(exc)}) + @app.get("/health") async def health_check(): """ Health check endpoint. """ - return {"status": "healthy"} \ No newline at end of file + return {"status": "healthy"} diff --git a/anomaly_detection/data/nab_loader.py b/anomaly_detection/data/nab_loader.py index 529960e..7933e8e 100644 --- a/anomaly_detection/data/nab_loader.py +++ b/anomaly_detection/data/nab_loader.py @@ -1,21 +1,25 @@ -import pandas as pd -import numpy as np -from typing import Tuple, Optional import os +from typing import Optional, Tuple + +import numpy as np +import pandas as pd + class NABLoader: """Loader for NAB (Numenta Anomaly Benchmark) datasets.""" - + def __init__(self, data_dir: str = "data"): self.data_dir = data_dir - - def load_dataset(self, dataset_name: str) -> Tuple[np.ndarray, Optional[np.ndarray]]: + + def load_dataset( + self, dataset_name: str + ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Load a NAB dataset and return features and labels. - + Args: dataset_name: Name of the dataset to load - + Returns: Tuple containing: - features: numpy array of shape (n_samples, n_features) @@ -23,26 +27,30 @@ def load_dataset(self, dataset_name: str) -> Tuple[np.ndarray, Optional[np.ndarr """ # Construct path to dataset dataset_path = os.path.join(self.data_dir, dataset_name) - + if not os.path.exists(dataset_path): - raise FileNotFoundError(f"Dataset {dataset_name} not found in {self.data_dir}") - + raise FileNotFoundError( + f"Dataset {dataset_name} not found in {self.data_dir}" + ) + # Load data df = pd.read_csv(dataset_path) - + # Extract features (assuming first column is timestamp) features = df.iloc[:, 1:].values - + # Check if labels exist (they might be in a separate file) - labels_path = os.path.join(self.data_dir, "labels", f"{dataset_name}_labels.csv") + labels_path = os.path.join( + self.data_dir, "labels", f"{dataset_name}_labels.csv" + ) labels = None - + if os.path.exists(labels_path): labels_df = pd.read_csv(labels_path) labels = labels_df.iloc[:, 1].values - + return features, labels - + def get_available_datasets(self) -> list: """Get list of available datasets in the data directory.""" - return [f for f in os.listdir(self.data_dir) if f.endswith('.csv')] \ No newline at end of file + return [f for f in os.listdir(self.data_dir) if f.endswith(".csv")] diff --git a/anomaly_detection/models/base.py b/anomaly_detection/models/base.py index a82391a..be54c47 100644 --- a/anomaly_detection/models/base.py +++ b/anomaly_detection/models/base.py @@ -1,34 +1,36 @@ from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + import numpy as np -from typing import Dict, Any, Optional + class AnomalyDetector(ABC): """Base class for anomaly detection models.""" - + def __init__(self, **kwargs): self.model = None self.threshold = None self.parameters = kwargs - + @abstractmethod def fit(self, X: np.ndarray) -> None: """Fit the model to the training data.""" pass - + @abstractmethod def predict(self, X: np.ndarray) -> np.ndarray: """Predict anomaly scores for the input data.""" pass - + def is_anomaly(self, X: np.ndarray) -> np.ndarray: """Determine if samples are anomalies based on the threshold.""" scores = self.predict(X) return scores > self.threshold - + def set_threshold(self, threshold: float) -> None: """Set the anomaly detection threshold.""" self.threshold = threshold - + def get_parameters(self) -> Dict[str, Any]: """Get the model parameters.""" - return self.parameters \ No newline at end of file + return self.parameters diff --git a/anomaly_detection/models/factory.py b/anomaly_detection/models/factory.py index 3c20ac3..47b9618 100644 --- a/anomaly_detection/models/factory.py +++ b/anomaly_detection/models/factory.py @@ -1,37 +1,39 @@ from typing import Dict, Type + from .base import AnomalyDetector from .isolation_forest import IsolationForestDetector from .random_cut_forest import RandomCutForestDetector + class ModelFactory: """Factory class for creating anomaly detection models.""" - + _models: Dict[str, Type[AnomalyDetector]] = { "isolation_forest": IsolationForestDetector, - "random_cut_forest": RandomCutForestDetector + "random_cut_forest": RandomCutForestDetector, } - + @classmethod def create_model(cls, model_type: str, **kwargs) -> AnomalyDetector: """ Create an anomaly detection model. - + Args: model_type: Type of model to create **kwargs: Additional arguments to pass to the model constructor - + Returns: An instance of the requested anomaly detection model - + Raises: ValueError: If the requested model type is not supported """ if model_type not in cls._models: raise ValueError(f"Unsupported model type: {model_type}") - + return cls._models[model_type](**kwargs) - + @classmethod def get_supported_models(cls) -> list: """Get list of supported model types.""" - return list(cls._models.keys()) \ No newline at end of file + return list(cls._models.keys()) diff --git a/anomaly_detection/models/isolation_forest.py b/anomaly_detection/models/isolation_forest.py index 274ee83..bbbe134 100644 --- a/anomaly_detection/models/isolation_forest.py +++ b/anomaly_detection/models/isolation_forest.py @@ -1,26 +1,26 @@ -from sklearn.ensemble import IsolationForest import numpy as np +from sklearn.ensemble import IsolationForest + from .base import AnomalyDetector + class IsolationForestDetector(AnomalyDetector): """Isolation Forest based anomaly detector.""" - + def __init__(self, contamination: float = 0.1, **kwargs): super().__init__(**kwargs) self.contamination = contamination self.model = IsolationForest( - contamination=contamination, - random_state=42, - **kwargs + contamination=contamination, random_state=42, **kwargs ) - + def fit(self, X: np.ndarray) -> None: """Fit the Isolation Forest model.""" self.model.fit(X) # Set threshold based on contamination scores = self.model.score_samples(X) self.threshold = np.percentile(scores, 100 * self.contamination) - + def predict(self, X: np.ndarray) -> np.ndarray: """Predict anomaly scores.""" - return -self.model.score_samples(X) # Convert to positive scores \ No newline at end of file + return -self.model.score_samples(X) # Convert to positive scores diff --git a/anomaly_detection/models/random_cut_forest.py b/anomaly_detection/models/random_cut_forest.py index 5d21b75..0bf43af 100644 --- a/anomaly_detection/models/random_cut_forest.py +++ b/anomaly_detection/models/random_cut_forest.py @@ -1,11 +1,14 @@ +from typing import Optional + import boto3 import numpy as np -from typing import Optional + from .base import AnomalyDetector + class RandomCutForestDetector(AnomalyDetector): """AWS Random Cut Forest based anomaly detector.""" - + def __init__( self, aws_access_key_id: Optional[str] = None, @@ -18,11 +21,11 @@ def __init__( "sagemaker-runtime", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, - region_name=region_name + region_name=region_name, ) self.endpoint_name = None self.threshold = 0.0 - + def fit(self, X: np.ndarray) -> None: """Train the Random Cut Forest model using AWS SageMaker.""" # TODO: Implement AWS SageMaker training @@ -31,22 +34,22 @@ def fit(self, X: np.ndarray) -> None: # 2. Deploying the model # 3. Setting the endpoint name pass - + def predict(self, X: np.ndarray) -> np.ndarray: """Predict anomaly scores using the deployed model.""" if self.endpoint_name is None: raise ValueError("Model not trained. Call fit() first.") - + # Convert input to the format expected by the endpoint payload = X.tolist() - + # Make prediction request response = self.client.invoke_endpoint( EndpointName=self.endpoint_name, ContentType="application/json", - Body=str(payload) + Body=str(payload), ) - + # Parse response scores = np.array(response["Body"].read().decode()) - return scores \ No newline at end of file + return scores diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..ea3dd5d --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,193 @@ +import os + +import numpy as np +import pytest +from dotenv import load_dotenv +from fastapi.testclient import TestClient + +from anomaly_detection.api.main import app + +# Load environment variables for testing +load_dotenv() + + +@pytest.fixture +def client(): + return TestClient(app) + + +@pytest.fixture +def synthetic_data(): + # Create a small synthetic dataset for testing + np.random.seed(42) + n_samples = 100 + timestamp = np.arange(n_samples) + normal_data = np.sin(np.linspace(0, 2 * np.pi, n_samples)) + np.random.normal( + 0, 0.1, n_samples + ) + + # Insert an anomaly + normal_data[50] += 5 + + return {"timestamp": timestamp.tolist(), "value": normal_data.tolist()} + + +def test_health_check(client): + """Test the health check endpoint.""" + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"status": "healthy"} + + +def test_train_model_isolation_forest(client, synthetic_data): + """Test training an Isolation Forest model.""" + # Save synthetic data to a temporary file + import pandas as pd + + df = pd.DataFrame(synthetic_data) + test_data_path = "data/test_data.csv" + df.to_csv(test_data_path, index=False) + + try: + # Train model + response = client.post( + "/train", + json={ + "algorithm": "isolation_forest", + "data_path": "test_data.csv", + "parameters": {"contamination": 0.1}, + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "success" + assert "model_id" in data + assert data["model_id"].startswith("isolation_forest_") + assert "message" in data + + # Clean up + os.remove(test_data_path) + except Exception as e: + # Ensure cleanup even if test fails + if os.path.exists(test_data_path): + os.remove(test_data_path) + raise e + + +def test_train_model_invalid_algorithm(client): + """Test training with an invalid algorithm.""" + response = client.post( + "/train", json={"algorithm": "invalid_algorithm", "data_path": "test_data.csv"} + ) + assert response.status_code == 500 + + +def test_predict_without_training(client): + """Test prediction without training a model first.""" + response = client.post( + "/predict", + json={ + "algorithm": "isolation_forest", + "model_id": "nonexistent_model", + "data": [1.0, 2.0, 3.0], + }, + ) + assert response.status_code == 404 + + +def test_train_and_predict_workflow(client, synthetic_data): + """Test the complete workflow of training and prediction.""" + # Save synthetic data + import pandas as pd + + df = pd.DataFrame(synthetic_data) + test_data_path = "data/test_data.csv" + df.to_csv(test_data_path, index=False) + + try: + # Train model + train_response = client.post( + "/train", + json={ + "algorithm": "isolation_forest", + "data_path": "test_data.csv", + "parameters": {"contamination": 0.1}, + }, + ) + assert train_response.status_code == 200 + model_id = train_response.json()["model_id"] + + # Make prediction with normal data + normal_data = [0.0, 0.1, 0.2] + predict_response = client.post( + "/predict", + json={ + "algorithm": "isolation_forest", + "model_id": model_id, + "data": normal_data, + }, + ) + assert predict_response.status_code == 200 + prediction = predict_response.json() + assert "is_anomaly" in prediction + assert "score" in prediction + assert "threshold" in prediction + assert isinstance(prediction["is_anomaly"], bool) + assert isinstance(prediction["score"], float) + assert isinstance(prediction["threshold"], float) + + # Make prediction with anomalous data + anomalous_data = [0.0, 5.0, 0.2] # Large spike in the middle + predict_response = client.post( + "/predict", + json={ + "algorithm": "isolation_forest", + "model_id": model_id, + "data": anomalous_data, + }, + ) + assert predict_response.status_code == 200 + prediction = predict_response.json() + assert prediction["is_anomaly"] is True + assert prediction["score"] > prediction["threshold"] + + # Clean up + os.remove(test_data_path) + except Exception as e: + # Ensure cleanup even if test fails + if os.path.exists(test_data_path): + os.remove(test_data_path) + raise e + + +def test_invalid_prediction_data(client): + """Test prediction with invalid data format.""" + # Train a model first + train_response = client.post( + "/train", + json={ + "algorithm": "isolation_forest", + "data_path": "synthetic_data.csv", + "parameters": {"contamination": 0.1}, + }, + ) + model_id = train_response.json()["model_id"] + + # Test with empty data + response = client.post( + "/predict", + json={"algorithm": "isolation_forest", "model_id": model_id, "data": []}, + ) + assert response.status_code == 500 + + # Test with non-numeric data + response = client.post( + "/predict", + json={ + "algorithm": "isolation_forest", + "model_id": model_id, + "data": ["not", "a", "number"], + }, + ) + assert response.status_code == 500 diff --git a/tests/test_models.py b/tests/test_models.py index 91fe736..cd51a1e 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,37 +1,40 @@ import numpy as np import pytest + from anomaly_detection.models.factory import ModelFactory + def test_isolation_forest(): # Create synthetic data np.random.seed(42) X_normal = np.random.randn(100, 2) X_anomaly = np.random.randn(10, 2) + 5 # Shifted distribution - + # Create and train model model = ModelFactory.create_model("isolation_forest", contamination=0.1) model.fit(X_normal) - + # Test predictions scores_normal = model.predict(X_normal) scores_anomaly = model.predict(X_anomaly) - + # Check that anomaly scores are higher than normal scores assert np.mean(scores_anomaly) > np.mean(scores_normal) - + # Check that threshold is set assert model.threshold is not None + def test_model_factory(): # Test supported models supported_models = ModelFactory.get_supported_models() assert "isolation_forest" in supported_models assert "random_cut_forest" in supported_models - + # Test creating unsupported model with pytest.raises(ValueError): ModelFactory.create_model("unsupported_model") - + # Test creating models with parameters model = ModelFactory.create_model("isolation_forest", contamination=0.2) - assert model.contamination == 0.2 \ No newline at end of file + assert model.contamination == 0.2