Skip to content

Commit

Permalink
fix bug in parallel bin finding .
Browse files Browse the repository at this point in the history
  • Loading branch information
guolinke committed Jul 23, 2017
1 parent 1c21b1d commit 402474f
Showing 1 changed file with 29 additions and 30 deletions.
59 changes: 29 additions & 30 deletions src/io/dataset_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,11 +734,39 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
// if have multi-machines, need to find bin distributed
// different machines will find bin for different features

int max_bin = 0;
int total_num_feature = static_cast<int>(sample_values.size());
for (int i = 0; i < total_num_feature; ++i) {
max_bin = std::max(max_bin, bin_mappers[i]->num_bin());
}
std::pair<int, int> local_sync_info(max_bin, total_num_feature);
std::pair<int, int> global_sync_info(max_bin, total_num_feature);
// sync global max_bin and total_num_feature
Network::Allreduce(reinterpret_cast<char*>(&local_sync_info),
sizeof(local_sync_info), sizeof(global_sync_info),
reinterpret_cast<char*>(&global_sync_info),
[] (const char* src, char* dst, int len) {
int used_size = 0;
const int type_size = sizeof(std::pair<int, int>);
const std::pair<int, int> *p1;
std::pair<int, int> *p2;
while (used_size < len) {
p1 = reinterpret_cast<const std::pair<int, int> *>(src);
p2 = reinterpret_cast<std::pair<int, int> *>(dst);
p2->first = std::max(p1->first, p2->first);
// ignore the rare features
p2->second = std::min(p1->second, p2->second);
src += type_size;
dst += type_size;
used_size += type_size;
}
});
max_bin = global_sync_info.first;
total_num_feature = global_sync_info.second;
// start and len will store the process feature indices for different machines
// machine i will find bins for features in [ start[i], start[i] + len[i] )
std::vector<int> start(num_machines);
std::vector<int> len(num_machines);
int total_num_feature = static_cast<int>(sample_values.size());
int step = (total_num_feature + num_machines - 1) / num_machines;
if (step < 1) { step = 1; }

Expand All @@ -765,35 +793,6 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
// get max_bin
int local_max_bin = 0;
for (int i = 0; i < len[rank]; ++i) {
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
local_max_bin = std::max(local_max_bin, bin_mappers[i]->num_bin());
}
int max_bin = local_max_bin;
// sync global max_bin
Network::Allreduce(reinterpret_cast<char*>(&local_max_bin),
sizeof(local_max_bin), sizeof(local_max_bin),
reinterpret_cast<char*>(&max_bin),
[](const char* src, char* dst, int len) {
int used_size = 0;
const int type_size = sizeof(int);
const int *p1;
int *p2;
while (used_size < len) {
p1 = reinterpret_cast<const int *>(src);
p2 = reinterpret_cast<int *>(dst);
if (*p1 > *p2) {
std::memcpy(dst, src, type_size);
}
src += type_size;
dst += type_size;
used_size += type_size;
}
});
// get size of bin mapper with max_bin size
int type_size = BinMapper::SizeForSpecificBin(max_bin);
// since sizes of different feature may not be same, we expand all bin mapper to type_size
Expand Down

0 comments on commit 402474f

Please sign in to comment.