In [None]:
"""
Enterprise-level Technical Analysis Framework
Provides fluent interface for financial data analysis with comprehensive error handling
"""

import numpy as np
import pandas as pd
import mplfinance as mpf
from typing import Union, Optional, Dict, List, Callable, Any
from abc import ABC, abstractmethod
import logging
from dataclasses import dataclass
from enum import Enum
import re

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
class ComparisonType(Enum):
    """Enumeration of supported comparison operations"""
    ABOVE = "above"
    BELOW = "below"
    CROSSED_UP = "crossed_up"
    CROSSED_DOWN = "crossed_dn"
    EQUALS = "equals"
    GREATER_EQUAL = "greater_equal"
    LESS_EQUAL = "less_equal"

In [None]:
@dataclass
class AnalysisResult:
    """Data class to encapsulate analysis results"""
    column_name: str
    operation: str
    success: bool
    message: str = ""
    data: Optional[pd.Series] = None

class TAException(Exception):
    """Custom exception for Technical Analysis operations"""
    pass

In [None]:
class ColumnValidator:
    """Validates column existence and data types"""

    @staticmethod
    def validate_column_exists(df: pd.DataFrame, column: str) -> bool:
        """Check if column exists in DataFrame"""
        if column not in df.columns:
            raise TAException(f"Column '{column}' not found in DataFrame. Available columns: {list(df.columns)}")
        return True

    @staticmethod
    def validate_numeric_column(df: pd.DataFrame, column: str) -> bool:
        """Validate that column contains numeric data"""
        ColumnValidator.validate_column_exists(df, column)
        if not pd.api.types.is_numeric_dtype(df[column]):
            raise TAException(f"Column '{column}' must contain numeric data, got {df[column].dtype}")
        return True

    @staticmethod
    def validate_numeric_value(value: Union[str, int, float]) -> float:
        """Convert and validate numeric value"""
        try:
            return float(value)
        except (ValueError, TypeError):
            raise TAException(f"Value '{value}' cannot be converted to numeric")

In [None]:
class BaseComparator(ABC):
    """Abstract base class for comparison operations"""

    @abstractmethod
    def compare(self, df: pd.DataFrame, x: str, y: Union[str, float],
                new_col: Optional[str] = None) -> pd.DataFrame:
        """Perform the comparison operation"""
        pass

    def _generate_column_name(self, x: str, y: Union[str, float], operation: str) -> str:
        """Generate descriptive column name"""
        y_str = str(y).replace('.', '_')
        return f"{x}_{operation}_{y_str}"

    def _add_constant_column(self, df: pd.DataFrame, name: str, value: float) -> pd.DataFrame:
        """Add a constant value column if it doesn't exist"""
        if name not in df.columns:
            df[name] = value
        return df

In [None]:
class AboveComparator(BaseComparator):
    """Compare if column X is above Y"""

    def compare(self, df: pd.DataFrame, x: str, y: Union[str, float],
                new_col: Optional[str] = None) -> pd.DataFrame:
        ColumnValidator.validate_numeric_column(df, x)

        if isinstance(y, (int, float)):
            df = self._add_constant_column(df, str(y), float(y))
            y_col = str(y)
        else:
            ColumnValidator.validate_numeric_column(df, y)
            y_col = y

        new_col = new_col or self._generate_column_name(x, y, "above")
        df[new_col] = (df[x] > df[y_col]).astype(int)
        return df

In [None]:
class BelowComparator(BaseComparator):
    """Compare if column X is below Y"""

    def compare(self, df: pd.DataFrame, x: str, y: Union[str, float],
                new_col: Optional[str] = None) -> pd.DataFrame:
        ColumnValidator.validate_numeric_column(df, x)

        if isinstance(y, (int, float)):
            df = self._add_constant_column(df, str(y), float(y))
            y_col = str(y)
        else:
            ColumnValidator.validate_numeric_column(df, y)
            y_col = y

        new_col = new_col or self._generate_column_name(x, y, "below")
        df[new_col] = (df[x] < df[y_col]).astype(int)
        return df

In [None]:
class CrossedUpComparator(BaseComparator):
    """Detect when X crosses above Y"""

    def compare(self, df: pd.DataFrame, x: str, y: Union[str, float],
                new_col: Optional[str] = None) -> pd.DataFrame:
        ColumnValidator.validate_numeric_column(df, x)

        if isinstance(y, (int, float)):
            df = self._add_constant_column(df, str(y), float(y))
            y_col = str(y)
        else:
            ColumnValidator.validate_numeric_column(df, y)
            y_col = y

        new_col = new_col or self._generate_column_name(x, y, "crossed_up")
        diff = df[x] - df[y_col]
        df[new_col] = ((diff > 0) & (diff.shift(1) <= 0)).astype(int)
        return df

In [None]:
class CrossedDownComparator(BaseComparator):
    """Detect when X crosses below Y"""

    def compare(self, df: pd.DataFrame, x: str, y: Union[str, float],
                new_col: Optional[str] = None) -> pd.DataFrame:
        ColumnValidator.validate_numeric_column(df, x)

        if isinstance(y, (int, float)):
            df = self._add_constant_column(df, str(y), float(y))
            y_col = str(y)
        else:
            ColumnValidator.validate_numeric_column(df, y)
            y_col = y

        new_col = new_col or self._generate_column_name(x, y, "crossed_down")
        diff = df[x] - df[y_col]
        df[new_col] = ((diff < 0) & (diff.shift(1) >= 0)).astype(int)
        return df

In [None]:
class ComparatorFactory:
    """Factory class to create appropriate comparator instances"""

    _comparators: Dict[str, BaseComparator] = {
        ComparisonType.ABOVE.value: AboveComparator(),
        ComparisonType.BELOW.value: BelowComparator(),
        ComparisonType.CROSSED_UP.value: CrossedUpComparator(),
        ComparisonType.CROSSED_DOWN.value: CrossedDownComparator(),
    }

    @classmethod
    def get_comparator(cls, operation: str) -> BaseComparator:
        """Get comparator instance for the given operation"""
        comparator = cls._comparators.get(operation.lower())
        if not comparator:
            available = list(cls._comparators.keys())
            raise TAException(f"Unsupported operation '{operation}'. Available: {available}")
        return comparator

    @classmethod
    def register_comparator(cls, operation: str, comparator: BaseComparator):
        """Register a new comparator for custom operations"""
        cls._comparators[operation.lower()] = comparator

In [None]:
class QueryParser:
    """Parse natural language queries into structured operations"""

    @staticmethod
    def parse_query(query: str) -> List[Dict[str, str]]:
        """Parse multi-line query string into structured operations"""
        operations = []

        for line in query.strip().splitlines():
            line = line.strip()
            if not line:
                continue

            # Parse pattern: "Column1 operation Column2/Value"
            parts = line.split()
            if len(parts) >= 3:
                col1 = parts[0]
                operation = parts[1].lower()
                col2 = parts[-1]  # Last part is the target

                operations.append({
                    'column1': col1,
                    'operation': operation,
                    'column2': col2
                })
            else:
                logger.warning(f"Skipping malformed query line: {line}")

        return operations

In [None]:
class TechnicalAnalyzer:
    """Main class providing fluent interface for technical analysis"""

    def __init__(self, df: pd.DataFrame):
        """Initialize with DataFrame"""
        if not isinstance(df, pd.DataFrame):
            raise TAException("Input must be a pandas DataFrame")

        self._df = df.copy()  # Work with copy to avoid modifying original
        self._operations_log: List[AnalysisResult] = []
        logger.info(f"Initialized TechnicalAnalyzer with DataFrame shape: {self._df.shape}")

    @property
    def df(self) -> pd.DataFrame:
        """Get the current DataFrame"""
        return self._df

    @property
    def operations_log(self) -> List[AnalysisResult]:
        """Get log of all operations performed"""
        return self._operations_log

    def above(self, x: str, y: Union[str, float], new_col: Optional[str] = None) -> 'TechnicalAnalyzer':
        """Fluent interface for above comparison"""
        return self._execute_operation(ComparisonType.ABOVE.value, x, y, new_col)

    def below(self, x: str, y: Union[str, float], new_col: Optional[str] = None) -> 'TechnicalAnalyzer':
        """Fluent interface for below comparison"""
        return self._execute_operation(ComparisonType.BELOW.value, x, y, new_col)

    def crossed_up(self, x: str, y: Union[str, float], new_col: Optional[str] = None) -> 'TechnicalAnalyzer':
        """Fluent interface for crossed up detection"""
        return self._execute_operation(ComparisonType.CROSSED_UP.value, x, y, new_col)

    def crossed_down(self, x: str, y: Union[str, float], new_col: Optional[str] = None) -> 'TechnicalAnalyzer':
        """Fluent interface for crossed down detection"""
        return self._execute_operation(ComparisonType.CROSSED_DOWN.value, x, y, new_col)

    def _execute_operation(self, operation: str, x: str, y: Union[str, float],
                          new_col: Optional[str] = None) -> 'TechnicalAnalyzer':
        """Execute a comparison operation and log the result"""
        try:
            comparator = ComparatorFactory.get_comparator(operation)
            self._df = comparator.compare(self._df, x, y, new_col)

            # Log successful operation
            result_col = new_col or comparator._generate_column_name(x, y, operation)
            result = AnalysisResult(
                column_name=result_col,
                operation=f"{x} {operation} {y}",
                success=True,
                message="Operation completed successfully",
                data=self._df[result_col]
            )
            self._operations_log.append(result)
            logger.info(f"✓ {result.operation} -> {result.column_name}")

        except Exception as e:
            # Log failed operation
            result = AnalysisResult(
                column_name="",
                operation=f"{x} {operation} {y}",
                success=False,
                message=str(e)
            )
            self._operations_log.append(result)
            logger.error(f"✗ {result.operation}: {result.message}")
            raise TAException(f"Operation failed: {e}")

        return self

    def execute_query(self, query: str) -> 'TechnicalAnalyzer':
        """Execute a natural language query"""
        operations = QueryParser.parse_query(query)

        for op in operations:
            try:
                # Convert numeric strings to float
                try:
                    col2 = float(op['column2'])
                except ValueError:
                    col2 = op['column2']

                self._execute_operation(op['operation'], op['column1'], col2)

            except Exception as e:
                logger.error(f"Failed to execute query operation {op}: {e}")
                continue

        return self

    def get_signals(self, column: str) -> pd.Series:
        """Get signal series for a specific column"""
        if column not in self._df.columns:
            raise TAException(f"Column '{column}' not found")
        return self._df[column]

    def get_active_signals(self, column: str) -> pd.DataFrame:
        """Get only rows where signal is active (value = 1)"""
        if column not in self._df.columns:
            raise TAException(f"Column '{column}' not found")
        return self._df[self._df[column] == 1]

    def summary(self) -> pd.DataFrame:
        """Get summary of all operations performed"""
        summary_data = []
        for result in self._operations_log:
            summary_data.append({
                'Operation': result.operation,
                'Column': result.column_name,
                'Success': result.success,
                'Message': result.message,
                'Active_Signals': result.data.sum() if result.data is not None else 0
            })
        return pd.DataFrame(summary_data)

    def reset(self) -> 'TechnicalAnalyzer':
        """Reset to original DataFrame state"""
        # Keep only original columns (remove generated ones)
        original_cols = [col for col in self._df.columns
                        if not any(op in col.lower() for op in ['above', 'below', 'cross'])]
        self._df = self._df[original_cols]
        self._operations_log.clear()
        logger.info("Reset analyzer to original state")
        return self

In [None]:
def cabr(df: pd.DataFrame) -> TechnicalAnalyzer:
    """Factory function to create TechnicalAnalyzer instance"""
    return TechnicalAnalyzer(df)

In [None]:
# Example usage and demonstration
if __name__ == "__main__":
    # Sample data creation for demonstration
    np.random.seed(42)
    dates = pd.date_range('2023-01-01', periods=100, freq='D')
    sample_data = {
        'DateTime': dates,
        'Close': 100 + np.cumsum(np.random.randn(100) * 0.5),
        'EMA_21': 100 + np.cumsum(np.random.randn(100) * 0.3),
        'RSI_14': 30 + 40 * np.random.rand(100),
        'Volume': 1000 + 500 * np.random.rand(100)
    }
    df = pd.DataFrame(sample_data)

    # Demonstration of fluent interface
    print("=== Enterprise Technical Analysis Framework Demo ===\n")

    # Create analyzer instance
    analyzer = cabr(df)

    # Chain operations using fluent interface
    result = (analyzer
              .above('Close', 'EMA_21')
              .below('RSI_14', 70)
              .crossed_up('Close', 'EMA_21')
              .above('Volume', 1200))

In [None]:
query = """
Close above EMA_21
RSI_14 below 30
Volume above 1500
"""

analyzer.execute_query(query)

# Display results
print("Operations Summary:")
print(analyzer.summary())
print(f"\nDataFrame shape: {analyzer.df.shape}")
print(f"\nGenerated columns: {[col for col in analyzer.df.columns if col not in df.columns]}")

# Get specific signals
if 'Close_above_EMA_21' in analyzer.df.columns:
    active_signals = analyzer.get_active_signals('Close_above_EMA_21')
    print(f"\nActive 'Close above EMA_21' signals: {len(active_signals)} out of {len(df)}")

In [None]:
# Load your data
df = pd.read_csv('auto-indig.csv')

# Fluent interface - your requested syntax
analyzer = (cabr(df)
    .above('Close', 'EMA_21')
    .below('RSI_14', 70)
    .crossed_up('Close', 'EMA_21'))

# Natural language queries
query = """print(analyzer.df.head())
Close above EMA_21
RSI_14 below 30
Volume crossed_up 1000
"""
analyzer.execute_query(query)

# Get results
print(analyzer.summary())
active_signals = analyzer.get_active_signals('Close_above_EMA_21')

In [None]:
analyzer.df.T