In [18]:
import os, websocket, json, psycopg2
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String, Float
from pathlib import Path
from pathlib import Path
from sqlalchemy.sql import text
from sqlalchemy.ext.declarative import declarative_base

from dotenv import load_dotenv

In [3]:
URL = os.getenv("POSTGRES_URL")
USER = os.getenv("POSTGRES_USER")
PWD = os.getenv("POSTGRES_PWD")
DB = os.getenv("POSTGRES_DB")
table_init = Path("schema.sql")

In [15]:
Base = declarative_base()
eng = create_engine(f"postgres://{USER}:{PWD}@{URL}/{DB}")

In [16]:
with eng.connect() as con:
    schema_sql = text(table_init.read_text())
    print(schema_sql)
    con.execute(schema_sql)

DROP TABLE IF EXISTS transactions;

CREATE TABLE transactions(
	id SERIAL NOT NULL PRIMARY KEY,
	type TEXT NOT NULL, 
	sequence INT NOT NULL, 
	product_id TEXT NOT NULL, 
	price TEXT NOT NULL, 
	open_24h TEXT NOT NULL, 
	volume_24h TEXT NOT NULL, 
	low_24h TEXT NOT NULL, 
	high_24h TEXT NOT NULL, 
	volume_30d TEXT NOT NULL, 
	best_bid TEXT NOT NULL, 
	best_ask TEXT NOT NULL, 
	side TEXT NOT NULL, 
	time TEXT, 
	trade_id INT NOT NULL, 
	last_size TEXT NOT NULL
);


In [19]:
class Transaction(Base):
    __tablename__='transactions'

    id = Column(Integer, primary_key=True)
    type = Column(String)
    sequence = Column(Integer)
    product_id = Column(String)
    price = Column(String)
    open_24h = Column(String)
    volume_24h = Column(String)
    low_24h = Column(String)
    high_24h = Column(String)
    volume_30d = Column(String)
    best_bid = Column(String)
    best_ask = Column(String)
    side = Column(String)
    time = Column(String)
    trade_id = Column(Integer)
    last_size = Column(String)

    def __repr__(self):
        return "<Transaction(type='%s', sequence='%d', product_id='%s', price='%s', open_24h='%s', volume_24h='%s', low_24h='%s', high_24h='%s', volume_30d='%s', best_bid='%s', best_ask='%s', side='%s', time='%s', trade_id='%d', last_size='%s')>" % (
            self.type, self.sequence, self.product_id, self.price, self.open_24h, self.volume_24h, self.low_24h, self.high_24h, self.volume_30d, self.best_bid, self.best_ask, self.side, self.time, self.trade_id, self.last_size
        )


In [20]:
def on_open(ws):
    print("opened connection")

    subscribe_message = {
        "type": "subscribe",
        "channels": [
            {
                "name": "ticker",
                "product_ids": [
                    "BTC-USD",
                    "XRP-USD",
                    "ETH-USD"
                ]
            }
        ]
    }

    ws.send(json.dumps(subscribe_message))

def on_message(ws, message):
    tick = json.loads(message)
    insert = "INSERT INTO transactions () VALUES "
    print(tick)

socket = "wss://ws-feed.pro.coinbase.com"
ws = websocket.WebSocketApp(socket, on_open=on_open, on_message=on_message)

In [10]:
ws.run_forever()

opened connection
{'type': 'subscriptions', 'channels': [{'name': 'ticker', 'product_ids': ['BTC-USD', 'XRP-USD', 'ETH-USD']}]}
{'type': 'ticker', 'sequence': 17261813719, 'product_id': 'BTC-USD', 'price': '13914.5', 'open_24h': '13540', 'volume_24h': '13925.60467460', 'low_24h': '13290', 'high_24h': '14081.45', 'volume_30d': '334316.01675994', 'best_bid': '13914.49', 'best_ask': '13914.50', 'side': 'buy', 'time': '2020-11-04T00:56:01.971291Z', 'trade_id': 107662706, 'last_size': '0.01426468'}
{'type': 'ticker', 'sequence': 1420871054, 'product_id': 'XRP-USD', 'price': '0.2391', 'open_24h': '0.2356', 'volume_24h': '52052806.37207200', 'low_24h': '0.2283', 'high_24h': '0.2442', 'volume_30d': '1096077389.07411400', 'best_bid': '0.2389', 'best_ask': '0.2391', 'side': 'buy', 'time': '2020-11-04T00:56:02.574218Z', 'trade_id': 14332184, 'last_size': '1166.28698'}
{'type': 'ticker', 'sequence': 10988727087, 'product_id': 'ETH-USD', 'price': '385.92', 'open_24h': '384.05', 'volume_24h': '10436

False