In [7]:
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    task1 = t.submit(spider, 1)
    task2 = t.submit(spider, 2)  # 通过submit提交执行的函数到线程池中
    task3 = t.submit(spider, 5)

    print(f"task1: {task1.done()}")  # 通过done来判断线程是否完成
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  # 通过result来获取返回值

task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task5 finished


In [10]:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t: 
    all_task = [t.submit(spider, page) for page in range(1, 5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('finished')
    print(wait(all_task, timeout=2.5))



crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x208b664e8b0 state=finished returned int>, <Future at 0x208b67349d0 state=finished returned int>, <Future at 0x208b6668d30 state=finished returned int>}, not_done={<Future at 0x208b679e0a0 state=running>})
crawl task4 finished


In [4]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

def main():
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for page in range(1, 5):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(f"main: {data}")
        print(1)

main()

crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4
1


In [10]:
import time
from concurrent.futures import ThreadPoolExecutor

def spider(page):
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 4, 1, 8],timeout=9):
    print("task{}:{}".format(i, result))
    i += 1


task1:2
task2:4
task3:1
task4:8


In [71]:
import akshare as ak
import logging
import talib as tl
import concurrent.futures
import datetime


def fetch(stock):
    """
    返回某只股票、指定周期和指定日期间的历史行情日频率数据
    Args: stock
        type:tunple ( (stock_code,stock_name), period, start_date, end_date, adjust)
    Return: stock_data
        type:daraframe
    """


    # stock_zh_a_hist：返回单只股票的历史行情日频率数据， https://www.akshare.xyz/data/stock/stock.html#id20
    try:
        data = ak.stock_zh_a_hist(symbol=stock[0][0], period=stock[1], start_date=stock[2], end_date=stock[3],adjust=stock[4])
    except  Exception as exc:
        print('%s(%r) fetch data generated an exception: %s' % (stock[0][1], stock[0][0], exc))

    if data is None or data.empty:
        logging.debug("股票："+stock+" 没有数据，略过...")
        return

    data['p_change'] = tl.ROC(data['收盘'], 1)

    return data

def run(stocks,period="daily",start_date="20200201",end_date=datetime.datetime.now().strftime('%Y%m%d'),adjust="qfq"):
    """
    返回沪深京 A 股指定股票、指定周期和指定日期间的历史行情日频率数据
    Args: stocks 
        type:list [('代码', '名称')]
    Return: stocks_data
        type:dict keys:('代码', '名称'); values: daraframe
    """

    stocks_data = {}


    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        future_to_stock = {executor.submit(fetch, (stock,period,start_date,end_date,adjust)): stock for stock in stocks} # 通过submit提交执行的函数到线程池中
        # as_completed:当子线程中的任务执行完后，直接用 result() 获取返回结果
        for future in concurrent.futures.as_completed(future_to_stock):
            stock = future_to_stock[future] # stock: tunple ('代码', '名称')
            try:
                data = future.result()
                if data is not None:
                    data = data.astype({'成交量': 'double'})
                    stocks_data[stock] = data
            except Exception as exc:
                print('%s(%r) generated an exception: %s' % (stock[1], stock[0], exc))

    
    
    return stocks_data

stocks =  [('600519', '名称1'),('600172', '名称2')]
res = run(stocks,start_date="20200201",end_date="20200228")
# print(res)

名称1('600519') generated an exception: tuple index out of range
名称2('600172') generated an exception: tuple index out of range
{}


In [45]:
#返回当前日期
import datetime
datetime.datetime.now().strftime('%Y%m%d')




'20230327'