-
Notifications
You must be signed in to change notification settings - Fork 0
/
StreamTaskQueue.cpp
74 lines (62 loc) · 1.63 KB
/
StreamTaskQueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include "stdafx.h"
#include "StreamHandler.h"
#include "StreamTaskQueue.h"
#include "StreamingManager.h"
namespace BlinkAnalysis
{
StreamTaskQueue::StreamTaskQueue() {
_syncObj = gcnew Object();
_tasks = gcnew Generic::Queue<QTask^>();
_runningTaskCount = 0;
_process = false;
}
void StreamTaskQueue::ProcessTaskQueue(StreamTaskQueue^ queue)
{
if (queue->_runningTaskCount != 0) return;
if (queue->_tasks->Count > 0 && queue->_runningTaskCount == 0)
{
Monitor::Enter(queue->_syncObj);
QueueUserWorkItem(queue->_tasks->Dequeue());
Monitor::Exit(queue->_syncObj);
}
}
void StreamTaskQueue::QueueUserWorkItem(QTask^ task)
{
if (!task->queue->_process) return;
task->queue->_runningTaskCount++;
ThreadPool::QueueUserWorkItem(gcnew WaitCallback(StreamTaskQueue::completionTaskAsync), task);
}
void StreamTaskQueue::OnTaskCompleted(StreamTaskQueue^ queue)
{
Monitor::Enter(queue->_syncObj);
queue->_runningTaskCount--;
if (queue->_runningTaskCount == 0)
{
ProcessTaskQueue(queue);
}
Monitor::Exit(queue->_syncObj);
}
void StreamTaskQueue::completionTaskAsync(Object^ task)
{
if (!StreamingManager::getInstance()->isStreaming()) return;
StreamHandler::addFrameAsync(((QTask^)task)->frame);
OnTaskCompleted(((QTask^)task)->queue);
}
void StreamTaskQueue::Queue(Object^ frame)
{
Monitor::Enter(_syncObj);
QTask^ task = gcnew QTask();
task->queue = this;
task->frame = frame;
_tasks->Enqueue(task);
Monitor::Exit(_syncObj);
ProcessTaskQueue(this);
}
int StreamTaskQueue::Count()
{
Monitor::Enter(_syncObj);
int ret = _tasks->Count;
Monitor::Exit(_syncObj);
return ret;
}
}