Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server如何等待一个非本框架的异步事件完成?-- Good Question #328

Closed
ghost opened this issue Apr 10, 2021 · 13 comments
Closed

Comments

@ghost
Copy link

ghost commented Apr 10, 2021

背景
有N(几百更多)个客户端,类似地分别请求文件资源 1.txt 2.txt ... N.txt,请求之前这些文件都不存在,workflow收到 url 后,会调用启动一个python脚本,脚本里生成文件的函数,调用后会在while循环里检测是否生成成功,成功则继续:

新增代码

#include <sys/stat.h>
/* 检查某个文件是否存在的函数 */
/* https://stackoverflow.com/questions/12774207/fastest-way-to-check-if-a-file-exist-using-standard-c-c11-c */
inline bool exists_test3 (const std::string& name) {
  struct stat buffer;   
  return (stat (name.c_str(), &buffer) == 0); 
}

/*** 在process里增加的逻辑,在这里加时因为fd=open()在这个函数里,要确保有文件了才能调用open() ***/

void process(WFHttpTask *server_task, const char *root)
{
	HttpRequest *req = server_task->get_req();
	HttpResponse *resp = server_task->get_resp();
	const char *uri = req->get_request_uri();
	const char *p = uri;

	printf("Request-URI: %s\n", uri);
	while (*p && *p != '?')
		p++;

	std::string abs_path(uri, p - uri);
	abs_path = root + abs_path;
	if (abs_path.back() == '/')
		abs_path += "index.html";
        /******************************************新增代码******************************************/
	/* 检查文件是否存在 */
	std::string uri_str(uri);
	if (uri_str.find(".txt") != uri_str.npos) {  /*如果匹配到txt后缀则调用生成*/

                // 调用一个python脚本生成 abs_path
		std::string cmd = "python ~/generate.py \""+uri_str+"\""+" para"+" &";
		// printf("cmd: %s\n", cmd.c_str());
		system(cmd.c_str());

               
               // 循环检测,超过6秒没生成,就返回响应
		int cnt = 0;
		while (! exists_test3(abs_path)) {
			// printf("文件不存在: %s\n", abs_path.c_str());
			// sleep(1); 
			usleep(100000); // 间隔0.1秒检测文件是否已经生成
			cnt++; 
			if (cnt == 60) break;
		}
	}
	 /******************************************新增代码******************************************/

	resp->add_header_pair("Server", "Sogou C++ Workflow Server");
	resp->add_header_pair("Access-Control-Allow-Origin", "*");

	int fd = open(abs_path.c_str(), O_RDONLY); // 运行到这里,说明文件已经生成了,可以继续了
	if (fd >= 0)

        ......
}

问题
在process里增加的逻辑,在这里加时因为fd=open()在这个函数里,要确保有文件了才能调用open()。

表面上看代码流程逻辑上好像没问题,因为每个请求都是在独立的线程里,阻塞完了结束各自的请求。在 process 里加个死循环,好像不影响其他其他 url 请求的进来,是不是意味着加这里没事?

不过实际运行时,文件有时5秒能收到,有时9秒能收到,好像比实际文件生成的时间要慢,不清楚是那边网络问题,还是请求数多了之后,这种写法的不合理导致http server逻辑变慢了

@ghost ghost mentioned this issue Apr 10, 2021
@ghost ghost changed the title 如何在合理位置添加文件异步IO任务 如何在合理位置添加耗时的 文件异步IO任务 Apr 10, 2021
@Barenboim
Copy link
Contributor

Barenboim commented Apr 10, 2021

我想了一下,可以让python写完文件也发一个http请求通知吧?那么,我们原来的process这么写:

void process(WFHttpTask *server_task, const char *root)
{
    // get abs_path
    ...
    // create a named counter with name 'abs_path'
    WFCounterTask *counter = WFTaskFactory::create_counter_task(abs_path, 1, [abs_path](WFCounterTask *counter) {
         SeriesWork *series = series_of(counter);
         WFHttpTask *server_task = (WFHttpTask *)series->get_context();
        /* OK, everthing is ready. create your pread task and push back to series ....*/
    });
    // Notify python program to create file with abs_path.
    ...
    // save server_task in series' context.
    series_of(server_task)->set_context(server_task);
    // add counter to series.
    series_of(server_task)->push_back(counter);
}
// 增加一个处理完成通知的服务。request uri是全路径。
void process_file_ready(WFHttpTask *server_task)
{
    WFTaskFactory::count_by_name(server_task->get_req()->get_request_uri());
}

int main(int argc, char *argv[])
{
    ...
    WFHttpServer file_server(std::bind(process, root));
    WFHttpServer msg_server(process_file_ready);
    file_server.start(8000);
    msg_server.start(8001);
    getchar();
    file_server.stop();
    msg_server.stop();
    return 0;
}

原理是创建一个命名counter堵住series,等收到文件写入完成的http通知,打开counter,原处理流程继续。唯一比较微妙的地方是程序的退出,必须先完全关闭file_server再关闭msg_server,否则file_server会因为收不到通知的原因,无法结束。

@ghost
Copy link
Author

ghost commented Apr 11, 2021

要仔细理解下,对series不熟

@Barenboim
Copy link
Contributor

Barenboim commented Apr 11, 2021

可以看一下counter相关文档,原理就是用一个目标值为1的counter堵住series,文件生成完毕时打开。
https://github.com/sogou/workflow/blob/master/docs/about-counter.md

@Barenboim Barenboim changed the title 如何在合理位置添加耗时的 文件异步IO任务 Server如何等待一个非本框架的异步事件完成?-- Good Question Apr 11, 2021
@ghost
Copy link
Author

ghost commented Apr 11, 2021

可以看一下counter相关文档,原理就是用一个目标值为1的counter堵住series,文件生成完毕时打开。
https://github.com/sogou/workflow/blob/master/docs/about-counter.md

看了前面几个例程,大概理解了workflow用到的概念,这里用计数器比较简单比较好理解,文档里其他复杂的用法没接触过这方面的复杂业务需求所以理解比较难,如果有实际需求可能就理解快了。
编译时下面这个截图里的 = 去掉好像可以编译,需要加 = 吗:
image

感觉这样写分工逻辑清楚了很多,原来逻辑都是交叉的,把很多逻辑判断都加到workflow了,现在可以都放到异步事件里了。

有个疑问 int fd = open(abs_path.c_str(), O_RDONLY); 读取资源文件也是一个异步任务,是不是也写成等读完了再发个信号效率更高?
文件生成那里其实也是在python文件里循环检测文件(这样一来也相当于占用了一个线程,是不是和堵塞sever_task占用线程的效率差不多了?),然后发送到msg_server:这个流程和直接在process里循环检测文件堵塞server_task,最终效率上是否有区别?

@Barenboim
Copy link
Contributor

Barenboim commented Apr 11, 2021

counter的callback里产生一个pread任务,继续push_back到series里啊。这个不就是http_file_server的逻辑吗?不过windows版暂不支持pread任务。
python那边我不知道是什么逻辑,如果不用python,你也可以启动一个workflow的pwrite任务来实现不占任何线程的文件写。
你先看一看http_file_server这个实例,感觉你好像没有看过。

@ghost
Copy link
Author

ghost commented Apr 11, 2021

counter的callback里产生一个pread任务,继续push_back到series里啊。这个不就是http_file_server的逻辑吗?不过windows版暂不支持pread任务。
python那边我不知道是什么逻辑,如果不用python,你也可以启动一个workflow的pwrite任务来实现不占任何线程的文件写。
你先看一看http_file_server这个实例,感觉你好像没有看过。

看过了,之前没注意到这个,基础不太熟悉。

异步事件是N路的视频转码进程,客户端播放器会每隔几秒请求新的 m3u8 播放列表。
发现播放器打开首帧视频画面的用时不是很稳定,开始还3秒多就打开了,同样一路,过了一会变成6秒多打开了,不清楚是不是hls.js播放器的问题,也可能时路数变多之后的问题,因为每个m3u8请求都会通过 system() 调用异步的脚本,可能调用太频繁导致系统占用。

std::string abs_path_para_random = abs_path + ":" + para_random; // 拼接了一个随机数,因为可能会重复

代码修改如下:

void process(WFHttpTask *server_task, const char *root)
{
	HttpRequest *req = server_task->get_req();
	HttpResponse *resp = server_task->get_resp();
	// HttpResponse *resp = server_task->get_resp();
	const char *uri = req->get_request_uri();
	std::string uri_str(uri);
	const char *p = uri;

	printf("Request-URI: %s\n", uri);
	while (*p && *p != '?')
		p++;

    // get abs_path
	std::string abs_path(uri, p - uri);
	abs_path = root + abs_path;
	// printf("abs_path: %s\n", abs_path.c_str());
	// if (abs_path.back() == '/') abs_path += "index.html";

	// printf("strRand(32): %s\n", strRand(32).c_str());
	if (uri_str.find(".m3u8") != uri_str.npos) {
		// 判断是否已经存在,已经存在则不需要堵塞series
		if ( ! exists_test3(abs_path) ) {
			std::string para_random = strRand(32);
			std::string abs_path_para_random = abs_path + ":" + para_random;
			// 创建一个命名counter堵住series,因为计数器不占用线程,用while循环堵的话会占用server_task
			WFCounterTask *counter = WFTaskFactory::create_counter_task(abs_path_para_random, 1, [abs_path_para_random](WFCounterTask *counter) {
				std::string abs_path = abs_path_para_random.substr(0, abs_path_para_random.find(':'));
                
				SeriesWork *series = series_of(counter); // counter本身也是一个task
				WFHttpTask *server_task = (WFHttpTask *)series->get_context(); // counter 在 server_task 的 series 中

				printf("OK, everything is ready. Create your pread task and push back to series ...\n");
				printf("abs_path: %s\n", abs_path.c_str());

				HttpResponse *resp = server_task->get_resp();
				resp->add_header_pair("Server", "Sogou C++ Workflow Server");
				resp->add_header_pair("Access-Control-Allow-Origin", "*"); // 服务器实现跨域
				int fd = open(abs_path.c_str(), O_RDONLY);
				if (fd >= 0) {
					size_t size = lseek(fd, 0, SEEK_END);
					void *buf = malloc(size); /* As an example, assert(buf != NULL); */
					WFFileIOTask *pread_task;

					pread_task = WFTaskFactory::create_pread_task(fd, buf, size, 0, pread_callback);
					/* To implement a more complicated server, please use series' context
					* instead of tasks' user_data to pass/store internal data. */
					pread_task->user_data = resp;	/* pass resp pointer to pread task. */
					server_task->user_data = buf;	/* to free() in callback() */
					server_task->set_callback([](WFHttpTask *t){ free(t->user_data); });
					series_of(server_task)->push_back(pread_task);
				} else {
					resp->set_status_code("404");
					resp->append_output_body("<html>404 Not Found.</html>");
				}
			});

			// 通知 python 开始生成资源文件
			std::string cmd = "python playing.py "+" "+para_random+" "+"&"; 
			printf("cmd: %s\n", cmd.c_str());
			system(cmd.c_str());

			// 保存 server_task 到他自己所在 SeriesWork *series 的上下文中,相当于把当前请求信息临时存档
			series_of(server_task)->set_context(server_task);
			// 把命名计数器任务也加到 server_task 所在的 SeriesWork *series
			series_of(server_task)->push_back(counter);
			return;
		}
		// 通知 python 开始生成资源文件
		std::string cmd = "python playing.py "+" "+"para"+" "+"&"; 
		// printf("cmd: %s\n", cmd.c_str());
		system(cmd.c_str());
	}

	// while循环检查文件是否生成完成
	/*
	if (uri_str.find(".m3u8") != uri_str.npos) {
		std::string cmd = "python playing.py &";
		// printf("cmd: %s\n", cmd.c_str());
		system(cmd.c_str());
		int cnt = 0;
		while (! exists_test3(abs_path)) {
			// printf("文件不存在: %s\n", abs_path.c_str());
			// sleep(1); 
			usleep(100000);
			cnt++; 
			if (cnt == 60) {
				// std::string cmd = "python playing.py &";
				// system(cmd.c_str());
				break;
			}
		}
	}
	*/

	resp->add_header_pair("Server", "Sogou C++ Workflow Server");
	resp->add_header_pair("Access-Control-Allow-Origin", "*"); // 服务器实现跨域
	int fd = open(abs_path.c_str(), O_RDONLY);
	if (fd >= 0) {
		size_t size = lseek(fd, 0, SEEK_END);
		void *buf = malloc(size); /* As an example, assert(buf != NULL); */
		WFFileIOTask *pread_task;

		pread_task = WFTaskFactory::create_pread_task(fd, buf, size, 0, pread_callback);
		/* To implement a more complicated server, please use series' context
		* instead of tasks' user_data to pass/store internal data. */
		pread_task->user_data = resp;	/* pass resp pointer to pread task. */
		server_task->user_data = buf;	/* to free() in callback() */
		server_task->set_callback([](WFHttpTask *t){ free(t->user_data); });
		series_of(server_task)->push_back(pread_task);
	} else {
		resp->set_status_code("404");
		resp->append_output_body("<html>404 Not Found.</html>");
	}
}

// 增加一个处理文件生成完成通知的服务,request uri是资源路径
void process_file_ready(WFHttpTask *server_task)
{
	const char *uri = server_task->get_req()->get_request_uri();
	printf("MSG-URI: %s\n", uri);
	/* 等收到文件写入完成的http通知,打开counter,原请求响应处理流程继续 */
	WFTaskFactory::count_by_name(uri);
}


static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo)
{
	wait_group.done();
}

int main(int argc, char *argv[])
{
	if (argc != 2 && argc != 3 && argc != 5)
	{
		fprintf(stderr, "%s <port> [root path] [cert file] [key file]\n",
				argv[0]);
		exit(1);
	}

	signal(SIGINT, sig_handler);

	unsigned short port = atoi(argv[1]);
	const char *root = (argc >= 3 ? argv[2] : "."); // 资源路径
	auto&& proc = std::bind(process, std::placeholders::_1, root);
	WFHttpServer file_server(proc); // WFHttpServer file_server(std::bind(process, root));
	int ret;
	if (argc == 5)
		ret = file_server.start(port, argv[3], argv[4]); /* https server */
	else
		ret = file_server.start(port);

	WFHttpServer msg_server(process_file_ready); // 可以让python写完文件也发一个http请求通知
	msg_server.start(8002); // http://127.0.0.1:8002uri

	if (ret == 0) {
		wait_group.wait();
		file_server.stop();
	} else {
		perror("start server");
		exit(1);
	}

	// getchar();
	// 必须先完全关闭file_server再关闭msg_server,否则file_server会因为收不到通知的原因,无法结束
    // file_server.stop();
    // msg_server.stop();

	return 0;
}

异步事件发送消息时通过python的requests.reques发的:

while not os.path.exists(m3u8_file):
   ...
  time.sleep(0.2)
response = requests.request("GET", 'http://0.0.0.0:8002'+live_cameraId_m3u8+':'+para_random, headers={}, data = {}, files = {})

@Barenboim
Copy link
Contributor

这代码应该可以跑通了吧?但我还是觉得最好吧文件生成的过程也放到C++层,这样才能实现最优化的服务器性能。

@ghost
Copy link
Author

ghost commented Apr 12, 2021

这代码应该可以跑通了吧?但我还是觉得最好吧文件生成的过程也放到C++层,这样才能实现最优化的服务器性能。

谢谢,代码跑通了,是的,发现最好是用cpp,主要是ffmpeg内部不会,不然集成起来会更好些,python是各种网上搜到的函数库往上堆,写逻辑倒挺方便。

workflow 可以完全取代nginx吗?感觉workfow逻辑很优雅直观,就是任务队列,启动很方便,
nginx就是有些业务插件,比如nginx-rtmp推流的

@Barenboim
Copy link
Contributor

覆盖ngnix的功能是不可能的。nginx主要还是做反向代理这些应用吧,我们主要面向后端开发。
只说网络这块ngnix应该可以覆盖我们的功能。但写后端业务的话我们比nginx简单太多,而且性能更好:)

@Barenboim
Copy link
Contributor

Barenboim commented Apr 13, 2021

你的代码里msg_server.stop()为什么没有调用呢?应该关闭所有server的:

file_server.stop();
msg_server.stop();

但在你这个应用里,不可以这么写:

file_server.shutdown();
msg_server.shutdown();
file_server.wait_finish();
msg_server.wait_finish();

一般来讲,关闭多个server用第二种写法会更快。但是这里只能用连续两个stop()。实际执行逻辑是:

/*
Identical to:
file_server.stop();
msg_server.stop(); 
*/
file_server.shutdown();
file_server.wait_finish();
msg_server.shutdown();
msg_server.wait_finish();

@Barenboim
Copy link
Contributor

Barenboim commented Apr 13, 2021

@eatcosmos 对了对了,我感觉你这个应用可以用我们的pyworkflow啊,能调用python的库,底层还是C++。python里需要调用计算可以启动一个gotask(多出一个计算线程,不占用process),用法和C++的完全一样,也有file_server的例子。https://github.com/sogou/pyworkflow

@ghost
Copy link
Author

ghost commented Apr 14, 2021

@eatcosmos 对了对了,我感觉你这个应用可以用我们的pyworkflow啊,能调用python的库,底层还是C++。python里需要调用计算可以启动一个gotask(多出一个计算线程,不占用process),用法和C++的完全一样,也有file_server的例子。https://github.com/sogou/pyworkflow

好的,我后面有机会试试,能不能把workflow打包成python 包pip直接安装的,想以后用在机器学习部署里,有的平台只能运行pip?

@Barenboim
Copy link
Contributor

@eatcosmos 对了对了,我感觉你这个应用可以用我们的pyworkflow啊,能调用python的库,底层还是C++。python里需要调用计算可以启动一个gotask(多出一个计算线程,不占用process),用法和C++的完全一样,也有file_server的例子。https://github.com/sogou/pyworkflow

好的,我后面有机会试试,能不能把workflow打包成python 包pip直接安装的,想以后用在机器学习部署里,有的平台只能运行pip?

可以pip安装。
$ pip3 install pywf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant