Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use BlockingQueue in JobMgr #2885

Merged
merged 4 commits into from Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -33,7 +33,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2689 Construct Knowhere Index Without Data
- \#2798 hnsw-sq8 support
- \#2802 Add new index: IVFSQ8NR
- \#2834 add C++ sdk support 4 hnsw_sq8nr
- \#2834 Add C++ sdk support 4 hnsw_sq8nr

## Improvement
- \#2543 Remove secondary_path related code
Expand All @@ -43,6 +43,7 @@ Please mark all changes in change log and use the issue from GitHub
- \#2675 Print out system memory size when report invalid cpu cache size
- \#2841 Replace IndexType/EngineType/MetricType
- \#2858 Unify index name in db
- \#2884 Using BlockingQueue in JobMgr

## Task

Expand Down
87 changes: 18 additions & 69 deletions core/src/scheduler/JobMgr.cpp
Expand Up @@ -9,21 +9,19 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "scheduler/JobMgr.h"

#include "src/db/Utils.h"
#include "src/segment/SegmentReader.h"

#include <limits>
#include <string>
#include <utility>

#include "SchedInst.h"
#include "TaskCreator.h"
#include "db/Utils.h"
#include "scheduler/Algorithm.h"
#include "scheduler/CPUBuilder.h"
#include "scheduler/JobMgr.h"
#include "scheduler/SchedInst.h"
#include "scheduler/TaskCreator.h"
#include "scheduler/selector/Optimizer.h"
#include "scheduler/task/Task.h"
#include "scheduler/tasklabel/SpecResLabel.h"
#include "selector/Optimizer.h"
#include "task/Task.h"

namespace milvus {
namespace scheduler {
Expand All @@ -33,99 +31,50 @@ JobMgr::JobMgr(ResourceMgrPtr res_mgr) : res_mgr_(std::move(res_mgr)) {

void
JobMgr::Start() {
if (not running_) {
running_ = true;
worker_thread_ = std::thread(&JobMgr::worker_function, this);
if (worker_thread_ == nullptr) {
worker_thread_ = std::make_shared<std::thread>(&JobMgr::worker_function, this);
}
}

void
JobMgr::Stop() {
if (running_) {
if (worker_thread_ != nullptr) {
this->Put(nullptr);
worker_thread_.join();
running_ = false;
worker_thread_->join();
worker_thread_ = nullptr;
}
}

json
JobMgr::Dump() const {
json ret{
{"running", running_},
{"event_queue_length", queue_.size()},
{"running", (worker_thread_ != nullptr ? true : false)},
{"event_queue_length", queue_.Size()},
};
return ret;
}

void
JobMgr::Put(const JobPtr& job) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(job);
}
cv_.notify_one();
queue_.Put(job);
}

void
JobMgr::worker_function() {
SetThreadName("jobmgr_thread");
while (running_) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !queue_.empty(); });
auto job = queue_.front();
queue_.pop();
lock.unlock();
while (true) {
auto job = queue_.Take();
if (job == nullptr) {
break;
}

auto tasks = build_task(job);

// TODO(zhiru): if the job is search by ids, pass any task where the ids don't exist
auto search_job = std::dynamic_pointer_cast<SearchJob>(job);
if (search_job != nullptr) {
search_job->GetResultIds().resize(search_job->nq(), -1);
search_job->GetResultDistances().resize(search_job->nq(), std::numeric_limits<float>::max());

if (search_job->vectors().float_data_.empty() && search_job->vectors().binary_data_.empty() &&
!search_job->vectors().id_array_.empty()) {
for (auto task = tasks.begin(); task != tasks.end();) {
auto search_task = std::static_pointer_cast<XSearchTask>(*task);
auto location = search_task->GetLocation();

// Load bloom filter
std::string segment_dir;
engine::utils::GetParentPath(location, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
segment::IdBloomFilterPtr id_bloom_filter_ptr;
segment_reader.LoadBloomFilter(id_bloom_filter_ptr);

// Check if the id is present.
bool pass = true;
for (auto& id : search_job->vectors().id_array_) {
if (id_bloom_filter_ptr->Check(id)) {
pass = false;
break;
}
}

if (pass) {
// std::cout << search_task->GetIndexId() << std::endl;
search_job->SearchDone(search_task->GetIndexId());
task = tasks.erase(task);
} else {
task++;
}
}
}
}

// for (auto &task : tasks) {
// if ...
// search_job->SearchDone(task->id);
// tasks.erase(task);
// }

auto tasks = build_task(job);
for (auto& task : tasks) {
OptimizerInst::GetInstance()->Run(task);
}
Expand Down
26 changes: 7 additions & 19 deletions core/src/scheduler/JobMgr.h
Expand Up @@ -10,21 +10,15 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once

#include <condition_variable>
#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "ResourceMgr.h"
#include "interface/interfaces.h"
#include "job/Job.h"
#include "task/Task.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/interface/interfaces.h"
#include "scheduler/job/Job.h"
#include "scheduler/task/Task.h"
#include "utils/BlockingQueue.h"

namespace milvus {
namespace scheduler {
Expand Down Expand Up @@ -58,14 +52,8 @@ class JobMgr : public interface::dumpable {
calculate_path(const ResourceMgrPtr& res_mgr, const TaskPtr& task);

private:
bool running_ = false;
std::queue<JobPtr> queue_;

std::thread worker_thread_;

std::mutex mutex_;
std::condition_variable cv_;

BlockingQueue<JobPtr> queue_;
std::shared_ptr<std::thread> worker_thread_ = nullptr;
ResourceMgrPtr res_mgr_ = nullptr;
};

Expand Down
4 changes: 2 additions & 2 deletions core/src/utils/BlockingQueue.h
Expand Up @@ -68,13 +68,13 @@ class BlockingQueue {
}

size_t
Size() {
Size() const {
std::lock_guard<std::mutex> lock(mtx);
return queue_.size();
}

bool
Empty() {
Empty() const {
std::unique_lock<std::mutex> lock(mtx);
return queue_.empty();
}
Expand Down