In [2]:
import numpy as np
import asyncio

# Create a fake trade feed

In [3]:
async def trade_feed(num_trade=10):
    price = 100
    for _ in range(num_trade):
        price *= np.exp(np.random.normal(scale=0.01))
        price = float(np.round(price, 2))
        volume = int(np.random.normal(scale=100))
        yield volume, price
        dt = np.random.uniform(0, 0.2)
        await asyncio.sleep(dt)

async for v, p in trade_feed():
    print(v, p)
    

20 99.96
107 99.41
-16 99.58
80 101.65
-20 102.21
149 101.21
-39 100.36
-233 99.85
52 98.6
152 97.48


In [5]:
async def price_feed(num_trade=10):
    price = 100
    for _ in range(num_trade):
        price *= np.exp(np.random.normal(scale=0.01))
        price = float(np.round(price, 2))
        yield price
        dt = np.random.uniform(0, 0.2)
        await asyncio.sleep(dt)

async for  p in price_feed():
    print(p)
    
from screamer import RollingMean

obj = RollingMean(10)
for x in obj(price_feed(30)):
    print(x)


100.56
101.51
100.62
100.99
100.52
98.6
100.0
101.15
102.37
102.62
we have an async_generator


ValueError: Unsupported input type for call: [<class 'async_generator'>]

### A simple Cashflow class: compute price * volume

In [15]:
from collections.abc import Iterable
import inspect

class Cashflow:

    def __init__(self):
        pass

    # main implementation
    def process_scalar(self, volume, price):
        return abs(volume * price)

    # generic code: handeling generator of tuples
    def process_generator(self, values_gen):
        for values in values_gen:
            # check that values has N elements, all convertable to double
            yield self.process_scalar(*values)

    # generic code: handeling numpy arrays
    def process_numpy_arrays(self, volumes, prices):
        ans = np.zeros_like(prices)
        for i in range(len(prices)):
            ans[i] = self.process_scalar(volumes[i], prices[i])
        return ans

    # generic code: handeling an asynchronous generator of tuples
    async def process_async_generator(self, async_values_gen):
        print(type(async_values_gen))
        async for values in async_values_gen:
            # check that values has N elements, all convertable to double
            yield self.process_scalar(*values)  
        
    # generic code: dispatcher
    def __call__(self, arg0, arg1=None):
        if isinstance(arg0, np.ndarray):
            # check that we have N args, all numpy array, all of the same shape
            return self.process_numpy_arrays(arg0, arg1)
        if isinstance(arg0, Iterable):
            # check that we have 1 arg
            return self.process_generator(arg0)
        if inspect.isasyncgen(arg0):
            # check that we have 1 arg
            return self.process_async_generator(arg0)
        # check that we have N args, all convertable to double
        return self.process_scalar(arg0, arg1)


### A Scalar

In [16]:
cf = Cashflow()

cf(3,4)

12

### Numpy arrays

In [17]:
prices = np.array([100,101,102,101])
volumes = np.array([3,-1,-2,1])

cf = Cashflow()
cf(volumes, prices)


array([300, 101, 204, 101])

### Generator

In [18]:
gen = iter([(2,300), (-1, 101), (-2, 102), (1, 101)])


list(cf(gen))

[600, 101, 204, 101]

### Async generator

In [19]:
cf = Cashflow()

async for c in cf(trade_feed()):
    print(c)


<class 'async_generator'>
4790.4
5604.8099999999995
7782.29
1486.3500000000001
20843.57
3115.5
24330.68
5608.96
13257.44
1665.66


In [23]:
from screamer import RollingMean

obj = RollingMean(30, start_policy='expanding')

async for c in obj(trade_feed()):
    print(c)


Unsupported input type for call: [<class 'async_generator'>]


ValueError: Unsupported input type for call

In [24]:
tmp = trade_feed()
print(type(tmp))
print(dir(tmp))

<class 'async_generator'>
['__aiter__', '__anext__', '__class__', '__class_getitem__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'aclose', 'ag_await', 'ag_code', 'ag_frame', 'ag_running', 'asend', 'athrow']
