In [2]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time,os
import pandas as pd
from selenium.common.exceptions import ElementClickInterceptedException, NoSuchElementException
from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys  
import uuid 
def get_last_file(dir):
    file_names = os.listdir(dir)
    sorted_file_names = sorted(file_names)
    if len(sorted_file_names) < 1:
        return None
    else:
        return os.path.join(dir,sorted_file_names[-1]) 
class SnowTrader:
    def __init__(self, driver,group='small',min_amount = 100,commission_fee = 0.0001,transaction_fee = 0.0001):
        self.driver = driver
        self.min_amount = min_amount
        self.commission_fee = commission_fee
        self.transaction_fee = transaction_fee
        self.group = group
        self.set_group()
    def set_group(self):
        driver = self.driver
        group_element = driver.find_elements(By.CLASS_NAME, "moni__tabs__controls")
        for g in group_element:
            a = g.find_element(By.TAG_NAME, "a")
            if a.text == self.group:
                if a.get_attribute("class") == "active":
                    continue
                else:
                    a.click()
                    time.sleep(2)
                    break
    def get_assets(self):
        asssets={}
        total_assets_element = driver.find_element("xpath", "//span[text()='总资产']")
        next_sibling = total_assets_element.find_element("xpath", "following-sibling::*[1]")
        total_assets = next_sibling.text
        asssets['total_asset']=float(total_assets.replace(',',''))
        cash = driver.find_element("xpath", "//span[text()='现金']")
        next_sibling = cash.find_element("xpath", "following-sibling::*[1]")
        cash = next_sibling.text
        asssets['cash']=float(cash.replace(',',''))      
        return asssets
    def get_positions(self):
        positions=[]
        position_div = driver.find_element(By.CLASS_NAME, "moni__position__list_position")
        trs = position_div.find_elements(By.TAG_NAME, "tr")
        for i,tr in enumerate(trs):
            position={}
            if i == 0:
                continue
            tds = tr.find_elements(By.TAG_NAME, "td")
            symbol_info = tds[0].find_elements(By.TAG_NAME, "div")
            symbol = symbol_info[1].text
            if len(symbol)==0:
                continue
            symbol_name = symbol_info[0].text
            # amount = tds[2].text
            position['symbol']=symbol
            position['symbol_name']=symbol_name
            position['market_value']=float(tds[3].text.replace(',',''))
            position['amount']=float(tds[4].text.replace(',',''))
            positions.append(position)
        return positions
    def buy(self, symbol, cash):
        if cash<1000:
            print(f"buy {symbol} failed. cash is {cash}")
            return -1
        buy_button = driver.find_element("xpath", "//span[text()='买入']")
        buy_button.click()
        time.sleep(1)
        
        symbol_input = driver.find_element("xpath", "//input[@placeholder='搜索股票名称/代码/拼音']")
        symbol_input.send_keys(symbol)
        time.sleep(3)
        symbol_input.send_keys(Keys.ENTER)
        # result_list = symbol_input.find_element("xpath", "following-sibling::*[1]")
        # # 获取 result_list 下的子元素（股票搜索结果列表项）
        # result_items = result_list.find_elements("xpath", ".//li")
        # if result_items:
        #     result_items[0].click()

        time.sleep(1)
        # 输入价格
        price_input = driver.find_elements("xpath", "//input[@placeholder='输入价格']")
        if not price_input:
            print(f"buy {symbol} failed. price_input is empty")
            return -3
        price = float(price_input[-1].get_attribute("value"))
        #price = float(price_input[-1].text)
        amount = int(cash/price/self.min_amount)*self.min_amount
        if amount<100:
            order = driver.find_elements(By.CLASS_NAME, "modal__order__performance")[-1] 
            cancle_button = order.find_elements(By.CLASS_NAME, "modal__confirm__cancle")[-1]
            cancle_button.click()
            time.sleep(1)
            print(f"buy {symbol} failed. amount is {amount}")
            return -2
        amount_input = driver.find_element("xpath", "//input[@placeholder='输入数量']")
        amount_input.send_keys(str(amount))
        commission_fee_input = driver.find_element("xpath", "//input[@placeholder='输入佣金率']")
        commission_fee_input.send_keys(Keys.CONTROL + "a")
        commission_fee_input.send_keys(str(self.commission_fee*1000))
        transaction_fee_input = driver.find_element("xpath", "//input[@placeholder='输入税率']")
        transaction_fee_input.send_keys(Keys.CONTROL + "a")
        transaction_fee_input.send_keys(str(self.transaction_fee*1000))
        # 查找并点击确认买入按钮（class为modal__confirm__submit）
        time.sleep(1)
        order = driver.find_elements(By.CLASS_NAME, "modal__order__performance")[-1] 
        confirm_button = order.find_elements(By.CLASS_NAME, "modal__confirm__submit")[-1]
        confirm_button.click()
        time.sleep(1)
        return 1
    def sell(self, symbol, amount):
        sell_button = driver.find_element("xpath", "//span[text()='卖出']")
        sell_button.click()
        time.sleep(1)
        symbol_input = driver.find_element("xpath", "//input[@placeholder='搜索股票名称/代码/拼音']")
        symbol_input.send_keys(symbol)
        time.sleep(3)
        symbol_input.send_keys(Keys.ENTER)
        # result_list = symbol_input.find_element("xpath", "following-sibling::*[1]")
        # # 获取 result_list 下的子元素（股票搜索结果列表项）
        # result_items = result_list.find_elements("xpath", ".//li")
        # if result_items:
        #     result_items[0].click()
        time.sleep(1)
        amount_input = driver.find_element("xpath", "//input[@placeholder='输入数量']")
        amount_input.send_keys(str(amount))
        commission_fee_input = driver.find_element("xpath", "//input[@placeholder='输入佣金率']")
        commission_fee_input.send_keys(Keys.CONTROL + "a")
        commission_fee_input.send_keys(str(self.commission_fee*1000))
        transaction_fee_input = driver.find_element("xpath", "//input[@placeholder='输入税率']")
        transaction_fee_input.send_keys(Keys.CONTROL + "a")
        transaction_fee_input.send_keys(str(self.transaction_fee*1000))
        time.sleep(1)
        order = driver.find_elements(By.CLASS_NAME, "modal__order__performance")[-1] 
        confirm_button = order.find_elements(By.CLASS_NAME, "modal__confirm__submit")[-1]
        confirm_button.click()
        time.sleep(1)
class Result:
    def __init__(self,code,msg,obj=None):
        self.code = code
        self.msg = msg
        self.obj = obj
    def __str__(self):
        return 'code:'+str(self.code)+' msg:'+self.msg

class Trader_top_low_n_snow:
    def __init__(self,trader = None,data_dir ='E:\\data\\trader_tmp\\',topOrlow='top',n=15,factor_col='factor',symbol_col='symbol',symbol_type='XXXXXX',strategy='small'):
        self.dir = data_dir
        self.trader = trader
        self.topOrlow = topOrlow
        self.n = n
        self.data = None
        self.strategy = strategy
        self.factor_col = factor_col
        self.symbol_col = symbol_col
        self.symbol_type = symbol_type
        self.set_trade_data()
    def buy_per_cash(self):
        asset = self.trader.get_assets()
        cash = asset['cash']
        total_asset = asset['total_asset']
        per = total_asset / self.n
        buycash = min(cash, per)
        return buycash
        
    def set_trade_data(self):
        csv_file = get_last_file(self.dir)
        data = pd.read_csv(csv_file, dtype={self.symbol_col: str},parse_dates=['date'])
        self.data = data.copy()
        self.data = self.data[self.data[self.factor_col]>0]
        self.data = self.data.sort_values(by=self.factor_col,ascending=False if self.topOrlow=='top' else True)
        self.data = self.data.head(self.n)
        if self.symbol_type == 'XXXXXX':
            pass
        elif self.symbol_type[:3] == 'MM.':
            self.data[self.symbol_col] = self.data[self.symbol_col].str[3:]
        elif self.symbol_type[-3:] == '.MM':
            self.data[self.symbol_col] = self.data[self.symbol_col].str[:-3]
        elif self.symbol_type[:2] == 'MM':
            self.data[self.symbol_col] = self.data[self.symbol_col].str[3:]
        elif self.symbol_type[-2:] == 'MM':
            self.data[self.symbol_col] = self.data[self.symbol_col].str[:-2]
        else:
            pass
    def sell(self):
        ps =  self.trader.get_positions()
        orderid=''
        for p in ps:
            if p['symbol'][2:] not in self.data[self.symbol_col].values:
                count = p['amount']
                if count > 0:
                    orderid = self.strategy + '_s_' + str(uuid.uuid1())
                    self.trader.sell(p['symbol'],count)
                    print(Result(0,msg=f"sell success.{orderid} {p['symbol']},{count}" ))
        return orderid
    def buy(self,symbol,cash):
        ps =  self.trader.get_positions()
        holded = 0
        for p in ps:
            if p['symbol'][2:] == symbol:
                holded = p['market_value']
                break
        cash = cash - holded
        orderid = self.strategy + '_b_' + str(uuid.uuid1())
        r = self.trader.buy(symbol,cash)
        if r>0:
            print(Result(orderid,msg=f"buy success. {symbol},{cash}" ))
    def trade(self):
        self.sell()
        time.sleep(1)
        for symbol in self.data[self.symbol_col].values:
            self.buy(symbol,self.buy_per_cash())

In [3]:
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9223")
driver = webdriver.Chrome( options=chrome_options)


In [None]:
trader.get_assets()

{'total_asset': '200000.00', 'cash': '200000.00'}

In [None]:
trader.buy('000001', 100)

In [None]:
trader.get_positions()

[{'symbol': 'SZ000001',
  'symbol_name': '平安银行',
  'market_value': 1162.0,
  'amount': 100.0},
 {'symbol': 'SZ002512',
  'symbol_name': '达华智能',
  'market_value': 603.0,
  'amount': 100.0}]

In [None]:
trader.sell('002512',100)