In [None]:
import time
import hmac
import hashlib
import json
import requests
import logging
from threading import Thread
from socketio import Client

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')


class Pi42API:
    def __init__(self, api_secret: str, api_key: str):
        self.api_secret = api_secret
        self.api_key = api_key
        self.base_url = "https://fapi.pi42.com/"
        self.live_eth_price = None  # Store the live ETH price
        self.web_socket_url = "https://fawss.pi42.com/"
        self.socket = Client()
        self._start_websocket_connection()
        logging.debug(f"Initialized Pi42API with API Key: {self.api_key}")

    def _start_websocket_connection(self):
        """Start the WebSocket connection to fetch live ETH prices."""

        @self.socket.on("connect")
        def on_connect():
            logging.info("Connected to WebSocket server")
            topics = ['ethusdt@markPrice']
            self.socket.emit('subscribe', {"params": topics})
            logging.debug(f"Subscribed to topics: {topics}")

        @self.socket.on("markPriceUpdate")
        def on_mark_price_update(data):
            # Log the full response to check the structure
            logging.debug(f"Received markPriceUpdate event: {data}")

            if data.get("s") == "ETHUSDT":  # Ensure it's the ETHUSDT symbol
                self.live_eth_price = float(data.get("p", 0))  # Use 'p' for the price field
                logging.info(f"Live ETHUSDT Price Updated: {self.live_eth_price}")
            else:
                logging.debug(f"Received data for symbol: {data.get('s')}. Ignoring.")  # Debug instead of warning

        @self.socket.on("disconnect")
        def on_disconnect():
            logging.warning("Disconnected from WebSocket server")
            logging.info("Attempting to reconnect...")
            self._start_websocket_connection()

        def run_socket():
            self.socket.connect(self.web_socket_url)
            self.socket.wait()

        # Run WebSocket in a separate thread to prevent blocking
        socket_thread = Thread(target=run_socket, daemon=True)
        socket_thread.start()

    def generate_signature(self, data_to_sign):
        """Generate HMAC SHA256 signature."""
        signature = hmac.new(
            self.api_secret.encode('utf-8'),
            data_to_sign.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        logging.debug(f"Generated signature: {signature} for data: {data_to_sign}")
        return signature

    def place_order(self, symbol, side, order_type, quantity, price, stop_loss_price=None, take_profit_price=None):
        """Place a new order with optional stop loss and take profit."""
        logging.info("Placing order...")
        timestamp = str(int(time.time() * 1000))
        
        params = {
            'timestamp': timestamp,
            'placeType': 'ORDER_FORM',
            'quantity': quantity,
            'side': side,
            'symbol': symbol.upper(),
            'type': order_type,
            'price': price,
            'reduceOnly': False,
            'marginAsset': 'INR',
            'deviceType': 'WEB',
            'userCategory': 'EXTERNAL',
        }
        
        # Optionally add stop loss and take profit prices if provided
        if stop_loss_price is not None:
            params['stopLossPrice'] = stop_loss_price
        if take_profit_price is not None:
            params['takeProfitPrice'] = take_profit_price

        data_to_sign = json.dumps(params, separators=(',', ':'))
        signature = self.generate_signature(data_to_sign)

        headers = {
            'api-key': self.api_key,
            'signature': signature,
        }

        logging.debug(f"Order params: {params}")
        logging.debug(f"Order headers: {headers}")

        try:
            response = requests.post(
                f"{self.base_url}v1/order/place-order",
                json=params,
                headers=headers
            )
            response.raise_for_status()
            response_data = response.json()

            logging.info(f"Order placed successfully: {response_data}")
            return response_data

        except Exception as e:
            logging.error(f"An unexpected error occurred while placing order: {str(e)}")

    def execute_trade(self, symbol, quantity, price_buffer=0.0001 , stop_loss_buffer=0.00025, wait_timeout=30):
        """Execute a buy (long) or sell (short) trade and place stop-loss dynamically after execution."""
        logging.info("Starting trade execution...")

        # Wait for live price from WebSocket
        start_time = time.time()
        while self.live_eth_price is None:
            if time.time() - start_time > wait_timeout:
                logging.error("Live ETH price is not available within the timeout period. Aborting trade.")
                return
            time.sleep(0.5)

        # Use the live ETH price
        current_price = self.live_eth_price
        logging.info(f"Using live ETH price: {current_price}")

        # Calculate Buy, Sell, and Stop-Loss Prices
        buy_price = round(current_price * (1 - price_buffer), 2)
        sell_price = round(current_price * (1 + price_buffer), 2)

        logging.info(f"Calculated buy price: {buy_price}, sell price: {sell_price}")

        # Place Buy Order (Long position)
        stop_loss_price_long = round(buy_price * (1 - stop_loss_buffer), 2)
        take_profit_price_long = round(buy_price * (1 + stop_loss_buffer), 2)
        
        buy_order = self.place_order(symbol, "BUY", "LIMIT", quantity, buy_price, stop_loss_price=stop_loss_price_long, take_profit_price=take_profit_price_long)
        if buy_order:
            logging.info("Buy order placed successfully.")
            #return

        # Place Sell Order (Short position)
        stop_loss_price_short = round(sell_price * (1 + stop_loss_buffer), 2)
        take_profit_price_short = round(sell_price * (1 - stop_loss_buffer), 2)
        
        sell_order = self.place_order(symbol, "SELL", "LIMIT", quantity, sell_price, stop_loss_price=stop_loss_price_short, take_profit_price=take_profit_price_short)
        if sell_order:
            logging.info("Sell order placed successfully.")
            #return

        logging.error("Both buy and sell orders failed. Aborting trade.")


In [None]:
def test():
    api_secret =''
    api_key = ''

    client = Pi42API(api_secret, api_key)
    client.execute_trade('ETHUSDT', 0.01)


In [None]:
if __name__ == "__main__":
    test()
