Skip to content

Commit

Permalink
reduce the buffer when using high dimensional data in distributed mod…
Browse files Browse the repository at this point in the history
…e. (#2485)

* reduce the buffer when using high dimensional data in distributed mode.

* Update dataset_loader.cpp

* refix

* typo

* fix number of bin accumulation.

* avoid overflow

* fix warning

* efficient solution.

* Update dataset.h

* fix bin count output

* fix warning

* bug in dist number of feature check

* fix possible edge case

* Update dataset.cpp

* possible bug fix

* fix
  • Loading branch information
guolinke committed Oct 15, 2019
1 parent 4848776 commit 40e56ca
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 93 deletions.
1 change: 1 addition & 0 deletions include/LightGBM/dataset.h
Expand Up @@ -294,6 +294,7 @@ class Dataset {
const std::vector<std::vector<double>>& forced_bins,
int** sample_non_zero_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const Config& io_config);

Expand Down
13 changes: 13 additions & 0 deletions include/LightGBM/network.h
Expand Up @@ -261,6 +261,19 @@ class Network {
return global;
}

template<class T>
static std::vector<T> GlobalArray(T local) {
std::vector<T> global(num_machines_, 0);
int type_size = sizeof(T);
std::vector<comm_size_t> block_start(num_machines_);
std::vector<comm_size_t> block_len(num_machines_, type_size);
for (int i = 1; i < num_machines_; ++i) {
block_start[i] = block_start[i - 1] + block_len[i - 1];
}
Allgather(reinterpret_cast<char*>(&local), block_start.data(), block_len.data(), reinterpret_cast<char*>(global.data()), type_size*num_machines_);
return global;
}

private:
static void AllgatherBruck(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size);

Expand Down
4 changes: 3 additions & 1 deletion src/io/bin.cpp
Expand Up @@ -19,7 +19,9 @@

namespace LightGBM {

BinMapper::BinMapper() {
BinMapper::BinMapper(): num_bin_(1), is_trivial_(true), bin_type_(BinType::NumericalBin) {
bin_upper_bound_.clear();
bin_upper_bound_.push_back(std::numeric_limits<double>::infinity());
}

// deep copy function for BinMapper
Expand Down
28 changes: 20 additions & 8 deletions src/io/dataset.cpp
Expand Up @@ -70,6 +70,7 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
const std::vector<int>& find_order,
int** sample_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
data_size_t max_error_cnt,
data_size_t filter_cnt,
Expand All @@ -85,7 +86,8 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
std::vector<int> group_num_bin;

for (auto fidx : find_order) {
const size_t cur_non_zero_cnt = num_per_col[fidx];
bool is_filtered_feature = fidx >= num_sample_col;
const size_t cur_non_zero_cnt = is_filtered_feature ? 0: num_per_col[fidx];
bool need_new_group = true;
std::vector<int> available_groups;
for (int gid = 0; gid < static_cast<int>(features_in_group.size()); ++gid) {
Expand All @@ -107,7 +109,7 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
}
for (auto gid : search_groups) {
const int rest_max_cnt = max_error_cnt - group_conflict_cnt[gid];
int cnt = GetConfilctCount(conflict_marks[gid], sample_indices[fidx], num_per_col[fidx], rest_max_cnt);
const int cnt = is_filtered_feature ? 0 : GetConfilctCount(conflict_marks[gid], sample_indices[fidx], num_per_col[fidx], rest_max_cnt);
if (cnt >= 0 && cnt <= rest_max_cnt) {
data_size_t rest_non_zero_data = static_cast<data_size_t>(
static_cast<double>(cur_non_zero_cnt - cnt) * num_data / total_sample_cnt);
Expand All @@ -116,7 +118,9 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
features_in_group[gid].push_back(fidx);
group_conflict_cnt[gid] += cnt;
group_non_zero_cnt[gid] += cur_non_zero_cnt - cnt;
MarkUsed(&conflict_marks[gid], sample_indices[fidx], num_per_col[fidx]);
if (!is_filtered_feature) {
MarkUsed(&conflict_marks[gid], sample_indices[fidx], num_per_col[fidx]);
}
if (is_use_gpu) {
group_num_bin[gid] += bin_mappers[fidx]->num_bin() + (bin_mappers[fidx]->GetDefaultBin() == 0 ? -1 : 0);
}
Expand All @@ -128,7 +132,9 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
features_in_group.back().push_back(fidx);
group_conflict_cnt.push_back(0);
conflict_marks.emplace_back(total_sample_cnt, false);
MarkUsed(&(conflict_marks.back()), sample_indices[fidx], num_per_col[fidx]);
if (!is_filtered_feature) {
MarkUsed(&(conflict_marks.back()), sample_indices[fidx], num_per_col[fidx]);
}
group_non_zero_cnt.emplace_back(cur_non_zero_cnt);
if (is_use_gpu) {
group_num_bin.push_back(1 + bin_mappers[fidx]->num_bin() + (bin_mappers[fidx]->GetDefaultBin() == 0 ? -1 : 0));
Expand All @@ -141,6 +147,7 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
std::vector<std::vector<int>> FastFeatureBundling(const std::vector<std::unique_ptr<BinMapper>>& bin_mappers,
int** sample_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const std::vector<int>& used_features,
double max_conflict_rate,
Expand All @@ -156,7 +163,11 @@ std::vector<std::vector<int>> FastFeatureBundling(const std::vector<std::unique_
feature_non_zero_cnt.reserve(used_features.size());
// put dense feature first
for (auto fidx : used_features) {
feature_non_zero_cnt.emplace_back(num_per_col[fidx]);
if (fidx < num_sample_col) {
feature_non_zero_cnt.emplace_back(num_per_col[fidx]);
} else {
feature_non_zero_cnt.emplace_back(0);
}
}
// sort by non zero cnt
std::vector<int> sorted_idx;
Expand All @@ -175,8 +186,8 @@ std::vector<std::vector<int>> FastFeatureBundling(const std::vector<std::unique_
for (auto sidx : sorted_idx) {
feature_order_by_cnt.push_back(used_features[sidx]);
}
auto features_in_group = FindGroups(bin_mappers, used_features, sample_indices, num_per_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
auto group2 = FindGroups(bin_mappers, feature_order_by_cnt, sample_indices, num_per_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
auto features_in_group = FindGroups(bin_mappers, used_features, sample_indices, num_per_col, num_sample_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
auto group2 = FindGroups(bin_mappers, feature_order_by_cnt, sample_indices, num_per_col, num_sample_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
if (features_in_group.size() > group2.size()) {
features_in_group = group2;
}
Expand Down Expand Up @@ -219,6 +230,7 @@ void Dataset::Construct(
const std::vector<std::vector<double>>& forced_bins,
int** sample_non_zero_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const Config& io_config) {
num_total_features_ = num_total_features;
Expand All @@ -238,7 +250,7 @@ void Dataset::Construct(

if (io_config.enable_bundle && !used_features.empty()) {
features_in_group = FastFeatureBundling(*bin_mappers,
sample_non_zero_indices, num_per_col, total_sample_cnt,
sample_non_zero_indices, num_per_col, num_sample_col, total_sample_cnt,
used_features, io_config.max_conflict_rate,
num_data_, io_config.min_data_in_leaf,
sparse_threshold_, io_config.is_enable_sparse, io_config.device_type == std::string("gpu"));
Expand Down

0 comments on commit 40e56ca

Please sign in to comment.