In [None]:
import time

import boto3

write_client = boto3.client("timestream-write", region_name="us-east-1")
db_arn = write_client.create_database(DatabaseName="securities_master")
time.sleep(10)

tbl_arn = write_client.create_table(
    DatabaseName="securities_master",
    TableName="crypto",
    RetentionProperties={
        "MemoryStoreRetentionPeriodInHours": 1,
        "MagneticStoreRetentionPeriodInDays": 10,
    },
)

In [None]:
from botocore.config import Config
import logging
import requests
import time
from typing import Any, Dict, List

def send_records(write_cliient, logger, records, db_name, tbl_name):
    logger.info("sending %d records to TimeStream...", len(records))
    result = write_client .write_records(
        DatabaseName=db_name, TableName=tbl_name, Records=records, CommonAttributes={},
    )
    status = result['ResponseMetadata']['HTTPStatusCode']
    if status == 200:
        logger.info("Successfully ingested %d records.", len(records))
    else:
        logger.error(
            "Tried to ingest %d records.  Something went wrong.  Status %s",
            len(records),
            status,
        )




In [None]:
def ingest_records_into_timestream(records: List[Dict[str, Any]], db_name: str, tbl_name: str):
    logger = logging.getLogger(__name__)
    session = boto3.Session(region_name="us-east-1")
    write_client = session.client(
        "timestream-write", 
        config=Config(
            read_timeout=20, max_pool_connections=5000, retries={"max_attempts": 10}
        )
    )
    send_records(write_client, logger, records, db_name, tbl_name)

In [None]:
symbol = 'BTC/USD'
stream_name = 'kraken-blotter'
from exchange_feeds.kraken import KrakenOrderBook
count = 0
async def stream():
    count = 0
    records = []
    async with KrakenOrderBook(symbol, stream_name) as book:
        await book.send()
        while count <= 20:
            record = await book.receive()
            if not record:
                continue
            records.append(record)
            count += 1
    return records



In [None]:
records = await stream()

In [None]:
records = [x for x in records if x is not None]

In [None]:
def get_data_from_api(
    api_url: str = "https://min-api.cryptocompare.com/data/pricemulti?fsyms=BTC,ETH,REP,DASH&tsyms=USD",
) -> List[Dict[str, Any]]:
    """
    Get data from API and transform it to a Timestream-friendly format
    :param api_url: Cryptocompare API URL
    :return: records to be ingested
    """
    logger = logging.getLogger(__name__)
    r = requests.get(api_url)
    data = r.json()
    
    btc = data["BTC"]["USD"]
    return btc
    eth = data["ETH"]["USD"]
    rep = data["REP"]["USD"]
    dash = data["DASH"]["USD"]
    now = str(int(round(time.time() * 1000)))
    logger.info("Current timestamp for Timestream: %s. Current prices: %s", now, data)
    current_prices = [
        {
            "Time": now,
            "TimeUnit": "MILLISECONDS",
            "Dimensions": [{"Name": "crypto", "Value": "BTC"}],
            "MeasureName": "Price",
            "MeasureValue": str(btc),
            "MeasureValueType": "DOUBLE",
        },
        {
            "Time": now,
            "TimeUnit": "MILLISECONDS",
            "Dimensions": [{"Name": "crypto", "Value": "ETH"}],
            "MeasureName": "Price",
            "MeasureValue": str(eth),
            "MeasureValueType": "DOUBLE",
        },
        {
            "Time": now,
            "TimeUnit": "MILLISECONDS",
            "Dimensions": [{"Name": "crypto", "Value": "DASH"}],
            "MeasureName": "Price",
            "MeasureValue": str(dash),
            "MeasureValueType": "DOUBLE",
        },
        {
            "Time": now,
            "TimeUnit": "MILLISECONDS",
            "Dimensions": [{"Name": "crypto", "Value": "REP"}],
            "MeasureName": "Price",
            "MeasureValue": str(rep),
            "MeasureValueType": "DOUBLE",
        },
    ]
    return current_prices

In [None]:
record = {
        "Time": str(int(round(time.time() * 1000))),
        "TimeUnit": "MILLISECONDS",
        "Dimensions": [{"Name": "symbol", "Value": "BTC"}, {"Name": 'bid', "Value": str(['33', '44'])}],
        "MeasureName": "prices and quantities",
        "MeasureValues": 'price',
        "MeasureValueType": "VARCHAR",
    }

In [None]:
ingest_records_into_timestream(record, 'securities_master', 'crypto')

In [None]:
compress = str(records[0]['asks'])

In [None]:
compress

In [None]:
import awswrangler as wr
import pandas as pd
from datetime import datetime

wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1);

In [None]:
import pandas as pd
from datetime import datetime
df = pd.DataFrame(
    {
        "time": [datetime.now()],
        "symbol": ["btc"],
        "bids": str(records[0]['bids']),
        "asks": str(records[0]['asks']),
        "measure": "prices and quantities"
    }
)


rejected_records = wr.timestream.write(
    df=df,
    database="securities_master",
    table="sampleTable",
    time_col="time",
    dimensions_cols=["symbol", "bids", "asks"],
    measure_col="measure"
)

print(f"Number of rejected records: {len(rejected_records)}")

In [None]:
records

In [None]:
cache = []
def build_records_from_stream(record: List[List[Dict[str, Any]]]):
        data = {"time": datetime.now(),
        "symbol": record[0]['symbol'], 
        "bids": str(record[0]['bids']),
        "asks": str(record[0]['asks']),
        "measure": "prices and quantities"
        }
        cache.append(data)
        



In [None]:
for record in records:
    build_records_from_stream(record)


    

In [None]:
df = pd.DataFrame(cache)

In [None]:
df = pd.DataFrame(
    {
        "time": [datetime.now()],
        "symbol": ["btc"],
        "bids": str(records[0]['bids']),
        "asks": str(records[0]['asks']),
        "measure": "prices and quantities"
    }
)

In [None]:
rejected_records = wr.timestream.write(
    df=df,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    dimensions_cols=["symbol", "bids", "asks"],
    measure_col="measure"
)

print(f"Number of rejected records: {len(rejected_records)}")

In [None]:
async def fun():
    return {'test': 2}

In [None]:
from timestreamtoolz import handle_record, CacheHandler

In [None]:
from exchange_feeds.kraken import KrakenOrderBook

cache = CacheHandler()

async def fun():
    async with KrakenOrderBook('BTC/USD', 'kraken-orderbook') as book:
        count = 0
        await book.send()
        while True:
            records = await book.receive()
            if not records:
                continue
            return records
            


In [None]:
records = await fun()

In [None]:
records

In [None]:
async def fun2(records):
    for record in records:
        await handle_record(record=record, stream_name='kraken-orderbook', cache=cache, handle_lob=True, save=True, symbol='BTCUSD')
    

In [None]:
await fun2(records)

In [None]:
datetime.datetime.now()

In [None]:
import pandas as pd
df = pd.DataFrame(cache.values())

In [None]:
from timestreamtoolz import _push_records_to_timestream

_push_records_to_timestream(cache)

In [None]:
df['time'][0]

In [None]:
import datetime
rec = datetime.datetime.fromtimestamp(1649959027.682306)


In [None]:
rec

In [None]:
count = 0
for record in records:
    count += 1
    print(record)
    if count == 1:
        break

In [None]:
book.update_bids(record)
book.update_asks(record)

In [None]:
from exchange_feeds.constants import L2_ENABLED_STREAMS

In [None]:
new_record = book.to_frame() if 'kraken-orderbook' in L2_ENABLED_STREAMS else {}

In [None]:
bid = book.best_bid()
ask = book.best_ask()

In [None]:
new_record["symbol"] = 'BTCUSD'
new_record["best_bid"] = bid.price
new_record["bid_size"] = bid.size
new_record["best_ask"] = ask.price
new_record["ask_size"] = ask.size
new_record["trxn_time"] = new_record.get(
    "trxn_time", max(bid.offset, ask.offset)
)

if 'kraken-orderbook' in L2_ENABLED_STREAMS:
    new_record["best_bid_timestamp"] = bid.offset
    new_record["best_ask_timestamp"] = ask.offset

In [None]:
L2_ENABLED_STREAMS

In [None]:
c = CacheHandler()
c.add(new_record, True)

In [None]:
new_record

In [1]:
from redistoolz import read_feed_from_redis_once, connect_to_redis

feed = await read_feed_from_redis_once('kraken-orderbook')

connected to Redis!
reading from kraken-orderbook started


In [2]:
import aioredis
r = await aioredis.from_url("redis://localhost", decode_responses=True)

In [None]:
check = await r.xrange('kraken-orderbook')

In [None]:
r = await connect_to_redis("writeonly")

In [None]:
r.xdel

In [None]:
import boto3

boto3.Session(profile_name='default')

In [None]:
import aioredis

redis = await aioredis.from_url("redis://localhost")


In [None]:
await redis.exists("test")

In [None]:
async with redis.pipeline() as Pipe:
    await Pipe.xadd("test", {'hey': 3})
    await Pipe.execute()

In [None]:
r = await aioredis.from_url("redis://localhost", port=6379)

In [None]:
await r.xlen('kraken-orderbook')

In [None]:
res = await r.xread({'kraken-orderbook': b"$"}, block=0)

In [11]:
from redistoolz import connect_to_redis, RawFeed
r = await connect_to_redis()
result = await r.xrange('kraken-orderbook')

connected to Redis!


In [None]:
count = 0
check = []
while count < 3
    res = await r.xread({'kraken-orderbook': b'$'}, block=0)
    check.append(res)
raw = RawFeed('kraken-orderbook', records=check)

In [None]:
result[1]

In [6]:
res = await r.xread({'kraken-orderbook': b'$'}, block=0, count=10)

In [9]:
res[0][1][0]

('1650299085489-0',
 {'asks': '[[1650299075.931969, 39148.6, 9.86364601], [1650299075.931969, 39148.7, 1.02323], [1650299075.931969, 39155.9, 0.57993846], [1650299075.931969, 39156.0, 0.57552583], [1650299075.931969, 39160.0, 0.60727851], [1650299075.931969, 39164.0, 0.59584468], [1650299075.931969, 39167.9, 0.47851047], [1650299075.931969, 39171.6, 0.30648388], [1650299075.931969, 39172.7, 0.12763991], [1650299075.931969, 39175.9, 0.00023876]]',
  'bids': '[[1650299075.931969, 39148.5, 5.31635584], [1650299075.931969, 39148.4, 0.10354211], [1650299075.931969, 39147.7, 0.19603266], [1650299075.931969, 39147.6, 3.83084875], [1650299075.931969, 39147.5, 0.05534614], [1650299075.931969, 39147.4, 0.20826965], [1650299075.931969, 39147.3, 5.10365374], [1650299075.931969, 39147.1, 0.11992914], [1650299075.931969, 39146.7, 7.01886828], [1650299075.931969, 39146.6, 0.10034254]]',
  'trxn_time': '1650299075.931969',
  'symbol': 'BTCUSD',
  'best_bid': '39146.6',
  'bid_size': '0.10034254',
  'b

In [12]:
raw = RawFeed('kraken-orderbook', records=res[0][1])

In [None]:
list(raw.get_ids())

In [None]:
from redistoolz import push_feed_to_postgres

await push_feed_to_postgres(raw)

In [None]:
import asyncpg

In [14]:
from phobosdb import DBReader

async with DBReader() as db:
   records =  await db.async_fetch("select * from kraken_orderbook limit 10;")

Connected to Postgres


In [15]:
async with DBReader() as db:
    table_name = raw.stream_name.replace("-", "_")
    column_names = raw.column_names
    await db.async_push(raw.records, table_name, column_names)

Connected to Postgres
Connected to Postgres
error:  syntax error at or near "%"


In [31]:
query = f"""INSERT INTO {table_name} ({column_names}) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""

In [32]:
query

"INSERT INTO kraken_orderbook (['asks', 'bids', 'trxn_time', 'symbol', 'best_bid', 'bid_size', 'best_ask', 'ask_size', 'best_bid_timestamp', 'best_ask_timestamp']) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"

In [33]:
db = DBReader()
conn = await db.async_connect()

Connected to Postgres


In [43]:
await conn.copy_records_to_table('test_phobos', records=raw.records, columns=column_names, )

UndefinedTableError: relation "test_phobos" does not exist

In [42]:
raw.records[0]

('1650299085489-0',
 {'asks': '[[1650299075.931969, 39148.6, 9.86364601], [1650299075.931969, 39148.7, 1.02323], [1650299075.931969, 39155.9, 0.57993846], [1650299075.931969, 39156.0, 0.57552583], [1650299075.931969, 39160.0, 0.60727851], [1650299075.931969, 39164.0, 0.59584468], [1650299075.931969, 39167.9, 0.47851047], [1650299075.931969, 39171.6, 0.30648388], [1650299075.931969, 39172.7, 0.12763991], [1650299075.931969, 39175.9, 0.00023876]]',
  'bids': '[[1650299075.931969, 39148.5, 5.31635584], [1650299075.931969, 39148.4, 0.10354211], [1650299075.931969, 39147.7, 0.19603266], [1650299075.931969, 39147.6, 3.83084875], [1650299075.931969, 39147.5, 0.05534614], [1650299075.931969, 39147.4, 0.20826965], [1650299075.931969, 39147.3, 5.10365374], [1650299075.931969, 39147.1, 0.11992914], [1650299075.931969, 39146.7, 7.01886828], [1650299075.931969, 39146.6, 0.10034254]]',
  'trxn_time': '1650299075.931969',
  'symbol': 'BTCUSD',
  'best_bid': '39146.6',
  'bid_size': '0.10034254',
  'b