# Predicting with Exponential Moving Averages

I've had some success predicting swings by eyeballing exponential moving averages (EMAs). Now I want to use that idea to backtest against months of PancakeSwap's BNB prediction game.

In [52]:
import os
import sys
import web3
import time
import json
import tqdm
import random
import pandas as pd
import numbers
import requests
import datetime
import bsc_analysis

## OHLC Data

Implement a class that obtains OHLC data from whatever source, can save/load as JSON, and can update on demand.

In [54]:
class OHLCData(dict):
    '''
    A class that obtains OHLC data from whatever source, can save/load as JSON, and can update on demand.
    It calculates a list of statistical indicators. Supported indicator types are:
        ema       : exponential moving average - alpha is controlled by number of periods in window
        crossover : abs=# periods since the last time val-a went up over val-b, sign=current comparison
    '''
    start_date   = '2022-02-10' #'2021-07-25'
    storage_file = 'bnb_busd_ohlc.json'
    data         = None
    targ_address = '0xbb4CdB9CBd36B01bD1cBaEBF2De08d9173bc095c'
    busd_address = '0xe9e7cea3dedca5984780bafc599bd69add087d56'
    
    def __init__(self, address= None, stats= None, start_date= None, today= False):
        self.today = today and 1 or 0
        if address is not None:
            self.targ_address = address
        if start_date is not None:
            self.start_date = start_date
        self.data = {}
        self.load()
        if stats is not None:
            self.data['stats'] = stats
            self.data['names'] = list(stats.keys())
            self.data['names'].sort()
        else:
            if 'stats' not in self.data:
                self.data['stats'] = {}
            if 'names' not in self.data:
                self.data['names'] = []
        self.retrieve()
        return
    
    def __len__(self):
        return self.data and len(self.data) or 0
    
    def __contains__(self, key):
        if key in self.data:
            return True
        try:
            day, minute = self.split_date(key)
            if day in self.data and len(self.data[day]) > minute:
                return True
        except:
            pass
        return False
    
    def __getitem__(self, key):
        if key in self.data:
            return self.data[key]
        try:
            day, minute = self.split_date(key)
            if day in self.data and len(self.data[day]) > minute:
                return (self.data[day][minute], self.stat_data[day][minute])
        except Exception as err:
            raise IndexError(f'Unable to access [{key}] - {err}')
        return
    
    def split_date(self, isodate):
        daytime  = ddatetime.datetime.fromisoformat(isodate)
        day      = daytime.date().isoformat()
        daystart = datetime.datetime.fromisoformat(day)
        minute   = int((daytime - daystart) / datetime.timedelta(minutes=1))
        return (day, minute)
    
    def keys(self):
        if self.data is not None:
            return self.data.keys()
        return
    
    def items(self):
        if self.data is not None:
            return self.data.items()
        return
    
    def save(self):
        'Save OHLC data and stats to a JSON file.'
        if not self.data:
            return
        try:
            with open(self.storage_file, 'w') as outfile:
                json.dump(self.data, outfile)
                print(f'Saved {len(self.data) - 2} days of OHLC to storage file')
        except Exception as err:
            print(f'Unable to save storage file: {err}')
        return
    
    def load(self):
        'Load OHLC data and stats from a JSON file.'
        try:
            with open(self.storage_file, 'r') as infile:
                self.data = json.load(infile)
                print(f'Loaded {len(self.data) - 2} days of OHLC from storage file')
        except Exception as err:
            print(f'Unable to load storage file: {err}')
        #for key in self.data:
        #    for period in range(len(self.data[key])):
        #        if type(self.data[key][period]) == list:
        #            (oo,hh,ll,cc,now,tt) = self.data[key][period]
        #            self.data[key][period] = {'open': oo, 'high': hh, 'low': ll, 'close': cc, 'time': now, 'trades': tt}
        return

    def retrieve(self):
        'Retrieve any missing data, and calculate stats over all data.'
        
        # Make sure we start with a dict, at least
        if self.data is None:
            self.data = {}
            
        # Figure out what dates we will loop over
        date     = datetime.date.fromisoformat(self.start_date)
        day      = datetime.timedelta(days= 1)
        now      = datetime.datetime.now()
        today    = now.date()
        n_days   = self.today + int((today - date) / day)
        n_pulled = 0
        n_saved  = 0
        
        self.reset_stats()
        for ii in tqdm.tqdm(range(n_days)):
            # Pull each day worth of OHLC from the server
            isodate = date.isoformat()
            if isodate not in self.data or today == date:
                self.data[isodate] = pd.DataFrame(
                    bsc_analysis.get_ohlc(self.targ_address, isodate, 'minute', 1, 24*60, self.busd_address),
                    columns = ['open', 'high', 'low', 'close', 'trades', 'time']
                )
                n_pulled += 1
                if n_pulled > 20:
                    self.save()
                    n_saved += n_pulled
                    n_pulled = 0
            
            # Now calculate stats on the data
            #self.calc_stats(isodate)
            date += day
        
        # Save the result to a JSON file
        self.save()
        return
    
    def reset_stats(self):
        'Reset any internal state of the statistics.'
        
        for name in self.data['names']:
            self.data['stats'][name]['last value'] = 0
        return
    
    def clear_stat_data(self, isodate):
        'Clear out existing stats for a given date'
        
        keep_fields = ['open', 'high', 'low', 'close', 'trades', 'time']
        for period in range(len(self.data[isodate])):
            self.data[isodate][period] = {
                key: value
                for (key, value) in self.data[isodate][period].items()
                if key in keep_fields
            }
        return
    
    def calc_stats(self, isodate):
        'Calculate statistics for all periods on this date.'
        
        # First clear out any old stats data
        self.clear_stat_data(isodate)
        
        # Now calculate each statistic in turn over the full day
        for name in self.data['names']:
            stat = self.data['stats'][name]
            val  = stat['last value']
            
            for period in range(len(self.data[isodate])):
                if stat['type'] == 'ema':
                    update = self.data[isodate][period][stat['key']]
                    if update is not None:
                        val = stat['alpha'] * update + (1 - stat['alpha']) * val
                elif stat['type'] == 'crossover':
                    # Update the # periods since last +ve crossing
                    if self.data[isodate][period][stat['key a']] is None or self.data[isodate][period][stat['key b']] is None:
                        # If there is no new data, assume existing trends continue
                        if val < 0:
                            val -= 1
                        else:
                            val += 1
                    elif val < 0:
                        # If it is currently below and just went above, that's a change
                        if self.data[isodate][period][stat['key a']] > self.data[isodate][period][stat['key b']]:
                            val = 0
                        else:
                            # Increment abs(val) but keep sign the same
                            val -= 1
                    else:
                        # If it just went under, flip the sign and increment value
                        if self.data[isodate][period][stat['key a']] < self.data[isodate][period][stat['key b']]:
                            val = -abs(val)-1
                        else:
                            # Increment abs(val) but keep sign the same
                            val += 1
                # Store the current value in for this period
                self.data[isodate][period][name] = val
            # Store the value at the end of the period, ready for the next one
            stat['last value'] = val
        return


In [46]:
ema_stat_list = [
    ('high',  10), ('high',  20), ('high',  60), ('high', 120),
    ('low',   10), ('low',   20), ('low',   60), ('low',  120),
]

my_ema_stats = {
    f'ema-{key}-{n_periods:03d}': {
        'type' : 'ema',
        'alpha': 2. / (n_periods + 1),
        'key'  : key,
    } for (key, n_periods) in ema_stat_list
}

cross_stats_list = (
    [('high', name) for name in my_ema_stats if name.startswith('ema-high-')] +
    [(name, 'high') for name in my_ema_stats if name.startswith('ema-high-')] +
    [('low',  name) for name in my_ema_stats if name.startswith('ema-low-' )] +
    [(name, 'low' ) for name in my_ema_stats if name.startswith('ema-low-' )]
)

for name in my_ema_stats:
    for other in my_ema_stats:
        if name != other:
            cross_stats_list.append((name, other))

my_cross_stats = {
    f'xover[{key_a}][{key_b}]': {
        'type': 'crossover',
        'key a': key_a,
        'key b': key_b,
    } for (key_a, key_b) in cross_stats_list
}

my_stats = my_ema_stats.copy()
my_stats.update(my_cross_stats)

In [55]:
ohlc = OHLCData()
#stats=my_stats)

Unable to load storage file: [Errno 2] No such file or directory: 'bnb_busd_ohlc.json'


100%|██████████| 5/5 [00:19<00:00,  3.90s/it]

Unable to save storage file: Object of type DataFrame is not JSON serializable





In [42]:
print(ohlc['2021-12-10'][10])

{'open': '571.083523181853', 'high': 575.8047101891843, 'low': 571.0810666621276, 'close': '572.6236052793055', 'time': '2021-12-10 00:10:00', 'trades': 130, 'ema-high-010': 574.2566513450462, 'ema-high-020': 574.9213025233776, 'ema-high-060': 576.8074117591422, 'ema-high-120': 577.4596543436649, 'ema-low-010': 570.4707166597956, 'ema-low-020': 571.2260228971031, 'ema-low-060': 573.2164924524596, 'ema-low-120': 573.8886030258667}


## PancakeSwap Prediction Data

Implement a class that obtains data about past Prediction games on PancakeSwap, can save/load as JSON, and can update on demand.


In [4]:
class PredictionData(dict):
    storage_file      = 'pcs_prediction.json'
    pks_contract_addr = web3.Web3.toChecksumAddress('0x18b2a687610328590bc8f2e5fedde3b582a49cda')
    bsc_data_url      = 'https://bsc-dataseed.binance.org/'
    bsc_api_url       = 'https://api.bscscan.com/api'
    connection        = None
    pks_contract      = None
    data              = None
    n_rounds          = 0
    
    def __init__(self, filename= None):
        self.data = {}
        if filename is not None:
            self.storage_file = filename
        self.load()
        self.connect()
        self.load_contract()
        self.update()
        return
    
    def __len__(self):
        return self.n_rounds
    
    def __getitem__(self, key):
        if isinstance(key, numbers.Number):
            return self.data[str(key)]
        return self.data[key]
    
    def save(self):
        if not self.data:
            return
        try:
            with open(self.storage_file, 'w') as outfile:
                json.dump(self.data, outfile)
                print(f'Saved {len(self.data)} rounds to storage file')
        except Exception as err:
            print(f'Unable to save storage file: {err}')
        return
    
    def load(self):
        try:
            with open(self.storage_file, 'r') as infile:
                self.data = json.load(infile)
                print(f'Loaded {len(self.data)} rounds from storage file')
        except Exception as err:
            print(f'Unable to load storage file: {err}')
        return
    
    def keys(self):
        if self.data is not None:
            return self.data.keys()
        return
    
    def items(self):
        if self.data is not None:
            return self.data.items()
        return
    
    def connect(self):
        connection = web3.Web3(web3.Web3.HTTPProvider(self.bsc_data_url))
        if not connection.isConnected():
            self.connection = None
            return False
        self.connection = connection
        print('Connected')
        return True
    
    def load_contract(self):
        abi_url           = f'{self.bsc_api_url}?module=contract&action=getabi&address={self.pks_contract_addr}'
        rr                = requests.get(url = abi_url)
        self.abi          = json.loads(rr.json()['result'])
        self.pks_contract = self.connection.eth.contract(address= self.pks_contract_addr, abi= self.abi)
        self.func_inputs  = {func['name']: func[ 'inputs'] for func in self.abi if 'name' in func and  'inputs' in func}
        self.func_outputs = {func['name']: func['outputs'] for func in self.abi if 'name' in func and 'outputs' in func}
        print('PKS Prediction Contract Loaded')
        return
    
    def get_round(self, index):
        func = self.pks_contract.functions.rounds(index) 
        rlist = func.call()
        return {spec['name']: value for value, spec in zip(rlist, self.func_outputs['rounds'])}
    
    def update(self):
        if self.data is None:
            self.data = {}
        self.n_rounds = self.pks_contract.functions.currentEpoch().call()
        n_pulled = 0
        n_saved  = 0
        for ii in tqdm.tqdm(range(self.n_rounds)):
            iis = str(ii)
            if iis not in self.data:
                self.data[iis] = self.get_round(ii)
                n_pulled += 1
                if n_pulled > 5000:
                    self.save()
                    n_saved += n_pulled
                    n_pulled = 0
        self.save()
        return

## Plotting OHLC and EMAs

## Prediction Strategies

Implement various prediction strategies to be backtested against the data.

In [25]:
def strategy_biggest(round_info, ohlc, ohlc_stats):
    if round_info['up payout'] > round_info['down payout']:
        return 'up'
    elif round_info['up payout'] < round_info['down payout']:
        return 'down'
    return None

def strategy_smallest(round_info, ohlc, ohlc_stats):
    if round_info['up payout'] > round_info['down payout']:
        return 'down'
    elif round_info['up payout'] < round_info['down payout']:
        return 'up'
    return None

def strategy_bull(round_info, ohlc, ohlc_stats):
    return 'up'

def strategy_bear(round_info, ohlc, ohlc_stats):
    return 'down'

def strategy_ema_1(round_info, ohlc, ohlc_stats, verbose= False):
    start = datetime.datetime.fromtimestamp(round_info['startTimestamp'])
    lock  = datetime.datetime.fromtimestamp(round_info['lockTimestamp'])
    close = datetime.datetime.fromtimestamp(round_info['closeTimestamp'])
    # Pull EMA data at 1 minute prior to the lock time
    pull  = lock - datetime.timedelta(minutes= 1)
    
    if verbose:
        print(f'Round {round_info["epoch"]}: [{start}] - [{close}], locked at [{lock}], pull OHLC from [{pull}]')
    isopull = pull.isoformat()
    ohlc_slice = ohlc[isopull]
    if ohlc_slice is None or None in ohlc_slice:
        return None
        
    day, minute = ohlc.split_date(isopull)
    stat_slice = ohlc_stats[day][minute]
    oo, hh, ll, cc, mm, tt = ohlc_slice
    if verbose:
        print(f'OHLC: {oo} {hh} {ll} {cc} {tt}')
        pprint.pprint(stat_slice)
    highs = stat_slice['high']
    lows  = stat_slice['low' ]
    
    if hh > highs[0] > highs[1] > highs[2] and ll > lows[0] > lows[1] > lows[2] and round_info['up payout'] > min(2.5, round_info['down payout']):
        return 'up'
    if hh < highs[0] < highs[1] < highs[2] and ll < lows[0] < lows[1] < lows[2] and round_info['down payout'] > min(2.5, round_info['up payout']):
        return 'down'
    return None

basic_setup = {
    'starting bnb': 1,
    'betting bnb' : 0.001,
    'winnings tax': 0.03,
    'betting fee' : 0.0006,
}

bold_setup = {
    'starting bnb': 4,
    'betting bnb' : 0.001,
    'winnings tax': 0.03,
    'betting fee' : 0.0006,
}

free_setup = {
    'starting bnb': 1,
    'betting bnb' : 0.001,
    'winnings tax': 0.03,
    'betting fee' : 0,
}

basic_strategies = {
    'Always pick biggest payout' : strategy_biggest,
    'Always pick smallest payout': strategy_smallest,
    'Always pick up'             : strategy_bull,
    'Always pick down'           : strategy_bear,
    'Only pick obvious runs'     : strategy_ema_1,
}

In [23]:
def reference_strategy_generator(rate=0.7):
    def gen(round_info, ohlc, ohlc_stats):
        win  = (round_info['closePrice'] > round_info['lockPrice']) and 'up' or 'down'
        lose = (round_info['closePrice'] > round_info['lockPrice']) and 'down' or 'up'
        coin = random.random()
        if coin < rate:
            return win
        return lose
    return gen

ref_strategies = {f'{pp:2d}% success rate': reference_strategy_generator(pp / 100.) for pp in range(80,101)}

## Pulling it Together

Loads/update the OHLC data, calculate the statistics on the OHLCs, load/update the prediction data, and then backtest a list of prediction strategies, presenting the results in tabular form.

In [5]:
ohlc = OHLCData()
ohlc_stats = compute_all_stats(ohlc)
pred = PredictionData()

Loaded 203 days of OHLC from storage file


100%|██████████| 203/203 [00:02<00:00, 93.80it/s]


Saved 203 days of OHLC to storage file
Computed statistics for 203 days.
Loaded 44332 rounds from storage file
Connected
PKS Prediction Contract Loaded


100%|██████████| 44499/44499 [00:23<00:00, 1863.34it/s]  


Saved 44499 rounds to storage file


In [31]:

def evaluate_strategies(setup, strategies):
    for strategy in strategies:
        evaluator = strategies[strategy]
        n_played  = 0
        n_won     = 0
        n_lost    = 0
        n_zero    = 0
        n_skip    = 0
        n_iter    = 0
        balance   = setup['starting bnb']
        for rr in range(len(pred)):
            n_iter += 1
            if pred[rr]['totalAmount'] == 0 or pred[rr]['bullAmount'] == 0 or pred[rr]['bearAmount'] == 0:
                n_zero += 1
                continue

            # Calculate on win, payout on win, including our contrbution to the pool
            pred[rr]['up payout'  ] = (pred[rr]['totalAmount'] + setup['betting bnb']) / (pred[rr]['bullAmount'] + setup['betting bnb'])
            pred[rr]['down payout'] = (pred[rr]['totalAmount'] + setup['betting bnb']) / (pred[rr]['bearAmount'] + setup['betting bnb'])            
            bet = evaluator(pred[rr], ohlc, ohlc_stats)
            if bet:
                n_played   += 1
                balance    -= setup['betting bnb'] + setup['betting fee']
                went_up     = pred[rr]['closePrice'] > pred[rr]['lockPrice']
                if bet == 'up' and went_up:
                    n_won   += 1
                    balance += setup['betting bnb'] * pred[rr]['up payout'  ] * (1 - setup['winnings tax'])
                elif bet == 'down' and not went_up:
                    n_won   += 1
                    balance += setup['betting bnb'] * pred[rr]['down payout'] * (1 - setup['winnings tax'])
                else:
                    n_lost  += 1
                if balance < setup['betting bnb'] + setup['betting fee']:
                    break
            else:
                n_skip += 1
        pnl = 100 * (balance / setup['starting bnb'] - 1)
        print(f'{strategy:30}: played={n_played:5} won={n_won:6} lost={n_lost:6} skip={n_skip:6} balance={balance:9.5f} pnl%={pnl:9.5f} {pnl < -99. and "BANKRUPT" or pnl < 0 and "LOSS" or "WIN"}')


In [32]:
print('FREE SETUP, BASIC STRATEGIES')
evaluate_strategies(free_setup, basic_strategies)
print('-'*80)
print('BASIC SETUP, BASIC STRATEGIES')
evaluate_strategies(basic_setup, basic_strategies)
print('-'*80)
print('BOLD SETUP, BASIC STRATEGIES')
evaluate_strategies(bold_setup, basic_strategies)
print('-'*80)
print('FREE SETUP, REFERENCE STRATEGIES')
evaluate_strategies(free_setup, ref_strategies)
print('-'*80)
print('BASIC SETUP, REFERENCE STRATEGIES')
evaluate_strategies(basic_setup, ref_strategies)
print('-'*80)
print('BOLD SETUP, REFERENCE STRATEGIES')
evaluate_strategies(bold_setup, ref_strategies)

FREE SETUP, BASIC STRATEGIES
Always pick biggest payout    : played=44413 won= 20206 lost= 24207 skip=     0 balance=  2.64087 pnl%=164.08685 WIN
Always pick smallest payout   : played=13301 won=  7121 lost=  6180 skip=     0 balance=  0.00093 pnl%=-99.90667 BANKRUPT
Always pick up                : played=37667 won= 18940 lost= 18727 skip=     0 balance=  0.00004 pnl%=-99.99555 BANKRUPT
Always pick down              : played=44413 won= 22052 lost= 22361 skip=     0 balance=  0.77084 pnl%=-22.91619 LOSS
Only pick obvious runs        : played=10194 won=  4679 lost=  5515 skip= 34219 balance=  1.51285 pnl%= 51.28482 WIN
--------------------------------------------------------------------------------
BASIC SETUP, BASIC STRATEGIES
Always pick biggest payout    : played= 2069 won=   974 lost=  1095 skip=     0 balance=  0.00083 pnl%=-99.91704 BANKRUPT
Always pick smallest payout   : played= 1393 won=   733 lost=   660 skip=     0 balance=  0.00060 pnl%=-99.94022 BANKRUPT
Always pick up      

In [8]:

round = pred[1]
print(round)
start = datetime.datetime.fromtimestamp(round['startTimestamp'])
lock  = datetime.datetime.fromtimestamp(round['lockTimestamp'])
close = datetime.datetime.fromtimestamp(round['closeTimestamp'])
# Pull EMA data at 1 minute prior to the lock time
pull  = lock - datetime.timedelta(minutes= 1)
print(f'Round 1: [{start}] - [{close}], locked at [{lock}], pull OHLC from [{pull}]')
isopull = pull.isoformat()
day, minute = ohlc.split_date(isopull)
ohlc_slice = ohlc[day][minute]
stat_slice = ohlc_stats[day][minute]
oo, hh, ll, cc, mm, tt = ohlc_slice
print(f'OHLC: {oo} {hh} {ll} {cc} {tt}')
import pprint
pprint.pprint(stat_slice)


{'epoch': 1, 'startTimestamp': 1629927751, 'lockTimestamp': 1629928051, 'closeTimestamp': 1629928363, 'lockPrice': 50081000000, 'closePrice': 50027000000, 'lockOracleId': 18446744073709807007, 'closeOracleId': 18446744073709807020, 'totalAmount': 0, 'bullAmount': 0, 'bearAmount': 0, 'rewardBaseCalAmount': 0, 'rewardAmount': 0, 'oracleCalled': True}
Round 1: [2021-08-25 21:42:31] - [2021-08-25 21:52:43], locked at [2021-08-25 21:47:31], pull OHLC from [2021-08-25 21:46:31]
OHLC: 501.8173858558518 501.82442900432113 499.05808257684504 501.8155318030885 59
{'close': [500.74134508024906, 500.3082482267044, 499.8756749198357],
 'high': [501.75846844981214, 501.46631052025447, 501.23460866931515],
 'low': [499.00896667345665, 498.6668572503427, 498.3120671347388],
 'open': [501.2327696982138, 500.6387814583877, 500.0094165110097],
 'trades': [64.5117185430545, 67.13551413549771, 70.86439568583069]}


Conclusion: We need over 80% success rate to make anything at PCS prediction. I'm nowhere near that level, and if I were, there are easier ways to make money.

## Auto Trading

Okay, that kind of prediction is a bust, for now. Back to some more realistic goals:

* First, revisit the OHLC to incorporate the stats into the same object
* Next, a function that takes an input slice of OHLC+stats, and a list of conditions, and tells you whether the conditions have been met.
* Then a limit-trading bot that stores a list of conditional transactions and monitors those conditions. When the specified conditions are met, it executes the transaction, making up to a certain number of tries but continuing to monitor those conditions and aborting if they go out of range. It should pay extra for the gas (always, or if the transaction is above a certain threshold?). It should keep a list of completed transactions, too, and offer some interface to allow the list to change.
* When I'm done with that bot, make a function that consults an ML model to decide whether or not to trade, and an evaluation function that backtests a given model over the last several months. Build such a model and train it.
