In [1]:
import sys
import os

sys.path.append(os.getcwd().split(sep="notebooks")[0]) # Trocar o noteboooks antes de passar pro .py
#import modules
from modules.db_data import db_feeder as db

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Document, Float
from elasticsearch_dsl.connections import connections

from datetime import datetime
import time

from binance.client import Client

In [2]:
def create_marketdata(host,symbol,interval,start_str,end_str=None):
    """ Creates an Elasticsearch index for a certain (symbol) Binance asset
    See dateparse docs for valid start and end string formats http://dateparser.readthedocs.io/en/latest/
    If using offset strings for dates add "UTC" to date string e.g. "now UTC", "11 hours ago UTC"
    :param host: Elasticsearch host server e.g localhost:9200
    :type host: str
    :param symbol: Name of symbol pair e.g BNBBTC
    :type symbol: str
    :param interval: Biannce Kline interval
    :type interval: str
    :param start_str: Start date string in UTC format
    :type start_str: str
    :param end_str: optional - end date string in UTC format
    :type end_str: str
    :return: list of OHLCV values
    """
    client = Client("","")
    hist = client.get_historical_klines(symbol,interval,start_str,end_str)
    del hist[-1]
    
    #es = Elasticsearch(hosts=[host])
    connections.create_connection(hosts=[host])
    
    class Candle(Document):
        Open = Float()
        High = Float()
        Low = Float()
        Close = Float()
        Volume = Float()
        Close_time = Float()
        Quote_asset_vol = Float()
        N_of_trades = Float()
        Taker_buy_base_asset_vol = Float()
        Taker_buy_quote_asset_vol = Float()
    
        class Index:
            name =  'marketdata' + '-' + symbol.lower() + '-' + interval + '-' 'binance'
    
        def save(self, **kwargs):
            return super(Candle, self).save(**kwargs)
        
    
    Candle().init()
        

    for i in range(0,len(hist)): #Indexing 'hist'
        candle = Candle(meta={'id': hist[i][0]})
        candle.Open = float(hist[i][1])
        candle.High = float(hist[i][2])
        candle.Low = float(hist[i][3])
        candle.Close = float(hist[i][4])
        candle.Volume = float(hist[i][5])
        candle.Close_time = hist[i][6]
        candle.Quote_asset_vol = float(hist[i][7])
        candle.N_of_trades = float(hist[i][8])
        candle.Taker_buy_base_asset_vol = float(hist[i][9])
        candle.Taker_buy_quote_asset_vol = float(hist[i][10])
    
        candle.save()
    
    print('Success! \nIndex created!')

In [3]:
def update_marketdata(host,indexName):
    """ Updates marketdata of binance assets stored in an elasticsearch Index.
    :param host: Elasticsearch host server e.g localhost:9200
    :type host: str
    :param indexName: Elasticsearch Index e.g marketdata-btcusdt-1m-binance
    :type host: str
    :return: list of OHLCV values
    """
    
    symbol = indexName.split(sep='-')[1].upper()
    interval = indexName.split(sep='-')[2]
    es = Elasticsearch(hosts=[host])
    connections.create_connection(hosts=[host])
    

    lastCandleStored = es.search(index=indexName, size = 1, sort = "_id:desc")
    close_time = int(lastCandleStored['hits']['hits'][0]['_source']['Close_time'])
    hist_new = db.get_historical_klines_from_ts(symbol,interval,close_time)
    del hist_new[-1] # drop unfinished candle. A if may be needed here.
    if len(hist_new):
        
        class Candle(Document):
            Open = Float()
            High = Float()
            Low = Float()
            Close = Float()
            Volume = Float()
            Close_time = Float()
            Quote_asset_vol = Float()
            N_of_trades = Float()
            Taker_buy_base_asset_vol = Float()
            Taker_buy_quote_asset_vol = Float()
    
            class Index:
                name =  'marketdata' + '-' + symbol.lower() + '-' + interval + '-' 'binance'
    
            def save(self, **kwargs):
                return super(Candle, self).save(**kwargs)                  
            
        for i in range(0,len(hist_new)): #Indexing 'hist_new'
            candle = Candle(meta={'id': hist_new[i][0]})
            candle.Open = float(hist_new[i][1])
            candle.High = float(hist_new[i][2])
            candle.Low = float(hist_new[i][3])
            candle.Close = float(hist_new[i][4])
            candle.Volume = float(hist_new[i][5])
            candle.Close_time = hist_new[i][6]
            candle.Quote_asset_vol = float(hist_new[i][7])
            candle.N_of_trades = float(hist_new[i][8])
            candle.Taker_buy_base_asset_vol = float(hist_new[i][9])
            candle.Taker_buy_quote_asset_vol = float(hist_new[i][10])
    
            candle.save()

In [37]:
def live_update_marketdata(host,indexName):
    
    symbol = indexName.split(sep='-')[1].upper()
    interval = indexName.split(sep='-')[2]
    intervalInSeconds = db.interval_to_milliseconds(interval)/1000
    es = Elasticsearch(hosts=[host])
    
    for i in range(500):
        #print('i:',i)
        update_marketdata(host,indexName)
        #print('Banco de dados atualizados.')
        if i==0:
            time.sleep(4) # is there any need for this at all?
        else:
            time.sleep(1) # is there any need for this at all?
        lastCandle = es.search(index=indexName, size = 1, sort = "_id:desc")
        lastCandle_Open_inSeconds = float(lastCandle['hits']['hits'][0]['_id'])/1000
        #print('OpenTime do Ultimo Candle no Bd: ',lastCandle_Open_inSeconds)
        wait = lastCandle_Open_inSeconds + intervalInSeconds*2 - time.time() 
        #print('Espera: ',wait)
        time.sleep(wait) 
        while True:
            openedCandle_Open = float(db.get_historical_klines(symbol,interval,'2 minutes ago UTC')[-1][0]) #Mod 
            if (lastCandle_Open_inSeconds + 120)*1000 == openedCandle_Open: #mod
                break
            else:
                time.sleep(1)
            
        
    

In [38]:
#create_index('localhost:9200','BTCUSDT','1m','1 months ago UTC')
#update_marketdata('localhost:9200','marketdata-btcusdt-1m-binance')
live_update_marketdata('localhost:9200','marketdata-btcusdt-1m-binance')

ConnectTimeout: HTTPSConnectionPool(host='api.binance.com', port=443): Max retries exceeded with url: /api/v1/ping (Caused by ConnectTimeoutError(<urllib3.connection.VerifiedHTTPSConnection object at 0x7f25a79bf780>, 'Connection to api.binance.com timed out. (connect timeout=10)'))

In [30]:
es = Elasticsearch(hosts=['localhost:9200'])
#res=es.search(index='marketdata-btcusdt-1m-binance', size = 1, sort = "_id:desc")

In [6]:
es.indices.get_alias("*")
#es.indices.delete(index='marketdata-btcusdt-1m-binance', ignore=[400, 404])

{'marketdata-btcusdt-1m-binance': {'aliases': {}}}

In [28]:
print(datetime.utcfromtimestamp(float(res['hits']['hits'][0]['_id'])/1000))

2019-02-27 22:42:00


In [13]:
client = Client("","")

In [29]:
fechado = float(db.get_historical_klines('BTCUSDT','1m','15 minutes ago UTC')[-2][0])
print('Fechado:',fechado/1000)
print('Agora:',time.time())
while True:
    open_ultimo = float(db.get_historical_klines('BTCUSDT','1m','2 minutes ago UTC')[-1][0])
    if  open_ultimo == (fechado + 120000):
        print('Novo Candle!!!')
        print('Abertura:',open_ultimo/1000)
        print('Agora:',time.time())
        break
    else:
        time.sleep(1)

Fechado: 1551833040.0
Agora: 1551833108.6248732
Novo Candle!!!
Abertura: 1551833160.0
Agora: 1551833162.5833292


In [21]:
time.time()

1551831074.5813134