In [6]:
import pandas as pd
from pymongo import MongoClient
from pprint import pprint
from dotenv import load_dotenv
from binance.client import Client
from binance import AsyncClient
import glob
import os
from pathlib import Path
import asyncio

In [7]:
# Load Environment Variables
load_dotenv()

# Gets MongoDB Connection String
MDB_CONNECTION_STRING = os.getenv('MDB_CONNECTION_STRING')

# Gets Binance Api Key and Api Secret
BINANCE_API_KEY = os.getenv('BINANCE_API_KEY')
BINANCE_API_SECRET = os.getenv('BINANCE_API_SECRET')

In [8]:
# Function to connect to the Mongo DB
def get_database():
    try:
        client = MongoClient(MDB_CONNECTION_STRING)
        db = client["project-02"]
        return db
    except Exception as e:
        print(e)

In [9]:
# Connect to the db
db = get_database()

In [10]:
# Test Connection
serverStatusResult=db.command("serverStatus")
print(serverStatusResult["version"])

ServerSelectionTimeoutError: cluster0-shard-00-01.efpum.mongodb.net:27017: timed out,cluster0-shard-00-02.efpum.mongodb.net:27017: timed out,cluster0-shard-00-00.efpum.mongodb.net:27017: timed out, Timeout: 30s, Topology Description: <TopologyDescription id: 6244cc3f96e4e0685a2652ec, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('cluster0-shard-00-00.efpum.mongodb.net', 27017) server_type: Unknown, rtt: None, error=NetworkTimeout('cluster0-shard-00-00.efpum.mongodb.net:27017: timed out')>, <ServerDescription ('cluster0-shard-00-01.efpum.mongodb.net', 27017) server_type: Unknown, rtt: None, error=NetworkTimeout('cluster0-shard-00-01.efpum.mongodb.net:27017: timed out')>, <ServerDescription ('cluster0-shard-00-02.efpum.mongodb.net', 27017) server_type: Unknown, rtt: None, error=NetworkTimeout('cluster0-shard-00-02.efpum.mongodb.net:27017: timed out')>]>

In [None]:
# Get the coinpairs from the Database
db_coinpairs = db["coinpairs"].find({"exchange" : "binance"})

# convert the dictionary objects to dataframe
binance_coinpairs_df = pd.DataFrame(db_coinpairs)

# see the magic
coinpair_list = list(binance_coinpairs_df['pair'])
print(coinpair_list)

In [None]:
# Get the timeframes from the Database
db_binance_timeframes = db["binance_timeframes"].find()

# convert the dictionary objects to dataframe
binance_timeframes_df = pd.DataFrame(db_binance_timeframes)

# see the magic
binance_timeframes_list = list(binance_timeframes_df['timeframe'])
print(binance_timeframes_list)

In [None]:
# Get latest inserted kline date for each Coin Pair
def get_pair_data(binance_timeframes_list, coinpair_list):
    complete_pair_tf = []
    from_timestamp = "3 months ago UTC"
    for timeframe in binance_timeframes_list:
        for pair in coinpair_list:
            # Get the coinpairs from the Database
            collection_name = pair+"_"+timeframe
            latest = db[collection_name].find().sort('open_time', -1 ).limit(1)
            # Exists, update collection by setting from_timestamp to lastest timestamp
            if collection_name in db.list_collection_names():
                complete_pair_tf.append([timeframe, pair, latest[0]["open_time"].strftime("%m/%d/%Y, %H:%M:%S"),collection_name, "is_update"])
            # Does not exists, import from csv
            else:
                complete_pair_tf.append([timeframe, pair, from_timestamp, collection_name, "is_new"])
    return complete_pair_tf

In [None]:
# Function to get the kline data from Binance
async def get_coinpair_kline(pair, timeframe, from_timestamp):
    client = await AsyncClient.create(BINANCE_API_KEY, BINANCE_API_SECRET)
    if timeframe == "1m":
        klines = await client.get_historical_klines(pair, Client.KLINE_INTERVAL_1MINUTE, from_timestamp)
    elif timeframe == "5m":
        klines = await client.get_historical_klines(pair, Client.KLINE_INTERVAL_5MINUTE, from_timestamp)
    elif timeframe == "30m":
        klines = await client.get_historical_klines(pair, Client.KLINE_INTERVAL_30MINUTE, from_timestamp)
    elif timeframe == "1h":
        klines = await client.get_historical_klines(pair, Client.KLINE_INTERVAL_1HOUR, from_timestamp)
    elif timeframe == "1d":
        klines = await client.get_historical_klines(pair, Client.KLINE_INTERVAL_1DAY, from_timestamp)
    else:
        return
    await client.close_connection()
    return klines

In [None]:
async def get_binance_data(complete_pair_tf):
    headers=["open_time", "open", "high", "low","close","volume","close_time","quote_asset_volume","numer_trades","taker_base_volume","taker_quote_volume","ignore"]
    for query_pair in complete_pair_tf:
        collection_name = query_pair[1]+"_"+query_pair[0]
        kline_list = await get_coinpair_kline(query_pair[1], query_pair[0], query_pair[2])
        # If it is updating the db then delete the first item as it repeats
        if query_pair[4] == "is_update":
            kline_list.pop(0)
            print(f"Updating {len(kline_list)} items in {collection_name}..")
        else:
            print(f"Adding {len(kline_list)} items in {collection_name}..")
        if len(kline_list) > 0:
            kline_df = pd.DataFrame(kline_list, columns=headers)
            kline_df['open_time'] = kline_df['open_time'].values.astype(dtype='datetime64[ms]')
            kline_df['close_time'] = kline_df['close_time'].values.astype(dtype='datetime64[ms]')
            kline_df[["open", "high", "low","close","volume","quote_asset_volume","taker_base_volume","taker_quote_volume"]] = kline_df[["open", "high", "low","close","volume","quote_asset_volume","taker_base_volume","taker_quote_volume"]].astype(float)
            kline_df = kline_df.drop(columns=['ignore'])
            kline_dict = kline_df.to_dict("records")
            #print(kline_dict[0])
            db[query_pair[3]].insert_many(kline_dict)
        print(f"Done :)")


In [None]:
# Get pair info to update DB
complete_pair_tf = get_pair_data(binance_timeframes_list, coinpair_list)
print(complete_pair_tf)

In [None]:
# Update DB with the latest binance Data
await get_binance_data(complete_pair_tf)