Skip to content

Commit

Permalink
v0.4.4 beta 1
Browse files Browse the repository at this point in the history
 - Using thread pools to do async requests
  • Loading branch information
kbuffington committed Feb 22, 2020
1 parent cf784f3 commit eb9238e
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 14 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,10 @@ healthchecksdb
# Backup folder for Package Reference Convert tool in Visual Studio 2017
MigrationBackup/

# VS Code
*.code-workspace

# End of https://www.gitignore.io/api/visualstudio

# Storing PDB just in case
!src/Release/foo_musicbrainz/foo_musicbrainz.pdb
Binary file not shown.
4 changes: 2 additions & 2 deletions src/foo_musicbrainz.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace mb
{
static constexpr const char* component_title = "MusicBrainz Tagger";
static constexpr const char* component_dll_name = "foo_musicbrainz.dll";
static constexpr const char* component_version = "0.4.3.2";
static constexpr const char* component_info = "Copyright (C) 2009-2012 Dremora\nCopyright (C) 2015-2020 marc2003\n\nBuild: " __TIME__ ", " __DATE__;
static constexpr const char* component_version = "0.4.4b1";
static constexpr const char* component_info = "Copyright (C) 2009-2012 Dremora\nCopyright (C) 2015-2020 marc2003\nCopyright (C) 2020 MordredKLB\n\nBuild: " __TIME__ ", " __DATE__;
}
6 changes: 6 additions & 0 deletions src/foo_musicbrainz.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
<ClInclude Include="..\foobar2000-SDK\includes\json.hpp">
<Filter>Header Files\includes</Filter>
</ClInclude>
<ClInclude Include="thread_pool.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="foo_musicbrainz.rc">
Expand Down Expand Up @@ -91,5 +94,8 @@
<ClCompile Include="toc.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="thread_pool.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
91 changes: 79 additions & 12 deletions src/request_thread.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,67 @@
#include "stdafx.h"
#include "dialog_tagger.h"
#include "request_thread.h"
#include "thread_pool.h"
#include <atomic>
#include <mutex>

namespace mb
{
std::mutex adding_release;
std::atomic_uint32_t thread_counter = 0;

class get_release_info : public simple_thread_task {
public:
get_release_info(pfc::string mb_releaseid, size_t num_tracks, std::vector<Release>* release_list,
bool* failed, abort_callback& abort)
: m_id(mb_releaseid)
, m_num_tracks(num_tracks)
, m_release_list(release_list)
, m_failed(failed)
, m_abort(abort) {}

void run() override
{
try {
thread_counter++;

query q("release", m_id);
q.add_param("inc", "artists+labels+recordings+release-groups+artist-credits+isrcs");

json j2 = q.lookup(m_abort);
if (!j2.is_object())
{
*m_failed = true;
thread_counter--;
}
else {
Release r = parser(j2, m_num_tracks);
if (r.tracks.size())
{
std::lock_guard<std::mutex> guard(adding_release); // lock_guard is destroyed when exiting block
m_release_list->emplace_back(r);
}
else {
thread_counter--;
}
}

}
catch (std::exception const& e) {
m_failMsg = e.what();
FB2K_console_formatter() << component_title << ": " << m_failMsg;
}
}
private:
pfc::string8 m_failMsg;
const pfc::string m_id;
abort_callback& m_abort;
size_t m_num_tracks;
bool* m_failed;
std::vector<Release> * m_release_list;
};


request_thread::request_thread(types type, std::unique_ptr<query> q, metadb_handle_list_cref handles)
: m_type(type)
, m_query(std::move(q))
Expand Down Expand Up @@ -62,6 +120,8 @@ namespace mb
filter_releases(releases, handle_count, ids);
const size_t count = ids.get_count();

thread_counter = 0;

for (size_t i = 0; i < count; ++i)
{
status.set_progress(i + 1, count);
Expand All @@ -70,21 +130,28 @@ namespace mb
Sleep(1000);
}

query q("release", ids[i]);
q.add_param("inc", "artists+labels+recordings+release-groups+artist-credits+isrcs");

json j2 = q.lookup(abort);
if (!j2.is_object())
{
m_failed = true;
/**
* this was pre-existing logic which would abort when we didn't receive anything back
* from musicbrainz. This would prevent hammering if mb is down, but also (silently!)
* fails even if we had some number of requests that already completed successfully.
* Keeping this for now, but it's subject to change later.
**/
if (m_failed) {
FB2K_console_formatter() << component_title << ": Musicbrainz query failed.";
return;
}

Release r = parser(j2, handle_count);
if (r.tracks.size())
{
m_release_list.emplace_back(r);
}
// async requests
get_release_info* task = new get_release_info(ids[i], handle_count, &m_release_list, &m_failed, abort);
if (!simple_thread_pool::instance().enqueue(task)) delete task;
}
Sleep(10); // wait for last thread to start
while (m_release_list.size() < thread_counter) {
// Is there a better way to wait for all the threads to complete?
Sleep(10);
}
if (m_failed) {
FB2K_console_formatter() << component_title << ": Musicbrainz query failed.";
}
}
break;
Expand Down
185 changes: 185 additions & 0 deletions src/thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#include "stdafx.h"
#include "thread_pool.h"

simple_thread_worker::simple_thread_worker() {}

simple_thread_worker::~simple_thread_worker()
{
waitTillDone();
}

void simple_thread_worker::threadProc()
{
pfc::tickcount_t last_tick = pfc::getTickCount();

while (WaitForSingleObject(simple_thread_pool::instance().exiting_, 0) == WAIT_TIMEOUT)
{
if (WaitForSingleObject(simple_thread_pool::instance().have_task_, 1000) == WAIT_OBJECT_0)
{
simple_thread_task* task = simple_thread_pool::instance().acquire_task();

if (task)
{
task->run();
simple_thread_pool::instance().untrack(task);
last_tick = pfc::getTickCount();
continue;
}
}

if (pfc::getTickCount() - last_tick >= 10000)
{
insync(simple_thread_pool::instance().cs_);

if (simple_thread_pool::instance().is_queue_empty())
{
simple_thread_pool::instance().remove_worker_(this);
return;
}
}
}

simple_thread_pool::instance().remove_worker_(this);
}

simple_thread_pool simple_thread_pool::instance_;

simple_thread_pool::simple_thread_pool() : num_workers_(0)
{
empty_worker_ = CreateEvent(nullptr, TRUE, TRUE, nullptr);
exiting_ = CreateEvent(nullptr, TRUE, FALSE, nullptr);
have_task_ = CreateEvent(nullptr, TRUE, FALSE, nullptr);

pfc::dynamic_assert(empty_worker_ != INVALID_HANDLE_VALUE);
pfc::dynamic_assert(exiting_ != INVALID_HANDLE_VALUE);
pfc::dynamic_assert(have_task_ != INVALID_HANDLE_VALUE);
}

simple_thread_pool::~simple_thread_pool()
{
CloseHandle(empty_worker_);
CloseHandle(exiting_);
CloseHandle(have_task_);
}

bool simple_thread_pool::enqueue(simple_thread_task* task)
{
if (WaitForSingleObject(exiting_, 0) == WAIT_OBJECT_0)
return false;

insync(cs_);
int max_count = pfc::getOptimalWorkerThreadCount();
track(task);

if (num_workers_ < max_count)
{
simple_thread_worker* worker = new simple_thread_worker;
add_worker_(worker);
worker->start();
}

return true;
}

bool simple_thread_pool::is_queue_empty()
{
insync(cs_);
return task_list_.get_count() == 0;
}

simple_thread_task* simple_thread_pool::acquire_task()
{
insync(cs_);

t_task_list::iterator iter = task_list_.first();

if (iter.is_valid())
{
task_list_.remove(iter);
}

if (is_queue_empty())
ResetEvent(have_task_);

return iter.is_valid() ? *iter : nullptr;
}

simple_thread_pool& simple_thread_pool::instance()
{
return instance_;
}

void simple_thread_pool::add_worker_(simple_thread_worker* worker)
{
insync(cs_);
InterlockedIncrement(&num_workers_);
ResetEvent(empty_worker_);
}

void simple_thread_pool::exit()
{
core_api::ensure_main_thread();

SetEvent(exiting_);

// Because the tasks may use blocking SendMessage() function, it should be avoid using
// infinite wait here, or both main thread and worker thread will block, and it's also
// important to dispatch windows messages here.
while (WaitForSingleObject(empty_worker_, 0) == WAIT_TIMEOUT)
{
MSG msg;

if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
}

untrack_all();
}

void simple_thread_pool::remove_worker_(simple_thread_worker* worker)
{
InterlockedDecrement(&num_workers_);
insync(cs_);

if (num_workers_ == 0)
SetEvent(empty_worker_);

fb2k::inMainThread([=] {
delete worker;
});
}

void simple_thread_pool::track(simple_thread_task* task)
{
insync(cs_);
bool empty = is_queue_empty();
task_list_.add_item(task);

if (empty)
SetEvent(have_task_);
}

void simple_thread_pool::untrack(simple_thread_task* task)
{
insync(cs_);
task_list_.remove_item(task);
delete task;

if (is_queue_empty())
ResetEvent(have_task_);
}

void simple_thread_pool::untrack_all()
{
insync(cs_);
for (t_task_list::iterator iter = task_list_.first(); iter.is_valid(); ++iter)
{
task_list_.remove(iter);
delete* iter;
}

ResetEvent(have_task_);
}

0 comments on commit eb9238e

Please sign in to comment.