In [7]:
import sys, os, struct
import csv
import inspect
import math
import time
import calendar 
from datetime import datetime

testdir = os.getcwd() + '/' + os.path.abspath('')
if testdir != "":
    testdir = testdir + '/'
sys.path.append(testdir + "..")
sys.path.append(os.path.dirname(os.path.abspath('')))

import dxapi
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://timebase:8011'

try:
    # Create timebase connection
    db = dxapi.TickDb.createFromUrl(timebase)

    # Open in write mode
    db.open(False)
    
    # QQL create stream DDL
    barsQQL = """
CREATE DURABLE STREAM "OKEXPerpHist" 'OKEXPerpHist' (
    CLASS "deltix.timebase.api.messages.MarketMessage" 'Market Message' (
        STATIC "originalTimestamp" TIMESTAMP = NULL,
        STATIC "currencyCode" 'Currency Code' INTEGER = 999,
        STATIC "sequenceNumber" '' INTEGER = NULL,
        STATIC "sourceId" '' VARCHAR = NULL
    ) NOT INSTANTIABLE;
    CLASS "deltix.timebase.api.messages.BarMessage" 'Bar Message' UNDER "deltix.timebase.api.messages.MarketMessage" (
        "exchangeId" 'Exchange Code' VARCHAR,
        "close" 'Close' FLOAT DECIMAL64,
        "open" 'Open' FLOAT DECIMAL64 RELATIVE TO "close",
        "high" 'High' FLOAT DECIMAL64 RELATIVE TO "close",
        "low" 'Low' FLOAT DECIMAL64 RELATIVE TO "close",
        "volume" 'Volume' FLOAT DECIMAL64
    );
)
OPTIONS (FIXEDTYPE; PERIODICITY = '1I'; HIGHAVAILABILITY = TRUE)
COMMENT 'OKEXPerpHist'
    """
    
    barsQQL_decimal = """
CREATE DURABLE STREAM "OKEXPerpHist" (
    CLASS "deltix.timebase.api.messages.MarketMessage" 'Market Message' (
        STATIC "originalTimestamp" TIMESTAMP = NULL,
        "currencyCode" 'Currency Code' INTEGER SIGNED (16),
        STATIC "sequenceNumber" '' INTEGER = NULL,
        STATIC "sourceId" '' VARCHAR MULTILINE = NULL
    );
    CLASS "deltix.timebase.api.messages.BarMessage" 'Bar Message' UNDER "deltix.timebase.api.messages.MarketMessage" (
        STATIC "exchangeId" 'Exchange Code' VARCHAR MULTILINE = NULL,
        "close" 'Close' FLOAT DECIMAL,
        "open" 'Open' FLOAT DECIMAL,
        "high" 'High' FLOAT DECIMAL,
        "low" 'Low' FLOAT DECIMAL,
        "volume" 'Volume' FLOAT DECIMAL
    );
)
OPTIONS (POLYMORPHIC; PERIODICITY = '1I'; HIGHAVAILABILITY = FALSE)
COMMENT 'OKEXPerpHist'
"""
    # execute QQL and check result
    cursor = db.executeQuery(barsQQL_decimal)
    try:
        if (cursor.next()):
            message = cursor.getMessage()
            print('Query result: ' + message.messageText)
    finally:
        if (cursor != None):
            cursor.close()
    
finally:  # database connection should be closed anyway
    if (db.isOpen()):
        db.close()
    print("Connection " + timebase + " closed.")

Query result: Stream created
Connection dxtick://timebase:8011 closed.


In [5]:
import sys, os, struct
import csv
import inspect
import math
import time
import calendar
from datetime import datetime

testdir = os.getcwd() + '/' + os.path.abspath('')
if testdir != "":
    testdir = testdir + '/'
sys.path.append(testdir + "..")
sys.path.append(os.path.dirname(os.path.abspath('')))

import dxapi

# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://timebase:8011'

try:
    # Create timebase connection
    db = dxapi.TickDb.createFromUrl(timebase)
    
    # Open in read-write mode
    db.open(False)
    
    print('Connected to ' + timebase)

    # Define name of the stream    
    streamKey = 'OKEX Historical'
    
    # Get stream from the timebase
    stream = db.getStream(streamKey)
    
    # Create a Message Loader for the selected stream and provide loading options
    loader = stream.createLoader(dxapi.LoadingOptions())
    
    # Create Bar message
    barMessage = dxapi.InstrumentMessage()
    
    # Define message type name according to the Timebase schema type name
    # For the polymorphic streams, each message should have defined typeName to distinct messages on Timebase Server level.
    barMessage.typeName = 'deltix.timebase.api.messages.BarMessage'
    
    print('Start loading to ' + streamKey)   
    
    # get current time in UTC
    now = datetime.utcnow() - datetime(1970, 1, 1)
    
    # Define message timestamp as Epoch time in nanoseconds 
    ns = now.total_seconds() * 1e9 + now.microseconds * 1000;
    
    barMessage.instrumentType = 'EQUITY'
    
    barMessage.symbol = 'AAPL'   
    barMessage.timestamp = ns;    
    barMessage.exchangeId = 'NYSE'
    barMessage.open = 263.75
    barMessage.high = 278.09
    barMessage.low = 263.00
    barMessage.close = 266.17
    barMessage.volume = 71_690_000
    
    # Send message
    loader.send(barMessage)     
    
    barMessage.symbol = 'GOOG'   
    barMessage.timestamp = ns;    
    barMessage.exchangeId = 'NYSE'
    barMessage.open = 1205.3
    barMessage.high = 1254.76
    barMessage.low = 1200.00
    barMessage.close = 1215.56
    barMessage.volume = 33_700_000
        
    # Send message
    loader.send(barMessage)
    
    # close Message Loader
    loader.close()
    loader = None
    
finally:
    # database connection should be closed anyway
    if db.isOpen():
        db.close()
        print("Connection " + timebase + " closed.")

Connected to dxtick://timebase:8011
Connection dxtick://timebase:8011 closed.


AttributeError: 'NoneType' object has no attribute 'createLoader'

In [None]:
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'

try:
    # Create timebase connection
    db = dxapi.TickDb.createFromUrl(timebase)
    
    # Open in read-only mode
    db.open(True)
    
    print('Connected to ' + timebase)

    # Define name of the stream
    # streamKey = 'daily.stream'
    streamKey = 'bars1min'

    # Get stream from the timebase
    stream = db.getStream(streamKey)

    # List of message types to subscribe (if None, all stream types will be used)
    types = ['deltix.timebase.api.messages.BarMessage']

    # List of entities to subscribe (if None, all stream entities will be used)
    entities = [ 'GOOG', 'AAPL' ]

    # Define subscription start time
    time = datetime(2010, 1, 1, 0, 0)
    
    # Start time is Epoch time in milliseconds
    startTime = calendar.timegm(time.timetuple()) * 1000
    
    # Create cursor using defined message types and entities
    cursor = stream.select(startTime, dxapi.SelectionOptions(), types, entities)
    try:
        while cursor.next():
            message = cursor.getMessage()

            # Message time is Epoch time in nanoseconds
            time = message.timestamp/1e9
            messageTime = datetime.utcfromtimestamp(time)
            
            if message.typeName == 'deltix.timebase.api.messages.BarMessage':
                print("Bar (" + str(messageTime) + ") close price: " + str(message.close))
    finally:
        # cursor should be closed anyway
        cursor.close()
        cursor = None
        
finally:
    # database connection should be closed anyway
    if (db.isOpen()):
        db.close()
        print("Connection " + timebase + " closed.")