## Monitor agrakal daily data (FUTURES) - Python

### Overview

This sample makes the inventory of the reference futures for agrakal

### Services used
This sample uses *gRPC requests* in order to retrieve daily data from the dedicated hosted service. The queried endpoint in this script are:
* StaticData: to get the intrument identifier from the input
* Bars: to get market data for each instrument

### Modules required
1. Systemathics packages:
    * *systemathics.apis*
2. Open source packages
    * *googleapis-common-protos*
    * *protobuf*
    * *grpcio*
    * *pandas*

***

# Run index agrakal daily reference data sample

### Step 1: Install packages

In [None]:
pip install googleapis-common-protos protobuf grpcio pandas

In [None]:
pip install systemathics.apis --pre

In [None]:
import os
import grpc
import pandas as pd
from datetime import datetime
from datetime import timedelta 
import google.type.date_pb2 as date
import google.type.timeofday_pb2 as timeofday
import google.type.dayofweek_pb2 as dayofweek
import google.protobuf.duration_pb2 as duration
import google.protobuf as pb

import systemathics.apis.type.shared.v1.identifier_pb2 as identifier
import systemathics.apis.type.shared.v1.constraints_pb2 as constraints
import systemathics.apis.type.shared.v1.date_interval_pb2 as dateinterval
import systemathics.apis.type.shared.v1.time_interval_pb2 as timeinterval
import systemathics.apis.services.static_data.v1.static_data_pb2 as static_data
import systemathics.apis.services.static_data.v1.static_data_pb2_grpc as static_data_service
import systemathics.apis.services.tick_analytics.v1.tick_bars_pb2 as tick_bars
import systemathics.apis.services.tick_analytics.v1.tick_bars_pb2_grpc as tick_bars_service
import systemathics.apis.helpers.token_helpers as token_helpers
import systemathics.apis.helpers.channel_helpers as channel_helpers

### Step 2: Retrieve authentication token
The following code snippet sends authentication request and print token to console output in order to process the upcomming *gRPC queries*.

In [None]:
token = token_helpers.get_token()
display(token)

### Step 3: Specify parameters
To request *BarsService* we need to specify the **Instrument identifier** and some **Time Constraints**:

#### 3.1 Instrument selection

In [None]:
# set instrument identifier: exchange + ticker
tickerexchange_array = [['FESX', 'XEUR'], #Euro STOXX 50 Future
                        ['FDAX', 'XEUR'], # DAX Future
                        ['DX', 'IFUS'], # US Dollar Index Future
                        ['ZB', 'XCBT'] # US Treasury Bond Future
                       ]
length = len(tickerexchange_array)

#### 3.2 Time constraints and bars parameters selection
Specify the time constraints such as bar duration, first date, etc.

In [None]:
start = datetime(year=2021,month=1,day=1)
end = datetime.today()
start_time = timeofday.TimeOfDay(hours = 10, minutes = 0, seconds = 0)
end_time = timeofday.TimeOfDay(hours = 17, minutes = 0, seconds = 0)
offset = duration.Duration(seconds = start_time.hours*3600)
sampling = 3600*(end_time.hours - start_time.hours)

### Step 4: Define necessary methods
In this part we define the methods that will be used later on for the full workflow

#### Step 4.1: Future selection methods

In [None]:
# Define a method to request all futures from a given contract code
# input: contract
# output: static data request
def get_static_data_request(contract):
    # generate request and add filter values
    data_request = static_data.StaticDataRequest( asset_type = static_data.AssetType.ASSET_TYPE_FUTURE)
    data_request.future_contract.value = contract
    data_request.count.value = 1000
    return data_request

In [None]:
# Define a method to request static data for the given future and return all futures which are not expired yet 
# input: contract, start date (for filtering)
# output: list of filtered futures as returned by the API
def get_sorted_futures(contract, start_date):
    data_request = get_static_data_request(contract)
    try:
        # open a gRPC channel
        with channel_helpers.get_grpc_channel() as channel:  

            rpc_service = static_data_service.StaticDataServiceStub(channel)
            metadata = [('authorization', token)]

            # Process the request
            response = rpc_service.StaticData(request=data_request, metadata=metadata)
            
        # Sort futures : fix this issue = cant filter
        #sorted_futures = response.futures
        sorted_futures = sorted(response.futures, key=lambda x: (x.maturity.year, x.maturity.month))

        # Filter according to date
        selected_futures=[]
        for future in sorted_futures:
            maturity = datetime(future.maturity.year, future.maturity.month,future.maturity.day)
            if maturity >= start_date:
                selected_futures.append(future)
        return selected_futures
    except grpc.RpcError as e:
        display(e.code().name)
        display(e.details())

In [None]:
# define a method returning the front future for a given date
# input: list of futures + current date
# output: front future
def get_front(futures, time):
    for future in futures:
        maturity = datetime(future.maturity.year, future.maturity.month,future.maturity.day)
        #print("{0} vs {1}".format(maturity, time))
        if maturity > time:
            return future
    # default
    return None

In [None]:
# define a method returning the front future for a given date
# input: list of futures + current date
# output: front future
def get_back(futures, time):
    skipped_front = False
    for future in futures:
        maturity = datetime(future.maturity.year, future.maturity.month,future.maturity.day)
        #print("{0} vs {1}".format(maturity, time))
        if maturity > time:
            if skipped_front:
                return future
            else:
                skipped_front = True
    # default
    return None

#### Step 4.2: Bars request processing methods

In [None]:
# define a method returning intraday bars for given identifier (other parameters are locked from the notebook inputs)
def generate_bars_parameters(start_time, end_time, selected_futures):
    # Get all futures from given start date to end date
    current_time = start_time
    identifiers, starts, ends = [],[],[]
    while(current_time<end_time):
        front = get_front(selected_futures, current_time)
        maturity = datetime(front.maturity.year, front.maturity.month,front.maturity.day)

        # add data
        identifiers.append(front.identifier)
        starts.append(current_time)

        # prepare to skip to next future if needed
        current_time = maturity + timedelta(days=1)
        ends.append(front.maturity)

        
        
    # store in a dataframe and visualize start/end dates
    d = {'Identifier': identifiers, 'Start': starts, 'End': ends}
    df = pd.DataFrame(data=d)
    return df

In [None]:
# Define a method that creates a request to the intraday bars endpoint with the given inputs
def get_bars_request(identifier, start, end, sampling, start_time, end_time, offset):
    
    # --> Sampling
    sampling = sampling
    
    # --> Constraints : specify the start/close date, a time interval, and filtered days.

    # Set the start/close date
    start_date = date.Date(year = start.year, month =start.month, day = start.day)
    end_date = date.Date(year = end.year, month =end.month, day = end.day)
    date_interval = dateinterval.DateInterval(start_date = start_date, end_date= end_date)

    # Set days to exclude
    excluded_days = [ dayofweek.SATURDAY, dayofweek.SUNDAY ]
    
    time_interval = timeinterval.TimeInterval(
        start_time = start_time, 
        end_time = end_time)

    # Constraints: no need time interval
    constraint = constraints.Constraints(
        date_intervals = [date_interval],
        time_intervals = [time_interval],
        excluded_days = excluded_days,
        excluded_dates = [])
    
    request = tick_bars.TickBarsRequest(identifier = identifier,
                                    field = tick_bars.BAR_PRICE_TRADE,
                                    sampling = duration.Duration(seconds = sampling),
                                    constraints = constraint,
                                    offset = offset)
    return request
    

In [None]:
# define a method returning intraday bars for given identifier (other parameters are locked from the notebook inputs)
def get_intraday_bars(identifier, start_date, end_date):
    my_bars_request = get_bars_request(identifier, start_date, end_date, sampling, start_time, end_time, offset)

    # open a gRPC channel
    with channel_helpers.get_grpc_channel() as channel:  

        # instantiate the tick bars service
        bars_service = tick_bars_service.TickBarsServiceStub(channel)

        # process the tick bars request
        bars = []
        metadata = [('authorization', token)]
        for bar in bars_service.TickBars(request=my_bars_request, metadata=metadata):
                bars.append(bar)
    return bars

#### Step 4.3: Bars export methods

In [None]:
# Export the given ticker from given dictionary in a csv
def export_bars(bars_dict, ticker):
    dates=[datetime.fromtimestamp(b.time_stamp.seconds) for b in bars_dict[ticker]]
    opens = [b.open for b in bars_dict[ticker]]
    highs = [b.high for b in bars_dict[ticker]]
    lows = [b.low for b in bars_dict[ticker]]
    closes = [b.close for b in bars_dict[ticker]]
    volumes = [ts.volume for ts in bars_dict[ticker]]
    counts = [ts.count for ts in bars_dict[ticker]]
    vwaps = [ts.vwap for ts in bars_dict[ticker]]

    # create a pandas dataframe with: dates, bars and ticks count used for each bar
    d = {'Date': dates, 'Open': opens, 'High': highs, 'Low' : lows,'Close': closes, 'Volume': volumes, 'Count': counts, 'Vwap': vwaps}
    df_export = pd.DataFrame(data=d)

    filename = 'Export/Future/{0}_bars.csv'.format(ticker)
    df_export.to_csv(filename, index=False)
    print("Exporting {0} bars to {1} ...".format(ticker,filename))
    

In [None]:
# Export the given dictionary in a csv (all futures are joined)
def export_joined_bars(bars_dict):
    # Extract contract name
    tmp = list(bars_dict.keys())[0]
    contract = tmp[:len(tmp)-3]
    
    # process
    dates, opens, highs,lows,closes, volumes,counts, vwaps,tickers = [],[],[],[],[],[],[],[],[]
    for key in bars_dict.keys():
        dates=  dates + [datetime.fromtimestamp(b.time_stamp.seconds) for b in bars_dict[key]]
        opens = opens + [b.open for b in bars_dict[key]]
        highs =  highs + [b.high for b in bars_dict[key]]
        lows =lows+ [b.low for b in bars_dict[key]]
        closes = closes+[b.close for b in bars_dict[key]]
        volumes =volumes+ [ts.volume for ts in bars_dict[key]]
        counts = counts+[ts.count for ts in bars_dict[key]]
        vwaps =vwaps+ [ts.vwap for ts in bars_dict[key]]
        tickers =tickers+ [key]*len(bars_dict[key])
    
    # create a pandas dataframe with: dates, bars and ticks count used for each bar
    d = {'Date': dates, 'Open': opens, 'High': highs, 'Low' : lows,'Close': closes, 'Volume': volumes, 'Count': counts, 'Vwap': vwaps, 'Ticker': tickers}
    df_export = pd.DataFrame(data=d)

    filename = 'Export/Future/Joined_bars/{0}_bars.csv'.format(contract)
    df_export.to_csv(filename, index=False)
    print("Exporting joined {0} bars to {1} ...".format(contract,filename))

#### Step 4.4: Complete workflow methods

In [None]:
# This method runs the full process to export bars
def run_full_process(contract,start,end):
    # generate future list, SORTED, + FILTERED (expired ones are removed)
    my_futures = get_sorted_futures(contract, start)
    print("Found {0} futures for contract code {1} from {2} to {3}".format(len(my_futures),contract, start,end))
    
    
    # Generate bars parameters: get front
    bars_df = generate_bars_parameters(start, end, my_futures)
    bars_df.head(3)
    
    # --> iterate all futures and request for bars, then store in a dictionary
    bars_dict = {}
    for i in range(len(bars_df.index)):
        identifier = bars_df['Identifier'][i]
        first = bars_df['Start'][i]
        last = bars_df['End'][i]
        bars = get_intraday_bars(identifier,first , last)
        bars_dict[identifier.ticker] = bars
        print('Retrieved {0} bar entries for {1} between {2:%Y/%m/%d} and {3}'.format(len(bars), identifier.ticker, first, last))
    
    
    # --> Export individual bars (for each asset = each key in the dictionary)
    for key in bars_dict.keys():
        export_bars(bars_dict, key)
        
    # --> Export joined bars in a single file
    export_joined_bars(bars_dict)

### Step 5: Process each future separately
In this part we'll use the previously defined methods to create a continuous bar series for the given assets

#### Step 5.1: Test for WBS: One shot run

In [None]:
contract = 'WBS'
run_full_process(contract,start,end)

#### Step 5.2: Test for WBS: step by step
This example reproduces the 'one-shot' process for continuous bars creation and gives details about the different steps

In [None]:
# Instrument selection
contract = 'DX'

# Query static data and filter expired futures
my_futures = get_sorted_futures(contract, start) # get futures

#for f in my_futures:
#    print("{0}-{1}".format(f.maturity.year,f.maturity.month))

In [None]:
# Generate bars parameters for following bars requests
bars_df = generate_bars_parameters(start,end,  my_futures) # generate bars_start/end time + identifiers
bars_df.head(15)

In [None]:
# Request bars for each instrument according to the date constraints generated above
# results are stored in a dictionary indexed by instrument tickers: (ex FESXH18)
bars_dict = {}

# iterate all futures
for i in range(len(bars_df.index)):
    identifier = bars_df['Identifier'][i]
    first = bars_df['Start'][i]
    last = bars_df['End'][i]
    bars = get_intraday_bars(identifier,first , last)
    bars_dict[identifier.ticker] = bars


In [None]:
# Export bars for each instrument
for key in bars_dict.keys():
    export_bars(bars_dict, key)

In [None]:
# Export continouas bars joined together
export_joined_bars(bars_dict)

#### Step 5.3: Agrakal use-case loop workflow for every asset

In [None]:
for pair in tickerexchange_array:
    run_full_process(pair[0],start,end)