# D-LOGIC QUANT CORE v1.0
## Professional Quantitative Analytics Engine for MetaTrader 5
---
**Author:** RafaB Dembski  
**Architecture:** Google Colab + ngrok + Flask API  
**Purpose:** Real-time quantitative analysis without external AI APIs

### Features:
- Statistical Analysis (Returns, Volatility, Correlations)
- Risk Metrics (VaR, CVaR, Sharpe, Sortino, Calmar)
- Regime Detection (Hidden Markov Model)
- Monte Carlo Simulations
- Probability Distributions
- Pattern Recognition (ML-based)


In [None]:
#@title 1. INSTALLATION & SETUP {display-mode: "form"}
#@markdown ### Install required packages and configure ngrok

!pip install flask flask-cors pyngrok numpy pandas scipy scikit-learn hmmlearn --quiet

print("[OK] All packages installed successfully")

In [None]:
#@title 2. NGROK AUTHENTICATION {display-mode: "form"}
#@markdown ### Enter your ngrok authtoken (free at ngrok.com)

NGROK_TOKEN = ""  #@param {type:"string"}

from pyngrok import ngrok

if NGROK_TOKEN:
    ngrok.set_auth_token(NGROK_TOKEN)
    print("[OK] ngrok authenticated")
else:
    print("[!] No token provided - using free tier (limited)")

In [None]:
#@title 3. QUANT CORE ENGINE {display-mode: "form"}
#@markdown ### Core quantitative analysis module

import numpy as np
import pandas as pd
from scipy import stats
from scipy.stats import norm, skew, kurtosis
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.ensemble import IsolationForest
import warnings
warnings.filterwarnings('ignore')

class QuantCore:
    """
    Professional Quantitative Analysis Engine
    Optimized for real-time trading decisions
    """
    
    def __init__(self):
        self.scaler = StandardScaler()
        self.regime_model = None
        self.anomaly_detector = IsolationForest(contamination=0.1, random_state=42)
        
    # ==================== STATISTICAL ANALYSIS ====================
    
    def calculate_returns(self, prices, method='log'):
        """Calculate returns from price series"""
        prices = np.array(prices)
        if method == 'log':
            return np.diff(np.log(prices))
        return np.diff(prices) / prices[:-1]
    
    def realized_volatility(self, returns, annualize=True):
        """Calculate realized volatility"""
        vol = np.std(returns)
        return vol * np.sqrt(252) if annualize else vol
    
    def rolling_volatility(self, returns, window=20):
        """Calculate rolling volatility"""
        returns = np.array(returns)
        if len(returns) < window:
            return np.std(returns) * np.sqrt(252)
        return np.std(returns[-window:]) * np.sqrt(252)
    
    def distribution_stats(self, returns):
        """Calculate distribution statistics"""
        returns = np.array(returns)
        return {
            'mean': np.mean(returns),
            'std': np.std(returns),
            'skewness': skew(returns),
            'kurtosis': kurtosis(returns),
            'min': np.min(returns),
            'max': np.max(returns)
        }
    
    # ==================== RISK METRICS ====================
    
    def value_at_risk(self, returns, confidence=0.95, method='historical'):
        """Calculate Value at Risk"""
        returns = np.array(returns)
        if method == 'historical':
            return np.percentile(returns, (1 - confidence) * 100)
        elif method == 'parametric':
            mu, sigma = np.mean(returns), np.std(returns)
            return mu + sigma * norm.ppf(1 - confidence)
        return np.percentile(returns, (1 - confidence) * 100)
    
    def expected_shortfall(self, returns, confidence=0.95):
        """Calculate Expected Shortfall (CVaR)"""
        returns = np.array(returns)
        var = self.value_at_risk(returns, confidence)
        return np.mean(returns[returns <= var])
    
    def sharpe_ratio(self, returns, risk_free_rate=0.02):
        """Calculate Sharpe Ratio (annualized)"""
        returns = np.array(returns)
        excess_returns = returns - risk_free_rate / 252
        if np.std(excess_returns) == 0:
            return 0
        return np.mean(excess_returns) / np.std(excess_returns) * np.sqrt(252)
    
    def sortino_ratio(self, returns, risk_free_rate=0.02):
        """Calculate Sortino Ratio (annualized)"""
        returns = np.array(returns)
        excess_returns = returns - risk_free_rate / 252
        downside_returns = excess_returns[excess_returns < 0]
        if len(downside_returns) == 0 or np.std(downside_returns) == 0:
            return 0
        return np.mean(excess_returns) / np.std(downside_returns) * np.sqrt(252)
    
    def max_drawdown(self, prices):
        """Calculate Maximum Drawdown"""
        prices = np.array(prices)
        peak = np.maximum.accumulate(prices)
        drawdown = (prices - peak) / peak
        return np.min(drawdown)
    
    def calmar_ratio(self, returns, prices):
        """Calculate Calmar Ratio"""
        annual_return = np.mean(returns) * 252
        mdd = abs(self.max_drawdown(prices))
        return annual_return / mdd if mdd != 0 else 0
    
    # ==================== REGIME DETECTION ====================
    
    def detect_regime(self, returns, n_regimes=3):
        """
        Detect market regime using K-Means clustering
        Returns: 0=LOW_VOL, 1=NORMAL, 2=HIGH_VOL
        """
        returns = np.array(returns).reshape(-1, 1)
        if len(returns) < n_regimes * 2:
            return 1, 0.5  # Default: NORMAL
        
        # Feature engineering
        features = np.column_stack([
            returns,
            np.abs(returns),  # Magnitude
        ])
        
        scaled = self.scaler.fit_transform(features)
        kmeans = KMeans(n_clusters=n_regimes, random_state=42, n_init=10)
        kmeans.fit(scaled)
        
        current_regime = kmeans.predict(scaled[-1:])[0]
        
        # Calculate confidence based on distance to cluster center
        distances = kmeans.transform(scaled[-1:])[0]
        confidence = 1 - (distances[current_regime] / np.sum(distances))
        
        return int(current_regime), float(confidence)
    
    def detect_trend_strength(self, prices, window=20):
        """
        Calculate trend strength using linear regression R-squared
        Returns: strength (0-1), direction (1=UP, -1=DOWN)
        """
        prices = np.array(prices[-window:])
        if len(prices) < 5:
            return 0, 0
        
        x = np.arange(len(prices))
        slope, intercept, r_value, p_value, std_err = stats.linregress(x, prices)
        
        strength = r_value ** 2  # R-squared
        direction = 1 if slope > 0 else -1
        
        return float(strength), int(direction)
    
    # ==================== PROBABILITY ANALYSIS ====================
    
    def monte_carlo_simulation(self, returns, n_simulations=1000, n_days=5):
        """
        Run Monte Carlo simulation for price prediction
        Returns: probability of profit, expected return, VaR
        """
        returns = np.array(returns)
        mu = np.mean(returns)
        sigma = np.std(returns)
        
        # Generate random paths
        simulated_returns = np.random.normal(mu, sigma, (n_simulations, n_days))
        cumulative_returns = np.sum(simulated_returns, axis=1)
        
        prob_profit = np.mean(cumulative_returns > 0)
        expected_return = np.mean(cumulative_returns)
        var_95 = np.percentile(cumulative_returns, 5)
        
        return {
            'prob_profit': float(prob_profit),
            'expected_return': float(expected_return),
            'var_95': float(var_95),
            'best_case': float(np.percentile(cumulative_returns, 95)),
            'worst_case': float(np.percentile(cumulative_returns, 5))
        }
    
    def calculate_probability_zones(self, prices, returns):
        """
        Calculate probability zones based on distribution
        """
        current_price = prices[-1]
        daily_vol = np.std(returns)
        
        # 1 sigma zones (68% probability)
        zone_1_up = current_price * (1 + daily_vol)
        zone_1_down = current_price * (1 - daily_vol)
        
        # 2 sigma zones (95% probability)
        zone_2_up = current_price * (1 + 2 * daily_vol)
        zone_2_down = current_price * (1 - 2 * daily_vol)
        
        return {
            'current': current_price,
            'zone_1_up': zone_1_up,
            'zone_1_down': zone_1_down,
            'zone_2_up': zone_2_up,
            'zone_2_down': zone_2_down,
            'daily_vol_pct': daily_vol * 100
        }
    
    # ==================== ANOMALY DETECTION ====================
    
    def detect_anomaly(self, returns):
        """
        Detect market anomalies using Isolation Forest
        Returns: is_anomaly (bool), anomaly_score (-1 to 1)
        """
        returns = np.array(returns).reshape(-1, 1)
        if len(returns) < 10:
            return False, 0.0
        
        self.anomaly_detector.fit(returns)
        score = self.anomaly_detector.decision_function(returns[-1:])[0]
        prediction = self.anomaly_detector.predict(returns[-1:])[0]
        
        return prediction == -1, float(score)
    
    # ==================== MAIN ANALYSIS ====================
    
    def analyze(self, data):
        """
        Main analysis function - processes market data and returns QUANT metrics
        
        Input data format:
        {
            'prices': [list of recent prices],
            'h4_bias': 'BULLISH/BEARISH/NEUTRAL',
            'h1_bias': 'BULLISH/BEARISH/NEUTRAL',
            'zone': 'zone interaction string',
            'liquidity': 'liquidity status',
            'signals': 'entry signals'
        }
        """
        try:
            prices = np.array(data.get('prices', []))
            
            if len(prices) < 20:
                return self._default_response("INSUFFICIENT_DATA")
            
            # Calculate returns
            returns = self.calculate_returns(prices)
            
            # Risk metrics
            var_95 = self.value_at_risk(returns, 0.95)
            cvar = self.expected_shortfall(returns, 0.95)
            sharpe = self.sharpe_ratio(returns)
            sortino = self.sortino_ratio(returns)
            mdd = self.max_drawdown(prices)
            volatility = self.rolling_volatility(returns)
            
            # Regime detection
            regime, regime_conf = self.detect_regime(returns)
            regime_names = ['LOW_VOLATILITY', 'NORMAL', 'HIGH_VOLATILITY']
            
            # Trend analysis
            trend_strength, trend_dir = self.detect_trend_strength(prices)
            
            # Monte Carlo
            mc_result = self.monte_carlo_simulation(returns)
            
            # Anomaly detection
            is_anomaly, anomaly_score = self.detect_anomaly(returns)
            
            # Distribution stats
            dist_stats = self.distribution_stats(returns)
            
            # Generate trading decision
            decision, confidence, comment = self._generate_decision(
                data, sharpe, sortino, var_95, regime, trend_strength, 
                trend_dir, mc_result, is_anomaly, volatility
            )
            
            return {
                'decision': decision,
                'confidence': confidence,
                'comment': comment,
                
                # Risk Metrics
                'var_95': round(var_95 * 10000, 2),  # in pips
                'cvar': round(cvar * 10000, 2),
                'sharpe': round(sharpe, 2),
                'sortino': round(sortino, 2),
                'max_dd': round(mdd * 100, 2),
                'volatility': round(volatility * 100, 2),
                
                # Regime
                'regime': regime_names[regime],
                'regime_conf': round(regime_conf * 100, 0),
                
                # Trend
                'trend_strength': round(trend_strength * 100, 0),
                'trend_dir': 'UP' if trend_dir == 1 else 'DOWN',
                
                # Probability
                'prob_profit': round(mc_result['prob_profit'] * 100, 0),
                'expected_ret': round(mc_result['expected_return'] * 10000, 2),
                
                # Anomaly
                'is_anomaly': is_anomaly,
                'anomaly_score': round(anomaly_score, 2),
                
                # Distribution
                'skewness': round(dist_stats['skewness'], 2),
                'kurtosis': round(dist_stats['kurtosis'], 2)
            }
            
        except Exception as e:
            return self._default_response(f"ERROR: {str(e)}")
    
    def _generate_decision(self, data, sharpe, sortino, var, regime, 
                           trend_str, trend_dir, mc, is_anomaly, vol):
        """
        Generate trading decision based on quant metrics
        """
        h4 = data.get('h4_bias', 'NEUTRAL')
        h1 = data.get('h1_bias', 'NEUTRAL')
        zone = data.get('zone', '')
        
        score = 0
        reasons = []
        
        # 1. Risk-adjusted returns
        if sharpe > 1.5:
            score += 20
            reasons.append("Strong Sharpe")
        elif sharpe > 0.5:
            score += 10
        elif sharpe < 0:
            score -= 15
            reasons.append("Negative Sharpe")
        
        # 2. Regime analysis
        if regime == 0:  # Low volatility
            score += 15
            reasons.append("Low Vol regime")
        elif regime == 2:  # High volatility
            score -= 10
            reasons.append("High Vol regime")
        
        # 3. Trend alignment
        bias_aligned = (h4 == h1 and h4 != 'NEUTRAL')
        if bias_aligned:
            score += 15
            if trend_str > 0.6:
                score += 10
                reasons.append("Strong aligned trend")
        else:
            reasons.append("Mixed structure")
        
        # 4. Monte Carlo probability
        if mc['prob_profit'] > 0.6:
            score += 15
            reasons.append(f"{int(mc['prob_profit']*100)}% profit prob")
        elif mc['prob_profit'] < 0.4:
            score -= 15
        
        # 5. Zone interaction
        if 'INSIDE' in zone:
            score += 10
            reasons.append("In POI zone")
        
        # 6. Anomaly check
        if is_anomaly:
            score -= 20
            reasons.append("ANOMALY detected")
        
        # 7. VaR check
        if abs(var) > 0.02:  # More than 2% daily VaR
            score -= 10
            reasons.append("High VaR")
        
        # Calculate confidence
        confidence = min(max(50 + score, 0), 100)
        
        # Generate decision
        if is_anomaly:
            decision = 'DANGER'
        elif score >= 30:
            decision = 'EXECUTE'
        elif score >= 0:
            decision = 'WAIT'
        else:
            decision = 'DANGER'
        
        # Build comment
        comment = ' | '.join(reasons[:3]) if reasons else 'Standard conditions'
        
        return decision, confidence, comment
    
    def _default_response(self, reason):
        return {
            'decision': 'WAIT',
            'confidence': 0,
            'comment': reason,
            'var_95': 0, 'cvar': 0, 'sharpe': 0, 'sortino': 0,
            'max_dd': 0, 'volatility': 0, 'regime': 'UNKNOWN',
            'regime_conf': 0, 'trend_strength': 0, 'trend_dir': 'NONE',
            'prob_profit': 50, 'expected_ret': 0, 'is_anomaly': False,
            'anomaly_score': 0, 'skewness': 0, 'kurtosis': 0
        }

# Initialize engine
quant_engine = QuantCore()
print("[OK] QuantCore Engine initialized")

In [None]:
#@title 4. FLASK API SERVER {display-mode: "form"}
#@markdown ### Start the API server with ngrok tunnel

from flask import Flask, request, jsonify
from flask_cors import CORS
import threading
import json

app = Flask(__name__)
CORS(app)

# Request counter for monitoring
request_count = {'total': 0, 'success': 0, 'error': 0}

@app.route('/health', methods=['GET'])
def health():
    """Health check endpoint"""
    return jsonify({
        'status': 'OK',
        'engine': 'D-LOGIC QUANT CORE v1.0',
        'requests': request_count
    })

@app.route('/analyze', methods=['POST'])
def analyze():
    """Main analysis endpoint"""
    request_count['total'] += 1
    
    try:
        data = request.get_json()
        
        if not data:
            request_count['error'] += 1
            return jsonify({'error': 'No data provided'}), 400
        
        # Run analysis
        result = quant_engine.analyze(data)
        
        request_count['success'] += 1
        return jsonify(result)
        
    except Exception as e:
        request_count['error'] += 1
        return jsonify({
            'decision': 'WAIT',
            'confidence': 0,
            'comment': f'Server error: {str(e)}'
        }), 500

@app.route('/metrics', methods=['POST'])
def metrics_only():
    """Get only risk metrics without decision"""
    try:
        data = request.get_json()
        prices = data.get('prices', [])
        
        if len(prices) < 20:
            return jsonify({'error': 'Need at least 20 price points'}), 400
        
        returns = quant_engine.calculate_returns(prices)
        
        return jsonify({
            'var_95': round(quant_engine.value_at_risk(returns) * 10000, 2),
            'sharpe': round(quant_engine.sharpe_ratio(returns), 2),
            'sortino': round(quant_engine.sortino_ratio(returns), 2),
            'volatility': round(quant_engine.rolling_volatility(returns) * 100, 2),
            'max_dd': round(quant_engine.max_drawdown(prices) * 100, 2)
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

print("[OK] Flask API configured")

In [None]:
#@title 5. START SERVER {display-mode: "form"}
#@markdown ### Launch the server and get your public URL
#@markdown **Copy the ngrok URL and paste it into MT5 settings!**

import time

# Start ngrok tunnel
public_url = ngrok.connect(5000)
print("="*60)
print("D-LOGIC QUANT CORE SERVER")
print("="*60)
print(f"\n[PUBLIC URL]: {public_url}")
print(f"\n[ENDPOINT]:   {public_url}/analyze")
print("\n" + "="*60)
print("Copy the URL above and paste into MT5!")
print("="*60)

# Run Flask in a thread
def run_flask():
    app.run(port=5000, debug=False, use_reloader=False)

flask_thread = threading.Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()

print("\n[OK] Server is running! Keep this notebook open.")
print("[INFO] Press STOP to terminate the server.")

In [None]:
#@title 6. TEST THE ENGINE (Optional) {display-mode: "form"}
#@markdown ### Test the analysis with sample data

import numpy as np

# Generate sample price data (simulating EURUSD)
np.random.seed(42)
base_price = 1.0850
returns = np.random.normal(0.0001, 0.005, 100)
prices = [base_price]
for r in returns:
    prices.append(prices[-1] * (1 + r))

# Test data
test_data = {
    'prices': prices,
    'h4_bias': 'BULLISH',
    'h1_bias': 'BULLISH',
    'zone': 'INSIDE FVG (Buy)',
    'liquidity': 'PDH=50pts PDL=80pts',
    'signals': 'CISD:BULL TRTL:NONE'
}

# Run analysis
result = quant_engine.analyze(test_data)

print("\n" + "="*50)
print("QUANT ANALYSIS RESULT")
print("="*50)
print(f"\nDecision:    {result['decision']}")
print(f"Confidence:  {result['confidence']}%")
print(f"Comment:     {result['comment']}")
print("\n--- RISK METRICS ---")
print(f"VaR (95%):   {result['var_95']} pips")
print(f"CVaR:        {result['cvar']} pips")
print(f"Sharpe:      {result['sharpe']}")
print(f"Sortino:     {result['sortino']}")
print(f"Max DD:      {result['max_dd']}%")
print(f"Volatility:  {result['volatility']}%")
print("\n--- REGIME ---")
print(f"Regime:      {result['regime']}")
print(f"Confidence:  {result['regime_conf']}%")
print("\n--- TREND ---")
print(f"Strength:    {result['trend_strength']}%")
print(f"Direction:   {result['trend_dir']}")
print("\n--- PROBABILITY ---")
print(f"Prob Profit: {result['prob_profit']}%")
print(f"Expected:    {result['expected_ret']} pips")
print("\n--- DISTRIBUTION ---")
print(f"Skewness:    {result['skewness']}")
print(f"Kurtosis:    {result['kurtosis']}")
print(f"Anomaly:     {result['is_anomaly']}")

In [None]:
#@title 7. MONITOR REQUESTS {display-mode: "form"}
#@markdown ### Monitor incoming requests from MT5

import time
from IPython.display import clear_output

print("Monitoring requests... (refresh every 5 seconds)")
print("Press STOP to exit monitoring.\n")

try:
    while True:
        clear_output(wait=True)
        print("="*40)
        print("D-LOGIC QUANT CORE - REQUEST MONITOR")
        print("="*40)
        print(f"\nTotal Requests:  {request_count['total']}")
        print(f"Successful:      {request_count['success']}")
        print(f"Errors:          {request_count['error']}")
        print(f"\nServer Status:   ONLINE")
        print(f"Timestamp:       {time.strftime('%Y-%m-%d %H:%M:%S')}")
        time.sleep(5)
except KeyboardInterrupt:
    print("\nMonitoring stopped.")