In [3]:
import os
import time
import math
import json
import logging
from datetime import datetime

from binance.spot import Spot
from binance.error import ClientError

import smtplib
from email.message import EmailMessage
import mimetypes

# Function Definition for Strategy

In [None]:
class Strategy():
    def __init__(self, config_file_name: str):
        self.config = self.load_config(config_file_name)
        self.logger, self._log_path = self.setup_logging()
        self.is_connected, self._bn_client = self.connect_to_server()
    
    def cancel_staled_orders(self) -> None:
        if not os.path.isfile(self.config['pending_order_file']):
            return

        with open(self.config['pending_order_file'], "r") as f:
            orders = json.load(f)

        self.logger.info("=== Cancelling the staled orders ===")
        still_pending = []
        for order in orders:
            try:
                order = self._bn_client.get_order(symbol=order["symbol"], orderId=order["orderId"])
                if order["status"] in ("NEW", "PARTIALLY_FILLED"):
                    self._bn_client.cancel_order(symbol=order["symbol"], orderId=order["orderId"])
                    self.logger.info("The stale order id {} for {} is cancelled".format(order['orderId'], order['symbol']))
                else:
                    self.logger.info("Order {} already {}".format(order['orderId'], order['status']))
            except ClientError as error:
                self.logger.error("Unable to check the order id.")
                self.logger.error(
                "Found error. status: {}, error code: {}, error message: {}".format(
                    error.status_code, error.error_code, error.error_message
                )
            )
                still_pending.append(order)  # keep the orders

        # Clear or keep only the orders that are not yet processed.
        with open(self.config['pending_order_file'], "w") as f:
            json.dump(still_pending, f, indent=2)
    
    def check_available_balance(self) -> dict:
        try:
            account_info = self._bn_client.account()
            self.logger.debug(account_info)
            available_balance = float([item['free'] for item in account_info['balances'] if item['asset'] == self.config['base_token']][0])
            dic = {'success': True, 'available_balance': available_balance}

            return dic
        
        except ClientError as error:
            self.logger.error("Unable to check available balance.")
            self.logger.error(
                "Found error. status: {}, error code: {}, error message: {}".format(
                    error.status_code, error.error_code, error.error_message
                )
            )
            return {'success': False, 'available_balance': None}
        
    def connect_to_server(self) -> tuple:
        self.logger.info("== Connecting to Binance server... ==")
        bn_client = Spot(self.config['api_key'], self.config['api_secret'])
        
        try:
            bn_client.account()
            self.logger.info("The server is connected.")
            is_connected = True

        except ClientError as error:
            self.logger.error("The server is NOT connected!!!")
            self.logger.error(
                    "Found error. status: {}, error code: {}, error message: {}".format(
                        error.status_code, error.error_code, error.error_message
                    )
                )
            is_connected = False
        
        return is_connected, bn_client
    
    def dca(self) -> dict:
        self.logger.info("== Making DCA order... ==")

        try:
            self.logger.info("Getting the price...")
            ticker = self._bn_client.ticker_24hr(self.config['quote_token'] + self.config['base_token'])
            self.logger.debug(ticker)

        except ClientError as error:
            self.logger.error("Unable to get the price.")
            self.logger.error(
                "Found error. status: {}, error code: {}, error message: {}".format(
                    error.status_code, error.error_code, error.error_message
                )
            )
            return {'success': False, 'response': {}}
        
        symbol = ticker['symbol']
        avg_price = avg_price = float(ticker['lastPrice']) + (float(ticker['openPrice']) - float(ticker['lastPrice'])) / 2
        current_price = float(ticker['lastPrice'])

        if current_price - self.config['offset'] < avg_price:
            target_price = current_price - self.config['offset']
        else:
            target_price = avg_price

        if target_price > self.config['max_price']:
            self.logger.warning(f"Target price {target_price} is higher than max allowed {self.config['max_price']}. Skip placing order.")
            return {'success': False, 'response': {}}

        symbol_precision_response = self.get_symbol_precision(symbol)

        if (symbol_precision_response['success']):
            buy_quantity = math.floor(self.config['dca_amount'] / (current_price - self.config['offset']) * (10 ** symbol_precision_response['quantity_precision'])) / 10 ** symbol_precision_response['quantity_precision']
            buy_price = round(target_price, symbol_precision_response['price_precision'])
        else:
            self.logger.error("The order has failed.")
            return {'success': False, 'response': {}}

        params = {
        'symbol': symbol,
        'side': 'BUY',
        'type': 'LIMIT',
        'timeInForce': 'GTC',
        'quantity': buy_quantity,
        'price': buy_price
        }

        try:
            response = self._bn_client.new_order(**params)
            self.logger.info(response)
            self.logger.info("Order successful! The price is {}.".format(buy_price))
            self.save_order_to_file(response['symbol'], response['orderId'])

            dic = {'success': True, 'response': response}
            
            return dic
        
        except ClientError as error:
            self.logger.error("The order has failed.")
            self.logger.error(
                "Found error. status: {}, error code: {}, error message: {}".format(
                    error.status_code, error.error_code, error.error_message
                )
            )
            return {'success': False, 'response': {}}

    def send_email(self, order_response: dict) -> None:
        try:
            email_config = self.config['email']

            msg = EmailMessage()
            msg['From'] = email_config['sender']
            msg['To'] = email_config['receiver']
            date = datetime.now().strftime("%Y%m%d")

            # Decide the subject and the content
            if order_response['success']:
                msg['Subject'] = "[{}] {} - Success {}".format(email_config['subject'], self.config['quote_token'] + self.config['base_token'], date)
                msg.set_content("Congratuations! The order is created.\nOrder info:\nSymbol: {}\tPrice: {}\tOrder id: {}".format(
                    order_response['response']['symbol'], order_response['response']['price'], order_response['response']['orderId']))
            else:
                msg['Subject'] = "[{}] {} - Fail {}".format(email_config['subject'], self.config['quote_token'] + self.config['base_token'], date)
                msg.set_content("Order is not created, please check the log file.")

            # Attach the log file
            with open(self._log_path, 'rb') as f:
                file_data = f.read()
                maintype, subtype = mimetypes.guess_type(self._log_path)[0].split('/')
                msg.add_attachment(file_data, maintype=maintype, subtype=subtype, filename=os.path.basename(self._log_path))

            # Send the email
            with smtplib.SMTP_SSL(email_config['smtp_server'], email_config['smtp_port']) as server:
                server.login(email_config['sender'], email_config['app_password'])
                server.send_message(msg)

            self.logger.info("Email sent successfully.")

        except Exception as error:
            self.logger.error(f"Failed to send email: {error}")

    def get_flexible_product_position(self) -> dict:
        try:
            response = self._bn_client.get_flexible_product_position(
                current=1, size=100, recvWindow=5000
            )
            self.logger.debug(response)
            position = [item for item in response['rows'] if item['asset'] == self.config['base_token']][0]
            dic = {'success': True, 'position': position}

            return dic
        
        except ClientError as error:
            self.logger.error("Unable to get position name.")
            self.logger.error(
                "Found error. status: {}, error code: {}, error message: {}".format(
                    error.status_code, error.error_code, error.error_message
                )
            )
            return {'success': False, 'position': None}
        
        except:
            self.logger.warning("There is no position of the reddem token or it is unsupported.")
            return {'success': False, 'position': None}
        
    def get_symbol_precision(self, symbol: str) -> dict:
        self.logger.info("== Getting precision information... ==")
        try:
            info = self._bn_client.exchange_info(symbol=symbol)
            self.logger.info("Got the information from the server.")
            filters = info['symbols'][0]['filters']

            for f in filters:
                if f['filterType'] == 'LOT_SIZE':
                    step_size = float(f['stepSize'])
                    quantity_precision = abs(round(math.log10(step_size)))
                elif f['filterType'] == 'PRICE_FILTER':
                    tick_size = float(f['tickSize'])
                    price_precision = abs(round(math.log10(tick_size)))

            dic = {'success': True, 'quantity_precision': quantity_precision, 'price_precision': price_precision}
            return dic
        
        except ClientError as error:
            self.logger.info("Unable to get the information from the server.")
            self.logger.error(
                    "Found error. status: {}, error code: {}, error message: {}".format(
                        error.status_code, error.error_code, error.error_message
                    )
                )
            
            dic = {'success': False, 'quantity_precision': None, 'price_precision': None}
            return dic

    def redeem_flexible_product(self) -> dict:
        position_response = self.get_flexible_product_position()
        available_balance_response = self.check_available_balance()
        self.logger.info("== Start redeeming... ==")

        if (position_response['success']):
            product_id = position_response['position']['productId']
        else:
            self.logger.error("Unable to get the flexible position.")

        if (available_balance_response['success']):
            redeem_amount = self.config['dca_amount'] - available_balance_response['available_balance']
            if redeem_amount <= 0:
                self.logger.info("The amount in the spot account is enough, no need to redeem.")
            else:
                self.logger.info("Actually redeem amount: {}.".format(redeem_amount))
        else:
            redeem_amount = self.config['dca_amount']
            self.logger.info("Unable to get available balance, actually redeem amount: {}.".format(redeem_amount))

        try:
            response = self._bn_client.redeem_flexible_product(
                product_id, amount=redeem_amount, recvWindow=5000
            )
            self.logger.debug(response)
            self.logger.info("Redeem successful! {} {} is redeemed to spot account.".format(redeem_amount, self.config['base_token']))

        except ClientError as error:
            self.logger.warning(
                "Found warning. status: {}, error code: {}, error message: {}".format(
                    error.status_code, error.error_code, error.error_message
                )
            )
            return {'success': False, 'response': {}}

        dic = {'success': True, 'response': response}
        return dic

    def save_order_to_file(self, symbol: str, order_id: int) -> None:
        data = []
        
        if os.path.isfile(self.config['pending_order_file']):
            with open(self.config['pending_order_file'], "r") as f:
                data = json.load(f)
        else:
            self.logger.info("Pending order file is not exist.")
            self.logger.info("Creating...")

        data.append({"symbol": symbol, "orderId": order_id})
        with open(self.config['pending_order_file'], "w") as f:
            json.dump(data, f, indent=2)
    
    def load_config(self, path: str = "config.json") -> dict:
        with open(path, 'r') as file:
            return json.load(file)

    def setup_logging(self) -> tuple:
        # for jupyter
        root_logger = logging.getLogger()
        if root_logger.hasHandlers():
            root_logger.handlers.clear()
        
        os.makedirs(self.config['log_dir'], exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        log_filename = "log_{}.txt".format(timestamp)
        log_path = os.path.join(self.config['log_dir'], log_filename)

        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            handlers=[
                logging.FileHandler(log_path, mode='w', encoding='utf-8'),
                logging.StreamHandler()
            ]
        )

        return logging.getLogger(__name__), log_path


# Main flow definition

In [None]:
def dca_strategy():
    # get the abs path of config.json (for jupyter notebook)
    CURRENT_PATH = os.getcwd()
    CONFIG_PATH = os.path.join(CURRENT_PATH, "../config.json")
    
    client = Strategy(CONFIG_PATH)
    client.logger.info("=== START TASK ===")

    if client.is_connected:
        
        # clear the staled orders
        if client.config['enable_revoke_stale_order']:
            client.cancel_staled_orders()

        redeem_response = client.redeem_flexible_product()

        if redeem_response['success']:
            client.logger.info("Waiting 10 seconds before placing order...")
            time.sleep(10)

        order_response = client.dca()

    else:
        order_response = {'success': False}

    client.logger.info("=== END TASK ===")

    if client.config['enable_email']:
        client.send_email(order_response)

# Main flow

In [9]:
dca_strategy()

/home/opc/binance/trading_strategies/config.json


NameError: name 'Strategy' is not defined

## Unit test

In [21]:
client = Strategy("../config.json")

client.is_connected

2025-05-17 20:46:19,785 [INFO] == Connecting to Binance server... ==
2025-05-17 20:46:19,812 [INFO] The server is connected.


True

In [22]:
client = Strategy("../config.json")

response = client.get_flexible_product_position()
response

2025-05-17 20:46:26,546 [INFO] == Connecting to Binance server... ==
2025-05-17 20:46:26,569 [INFO] The server is connected.


{'success': True,
 'position': {'totalAmount': '82.03745674',
  'tierAnnualPercentageRate': {'0-200USDT': '0.05000000'},
  'latestAnnualPercentageRate': '0.02286619',
  'asset': 'USDT',
  'canRedeem': True,
  'collateralAmount': '0',
  'productId': 'USDT001',
  'yesterdayRealTimeRewards': '0.01835841',
  'cumulativeBonusRewards': '3.10986122',
  'cumulativeRealTimeRewards': '4.72846487',
  'cumulativeTotalRewards': '18.94657668',
  'autoSubscribe': True}}

In [23]:
client = Strategy("../config.json")

response = client.check_available_balance()
response

2025-05-17 20:47:04,274 [INFO] == Connecting to Binance server... ==
2025-05-17 20:47:04,304 [INFO] The server is connected.


{'success': True, 'available_balance': 50.0}

In [24]:
client = Strategy("../config.json")

response = client.redeem_flexible_product()
response

2025-05-17 20:47:32,143 [INFO] == Connecting to Binance server... ==
2025-05-17 20:47:32,193 [INFO] The server is connected.
2025-05-17 20:47:32,328 [INFO] == Start redeeming... ==
2025-05-17 20:47:32,328 [INFO] The amount in the spot account is enough, no need to redeem.


{'success': False, 'response': {}}

In [26]:
client = Strategy("../config.json")
symbol = "BTCUSDT"

response = client.get_symbol_precision(symbol)
response

2025-05-17 20:48:45,528 [INFO] == Connecting to Binance server... ==
2025-05-17 20:48:45,650 [INFO] The server is connected.
2025-05-17 20:48:45,652 [INFO] == Getting precision information... ==
2025-05-17 20:48:45,664 [INFO] Got the information from the server.


{'success': True, 'quantity_precision': 5, 'price_precision': 2}

In [27]:
client = Strategy("../config.json")

response = client.dca()
response

2025-05-17 20:49:08,539 [INFO] == Connecting to Binance server... ==
2025-05-17 20:49:08,566 [INFO] The server is connected.
2025-05-17 20:49:08,567 [INFO] == Making DCA order... ==
2025-05-17 20:49:08,568 [INFO] Getting the price...


{'success': False, 'response': {}}

In [28]:
client = Strategy("../config.json")

client.cancel_staled_orders()

2025-05-17 20:49:32,585 [INFO] == Connecting to Binance server... ==
2025-05-17 20:49:32,615 [INFO] The server is connected.
