<h1>Using Binance web socket with Casandra to save the data.</h1>
<h3>ETL to get, proces and save the crypto tickers data.</h3>

In [1]:
import os
from binance.client import Client
from binance.websockets import BinanceSocketManager
from twisted.internet import reactor
import cassandra
from cassandra.cluster import Cluster

<h5>Initializing variables for Binance API</h5>

In [2]:
api_key = os.environ.get('binance_test_api_key')
api_secret = os.environ.get('binance_test_api_secret')
client = Client(api_key, api_secret)
client.API_URL = 'https://testnet.binance.vision/api'

<h5>Get the symbols and use them to create Casandra's talbes.</h5>

In [3]:
symbols = client.get_all_tickers()
symbols_names = []

for symbol in symbols:
    symbols_names.append(str(symbol['symbol']))
    

<h5>Creation and connection to the keyspace and creation of the tables</h5>

In [None]:
cluster = Cluster(["127.0.0.1"])
session = cluster.connect()
session.execute("""CREATE KEYSPACE IF NOT EXISTS binance 
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1}""")
session.set_keyspace('binance')

#Itarate over symbols_names to create the corresponding table.
for symbol_name in symbols_names:
    query = """CREATE TABLE IF NOT EXISTS {} (time int, close double, PRIMARY KEY (time))""".format(symbol_name)
    session.execute(query)

<h5>Callback method for the socket. Each ticker will be processed and inserted into the corresponding talbe</h5>

In [None]:
def trade_history(obj):
    for msg in obj:
        if msg['e'] != 'error':
            insert_query = """INSERT INTO {} (time, close) \
                 VALUES (%s, %s)""".format(msg['s'])
            session.execute(insert_query, (msg['E'], msg['c']))

<h5>Start the socket and wait for the callback to be executed.</h5>

In [None]:
bsm = BinanceSocketManager(client)
ticker_socket = bsm.start_ticker_socket(trade_history)
bsm.start()

<h5>Finally stop the socket and shutdown the conection to Casandra</h5>

In [None]:
# stop websocket
bsm.stop_socket(ticker_socket)
# properly terminate WebSocket
reactor.stop()
session.shutdown()
cluster.shutdown()