# demo
> some simple signal processing

In [None]:
#|default_exp demo

We will design a system that consumes market data. The market data is processed and sybmol ticks are pushed to the system. 

The system will process data and calculate a volatility of market.  Volatility of the market is defined as a mean volatility across the price volatility of individual symbols. 

We define a volatility of symbol as a standard deviation of last 20 trades in log scale.


In [None]:
#| export
from mana_signals.core import *
from mana_signals.reactive import *
from mana_signals.data import *
import numpy as np
from typing import List
from pathlib import Path
import argparse


In [None]:
#| export
class RVolCalculator(RValue): 
    """
    Vol calculator for given price input.

    Calculates vol of last 20 price ticks for a given input.
    """
    def __init__(self, priceInput: RValue):
        super().__init__()
        self.last_trades = RLastn(priceInput, n=20)
    def calc(self): return np.std(np.log1p(self.last_trades.value))

In [None]:
#| export
class RMarketVol(RValue): 
    """
    calculates average volatility of the given price inputs
    """
    def __init__(self,symbol_prices: List[RValue]):
        super().__init__()
        self.deps = [RVolCalculator(v) for v in symbol_prices]
    def calc(self): 
        vols = np.array([d.value for d in self.deps])
        return np.nanmean(vols)

In [None]:
#| export
class ExecEngine():
    """Execution engine for the market"""
    def load_all_market_data_files(self, directory: Path):
        syms = get_symbols(directory)
        frames = [load_symbol(directory, sym) for sym in syms]
        self.market_data = make_sequential_stream(frames)

    def load_market_data_files(self, files: List[Path]): 
        frames = [load_market_data_file(file) for file in files]
        self.market_data = make_sequential_stream(frames)

    def run(self): 
        "Runs the engine by feeding market data to the system"
        for ts, row in self.market_data.iterrows():
            self.process(ts, row)
    def process(self, ts, row): raise NotImplementedError('Override engine process function')
        
        

# Simple Exec Engine

In [None]:
engine = ExecEngine()
engine.load_market_data_files([Path('../marketdata/20230101.MATIC.csv.gz')])

In [None]:
engine.market_data.index

DatetimeIndex(['2023-01-01 00:00:02.789350', '2023-01-01 00:00:06.276571',
               '2023-01-01 00:00:07.272223', '2023-01-01 00:00:07.872568',
               '2023-01-01 00:00:11.472638', '2023-01-01 00:00:11.472638',
               '2023-01-01 00:00:11.472638', '2023-01-01 00:00:11.472638',
               '2023-01-01 00:00:11.472638', '2023-01-01 00:00:11.472638',
               ...
               '2023-01-01 23:59:10.772233', '2023-01-01 23:59:29.672451',
               '2023-01-01 23:59:29.672451', '2023-01-01 23:59:29.672451',
               '2023-01-01 23:59:56.772372', '2023-01-01 23:59:56.772372',
               '2023-01-01 23:59:56.772372', '2023-01-01 23:59:56.772372',
               '2023-01-01 23:59:57.772425', '2023-01-01 23:59:57.772425'],
              dtype='datetime64[ns]', name='date', length=26151, freq=None)

# VolSignalEngine

In [None]:
#| export
class SignalEngine(ExecEngine):
    def __init__(self): 
        super().__init__()
        self.market_price_inputs = {
            'MATICUSDT': RInput(np.nan),
            'OP': RInput(np.nan),
            'XRP': RInput(np.nan)
        }
        self.market_vol = RMarketVol(self.market_price_inputs.values())
        self.result=[]

    def process(self, ts, row):
        input = self.market_price_inputs[row.symbol]
        input.set_value(row.price)
        self.output(f'{ts}, {self.market_vol.value}')

    def output(self, line): 
        self.result.append(line)
        
    

In [None]:
engine = SignalEngine()

In [None]:
engine.load_market_data_files(['../marketdata/20230101.MATIC.csv.gz'])

In [None]:
engine.run()

  return np.nanmean(vols)


In [None]:
engine.result[-10:]

['2023-01-01 23:59:10.772233, 6.0864435782985807e-05',
 '2023-01-01 23:59:29.672451, 5.472028913292919e-05',
 '2023-01-01 23:59:29.672451, 4.7112437455448106e-05',
 '2023-01-01 23:59:29.672451, 4.4377844789784614e-05',
 '2023-01-01 23:59:56.772372, 5.042174862790632e-05',
 '2023-01-01 23:59:56.772372, 5.56702216927174e-05',
 '2023-01-01 23:59:56.772372, 6.19808456245811e-05',
 '2023-01-01 23:59:56.772372, 6.3460650664703e-05',
 '2023-01-01 23:59:57.772425, 6.224075795312596e-05',
 '2023-01-01 23:59:57.772425, 6.086400637705373e-05']

And we got results collected to the output


# Command line utility for real time processing

For the demo, we also create a command line utility for processing data

Invocation: 

> simulate_signal_processing --data ./marketdata/20230101.MATIC.csv.gz
> 

In [None]:
#| export
class SignalEngineLive(SignalEngine):
    def output(self, line): print(line)

In [None]:
engine = SignalEngineLive()
engine.load_market_data_files(['../marketdata/20230101.MATIC.csv.gz'])
engine.run()

  return np.nanmean(vols)


In [None]:
#|export

def simulate_signal_processing():
    """Script entry point for signal processing"""
    parser = argparse.ArgumentParser(description='Simulate Signal Processing')
    parser.add_argument('data', type=str, help='input data file in csv format')
    args = parser.parse_args()
    engine = SignalEngineLive()
    engine.load_market_data_files([args.data])
    engine.run()


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()