Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promise.all 限制并发数量 #48

Open
nmsn opened this issue Oct 9, 2022 · 1 comment
Open

promise.all 限制并发数量 #48

nmsn opened this issue Oct 9, 2022 · 1 comment

Comments

@nmsn
Copy link
Owner

nmsn commented Oct 9, 2022

原文:https://juejin.cn/post/7067885409677606943

现在提供10个id和请求函数(请求返回promise对象),现在要求你设置一个并发数(假设为3),达到并发设置效果。

async function run(){
    for (let i=0;i<idArray.length;i++){
        let promise = request(idArray[i]);
        promise.then((res)=>{
            console.log(`id${res}的请求已经处理完毕,当前并发为${pool.length}`);
            pool.splice(pool.indexOf(promise),1);
        })
        pool.push(promise);
        //这里是重点,当满了就阻塞
        if (pool.length==max){
            await Promise.race(pool);
        }
    }
}
run();

这种写法存在一个问题, Promise.race 会将其他执行过程中的任务舍弃

@nmsn
Copy link
Owner Author

nmsn commented May 7, 2023

class AsyncLimit {
  limit: number;
  count: number;
  queue: any[];

  constructor(n: number) {
    this.limit = n;
    this.count = 0;
    this.queue = [];
  }

  enqueue(fn: () => unknown) {
    // 关键代码: fn, resolve, reject 统一管理
    return new Promise((resolve, reject) => {
      this.queue.push({ fn, resolve, reject });
    });
  }

  dequeue() {
    if (this.count < this.limit && this.queue.length) {
      // 等到 Promise 计数器小于阈值时,则出队执行
      const { fn, resolve, reject } = this.queue.shift();
      this.run(fn).then(resolve).catch(reject);
    }
  }

  // async/await 简化错误处理
  async run(fn: () => unknown) {
    this.count++;
    // 维护一个计数器
    const value = await fn();
    this.count--;
    // 执行完,看看队列有东西没
    this.dequeue();
    return value;
  }

  build(fn: () => unknown) {
    if (this.count < this.limit) {
      // 如果没有到达阈值,直接执行
      return this.run(fn);
    } else {
      // 如果超出阈值,则先扔到队列中,等待有空闲时执行
      return this.enqueue(fn);
    }
  }
}

const asyncPool = function (list: (() => Promise<unknown>)[], concurrency: number) {
  const limit = new AsyncLimit(concurrency);
  return Promise.all(
    list.map(fn => {
      return limit.build(fn);
    }),
  );
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant