Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker监听消息需要添加消息类型判断 worker monitoring message needs to add message type judgment #72

Closed
xinlingqudongX opened this issue Feb 19, 2022 · 12 comments

Comments

@xinlingqudongX
Copy link

消息通知
消息体

  • 我使用的是Egg框架,delay.worker可以收到到其他线程发送的消息,你应该加个消息判断
  • I am using the Egg framework, delay.worker can receive messages sent by other threads, you should add a message to judge
@xinlingqudongX xinlingqudongX changed the title delay.worker监听消息需要添加消息类型判断delay.worker monitoring message needs to add message type judgment delay.worker监听消息需要添加消息类型判断\ndelay.worker monitoring message needs to add message type judgment Feb 19, 2022
@xinlingqudongX xinlingqudongX changed the title delay.worker监听消息需要添加消息类型判断\ndelay.worker monitoring message needs to add message type judgment delay.worker监听消息需要添加消息类型判断delay.worker monitoring message needs to add message type judgment Feb 19, 2022
@xinlingqudongX xinlingqudongX changed the title delay.worker监听消息需要添加消息类型判断delay.worker monitoring message needs to add message type judgment worker监听消息需要添加消息类型判断 worker monitoring message needs to add message type judgment Feb 19, 2022
@weyoss
Copy link
Owner

weyoss commented Feb 19, 2022

Sorry, but I am not sure I understand what you mean.

As far as I understood, you are suggesting to add a message type check because the worker can be run by many threads.

If that is true, then a message type check is already done statically. Runtime checks are not required. By convention, all the workers are forked from one place and by one handler. It's the WorkerRunner.

The WorkerRunner has the following requirements:

export class WorkerRunner<
  WorkerParameters extends TWorkerParameters = TWorkerParameters,
> extends EventEmitter {
    constructor(
    redisClient: RedisClient,
    workersDir: string,
    keyLock: string,
    workerParameters: WorkerParameters,
    workerPool?: WorkerPool,
  ) {
  // ...
  }
}

So the parameters we pass to WorkerRunner should be of the WorkerParameters type and are forwarded to all forked threads as a message.

When the consumer creates the WorkerRunner instance it specifies IConsumerWorkerParameters as a parameters type for all the workers that it runs.

this.workerRunner =  new WorkerRunner<IConsumerWorkerParameters>(...)

The DelayWorker subprocess receives the message and expect it to be of IConsumerWorkerParameters type:

process.on('message', (payload: string) => {
  const params: IConsumerWorkerParameters = JSON.parse(payload);
  setConfiguration(params.config);
  RedisClient.getNewInstance((err, client) => {
    if (err) throw err;
    else if (!client) throw new EmptyCallbackReplyError();
    else new DelayWorker(client, params, false).run();
  });
});

The workers are NOT supposed to be run by a RedisSMQ client directly. They are internal parts of the library and by no means you should use them in your application.

Adding to that, redis-smq, starting with v6, no more forks any workers subprocesses. The code fragment you are showing is not used in any place and was kept just for historical purposes.

I fear that you are wrongly using this library and I am surprised by this screenshot:

154787414-af4dbebb-a4eb-4f77-87c8-c2c381fab8d8

How did you get that!?

If you are having an issue please show me a full example of your code so we can discuss it.

@PhantomRay
Copy link

better provide code for re-produce.

@xinlingqudongX
Copy link
Author

Sorry, but I am not sure I understand what you mean.

As far as I understood, you are suggesting to add a message type check because the worker can be run by many threads.

If that is true, then a message type check is already done statically. Runtime checks are not required. By convention, all the workers are forked from one place and by one handler. It's the WorkerRunner.

The WorkerRunner has the following requirements:

export class WorkerRunner<
  WorkerParameters extends TWorkerParameters = TWorkerParameters,
> extends EventEmitter {
    constructor(
    redisClient: RedisClient,
    workersDir: string,
    keyLock: string,
    workerParameters: WorkerParameters,
    workerPool?: WorkerPool,
  ) {
  // ...
  }
}

So the parameters we pass to WorkerRunner should be of the WorkerParameters type and are forwarded to all forked threads as a message.

When the consumer creates the WorkerRunner instance it specifies IConsumerWorkerParameters as a parameters type for all the workers that it runs.

this.workerRunner =  new WorkerRunner<IConsumerWorkerParameters>(...)

The DelayWorker subprocess receives the message and expect it to be of IConsumerWorkerParameters type:

process.on('message', (payload: string) => {
  const params: IConsumerWorkerParameters = JSON.parse(payload);
  setConfiguration(params.config);
  RedisClient.getNewInstance((err, client) => {
    if (err) throw err;
    else if (!client) throw new EmptyCallbackReplyError();
    else new DelayWorker(client, params, false).run();
  });
});

The workers are NOT supposed to be run by a RedisSMQ client directly. They are internal parts of the library and by no means you should use them in your application.

Adding to that, redis-smq, starting with v6, no more forks any workers subprocesses. The code fragment you are showing is not used in any place and was kept just for historical purposes.

I fear that you are wrongly using this library and I am surprised by this screenshot:

154787414-af4dbebb-a4eb-4f77-87c8-c2c381fab8d8

How did you get that!?

If you are having an issue please show me a full example of your code so we can discuss it.

是因为worker收到了其他主线程发送的消息,并不是redis-smq的线程,而是Egg框架的线程发送的,所以需要判断来源
It is because the worker received the message sent by other main threads, not the thread of redis-mq, but the thread of the Egg framework, so it is necessary to judge the source

@weyoss
Copy link
Owner

weyoss commented Feb 19, 2022

Please provide a full example.

@xinlingqudongX
Copy link
Author

Please provide a full example.

通讯架构

  • Egg的通讯主要有master和app worker组成,当我使用redis-smq的时候,redis-smq的worker就收到了Egg中的master发来的消息

  • Egg's communication is mainly composed of master and app worker. When I use redis-smq, the worker of redis-smq receives the message from the master in Egg.
    消息体2

  • 所以我在监听消息的位置添加了类型判断

  • So I added a type judgment where I listened to the message

@xinlingqudongX
Copy link
Author

当使用子进程运行redis-smq时,process监听到的message就会收到主进程发送的消息,进行一个类型安全的判断是必要的,因为会受到其他线程发送的消息污染
When using a subprocess to run redis-smq, the message monitored by the process will receive the message sent by the main process. It is necessary to make a type-safe judgment, because it will be polluted by messages sent by other threads.

@weyoss
Copy link
Owner

weyoss commented Feb 19, 2022

Thank you.

Now I see what you are talking about.

I haven't used the Egg framework before. So RedisSMQ workers may catch messages from the main process sent by Egg framework.

This issue will be fixed ASAP.

@xinlingqudongX
Copy link
Author

OK,我不太了解Egg框架的源码,所以不能提供代码示例给你,抱歉
OK, I don't know much about the source code of the Egg framework, so I can't provide you with a code example, sorry

@weyoss
Copy link
Owner

weyoss commented Feb 19, 2022

当使用子进程运行redis-smq时,process监听到的message就会收到主进程发送的消息,进行一个类型安全的判断是必要的,因为会受到其他线程发送的消息污染
When using a subprocess to run redis-smq, the message monitored by the process will receive the message sent by the main process. It is necessary to make a type-safe judgment, because it will be polluted by messages sent by other threads.

As I said before redis-smq does not fork subprocesses anymore. The code fragment that may catch messages is not used. I will just remove it.

@weyoss
Copy link
Owner

weyoss commented Feb 19, 2022

@xinlingqudongX

This issue has been fixed in v6.2.1

@weyoss
Copy link
Owner

weyoss commented Feb 19, 2022

Closing as resolved.

@weyoss weyoss closed this as completed Feb 19, 2022
@xinlingqudongX
Copy link
Author

多谢
Thanks a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants