# Finanlzr Predictor Service on Colab

This notebook runs the predictor Flask service for the Finanlzr app. It uses Prophet for forecasting when available, falling back to scikit-learn LinearRegression.

Steps:
1. Install dependencies
2. Upload predictor.py (or copy from repo)
3. Run the predictor service
4. Expose with ngrok
5. Use the public URL in your app's .env as PREDICTOR_URL

In [None]:
# Install dependencies (TensorFlow, Flask, ngrok helper)
!pip install --quiet tensorflow==2.12.0 flask flask-cors numpy pandas scikit-learn pyngrok psutil

In [None]:
# Write a predictor.py that exposes a /predict endpoint using an LSTM model for short sequences.
predictor = r'''
import json
import os
import traceback
from flask import Flask, request, jsonify
from flask_cors import CORS
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
import threading
import hashlib
import pickle

app = Flask(__name__)
CORS(app)

# Simple persistent cache on-disk to survive Colab cell restarts (small key/value store)
CACHE_PATH = 'prediction_cache.pkl'
try:
    with open(CACHE_PATH, 'rb') as f:
        prediction_cache = pickle.load(f)
except Exception:
    prediction_cache = {}


def save_cache():
    try:
        with open(CACHE_PATH, 'wb') as f:
            pickle.dump(prediction_cache, f)
    except Exception as e:
        print('Failed to save cache:', e)


def cache_key(history, periods, mode):
    # deterministic short key using sha1 of history and params
    h = ','.join([str(float(x)) for x in history])
    raw = f'{h}|{periods}|{mode}'
    return hashlib.sha1(raw.encode()).hexdigest()


def build_lstm_model(input_len):
    # small, efficient LSTM suitable for short series (e.g., 7-60 points)
    model = Sequential([
        LSTM(32, input_shape=(input_len, 1), return_sequences=False),
        Dropout(0.08),
        Dense(16, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(learning_rate=0.01), loss='mse')
    return model


def prepare_sequences(history, lookback):
    # normalize to percent changes to make model scale-invariant
    arr = np.array(history, dtype=float)
    if len(arr) < 2:
        raise ValueError('Need at least 2 points')
    pct = (arr[1:] - arr[:-1]) / (arr[:-1] + 1e-9)
    # create sequences from pct changes; target is next pct change
    X, y = [], []
    for i in range(len(pct) - lookback):
        X.append(pct[i:i+lookback])
        y.append(pct[i+lookback])
    if len(X) == 0:
        # fallback: use last lookback as input if insufficient history
        pad = np.zeros(lookback - len(pct)) if lookback > len(pct) else []
        seq = np.concatenate([np.array(pct[-lookback:]) if len(pct) >= lookback else np.concatenate([np.zeros(max(0, lookback - len(pct))), pct])])
        return np.array([seq]), np.array([0.0])
    return np.array(X), np.array(y)


def predict_lstm(history, periods=1, fast=True):
    # Fast mode: tiny model trained for few epochs; accurate enough for short-term forecasts
    hist = np.array(history, dtype=float)
    n = len(hist)
    lookback = 7 if n >= 14 else max(3, n//2)
    # prepare sequences
    try:
        X, y = prepare_sequences(hist, lookback)
    except Exception:
        # fallback to linear regression prediction if we can't prepare sequences
        return predict_linear(history, periods)
    input_len = X.shape[1]
    # reshape for LSTM: (samples, timesteps, features)
    X_train = X.reshape((X.shape[0], X.shape[1], 1))
    # build and train small model
    model = build_lstm_model(input_len)
    epochs = 20 if not fast else 6
    batch = 8 if X_train.shape[0] >= 8 else 1
    model.fit(X_train, y, epochs=epochs, batch_size=batch, verbose=0)
    # iterative prediction in pct-change space
    preds = []
    last_window = X_train[-1].reshape(1, input_len, 1)
    base_last = hist[-1]
    for p in range(periods):
        next_pct = float(model.predict(last_window, verbose=0).flatten()[0])
        next_price = base_last * (1 + next_pct)
        preds.append(float(next_price))
        # roll for next iteration: compute next pct relative to predicted value
        # convert new pct sequence for subsequent prediction
        next_pct_norm = next_pct
        window = last_window.flatten().tolist()[1:] + [next_pct_norm]
        last_window = np.array(window).reshape(1, input_len, 1)
        base_last = preds[-1]
    return preds


def predict_linear(history, periods=1):
    # simple linear regression on raw values as fallback
    try:
        arr = np.array(history, dtype=float)
        X = np.arange(len(arr)).reshape(-1, 1)
        y = arr
        model = LinearRegression()
        model.fit(X, y)
        xf = np.arange(len(arr), len(arr) + periods).reshape(-1, 1)
        preds = model.predict(xf).tolist()
        return [float(x) for x in preds]
    except Exception as e:
        # worst-case: repeat last value
        return [float(history[-1])] * periods


@app.route('/')
def index():
    return jsonify({'ok': True, 'model': 'lstm', 'note': 'LSTM fast-mode enabled by default'})


@app.route('/predict', methods=['POST'])
def predict():
    try:
        body = request.get_json(force=True)
        history = body.get('historical')
        periods = int(body.get('periods', 1))
        fast = bool(body.get('fast', True))
        mode = 'lstm' if not body.get('force_linear') else 'linear'
        if not history or not isinstance(history, list):
            return jsonify({'error': 'provide `historical` as list of numbers'}), 400
        history = [float(x) for x in history]
        key = cache_key(history, periods, 'lstm' if not body.get('force_linear') else 'linear')
        if key in prediction_cache:
            return jsonify({'model': prediction_cache[key]['model'], 'predictions': prediction_cache[key]['predictions'], 'cached': True})
        # If data is very short, use linear fallback quickly
        if len(history) < 4 or body.get('force_linear'):
            preds = predict_linear(history, periods)
            prediction_cache[key] = {'model': 'linear', 'predictions': preds}
            save_cache()
            return jsonify({'model': 'linear', 'predictions': preds, 'cached': False})
        # Otherwise try LSTM (fast mode by default), with try/except to fallback to linear
        try:
            preds = predict_lstm(history, periods=periods, fast=fast)
            prediction_cache[key] = {'model': 'lstm', 'predictions': preds}
            save_cache()
            return jsonify({'model': 'lstm', 'predictions': preds, 'cached': False})
        except Exception as e:
            traceback.print_exc()
            preds = predict_linear(history, periods)
            prediction_cache[key] = {'model': 'linear', 'predictions': preds}
            save_cache()
            return jsonify({'model': 'linear', 'predictions': preds, 'cached': False})
    except Exception as e:
        traceback.print_exc()
        return jsonify({'error': str(e)}), 500


if __name__ == '__main__':
    # Run Flask app (threaded) - Colab will keep it alive while cell is running
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
'''

with open('predictor.py', 'w') as f:
    f.write(predictor)

print('Wrote predictor.py with LSTM-based predictor (fast-mode).')

In [None]:
# Run the predictor in background thread (so the notebook cell doesn't block)
import subprocess, time, sys, os
# Kill any existing predictor process (colab env might have one)
try:
    import psutil
    for p in psutil.process_iter():
        try:
            cmd = ' '.join(p.cmdline() or [])
            if 'predictor.py' in cmd:
                p.kill()
        except Exception:
            continue
except Exception:
    pass
proc = subprocess.Popen([sys.executable, 'predictor.py'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
time.sleep(2)
print('Predictor launched (pid=', proc.pid, ')')

In [None]:
# Expose the local server using pyngrok
from pyngrok import ngrok
# Optionally set your ngrok auth token here if needed:
# ngrok.set_auth_token('YOUR_NGROK_AUTH_TOKEN')
public_url = ngrok.connect(5000)
print('Public URL:', public_url)
print('Use this URL as PREDICTOR_URL in your app\'s .env')

## Testing the Predictor

Test the service with a sample request:

In [None]:
# Test the predictor with a sample request
import requests, time
# public_url variable is returned by ngrok cell above (if run in same session)
try:
    url = public_url.public_url + '/predict'
    data = {'historical': [100, 101, 102, 99, 105, 108, 110, 112], 'periods': 1}
    r = requests.post(url, json=data, timeout=20)
    print('Response:', r.json())
except Exception as e:
    print('Test failed:', e)

## Notes
- Keep the notebook running to keep the service alive.
- If ngrok asks for auth, sign up at ngrok.com and set your token.
- Update your app's .env with the ngrok URL and restart the app.