Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

我有一个需求,希望能动态创建多个task,又希望这些task能被顺序的执行来避免多线程竞争,该如何做。 #301

Closed
lordoffox opened this issue Mar 23, 2021 · 28 comments

Comments

@lordoffox
Copy link

如题。

@holmes1412
Copy link
Contributor

holmes1412 commented Mar 23, 2021

感谢小伙伴的使用~这是一个比较经典的场景,一种解决办法是创建一个series来保证多个task的顺序执行:

SeriesWork *series = Workflow::create_series_work(first_task, series_callback);
series->start();

只要这个series指针可以被共享,就可以被多方往里放任务。但需要注意的是,series如果执行完、没有任务了就会结束,所以可以使用WFCounterTask作为内存开关,series内没有任务的时候放一个

WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);

在task的回调里判断一下,当前series里是否还有后续任务,如果没有,就放入一个counter。counter可以用带名字的counter来全局技术,避免自己保存counter task指针的麻烦~

void mytask_callback(MyTask *task)
{
    if (/* series里没有其他任务了*/ )
    {
        WFCounterTask *counter = WFTaskFactory::create_counter_task("COUNTER_A", 1, nullptr);
        series->push_back(counter);
    }
    ...
}

然后每个想往series里放的task,放入的时候都配合打开一下开关;

series->push_back(my_task);
WFTaskFactory::count_by_name("COUNTER_A");

就可以做到task本身被顺序执行,又能长期使用同一个series的做法了~麻烦看看是否符合当前的需求,如果还有其他用法,欢迎多多交流~

@Barenboim
Copy link
Contributor

Barenboim commented Mar 23, 2021

这个需求挺常见的,其实你需要的就是一个不会自动结束的series,你可以向这个series里不断的增加任务,这样子这些任务就可以顺序的被执行。方法是每次push_back任务时,除了push_back当前任务,还需要再push_back一个目标值为1的counter任务。接下来,上一个counter打开,让当前任务可以被拉起。counter相当于一个塞子,用于堵住series,让series不会自动结束。
其实我们series的push和pop操作都是加锁的,也就是为了用户可以实现这个功能。
@holmes1412 你的示例代码在callback里push_back(counter)应该是不对的。我一会写个block series的示例。

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 在callback里面push计数器有什么不对的地方吗

@Barenboim
Copy link
Contributor

Barenboim commented Mar 23, 2021

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

@holmes1412
Copy link
Contributor

@Barenboim @MaybeShewill-CV 我是在执行的task的callback里检查series的状态并push_back(counter)哈

@Barenboim
Copy link
Contributor

@Barenboim @MaybeShewill-CV 我是在执行的task的callback里检查series的状态并push_back(counter)哈

这个在并发访问的时候就有问题了。

@holmes1412
Copy link
Contributor

holmes1412 commented Mar 23, 2021

@Barenboim @MaybeShewill-CV 我是在执行的task的callback里检查series的状态并push_back(counter)哈

这个在并发访问的时候就有问题了。

嗯嗯,了解,要保证原子性的话,交给series做才行~

@lordoffox
Copy link
Author

非常感谢回答。

@wangyongxiao
Copy link

如果在一段时间内没有新的task,我希望回收这个serise的相关资源,比如没有你说的那个“塞子”,让这个serise结束掉,又该如何处理呢?

@Barenboim
Copy link
Contributor

想让series结束,就count一下这个塞子就可以了。
counter的定义就是当count的次数达到目标值,就callback。因此可以当成塞子使用。
你说没有“塞子”是什么意思?

@wangyongxiao
Copy link

主要是想实现这样一个功能:
可以动态创建task,如果一段时间内没有新的任务到来就结束掉这个serise,我在想应该是有一个定时器来判断这个时间,时间到了就结束掉serise。
我说的“没有塞子”,是想用定时器来替代counter来实现这个功能,刚接触workflow不久,还没有想到如何来做这个功能

或者说我这个想法有没有实现的必要

@Barenboim
Copy link
Contributor

Barenboim commented Aug 27, 2021

我想了想,这个挺好做的。首先,你个每个塞子都起一个独立的名字,并且每次创建塞子,也同时启动一个定时打开塞子的定时器。上面push_back操作大概写成这样:

class BlockSeries
{
public:
	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
                int counter_id = ++this->counter_id;
		counter = WFTaskFactory::create_counter_task(std::to_string(counter_id), 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
                // 启动一个定时器,10秒之后打开塞子。
                WFTimerTask *timer = create_timer_task(10, 0, [counter_id](WFTimerTask *) {
                        WFTaskFactory::count_by_name(std::to_string(counter_id));
                });
                timer->start(); 
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(std::to_string(id - 1));
	}
private:
        int counter_id = 0;
}

大概的逻辑就是用一个定时器,到时间通过name来打开塞子。所以每个塞子的name在这里是唯一的。如果塞子已经被下一个任务打开,定时器的count_by_name操作并不会产生任何作用。
Timer所占资源非常小,可在随便创建。不用担心之前的timer没有被取消的问题。

@wangyongxiao
Copy link

说得很明朗了,谢谢指教,具体实施的时候我觉得还应该有一些工作要做,比如检查BlockSeries里的series是否已经被释放了,因为没有task后,最后一个counter塞子会被定时器拔掉,导致serise被释放,这时BlockSeries里的series就是一个野指针了。
不过思路给出来了,我按这个思路弄完备就完事儿了

@Barenboim
Copy link
Contributor

现在这个功能也可以用WFResourcePool来实现。
https://github.com/sogou/workflow/blob/master/docs/about-conditional.md

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,

第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1
counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出
this->series->push_back(task);
this->series->push_back(counter);
第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行
WFTaskFactory::count_by_name(this->counter_name);

可以这么理解吗,大佬

@Barenboim
Copy link
Contributor

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,

第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);

可以这么理解吗,大佬

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

@Barenboim
Copy link
Contributor

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

没有看不起,只是希望你多做一些实验,有可能问题没有你想的那么复杂。

从你刚才贴的一些代码,我觉得你还是没有看我们的文档和示例。

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

恩恩,我先把你这个blackseries先用上吧,我感觉很适合我的这个的

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

没有看不起,只是希望你多做一些实验,有可能问题没有你想的那么复杂。

从你刚才贴的一些代码,我觉得你还是没有看我们的文档和示例。

验证了下,确实不是我要的那个场景,哈哈,但是就是说,假如我有10个任务,我想感知到第10任务执行完了后,再重新执行一个任务,这种要如何做适合,都是异步的有点不知道咋搞,又不想用成员容器来保存数据记录状态这种,就是通过任务来感知,因为我不到10个任务执行完,我直接开始第11个任务的话,有可能拿到我还处理完的数据又去处理了。

@Barenboim
Copy link
Contributor

你自己试一下吧。我倒不觉得你真的需要这么复杂的东西。

大概写了一下代码,没有测,方法没问题:

#include <string>
#include <mutex>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"

class BlockSeries
{
public:
	BlockSeries(SubTask *first, const std::string& name) :
		counter_name(name)
	{
		WFCounterTask *counter;
		this->series = Workflow::create_series_work(first, nullptr);
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(counter);
	}

	~BlockSeries()
	{
		// 假设用户已经调用过start()
		WFTaskFactory::count_by_name(this->counter_name);
	}

        void start()
        {
                this->series->start();
        }

	void push_back(SubTask *task)
	{
		WFCounterTask *counter;
		this->mutex.lock();
		counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr);
		this->series->push_back(task);
		this->series->push_back(counter);
		this->mutex.unlock();
                // 这个counter_by_name打开的是上一个push_back添加的counter。
		WFTaskFactory::count_by_name(this->counter_name);
	}

private:
	SeriesWork *series;
	std::string counter_name;
	std::mutex mutex;
};

其中name是一个全局唯一的名字,用来给counter起名的。series没有callback,如果需要的话可以自己改一下代码。

太好了,正好现在需要这个,大概理解了下,
第一步:这个start第一次调用就是为了开始运行没有任务不自动退出,固定的名字和固定的值1 counter = WFTaskFactory::create_counter_task(this->counter_name, 1, nullptr); 第二步: 这一段是添加新任务,且也是为了保证没有任务的时候不退出 this->series->push_back(task); this->series->push_back(counter); 第三步:这个其实是为了把第一步的阻塞塞子去掉,好让第二步的task任务得到执行 WFTaskFactory::count_by_name(this->counter_name);
可以这么理解吗,大佬

我感觉你在看不起我(哈哈,开玩笑哈),现在就是需要这个,现在业务有个状态需要维护,然后要根据各种处理结果来维护,这个正合适的,本来用同步编程,我取数据放到内存,调用hredis 操作的话这个就简单了,这不向大佬们学习使用异步嘛。

没有看不起,只是希望你多做一些实验,有可能问题没有你想的那么复杂。
从你刚才贴的一些代码,我觉得你还是没有看我们的文档和示例。

验证了下,确实不是我要的那个场景,哈哈,但是就是说,假如我有10个任务,我想感知到第10任务执行完了后,再重新执行一个任务,这种要如何做适合,都是异步的有点不知道咋搞,又不想用成员容器来保存数据记录状态这种,就是通过任务来感知,因为我不到10个任务执行完,我直接开始第11个任务的话,有可能拿到我还处理完的数据又去处理了。

那不就是在第10个任务的callback里,push_back第11个任务吗?

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

10个相同的任务,比如redis del指令,我不知道哪个是第10啊,你的意思在创建第10个的时候set_callback?

@Barenboim
Copy link
Contributor

Barenboim commented Jun 6, 2024 via email

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

放到一个series应该是串行的,但我也不知道哪个是第十个啊,任务不是异步执行的嘛,难道我加个计数?

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

我知道了,有个那个user_data可以用对吧

@Barenboim
Copy link
Contributor

Barenboim commented Jun 6, 2024 via email

@zxyAcmen
Copy link

zxyAcmen commented Jun 6, 2024

我真不想回你的问题了。。。 我教不会你。你自己看文档吧。

---原始邮件--- 发件人: @.> 发送时间: 2024年6月6日(周四) 晚上7:46 收件人: @.>; 抄送: @.@.>; 主题: Re: [sogou/workflow] 我有一个需求,希望能动态创建多个task,又希望这些task能被顺序的执行来避免多线程竞争,该如何做。 (#301) 我知道了,有个那个user_data可以用对吧 — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>

别阿大哥,能者多劳嘛,那个user_data可以用了,因为我一次10个数据处理,会建10个任务,我在建最后一个任务绑定一下user_data,待任务执行完回到call_back的时候判断一下就行了

@Barenboim
Copy link
Contributor

我真不想回你的问题了。。。 我教不会你。你自己看文档吧。

---原始邮件--- 发件人: @.> 发送时间: 2024年6月6日(周四) 晚上7:46 收件人: _@**._>; 抄送: _@.@._>; 主题: Re: [sogou/workflow] 我有一个需求,希望能动态创建多个task,又希望这些task能被顺序的执行来避免多线程竞争,该如何做。 (#301) 我知道了,有个那个user_data可以用对吧 — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: _@_.*>

别阿大哥,能者多劳嘛,那个user_data可以用了,因为我一次10个数据处理,会建10个任务,我在建最后一个任务绑定一下user_data,待任务执行完回到call_back的时候判断一下就行了

不要再提问了。或者你去找个同步框架用吧。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants