Skip to content

Commit

Permalink
Experimental support for HDFS (#1243)
Browse files Browse the repository at this point in the history
* Read and write datsets from hdfs.
* Only enabled when cmake is run with -DUSE_HDFS:BOOL=TRUE
* Introduces VirtualFile(Reader|Writer) to asbtract VFS differences
  • Loading branch information
ebernhardson authored and guolinke committed Feb 27, 2018
1 parent 7501faa commit 7e186a5
Show file tree
Hide file tree
Showing 18 changed files with 418 additions and 168 deletions.
15 changes: 15 additions & 0 deletions CMakeLists.txt
Expand Up @@ -12,6 +12,7 @@ OPTION(USE_MPI "MPI based parallel learning" OFF)
OPTION(USE_OPENMP "Enable OpenMP" ON)
OPTION(USE_GPU "Enable GPU-acclerated training (EXPERIMENTAL)" OFF)
OPTION(USE_SWIG "Enable SWIG to generate Java API" OFF)
OPTION(USE_HDFS "Enable HDFS support (EXPERIMENTAL)" OFF)

if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "4.8.2")
Expand Down Expand Up @@ -86,6 +87,15 @@ if(USE_GPU)
ADD_DEFINITIONS(-DUSE_GPU)
endif(USE_GPU)

if(USE_HDFS)
find_package(JNI REQUIRED)
find_path(HDFS_INCLUDE_DIR hdfs.h REQUIRED)
find_library(HDFS_LIB NAMES hdfs REQUIRED)
include_directories(${HDFS_INCLUDE_DIR})
ADD_DEFINITIONS(-DUSE_HDFS)
SET(HDFS_CXX_LIBRARIES ${HDFS_LIB} ${JAVA_JVM_LIBRARY})
endif(USE_HDFS)

if(UNIX OR MINGW OR CYGWIN)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread -O3 -Wextra -Wall -Wno-ignored-attributes -Wno-unknown-pragmas")
endif()
Expand Down Expand Up @@ -173,6 +183,11 @@ if(USE_GPU)
TARGET_LINK_LIBRARIES(_lightgbm ${OpenCL_LIBRARY} ${Boost_LIBRARIES})
endif(USE_GPU)

if(USE_HDFS)
TARGET_LINK_LIBRARIES(lightgbm ${HDFS_CXX_LIBRARIES})
TARGET_LINK_LIBRARIES(_lightgbm ${HDFS_CXX_LIBRARIES})
endif(USE_HDFS)

if(WIN32 AND (MINGW OR CYGWIN))
TARGET_LINK_LIBRARIES(lightgbm Ws2_32)
TARGET_LINK_LIBRARIES(_lightgbm Ws2_32)
Expand Down
4 changes: 2 additions & 2 deletions include/LightGBM/bin.h
Expand Up @@ -98,7 +98,7 @@ class BinMapper {
* \brief Save binary data to file
* \param file File want to write
*/
void SaveBinaryToFile(FILE* file) const;
void SaveBinaryToFile(const VirtualFileWriter* writer) const;
/*!
* \brief Mapping bin into feature value
* \param bin
Expand Down Expand Up @@ -308,7 +308,7 @@ class Bin {
* \brief Save binary data to file
* \param file File want to write
*/
virtual void SaveBinaryToFile(FILE* file) const = 0;
virtual void SaveBinaryToFile(const VirtualFileWriter* writer) const = 0;

/*!
* \brief Load from memory
Expand Down
2 changes: 1 addition & 1 deletion include/LightGBM/dataset.h
Expand Up @@ -99,7 +99,7 @@ class Metadata {
* \brief Save binary data to file
* \param file File want to write
*/
void SaveBinaryToFile(FILE* file) const;
void SaveBinaryToFile(const VirtualFileWriter* writer) const;

/*!
* \brief Get sizes in byte of this object
Expand Down
10 changes: 5 additions & 5 deletions include/LightGBM/feature_group.h
Expand Up @@ -191,13 +191,13 @@ class FeatureGroup {
* \brief Save binary data to file
* \param file File want to write
*/
void SaveBinaryToFile(FILE* file) const {
fwrite(&is_sparse_, sizeof(is_sparse_), 1, file);
fwrite(&num_feature_, sizeof(num_feature_), 1, file);
void SaveBinaryToFile(const VirtualFileWriter* writer) const {
writer->Write(&is_sparse_, sizeof(is_sparse_));
writer->Write(&num_feature_, sizeof(num_feature_));
for (int i = 0; i < num_feature_; ++i) {
bin_mappers_[i]->SaveBinaryToFile(file);
bin_mappers_[i]->SaveBinaryToFile(writer);
}
bin_data_->SaveBinaryToFile(file);
bin_data_->SaveBinaryToFile(writer);
}
/*!
* \brief Get sizes in byte of this object
Expand Down
70 changes: 70 additions & 0 deletions include/LightGBM/utils/file_io.h
@@ -0,0 +1,70 @@
#ifndef LIGHTGBM_UTILS_FILE_IO_H_
#define LIGHTGBM_UTILS_FILE_IO_H_

#include <memory>

namespace LightGBM{

/*!
* \brief An interface for writing files from buffers
*/
struct VirtualFileWriter {
virtual ~VirtualFileWriter() {};
/*!
* \brief Initialize the writer
* \return True when the file is available for writes
*/
virtual bool Init() = 0;
/*!
* \brief Append buffer to file
* \param data Buffer to write from
* \param bytes Number of bytes to write from buffer
* \return Number of bytes written
*/
virtual size_t Write(const void* data, size_t bytes) const = 0;
/*!
* \brief Create appropriate writer for filename
* \param filename Filename of the data
* \return File writer instance
*/
static std::unique_ptr<VirtualFileWriter> Make(const std::string& filename);
/*!
* \brief Check filename existence
* \param filename Filename of the data
* \return True when the file exists
*/
static bool Exists(const std::string& filename);
};

/**
* \brief An interface for reading files into buffers
*/
struct VirtualFileReader {
/*!
* \brief Constructor
* \param filename Filename of the data
*/
virtual ~VirtualFileReader() {};
/*!
* \brief Initialize the reader
* \return True when the file is available for read
*/
virtual bool Init() = 0;
/*!
* \brief Read data into buffer
* \param buffer Buffer to read data into
* \param bytes Number of bytes to read
* \return Number of bytes read
*/
virtual size_t Read(void* buffer, size_t bytes) const = 0;
/*!
* \brief Create appropriate reader for filename
* \param filename Filename of the data
* \return File reader instance
*/
static std::unique_ptr<VirtualFileReader> Make(const std::string& filename);
};

} // namespace LightGBM

#endif // LightGBM_UTILS_FILE_IO_H_
22 changes: 8 additions & 14 deletions include/LightGBM/utils/pipeline_reader.h
Expand Up @@ -9,6 +9,7 @@
#include <thread>
#include <memory>
#include <algorithm>
#include "file_io.h"

namespace LightGBM{

Expand All @@ -23,14 +24,8 @@ class PipelineReader {
* \process_fun Process function
*/
static size_t Read(const char* filename, int skip_bytes, const std::function<size_t (const char*, size_t)>& process_fun) {
FILE* file;

#ifdef _MSC_VER
fopen_s(&file, filename, "rb");
#else
file = fopen(filename, "rb");
#endif
if (file == NULL) {
auto reader = VirtualFileReader::Make(filename);
if (!reader->Init()) {
return 0;
}
size_t cnt = 0;
Expand All @@ -42,16 +37,17 @@ class PipelineReader {
size_t read_cnt = 0;
if (skip_bytes > 0) {
// skip first k bytes
read_cnt = fread(buffer_process.data(), 1, skip_bytes, file);
read_cnt = reader->Read(buffer_process.data(), skip_bytes);
}
// read first block
read_cnt = fread(buffer_process.data(), 1, buffer_size, file);
read_cnt = reader->Read(buffer_process.data(), buffer_size);

size_t last_read_cnt = 0;
while (read_cnt > 0) {
// start read thread
std::thread read_worker = std::thread(
[file, &buffer_read, buffer_size, &last_read_cnt] {
last_read_cnt = fread(buffer_read.data(), 1, buffer_size, file);
[&reader, &buffer_read, buffer_size, &last_read_cnt] {
last_read_cnt = reader->Read(buffer_read.data(), buffer_size);
}
);
// start process
Expand All @@ -62,8 +58,6 @@ class PipelineReader {
std::swap(buffer_process, buffer_read);
read_cnt = last_read_cnt;
}
// close file
fclose(file);
return cnt;
}

Expand Down
44 changes: 15 additions & 29 deletions include/LightGBM/utils/text_reader.h
Expand Up @@ -28,36 +28,29 @@ class TextReader {
TextReader(const char* filename, bool is_skip_first_line):
filename_(filename), is_skip_first_line_(is_skip_first_line){
if (is_skip_first_line_) {
FILE* file;
#ifdef _MSC_VER
fopen_s(&file, filename, "r");
#else
file = fopen(filename, "r");
#endif
if (file == NULL) {
auto reader = VirtualFileReader::Make(filename);
if (!reader->Init()) {
Log::Fatal("Could not open %s", filename);
}
std::stringstream str_buf;
int read_c = -1;
read_c = fgetc(file);
while (read_c != EOF) {
char tmp_ch = static_cast<char>(read_c);
if (tmp_ch == '\n' || tmp_ch == '\r') {
char read_c;
size_t nread = reader->Read(&read_c, 1);
while (nread == 1) {
if (read_c == '\n' || read_c == '\r') {
break;
}
str_buf << tmp_ch;
str_buf << read_c;
++skip_bytes_;
read_c = fgetc(file);
nread = reader->Read(&read_c, 1);
}
if (static_cast<char>(read_c) == '\r') {
read_c = fgetc(file);
if (read_c == '\r') {
reader->Read(&read_c, 1);
++skip_bytes_;
}
if (static_cast<char>(read_c) == '\n') {
read_c = fgetc(file);
if (read_c == '\n') {
reader->Read(&read_c, 1);
++skip_bytes_;
}
fclose(file);
first_line_ = str_buf.str();
Log::Debug("Skipped header \"%s\" in file %s", first_line_.c_str(), filename_);
}
Expand Down Expand Up @@ -151,25 +144,18 @@ class TextReader {
std::vector<char> ReadContent(size_t* out_len) {
std::vector<char> ret;
*out_len = 0;
FILE* file;
#ifdef _MSC_VER
fopen_s(&file, filename_, "rb");
#else
file = fopen(filename_, "rb");
#endif
if (file == NULL) {
auto reader = VirtualFileReader::Make(filename_);
if (!reader->Init()) {
return ret;
}
const size_t buffer_size = 16 * 1024 * 1024;
auto buffer_read = std::vector<char>(buffer_size);
size_t read_cnt = 0;
do {
read_cnt = fread(buffer_read.data(), 1, buffer_size, file);
read_cnt = reader->Read(buffer_read.data(), buffer_size);
ret.insert(ret.end(), buffer_read.begin(), buffer_read.begin() + read_cnt);
*out_len += read_cnt;
} while (read_cnt > 0);
// close file
fclose(file);
return ret;
}

Expand Down
17 changes: 5 additions & 12 deletions src/application/predictor.hpp
Expand Up @@ -128,15 +128,8 @@ class Predictor {
* \param result_filename Filename of output result
*/
void Predict(const char* data_filename, const char* result_filename, bool has_header) {
FILE* result_file;

#ifdef _MSC_VER
fopen_s(&result_file, result_filename, "w");
#else
result_file = fopen(result_filename, "w");
#endif

if (result_file == NULL) {
auto writer = VirtualFileWriter::Make(result_filename);
if (!writer->Init()) {
Log::Fatal("Prediction results file %s cannot be found.", result_filename);
}
auto parser = std::unique_ptr<Parser>(Parser::CreateParser(data_filename, has_header, boosting_->MaxFeatureIdx() + 1, boosting_->LabelIdx()));
Expand Down Expand Up @@ -189,7 +182,7 @@ class Predictor {
};

std::function<void(data_size_t, const std::vector<std::string>&)> process_fun =
[this, &parser_fun, &result_file]
[this, &parser_fun, &writer]
(data_size_t, const std::vector<std::string>& lines) {
std::vector<std::pair<int, double>> oneline_features;
std::vector<std::string> result_to_write(lines.size());
Expand All @@ -209,11 +202,11 @@ class Predictor {
}
OMP_THROW_EX();
for (data_size_t i = 0; i < static_cast<data_size_t>(result_to_write.size()); ++i) {
fprintf(result_file, "%s\n", result_to_write[i].c_str());
writer->Write(result_to_write[i].c_str(), result_to_write[i].size());
writer->Write("\n", 1);
}
};
predict_data_reader.ReadAllAndProcessParallel(process_fun);
fclose(result_file);
}

private:
Expand Down
23 changes: 12 additions & 11 deletions src/io/bin.cpp
@@ -1,4 +1,5 @@
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/file_io.h>
#include <LightGBM/bin.h>

#include "dense_bin.hpp"
Expand Down Expand Up @@ -455,19 +456,19 @@ namespace LightGBM {
}
}

void BinMapper::SaveBinaryToFile(FILE* file) const {
fwrite(&num_bin_, sizeof(num_bin_), 1, file);
fwrite(&missing_type_, sizeof(missing_type_), 1, file);
fwrite(&is_trival_, sizeof(is_trival_), 1, file);
fwrite(&sparse_rate_, sizeof(sparse_rate_), 1, file);
fwrite(&bin_type_, sizeof(bin_type_), 1, file);
fwrite(&min_val_, sizeof(min_val_), 1, file);
fwrite(&max_val_, sizeof(max_val_), 1, file);
fwrite(&default_bin_, sizeof(default_bin_), 1, file);
void BinMapper::SaveBinaryToFile(const VirtualFileWriter* writer) const {
writer->Write(&num_bin_, sizeof(num_bin_));
writer->Write(&missing_type_, sizeof(missing_type_));
writer->Write(&is_trival_, sizeof(is_trival_));
writer->Write(&sparse_rate_, sizeof(sparse_rate_));
writer->Write(&bin_type_, sizeof(bin_type_));
writer->Write(&min_val_, sizeof(min_val_));
writer->Write(&max_val_, sizeof(max_val_));
writer->Write(&default_bin_, sizeof(default_bin_));
if (bin_type_ == BinType::NumericalBin) {
fwrite(bin_upper_bound_.data(), sizeof(double), num_bin_, file);
writer->Write(bin_upper_bound_.data(), sizeof(double) * num_bin_);
} else {
fwrite(bin_2_categorical_.data(), sizeof(int), num_bin_, file);
writer->Write(bin_2_categorical_.data(), sizeof(int) * num_bin_);
}
}

Expand Down

0 comments on commit 7e186a5

Please sign in to comment.