Skip to content
This repository has been archived by the owner on Jan 25, 2023. It is now read-only.

Commit

Permalink
WorkQueue: use signed type
Browse files Browse the repository at this point in the history
Related: #2940
  • Loading branch information
1vanK committed Jun 7, 2022
1 parent 7fde25a commit 385b262
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 66 deletions.
3 changes: 3 additions & 0 deletions Source/Urho3D/AngelScript/Generated_GlobalVariables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ void ASRegisterGeneratedGlobalVariables(asIScriptEngine* engine)
// const StringHash VSP_ZONE | File: ../GraphicsAPI/GraphicsDefs.h
engine->RegisterGlobalProperty("const StringHash VSP_ZONE", (void*)&VSP_ZONE);

// constexpr i32 WI_MAX_PRIORITY | File: ../Core/WorkQueue.h
engine->RegisterGlobalProperty("const int WI_MAX_PRIORITY", (void*)&WI_MAX_PRIORITY);

#ifdef URHO3D_NETWORK
// static const unsigned CONTROLS_CONTENT_ID | File: ../Network/Protocol.h
engine->RegisterGlobalProperty("const uint CONTROLS_CONTENT_ID", (void*)&CONTROLS_CONTENT_ID);
Expand Down
24 changes: 12 additions & 12 deletions Source/Urho3D/AngelScript/Generated_Members.h
Original file line number Diff line number Diff line change
Expand Up @@ -8763,7 +8763,7 @@ template <class T> void RegisterMembers_WorkItem(asIScriptEngine* engine, const
{
RegisterMembers_RefCounted<T>(engine, className);

// void(* WorkItem::workFunction_) (const WorkItem* , unsigned)
// void(* WorkItem::workFunction_) (const WorkItem* , i32)
// Not registered because pointer
// void* WorkItem::start_
// Not registered because pointer
Expand All @@ -8774,8 +8774,8 @@ template <class T> void RegisterMembers_WorkItem(asIScriptEngine* engine, const
// std::atomic<bool> WorkItem::completed_
// Error: type "std::atomic<bool>" can not automatically bind

// unsigned WorkItem::priority_
engine->RegisterObjectProperty(className, "uint priority", offsetof(T, priority_));
// i32 WorkItem::priority_
engine->RegisterObjectProperty(className, "int priority", offsetof(T, priority_));

// bool WorkItem::sendEvent_
engine->RegisterObjectProperty(className, "bool sendEvent", offsetof(T, sendEvent_));
Expand Down Expand Up @@ -12739,26 +12739,26 @@ template <class T> void RegisterMembers_WorkQueue(asIScriptEngine* engine, const
// Error: type "SharedPtr<WorkItem>" can not automatically bind
// bool WorkQueue::RemoveWorkItem(SharedPtr<WorkItem> item)
// Error: type "SharedPtr<WorkItem>" can not automatically bind
// unsigned WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem>>& items)
// i32 WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem>>& items)
// Error: type "const Vector<SharedPtr<WorkItem>>&" can not automatically bind

// void WorkQueue::Complete(unsigned priority)
engine->RegisterObjectMethod(className, "void Complete(uint)", AS_METHODPR(T, Complete, (unsigned), void), AS_CALL_THISCALL);
// void WorkQueue::Complete(i32 priority)
engine->RegisterObjectMethod(className, "void Complete(int)", AS_METHODPR(T, Complete, (i32), void), AS_CALL_THISCALL);

// void WorkQueue::CreateThreads(unsigned numThreads)
engine->RegisterObjectMethod(className, "void CreateThreads(uint)", AS_METHODPR(T, CreateThreads, (unsigned), void), AS_CALL_THISCALL);
// void WorkQueue::CreateThreads(i32 numThreads)
engine->RegisterObjectMethod(className, "void CreateThreads(int)", AS_METHODPR(T, CreateThreads, (i32), void), AS_CALL_THISCALL);

// int WorkQueue::GetNonThreadedWorkMs() const
engine->RegisterObjectMethod(className, "int GetNonThreadedWorkMs() const", AS_METHODPR(T, GetNonThreadedWorkMs, () const, int), AS_CALL_THISCALL);

// unsigned WorkQueue::GetNumThreads() const
engine->RegisterObjectMethod(className, "uint GetNumThreads() const", AS_METHODPR(T, GetNumThreads, () const, unsigned), AS_CALL_THISCALL);
// i32 WorkQueue::GetNumThreads() const
engine->RegisterObjectMethod(className, "int GetNumThreads() const", AS_METHODPR(T, GetNumThreads, () const, i32), AS_CALL_THISCALL);

// int WorkQueue::GetTolerance() const
engine->RegisterObjectMethod(className, "int GetTolerance() const", AS_METHODPR(T, GetTolerance, () const, int), AS_CALL_THISCALL);

// bool WorkQueue::IsCompleted(unsigned priority) const
engine->RegisterObjectMethod(className, "bool IsCompleted(uint) const", AS_METHODPR(T, IsCompleted, (unsigned) const, bool), AS_CALL_THISCALL);
// bool WorkQueue::IsCompleted(i32 priority) const
engine->RegisterObjectMethod(className, "bool IsCompleted(int) const", AS_METHODPR(T, IsCompleted, (i32) const, bool), AS_CALL_THISCALL);

// bool WorkQueue::IsCompleting() const
engine->RegisterObjectMethod(className, "bool IsCompleting() const", AS_METHODPR(T, IsCompleting, () const, bool), AS_CALL_THISCALL);
Expand Down
39 changes: 24 additions & 15 deletions Source/Urho3D/Core/WorkQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ class WorkerThread : public Thread, public RefCounted
{
public:
/// Construct.
WorkerThread(WorkQueue* owner, unsigned index) :
WorkerThread(WorkQueue* owner, i32 index) :
owner_(owner),
index_(index)
{
assert(index >= 0);
}

/// Process work items until stopped.
Expand All @@ -37,13 +38,13 @@ class WorkerThread : public Thread, public RefCounted
}

/// Return thread index.
unsigned GetIndex() const { return index_; }
i32 GetIndex() const { return index_; }

private:
/// Work queue.
WorkQueue* owner_;
/// Thread index.
unsigned index_;
i32 index_;
};

WorkQueue::WorkQueue(Context* context) :
Expand All @@ -69,9 +70,11 @@ WorkQueue::~WorkQueue()
thread->Stop();
}

void WorkQueue::CreateThreads(unsigned numThreads)
void WorkQueue::CreateThreads(i32 numThreads)
{
#ifdef URHO3D_THREADING
assert(numThreads >= 0);

// Other subsystems may initialize themselves according to the number of threads.
// Therefore allow creating the threads only once, after which the amount is fixed
if (!threads_.Empty())
Expand All @@ -80,7 +83,7 @@ void WorkQueue::CreateThreads(unsigned numThreads)
// Start threads in paused mode
Pause();

for (unsigned i = 0; i < numThreads; ++i)
for (i32 i = 0; i < numThreads; ++i)
{
SharedPtr<WorkerThread> thread(new WorkerThread(this, i + 1));
thread->Run();
Expand Down Expand Up @@ -180,10 +183,10 @@ bool WorkQueue::RemoveWorkItem(SharedPtr<WorkItem> item)
return false;
}

unsigned WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem>>& items)
i32 WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem>>& items)
{
MutexLock lock(queueMutex_);
unsigned removed = 0;
i32 removed = 0;

for (Vector<SharedPtr<WorkItem>>::ConstIterator i = items.Begin(); i != items.End(); ++i)
{
Expand Down Expand Up @@ -227,8 +230,9 @@ void WorkQueue::Resume()
}


void WorkQueue::Complete(unsigned priority)
void WorkQueue::Complete(i32 priority)
{
assert(priority >= 0);
completing_ = true;

if (threads_.Size())
Expand Down Expand Up @@ -279,8 +283,9 @@ void WorkQueue::Complete(unsigned priority)
completing_ = false;
}

bool WorkQueue::IsCompleted(unsigned priority) const
bool WorkQueue::IsCompleted(i32 priority) const
{
assert(priority >= 0);
for (List<SharedPtr<WorkItem>>::ConstIterator i = workItems_.Begin(); i != workItems_.End(); ++i)
{
if ((*i)->priority_ >= priority && !(*i)->completed_)
Expand All @@ -290,8 +295,10 @@ bool WorkQueue::IsCompleted(unsigned priority) const
return true;
}

void WorkQueue::ProcessItems(unsigned threadIndex)
void WorkQueue::ProcessItems(i32 threadIndex)
{
assert(threadIndex >= 0);

bool wasActive = false;

for (;;)
Expand Down Expand Up @@ -325,8 +332,10 @@ void WorkQueue::ProcessItems(unsigned threadIndex)
}
}

void WorkQueue::PurgeCompleted(unsigned priority)
void WorkQueue::PurgeCompleted(i32 priority)
{
assert(priority >= 0);

// Purge completed work items and send completion events. Do not signal items lower than priority threshold,
// as those may be user submitted and lead to eg. scene manipulation that could happen in the middle of the
// render update, which is not allowed
Expand All @@ -353,11 +362,11 @@ void WorkQueue::PurgeCompleted(unsigned priority)

void WorkQueue::PurgePool()
{
unsigned currentSize = poolItems_.Size();
int difference = lastSize_ - currentSize;
i32 currentSize = poolItems_.Size();
i32 difference = lastSize_ - currentSize;

// Difference tolerance, should be fairly significant to reduce the pool size.
for (unsigned i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < (unsigned)difference; i++)
for (i32 i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < difference; i++)
poolItems_.PopFront();

lastSize_ = currentSize;
Expand All @@ -376,7 +385,7 @@ void WorkQueue::ReturnToPool(SharedPtr<WorkItem>& item)
item->end_ = nullptr;
item->aux_ = nullptr;
item->workFunction_ = nullptr;
item->priority_ = M_MAX_UNSIGNED;
item->priority_ = WI_MAX_PRIORITY;
item->sendEvent_ = false;
item->completed_ = false;

Expand Down
22 changes: 12 additions & 10 deletions Source/Urho3D/Core/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ URHO3D_EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
URHO3D_PARAM(P_ITEM, Item); // WorkItem ptr
}

inline constexpr i32 WI_MAX_PRIORITY = M_MAX_INT;

class WorkerThread;

/// Work queue item.
Expand All @@ -28,15 +30,15 @@ struct WorkItem : public RefCounted

public:
/// Work function. Called with the work item and thread index (0 = main thread) as parameters.
void (* workFunction_)(const WorkItem*, unsigned){};
void (* workFunction_)(const WorkItem*, i32){};
/// Data start pointer.
void* start_{};
/// Data end pointer.
void* end_{};
/// Auxiliary data pointer.
void* aux_{};
/// Priority. Higher value = will be completed first.
unsigned priority_{};
i32 priority_{};
/// Whether to send event on completion.
bool sendEvent_{};
/// Completed flag.
Expand All @@ -60,21 +62,21 @@ class URHO3D_API WorkQueue : public Object
~WorkQueue() override;

/// Create worker threads. Can only be called once.
void CreateThreads(unsigned numThreads);
void CreateThreads(i32 numThreads);
/// Get pointer to an usable WorkItem from the item pool. Allocate one if no more free items.
SharedPtr<WorkItem> GetFreeItem();
/// Add a work item and resume worker threads.
void AddWorkItem(const SharedPtr<WorkItem>& item);
/// Remove a work item before it has started executing. Return true if successfully removed.
bool RemoveWorkItem(SharedPtr<WorkItem> item);
/// Remove a number of work items before they have started executing. Return the number of items successfully removed.
unsigned RemoveWorkItems(const Vector<SharedPtr<WorkItem>>& items);
i32 RemoveWorkItems(const Vector<SharedPtr<WorkItem>>& items);
/// Pause worker threads.
void Pause();
/// Resume worker threads.
void Resume();
/// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
void Complete(unsigned priority);
void Complete(i32 priority);

/// Set the pool telerance before it starts deleting pool items.
void SetTolerance(int tolerance) { tolerance_ = tolerance; }
Expand All @@ -83,10 +85,10 @@ class URHO3D_API WorkQueue : public Object
void SetNonThreadedWorkMs(int ms) { maxNonThreadedWorkMs_ = Max(ms, 1); }

/// Return number of worker threads.
unsigned GetNumThreads() const { return threads_.Size(); }
i32 GetNumThreads() const { return threads_.Size(); }

/// Return whether all work with at least the specified priority is finished.
bool IsCompleted(unsigned priority) const;
bool IsCompleted(i32 priority) const;
/// Return whether the queue is currently completing work in the main thread.
bool IsCompleting() const { return completing_; }

Expand All @@ -98,9 +100,9 @@ class URHO3D_API WorkQueue : public Object

private:
/// Process work items until shut down. Called by the worker threads.
void ProcessItems(unsigned threadIndex);
void ProcessItems(i32 threadIndex);
/// Purge completed work items which have at least the specified priority, and send completion events as necessary.
void PurgeCompleted(unsigned priority);
void PurgeCompleted(i32 priority);
/// Purge the pool to reduce allocation where its unneeded.
void PurgePool();
/// Return a work item to the pool.
Expand Down Expand Up @@ -129,7 +131,7 @@ class URHO3D_API WorkQueue : public Object
/// Tolerance for the shared pool before it begins to deallocate.
int tolerance_;
/// Last size of the shared pool.
unsigned lastSize_;
i32 lastSize_;
/// Maximum milliseconds per frame to spend on low-priority work, when there are no worker threads.
int maxNonThreadedWorkMs_;
};
Expand Down
2 changes: 1 addition & 1 deletion Source/Urho3D/Graphics/Drawable.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class URHO3D_API Drawable : public Component

friend class Octant;
friend class Octree;
friend void UpdateDrawablesWork(const WorkItem* item, unsigned threadIndex);
friend void UpdateDrawablesWork(const WorkItem* item, i32 threadIndex);

public:
/// Construct.
Expand Down
6 changes: 3 additions & 3 deletions Source/Urho3D/Graphics/OcclusionBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum ClipMask : unsigned
};
URHO3D_FLAGSET(ClipMask, ClipMaskFlags);

void DrawOcclusionBatchWork(const WorkItem* item, unsigned threadIndex)
void DrawOcclusionBatchWork(const WorkItem* item, i32 threadIndex)
{
auto* buffer = reinterpret_cast<OcclusionBuffer*>(item->aux_);
OcclusionBatch& batch = *reinterpret_cast<OcclusionBatch*>(item->start_);
Expand Down Expand Up @@ -196,14 +196,14 @@ void OcclusionBuffer::DrawTriangles()
for (Vector<OcclusionBatch>::Iterator i = batches_.Begin(); i != batches_.End(); ++i)
{
SharedPtr<WorkItem> item = queue->GetFreeItem();
item->priority_ = M_MAX_UNSIGNED;
item->priority_ = WI_MAX_PRIORITY;
item->workFunction_ = DrawOcclusionBatchWork;
item->aux_ = this;
item->start_ = &(*i);
queue->AddWorkItem(item);
}

queue->Complete(M_MAX_UNSIGNED);
queue->Complete(WI_MAX_PRIORITY);

MergeBuffers();
depthHierarchyDirty_ = true;
Expand Down
6 changes: 3 additions & 3 deletions Source/Urho3D/Graphics/Octree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static const int DEFAULT_OCTREE_LEVELS = 8;

extern const char* SUBSYSTEM_CATEGORY;

void UpdateDrawablesWork(const WorkItem* item, unsigned threadIndex)
void UpdateDrawablesWork(const WorkItem* item, i32 threadIndex)
{
const FrameInfo& frame = *(reinterpret_cast<FrameInfo*>(item->aux_));
auto** start = reinterpret_cast<Drawable**>(item->start_);
Expand Down Expand Up @@ -369,7 +369,7 @@ void Octree::Update(const FrameInfo& frame)
for (int i = 0; i < numWorkItems; ++i)
{
SharedPtr<WorkItem> item = queue->GetFreeItem();
item->priority_ = M_MAX_UNSIGNED;
item->priority_ = WI_MAX_PRIORITY;
item->workFunction_ = UpdateDrawablesWork;
item->aux_ = const_cast<FrameInfo*>(&frame);

Expand All @@ -384,7 +384,7 @@ void Octree::Update(const FrameInfo& frame)
start = end;
}

queue->Complete(M_MAX_UNSIGNED);
queue->Complete(WI_MAX_PRIORITY);
scene->EndThreadedUpdate();
}

Expand Down

0 comments on commit 385b262

Please sign in to comment.