# Ingest Real-Time Stock Data to Iguazio NoSQL and Time-series DB
the following example function ingest real-time stock information from an internet service (Yahoo finance api) into iguazio platform.<br>
everytime the data is updated it updates a NoSQL table with the recent metadata and updates the time-series DB with the new metrics (price and volume)

The same code can run inside a nuclio (serverless) function and be automatically triggered on a predefined schedule (cron) or through HTTP requests<br>

the example demonstrate the use of `%nuclio` magic commands to specify environment variables, package dependencies,<br>configurations (such as the cron schedule), and to deploy functions automatically onto a cluster.

In [4]:
# if the nuclio-jupyter package is not installed run !pip install nuclio-jupyter
import nuclio 

## Environment

copy the local credentials to the nuclio function config (-c option doesn't initialize locally)

In [5]:
%nuclio env -c V3IO_ACCESS_KEY=${V3IO_ACCESS_KEY}
%nuclio env -c V3IO_USERNAME=${V3IO_USERNAME}
%nuclio env -c V3IO_API=${V3IO_API}

### Set function configuration 
use a cron trigger with 5min interval and define the base image<br>
for more details check [nuclio function configuration reference](https://github.com/nuclio/nuclio/blob/master/docs/reference/function-configuration/function-configuration-reference.md)

In [6]:
%%nuclio config 
spec.triggers.secs.kind = "cron"
spec.triggers.secs.attributes.interval = "300s"
spec.build.baseImage = "python:3.6-jessie"

%nuclio: setting spec.triggers.secs.kind to 'cron'
%nuclio: setting spec.triggers.secs.attributes.interval to '300s'
%nuclio: setting spec.build.baseImage to 'python:3.6-jessie'


### Install required packages
`%nuclio cmd` allows you to run image build instructions and install packages<br>
Note: `-c` option will only install in nuclio, not locally

In [7]:
%%nuclio cmd -c 
pip install lxml
pip install yfinance
pip install requests
pip install v3io_frames

## Nuclio function implementation
this function can run in Jupyter or in nuclio (real-time serverless)

In [8]:
# nuclio: start-code

In [9]:
import json
import requests
import yfinance as yf
import os
import pandas as pd
import datetime
import v3io_frames as v3f
import ast

In [62]:
def init_context(context):
    # Setup V3IO Client
    client = v3f.Client('framesd:8081',container=os.getenv('V3IO_CONTAINER', 'bigdata'))
    setattr(context, 'v3c', client)
    
    # Create V3IO Tables and add reference to context
    setattr(context, 'stocks_kv_table', os.getenv('STOCKS_KV_TABLE', 'stocks_kv'))
    setattr(context, 'stocks_tsdb_table', os.getenv('STOCKS_TSDB_TABLE', 'stocks_tsdb'))
    context.v3c.create(backend='tsdb', table=context.stocks_tsdb_table, rate='1/m', if_exists=1)
    
    stocks = os.getenv('STOCK_LIST','GOOG,MSFT,AMZN,AAPL,INTC')
    if stocks.startswith('['):
        stock_syms = ast.literal_eval(stocks)
    else: 
        stock_syms = stocks.split(',')
    setattr(context, 'stock_syms', stock_syms)
    

    # v3io update expression template 
    expr_template = os.getenv('EXPRESSION_TEMPLATE', "symbol='{symbol}';price={price};volume={volume};last_updated='{last_updated}'")
    setattr(context, 'expr_template', expr_template)

    last_trade_times = {}
    setattr(context, 'last_trade_times', last_trade_times)

In [63]:
def handler(context, event):
    
    stocks=[]; times=[]; volumes=[]; prices=[]
    
    for sym in context.stock_syms:
        hist = yf.Ticker(sym).history(period='5m', interval='1m')
        time = hist.index[len(hist) - 1]
        record = hist.loc[time]
        last = context.last_trade_times.get(sym)
        context.logger.info(f'Received {sym} data from yfinance, including {len(hist)} candles ending at {last}')

        
        # update the stocks table and TSDB metrics in case of new data 
        if not last or time > last:
            
            # update NoSQL table with stock data
            stock = {'symbol': sym, 'price': record['Close'], 'volume': record['Volume'], 'last_updated': time}
            expr = context.expr_template.format(**stock)
            context.logger.debug_with('update expression', symbol=sym, expr=expr)
            context.v3c.execute('kv', context.stocks_kv_table, 'update', args={'key': sym, 'expression': expr})
            context.logger.info(f'Updated records from {last} to {time}')
            # update time-series DB with price and volume metrics (use pandas dataframe with a single row, indexed by date)
            context.last_trade_times[sym] = time 
            stocks += [sym]
            times +=[time]
            volumes += [record['Volume']]
            prices += [record['Close']]
        else:
            context.logger.info(f'No update was made, current TS: {last} vs. new data {time}')
               
    # write price and volume metrics to the Time-Series DB, add exchange label
    if len(stocks)>0:
        df = pd.DataFrame({'volume':volumes,'price': prices}, index=[times,stocks], columns=['volume','price'])
        df.index.names=['time','symbol']
        context.logger.debug_with('writing data to TSDB', stocks=stocks)
        context.v3c.write(backend='tsdb', table=context.stocks_tsdb_table, dfs=df)
        
    return 'done'

In [49]:
# nuclio: end-code

## Function invocation
### Local test
the following section simulates nuclio function invocation and will emit the function results

In [64]:
# create a test event and invoke the function locally 
init_context(context)
event = nuclio.Event(body='')
handler(context, event)

Python> 2020-09-15 10:29:34,151 [info] Received GOOG data from yfinance, including 5 candles ending at None
Python> 2020-09-15 10:29:34,156 [info] Updated records from None to 2020-09-14 15:59:00-04:00
Python> 2020-09-15 10:29:34,463 [info] Received MSFT data from yfinance, including 5 candles ending at None
Python> 2020-09-15 10:29:34,469 [info] Updated records from None to 2020-09-14 15:59:00-04:00
Python> 2020-09-15 10:29:34,766 [info] Received AMZN data from yfinance, including 5 candles ending at None
Python> 2020-09-15 10:29:34,776 [info] Updated records from None to 2020-09-14 15:59:00-04:00
Python> 2020-09-15 10:29:35,066 [info] Received AAPL data from yfinance, including 5 candles ending at None
Python> 2020-09-15 10:29:35,070 [info] Updated records from None to 2020-09-14 15:59:00-04:00
Python> 2020-09-15 10:29:35,382 [info] Received INTC data from yfinance, including 5 candles ending at None
Python> 2020-09-15 10:29:35,385 [info] Updated records from None to 2020-09-14 15:59

'done'

## Deploy to cluster

In [1]:
from mlrun import code_to_function

environment_variables = {'STOCK_LIST': ['GOOG', 'MSFT', 'AMZN', 'AAPL', 'INTC'],
                         'V3IO_CONTAINER': 'bigdata',
                         'STOCKS_TSDB_TABLE': 'stocks/stocks_tsdb',
                         'STOCKS_KV_TABLE': 'stocks/stocks_kv',
                         'EXPRESSION_TEMPLATE': "symbol='{symbol}';price={price};volume={volume};last_updated='{last_updated}'"}

fn = code_to_function('read-stocks',
                      kind='nuclio',
                      handler='handler')
fn.export('03-read-stocks.yaml')
fn.set_envs(environment_variables)

> 2020-09-15 16:40:59,819 [info] function spec saved to path: 03-read-stocks.yaml


<mlrun.runtimes.function.RemoteRuntime at 0x7fc2e02fb828>

In [60]:
fn.deploy(project='stocks')

> 2020-09-15 10:16:31,722 [info] deploy started
[nuclio] 2020-09-15 10:16:40,935 (info) Build complete
[nuclio] 2020-09-15 10:16:49,051 (info) Function deploy complete
[nuclio] 2020-09-15 10:16:49,059 done updating stocks-read-stocks, function address: 192.168.224.209:32012


'http://192.168.224.209:32012'