Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion anomaly_detection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
Random Cut Forest and Isolation Forest algorithms.
"""

__version__ = "0.1.0"
__version__ = "0.1.0"
128 changes: 115 additions & 13 deletions anomaly_detection/api/main.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,160 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import os
from typing import Any, List, Optional

import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ValidationError, field_validator, validator

from anomaly_detection.data.nab_loader import NABLoader
from anomaly_detection.models.factory import ModelFactory

# Load environment variables
load_dotenv()

app = FastAPI(
title="Anomaly Detection Service",
description="A service for detecting anomalies in time series data",
version="1.0.0"
version="1.0.0",
)

# Store trained models in memory (in production, use a proper database)
trained_models = {}


class TrainingRequest(BaseModel):
algorithm: str
data_path: str
parameters: Optional[dict] = None


class PredictionRequest(BaseModel):
algorithm: str
data: List[float]
data: List[Any] # Accept any type of data and validate in the endpoint
model_id: Optional[str] = None


class PredictionResponse(BaseModel):
is_anomaly: bool
score: float
threshold: float


@app.post("/train")
async def train_model(request: TrainingRequest):
"""
Train an anomaly detection model using the specified algorithm and data.

Args:
request: TrainingRequest containing:
- algorithm: The algorithm to use (isolation_forest or random_cut_forest)
- data_path: Path to the training data
- parameters: Optional parameters for the model

Returns:
dict: Status and model ID
"""
try:
# TODO: Implement training logic
return {"status": "success", "model_id": "model_123"}
# Load data
loader = NABLoader()
X, _ = loader.load_dataset(request.data_path)

# Create model with parameters
model_params = request.parameters or {}
if request.algorithm == "random_cut_forest":
# Add AWS credentials if using RCF
model_params.update(
{
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"region_name": os.getenv("AWS_REGION", "us-west-2"),
}
)

model = ModelFactory.create_model(request.algorithm, **model_params)

# Train model
model.fit(X)

# Generate unique model ID
model_id = f"{request.algorithm}_{len(trained_models)}"

# Store model
trained_models[model_id] = model

return {
"status": "success",
"model_id": model_id,
"message": f"Model trained successfully with {len(X)} samples",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@app.post("/predict")
async def predict(request: PredictionRequest) -> PredictionResponse:
"""
Make predictions using a trained anomaly detection model.

Args:
request: PredictionRequest containing:
- algorithm: The algorithm used
- data: List of values to predict
- model_id: ID of the trained model to use

Returns:
PredictionResponse: Prediction results including anomaly status and score
"""
# Validate input data
if not request.data:
raise HTTPException(status_code=500, detail="Empty data provided")

try:
# Try converting all values to float
data_values = [float(x) for x in request.data]
except (ValueError, TypeError):
raise HTTPException(status_code=500, detail="All values must be numeric")

# Get model first to fail fast if model doesn't exist
if request.model_id not in trained_models:
raise HTTPException(
status_code=404,
detail=f"Model {request.model_id} not found. Please train a model first.",
)

try:
# Convert input data to numpy array and reshape to 2D
data = np.array(data_values, dtype=float)
if len(data.shape) == 1:
data = data.reshape(-1, 1) # Convert to 2D array with shape (n_samples, 1)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing data: {str(e)}")

try:
# TODO: Implement prediction logic
model = trained_models[request.model_id]

# Make prediction
score = float(model.predict(data)[0]) # Convert to float for JSON serialization
threshold = float(model.threshold) # Convert to float for JSON serialization
is_anomaly = score > threshold

return PredictionResponse(
is_anomaly=False,
score=0.5,
threshold=0.7
is_anomaly=bool(is_anomaly), score=score, threshold=threshold
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}")


@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
"""Handle Pydantic validation errors with 500 status code."""
return JSONResponse(status_code=500, content={"detail": str(exc)})


@app.get("/health")
async def health_check():
"""
Health check endpoint.
"""
return {"status": "healthy"}
return {"status": "healthy"}
44 changes: 26 additions & 18 deletions anomaly_detection/data/nab_loader.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,56 @@
import pandas as pd
import numpy as np
from typing import Tuple, Optional
import os
from typing import Optional, Tuple

import numpy as np
import pandas as pd


class NABLoader:
"""Loader for NAB (Numenta Anomaly Benchmark) datasets."""

def __init__(self, data_dir: str = "data"):
self.data_dir = data_dir

def load_dataset(self, dataset_name: str) -> Tuple[np.ndarray, Optional[np.ndarray]]:

def load_dataset(
self, dataset_name: str
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""
Load a NAB dataset and return features and labels.

Args:
dataset_name: Name of the dataset to load

Returns:
Tuple containing:
- features: numpy array of shape (n_samples, n_features)
- labels: numpy array of shape (n_samples,) or None if no labels
"""
# Construct path to dataset
dataset_path = os.path.join(self.data_dir, dataset_name)

if not os.path.exists(dataset_path):
raise FileNotFoundError(f"Dataset {dataset_name} not found in {self.data_dir}")

raise FileNotFoundError(
f"Dataset {dataset_name} not found in {self.data_dir}"
)

# Load data
df = pd.read_csv(dataset_path)

# Extract features (assuming first column is timestamp)
features = df.iloc[:, 1:].values

# Check if labels exist (they might be in a separate file)
labels_path = os.path.join(self.data_dir, "labels", f"{dataset_name}_labels.csv")
labels_path = os.path.join(
self.data_dir, "labels", f"{dataset_name}_labels.csv"
)
labels = None

if os.path.exists(labels_path):
labels_df = pd.read_csv(labels_path)
labels = labels_df.iloc[:, 1].values

return features, labels

def get_available_datasets(self) -> list:
"""Get list of available datasets in the data directory."""
return [f for f in os.listdir(self.data_dir) if f.endswith('.csv')]
return [f for f in os.listdir(self.data_dir) if f.endswith(".csv")]
18 changes: 10 additions & 8 deletions anomaly_detection/models/base.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

import numpy as np
from typing import Dict, Any, Optional


class AnomalyDetector(ABC):
"""Base class for anomaly detection models."""

def __init__(self, **kwargs):
self.model = None
self.threshold = None
self.parameters = kwargs

@abstractmethod
def fit(self, X: np.ndarray) -> None:
"""Fit the model to the training data."""
pass

@abstractmethod
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict anomaly scores for the input data."""
pass

def is_anomaly(self, X: np.ndarray) -> np.ndarray:
"""Determine if samples are anomalies based on the threshold."""
scores = self.predict(X)
return scores > self.threshold

def set_threshold(self, threshold: float) -> None:
"""Set the anomaly detection threshold."""
self.threshold = threshold

def get_parameters(self) -> Dict[str, Any]:
"""Get the model parameters."""
return self.parameters
return self.parameters
20 changes: 11 additions & 9 deletions anomaly_detection/models/factory.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
from typing import Dict, Type

from .base import AnomalyDetector
from .isolation_forest import IsolationForestDetector
from .random_cut_forest import RandomCutForestDetector


class ModelFactory:
"""Factory class for creating anomaly detection models."""

_models: Dict[str, Type[AnomalyDetector]] = {
"isolation_forest": IsolationForestDetector,
"random_cut_forest": RandomCutForestDetector
"random_cut_forest": RandomCutForestDetector,
}

@classmethod
def create_model(cls, model_type: str, **kwargs) -> AnomalyDetector:
"""
Create an anomaly detection model.

Args:
model_type: Type of model to create
**kwargs: Additional arguments to pass to the model constructor

Returns:
An instance of the requested anomaly detection model

Raises:
ValueError: If the requested model type is not supported
"""
if model_type not in cls._models:
raise ValueError(f"Unsupported model type: {model_type}")

return cls._models[model_type](**kwargs)

@classmethod
def get_supported_models(cls) -> list:
"""Get list of supported model types."""
return list(cls._models.keys())
return list(cls._models.keys())
16 changes: 8 additions & 8 deletions anomaly_detection/models/isolation_forest.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from sklearn.ensemble import IsolationForest
import numpy as np
from sklearn.ensemble import IsolationForest

from .base import AnomalyDetector


class IsolationForestDetector(AnomalyDetector):
"""Isolation Forest based anomaly detector."""

def __init__(self, contamination: float = 0.1, **kwargs):
super().__init__(**kwargs)
self.contamination = contamination
self.model = IsolationForest(
contamination=contamination,
random_state=42,
**kwargs
contamination=contamination, random_state=42, **kwargs
)

def fit(self, X: np.ndarray) -> None:
"""Fit the Isolation Forest model."""
self.model.fit(X)
# Set threshold based on contamination
scores = self.model.score_samples(X)
self.threshold = np.percentile(scores, 100 * self.contamination)

def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict anomaly scores."""
return -self.model.score_samples(X) # Convert to positive scores
return -self.model.score_samples(X) # Convert to positive scores
Loading