# Streaming 1 minute bars with technical analyis triggers

This notebook demonstrates how to build 1 minute bars with Data Libraries Streaming services and enrich them with technical analysis indicators/signals. The notebook introduces two options: one with price snapshot and another with stream recorder.

#### Learn more

To learn more about the Data Library for Python please join the LSEG Developer Community. By [registering](https://developers.lseg.com/iam/register) and [logging](https://developers.lseg.com/content/devportal/en_us/initCookie.html) into the LSEG Developer Community portal you will have free access to a number of learning materials like 
 [Quick Start guides](https://developers.lseg.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/quick-start), 
 [Tutorials](https://developers.lseg.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/learning), 
 [Documentation](https://developers.lseg.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/docs)
 and much more.

#### Getting Help and Support

If you have any questions regarding using the API, please post them on 
this [Q&A Forum](https://community.developers.refinitiv.com/spaces/321/index.html). 
The LSEG Developer Community will be happy to help. 

----

Below we import the requited packages:

In [None]:
import refinitiv.data as rd
import pandas as pd
import numpy as np
import talib as ta
import time
import re
from datetime import datetime, timedelta

pd.options.mode.chained_assignment = None

rd.open_session()

## Defining functions for producting TA signals

Below we define functions to calculate moving averages, rsi and stochastic signals.

In [None]:
# Define SMA function
def get_sma(close, short_period, long_period):
    short_sma = ta.SMA(close, short_period)[-2:]
    long_sma = ta.SMA(close,long_period)[-2:]
    sma_sell = ((short_sma <= long_sma) & (short_sma.shift(1) >= long_sma.shift(1)))
    sma_buy = ((short_sma >= long_sma) & (short_sma.shift(1) <= long_sma.shift(1)))
    return short_sma[-1], long_sma[-1], sma_sell[-1], sma_buy[-1]

In [2]:
#Define RSI function
def get_rsi(close, period):
    rsi = ta.RSI(close, period)[-2:]
    rsi_sell = (rsi>70) & (rsi.shift(1)<=70)
    rsi_buy = (rsi<30) & (rsi.shift(1)>=30)
    return rsi[-1], rsi_sell[-1], rsi_buy[-1]

In [4]:
#Define Stoch function
def get_stoch(close, high, low):
    slowk, slowd = ta.STOCH(high, low, close)
    stoch_sell = ((slowk < slowd) & (slowk.shift(1) > slowd.shift(1))) & (slowd > 80)
    stoch_buy = ((slowk > slowd) & (slowk.shift(1) < slowd.shift(1))) & (slowd < 20)
    return slowk[-1], slowd[-1], stoch_sell[-1], stoch_buy[-1]

## Functions for running technical analyis streaming data

Here we provide two functions which enable building 1 minute streaming bars along with technical analysis triggers. The first function uses get_snapshot function from the Data Libraries and the other one stream.recorder function.

### Function with gettting the snapshot

This function runs by combining the historical data with 1 minute pricing snapshots. We first request historical data in 1 minute interval for the requested assets, open a stream and get a snapshot of the stream every minute. By appending the latest snapshot with the historical datapoints we calculate ta indicators and produce triggers.

In [5]:
def run_ta_stream_snap(assets, asset_dict, assets_prices, stream, tech_indicators):
    new_assets_prices = {}
    price_snap = stream.get_snapshot()
    for asset in assets:
        prices_asset = assets_prices[asset]
        price_snap_asset = price_snap.loc[price_snap['Instrument'] == asset]
        date = price_snap_asset['CF_DATE'].values[0]
        time = price_snap_asset['CF_TIME'].values[0]
        print(f'Received price snapshot for {asset} as of {time}')
        
        price_snap_asset['CF_TIME'] = price_snap_asset['CF_TIME'].str[:5]
        prices = pd.concat([prices_asset, price_snap_asset.set_index('CF_TIME')])
        new_assets_prices[asset] = prices
        
        short_sma, long_sma, sma_sell, sma_buy = get_sma(prices['BID'],tech_indicators['s_ma'], tech_indicators['l_ma'])
        print(f' Short SMA: {short_sma} Long SMA: {long_sma} \n sma_sell: {sma_sell}, sma_buy: {sma_buy}')

        slowk, slowd, stoch_sell, stoch_buy  = get_stoch(prices['BID'], prices['BID_HIGH_1'], prices['BID_LOW_1'])
        print(f' SlowK: {slowk} SlowD: {slowd} \n stoch_sell: {stoch_sell}, stoch_buy: {stoch_buy}')

        rsi, rsi_sell,rsi_buy  = get_rsi(prices['BID'], tech_indicators['rsi_period'])   
        print(f' RSI: {rsi} \n rsi_sell: {rsi_sell}, rsi_buy: {rsi_buy}\n')
        
        asset_dict[asset].append({'Date':date, 'Time':time, 'short_sma':short_sma, 'long_sma':long_sma, 'sma_sell':sma_sell, 'sma_buy':sma_buy, 
                                  'rsi':rsi, 'rsi_sell':rsi_sell, 'rsi_buy':rsi_buy,  
                                  'slowK':slowk, 'slowD':slowd, 'stoch_sell':stoch_buy, 'stoch_buy':stoch_buy})
        
    assets_prices = pd.concat([df for df in new_assets_prices.values()], axis =1,
                              keys = [key for key in new_assets_prices.keys()])
    return assets_prices

In [5]:
def run_ta_stream_snap(assets, asset_dict, assets_prices, stream, tech_indicators):
    new_assets_prices = {}

    price_snap = stream.get_snapshot()
    for asset in assets:
        prices_asset = assets_prices[asset]
        price_snap_asset = price_snap.loc[price_snap['Instrument'] == asset]

        date = price_snap_asset.at[price_snap_asset.index[0], 'CF_DATE']
        time = price_snap_asset.at[price_snap_asset.index[0], 'CF_TIME']
        print(f'Received price snapshot for {asset} as of {time}')

        price_snap_asset.loc[:, 'CF_TIME'] = price_snap_asset['CF_TIME'].str[:5]
        prices = pd.concat([prices_asset, price_snap_asset.set_index('CF_TIME')])

        new_assets_prices[asset] = prices

        short_sma, long_sma, sma_sell, sma_buy = get_sma(prices['BID'], tech_indicators['s_ma'], tech_indicators['l_ma'])
        print(f'Short SMA: {short_sma} Long SMA: {long_sma} \nSMA sell: {sma_sell}, SMA buy: {sma_buy}')

        slowk, slowd, stoch_sell, stoch_buy = get_stoch(prices['BID'], prices['BID_HIGH_1'], prices['BID_LOW_1'])
        print(f'SlowK: {slowk} SlowD: {slowd} \nStoch sell: {stoch_sell}, Stoch buy: {stoch_buy}')

        rsi, rsi_sell, rsi_buy = get_rsi(prices['BID'], tech_indicators['rsi_period'])
        print(f'RSI: {rsi} \nRSI sell: {rsi_sell}, RSI buy: {rsi_buy}\n')

        asset_dict[asset].append({'Date': date, 'Time': time, 'short_sma': short_sma, 'long_sma': long_sma,
                                  'sma_sell': sma_sell, 'sma_buy': sma_buy, 'rsi': rsi, 'rsi_sell': rsi_sell,
                                  'rsi_buy': rsi_buy, 'slowK': slowk, 'slowD': slowd, 'stoch_sell': stoch_buy,
                                  'stoch_buy': stoch_buy})

    assets_prices = pd.concat([df for df in new_assets_prices.values()], axis=1, keys=assets)
    return assets_prices


Now, let's initialize the parameters

In [6]:
asset_dict = {'GBP=':[], 'EUR=':[], 'JPY=':[]}
assets = list(asset_dict.keys())
start = datetime.now() - timedelta(minutes = 20)
interval = '1min'
tech_indicators = {'s_ma':10, 'l_ma':20, 'rsi_period': 14}
timeout = time.time() + 300


In [7]:
assets_prices  = rd.get_history(assets, ['BID', 'BID_HIGH_1', 'BID_LOW_1'], start = start, 
                             interval = interval, count=max(tech_indicators['l_ma'], tech_indicators['rsi_period']) + 10).dropna()
stream = rd.open_pricing_stream(universe= assets,
                                fields=['CF_DATE', 'CF_TIME', 'BID', 'BID_HIGH_1', 'BID_LOW_1'])

while time.time() < timeout:
    time.sleep(60)
    assets_prices = run_ta_stream_snap(assets, asset_dict, assets_prices, stream, tech_indicators)
stream.close()

Received price snapshot for GBP= as of 10:46:32
Short SMA: 1.2475799999999997 Long SMA: 1.2477649999999998 
SMA sell: False, SMA buy: False
SlowK: 21.506734006734874 SlowD: 15.269360269360563 
Stoch sell: False, Stoch buy: True
RSI: 38.69923390003167 
RSI sell: False, RSI buy: False

Received price snapshot for EUR= as of 10:46:31
Short SMA: 1.09282 Long SMA: 1.0927950000000002 
SMA sell: False, SMA buy: False
SlowK: 28.636363636363683 SlowD: 12.878787878787895 
Stoch sell: False, Stoch buy: False
RSI: 56.04370094172866 
RSI sell: False, RSI buy: False

Received price snapshot for JPY= as of 10:46:32
Short SMA: 148.35699999999997 Long SMA: 148.3485 
SMA sell: False, SMA buy: True
SlowK: 38.23959956556079 SlowD: 57.14349425214013 
Stoch sell: False, Stoch buy: False
RSI: 47.862080843542664 
RSI sell: False, RSI buy: False

Received price snapshot for GBP= as of 10:47:32
Short SMA: 1.24748 Long SMA: 1.247735 
SMA sell: False, SMA buy: False
SlowK: 31.43939393939567 SlowD: 21.282267115601

<OpenState.Closed: 'Closed'>

In [111]:
pd.DataFrame(asset_dict['GBP='])

Unnamed: 0,Date,Time,short_sma,long_sma,sma_sell,sma_buy,rsi,rsi_sell,rsi_buy,slowK,slowD,stoch_sell,stoch_buy
0,2023-11-17,16:29:50,1.24221,1.24251,False,False,39.28553,False,False,49.154589,40.992046,False,False
1,2023-11-17,16:30:48,1.24217,1.24246,False,False,35.728327,False,False,61.231884,50.17933,False,False
2,2023-11-17,16:31:50,1.24213,1.2424,False,False,35.728327,False,False,70.531401,60.305958,False,False
3,2023-11-17,16:32:51,1.24206,1.242315,False,False,32.332931,False,False,68.599034,66.78744,False,False
4,2023-11-17,16:33:50,1.24201,1.242255,False,False,41.338423,False,False,69.082126,69.404187,False,False


### Function with stream recording

This function doesn not combine historical and snapshot data and isntead records the stream in provided interval and duration and builds the indicator on the records. In this function we first run the recorder for a required duration to calculate the provided indicators (e.g for the length of long moving average) and then record with minute interval with 1 minute duration.

In [93]:
def run_ta_stream_recorder(assets, asset_dict, stream, tech_indicators, price_metric, bar, duration, first_run):
    stream.recorder.record(frequency=bar, duration = duration)
    assets_prices = stream.recorder.get_history()
    if not first_run:
        for asset in assets:
 
            prices_asset = assets_prices[asset][price_metric]
            print(f'Build bar for {asset} with {prices_asset.last_valid_index()} timstamp')

            short_sma, long_sma, sma_sell, sma_buy = get_sma(prices_asset['close'],tech_indicators['s_ma'], tech_indicators['l_ma'])
            print(f' Short SMA: {short_sma} Long SMA: {long_sma} \n sma_sell: {sma_sell}, sma_buy: {sma_buy}')

            slowk, slowd, stoch_sell, stoch_buy  = get_stoch(prices_asset['close'], prices_asset['high'], prices_asset['low'])
            print(f' SlowK: {slowk} SlowD: {slowd} \n stoch_sell: {stoch_sell}, stoch_buy: {stoch_buy}')

            rsi, rsi_sell,rsi_buy  = get_rsi(prices_asset['close'], tech_indicators['rsi_period'])   
            print(f' RSI: {rsi} \n rsi_sell: {rsi_sell}, rsi_buy: {rsi_buy}\n')
            asset_dict[asset].append({'Timestamp':prices_asset.last_valid_index(),'short_sma':short_sma, 'long_sma':long_sma, 'sma_sell':sma_sell, 'sma_buy':sma_buy, 
                                  'rsi':rsi, 'rsi_sell':rsi_sell, 'rsi_buy':rsi_buy,  
                                  'slowK':slowk, 'slowD':slowd, 'stoch_sell':stoch_buy, 'stoch_buy':stoch_buy})


Now, let's initialize the parameters for this function.

In [None]:
asset_dict = {'GBP=':[], 'EUR=':[], 'JPY=':[]}
assets = list(asset_dict.keys())
first_run = True
bar = '1min'
price_metric = 'BID'
tech_indicators = {'s_ma':5, 'l_ma':10, 'rsi_period': 14}
timeout = time.time() + 1080

And finally, we open and record the stream by building the minute bars along with the ta triggers.

In [95]:
stream = rd.open_pricing_stream(universe= assets, fields=['BID', 'ASK', 'MID_PRICE', 'CF_LAST'])

while time.time() < timeout:
    duration = bar
    if first_run:
        multiplier = int(re.findall(r'\d+', bar)[0])
        duration = f"{multiplier*max(tech_indicators['l_ma'], tech_indicators['rsi_period'])}min"
        first_run = False
    assets_prices = run_ta_stream_recorder(assets, asset_dict, stream, tech_indicators, price_metric, bar, duration, first_run)
stream.recorder.stop()
stream.close()

1min 20min
Build bar for GBP= with 2023-11-17 12:57:23.350985 timstamp
 Short SMA: 1.2432100000000001 Long SMA: 1.2433299999999998 
 sma_sell: False, sma_buy: False
 SlowK: 30.7142857142872 SlowD: 26.843033509704373 
 stoch_sell: False, stoch_buy: False
 RSI: 54.763826360881026 
 rsi_sell: False, rsi_buy: False

Build bar for EUR= with 2023-11-17 12:57:23.350985 timstamp
 Short SMA: 1.08644 Long SMA: 1.0864300000000005 
 sma_sell: False, sma_buy: False
 SlowK: 22.22222222222221 SlowD: 17.407407407407405 
 stoch_sell: False, stoch_buy: True
 RSI: 50.05357710802165 
 rsi_sell: False, rsi_buy: False

Build bar for JPY= with 2023-11-17 12:57:23.350985 timstamp
 Short SMA: 149.25500000000002 Long SMA: 149.28750000000002 
 sma_sell: False, sma_buy: False
 SlowK: 44.781144781149855 SlowD: 58.383838383843425 
 stoch_sell: False, stoch_buy: False
 RSI: 44.22114177715731 
 rsi_sell: False, rsi_buy: False

1min 1min
Build bar for GBP= with 2023-11-17 12:58:23.446779 timstamp
 Short SMA: 1.2431500

<OpenState.Closed: 'Closed'>

In [96]:
pd.DataFrame(asset_dict['GBP='])

Unnamed: 0,Timestamp,short_sma,long_sma,sma_sell,sma_buy,rsi,rsi_sell,rsi_buy,slowK,slowD,stoch_sell,stoch_buy
0,2023-11-17 12:57:23.350985,1.24321,1.24333,False,False,54.763826,False,False,30.714286,26.843034,False,False
1,2023-11-17 12:58:23.446779,1.24315,1.243335,False,False,50.297729,False,False,44.047619,32.142857,False,False
2,2023-11-17 12:59:23.511672,1.2431,1.243325,False,False,46.23696,False,False,46.825397,40.529101,False,False
3,2023-11-17 13:00:23.581040,1.243,1.243295,False,False,44.310664,False,False,36.111111,42.328042,False,False
4,2023-11-17 13:01:23.647899,1.24292,1.243265,False,False,46.70194,False,False,31.944444,38.293651,False,False
5,2023-11-17 13:02:23.756639,1.24285,1.243215,False,False,44.637773,False,False,31.944444,33.333333,False,False
6,2023-11-17 13:03:23.856850,1.24281,1.24317,False,False,49.449997,False,False,42.65873,35.515873,False,False
7,2023-11-17 13:04:23.934189,1.2428,1.24313,False,False,51.710175,False,False,53.968254,42.857143,False,False
8,2023-11-17 13:05:24.026766,1.2428,1.24308,False,False,49.334657,False,False,55.357143,50.661376,False,False
9,2023-11-17 13:06:24.118362,1.24282,1.24305,False,False,55.88251,False,False,56.309524,55.21164,False,False


### Further Resources for Refinitiv Data Libraries API on Developer Community Portal

* [Overview](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python) 
* [Quick Start ](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/quick-start)
* [Documentation](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/documentation)
* [Tutorials](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/tutorials)
* [Q&A Forums](https://community.developers.refinitiv.com/spaces/321/index.html)