Skip to content

Commit

Permalink
Persistent prefetch thread
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof authored and shelhamer committed Aug 9, 2015
1 parent 73b3d13 commit ddcdc9d
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 80 deletions.
31 changes: 20 additions & 11 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {
Expand Down Expand Up @@ -50,12 +51,17 @@ class BaseDataLayer : public Layer<Dtype> {
bool output_labels_;
};

template <typename Dtype>
class Batch {
public:
Blob<Dtype> data_, label_;
};

template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
explicit BasePrefetchingDataLayer(const LayerParameter& param)
: BaseDataLayer<Dtype>(param) {}
explicit BasePrefetchingDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden.
Expand All @@ -67,14 +73,17 @@ class BasePrefetchingDataLayer :
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
// The thread's function
virtual void InternalThreadEntry() {}
// Prefetches batches (asynchronously if to GPU memory)
static const int PREFETCH_COUNT = 3;

protected:
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch) = 0;

Batch<Dtype> prefetch_[PREFETCH_COUNT];
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;

Blob<Dtype> transformed_data_;
};

Expand All @@ -93,7 +102,7 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
virtual inline int MaxTopBlobs() const { return 2; }

protected:
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;
Expand Down Expand Up @@ -235,7 +244,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
shared_ptr<Caffe::RNG> prefetch_rng_;
virtual void ShuffleImages();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

vector<std::pair<std::string, int> > lines_;
int lines_id_;
Expand Down Expand Up @@ -307,7 +316,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer<Dtype> {

protected:
virtual unsigned int PrefetchRand();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, vector<int> > > image_database_;
Expand Down
4 changes: 4 additions & 0 deletions include/caffe/syncedmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class SyncedMemory {
SyncedHead head() { return head_; }
size_t size() { return size_; }

#ifndef CPU_ONLY
void async_gpu_push(const cudaStream_t& stream);
#endif

private:
void to_cpu();
void to_gpu();
Expand Down
5 changes: 1 addition & 4 deletions src/caffe/internal_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ bool InternalThread::must_stop() {
}

void InternalThread::StartInternalThread() {
// TODO switch to failing once Caffe prefetch thread is persistent.
// Threads should not be started and stopped repeatedly.
// CHECK(!is_started());
StopInternalThread();
CHECK(!is_started()) << "Threads should persist and not be restarted.";

int device = 0;
#ifndef CPU_ONLY
Expand Down
88 changes: 63 additions & 25 deletions src/caffe/layers/base_data_layer.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include <boost/thread.hpp>
#include <string>
#include <vector>

#include "caffe/data_layers.hpp"
#include "caffe/net.hpp"
#include "caffe/util/io.hpp"

namespace caffe {
Expand All @@ -27,56 +29,92 @@ void BaseDataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
DataLayerSetUp(bottom, top);
}

template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
const LayerParameter& param)
: BaseDataLayer<Dtype>(param),
prefetch_free_(), prefetch_full_() {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_free_.push(&prefetch_[i]);
}
}

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
// Now, start the prefetch thread. Before calling prefetch, we make two
// cpu_data calls so that the prefetch thread does not accidentally make
// simultaneous cudaMalloc calls when the main thread is running. In some
// GPUs this seems to cause failures if we do not so.
this->prefetch_data_.mutable_cpu_data();
if (this->output_labels_) {
this->prefetch_label_.mutable_cpu_data();
// Before starting the prefetch thread, we make cpu_data and gpu_data
// calls so that the prefetch thread does not accidentally make simultaneous
// cudaMalloc calls when the main thread is running. In some GPUs this
// seems to cause failures if we do not so.
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_cpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_cpu_data();
}
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
for (int i = 0; i < PREFETCH_COUNT; ++i) {
prefetch_[i].data_.mutable_gpu_data();
if (this->output_labels_) {
prefetch_[i].label_.mutable_gpu_data();
}
}
}
#endif
DLOG(INFO) << "Initializing prefetch";
this->CreatePrefetchThread();
DLOG(INFO) << "Prefetch initialized.";
}

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::CreatePrefetchThread() {
this->data_transformer_->InitRand();
StartInternalThread();
DLOG(INFO) << "Prefetch initialized.";
}

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::JoinPrefetchThread() {
StopInternalThread();
void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
cudaStream_t stream;
if (Caffe::mode() == Caffe::GPU) {
cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking);
}
#endif

try {
while (!must_stop()) {
Batch<Dtype>* batch = prefetch_free_.pop();
load_batch(batch);
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
batch->data_.data().get()->async_gpu_push(stream);
cudaStreamSynchronize(stream);
}
#endif
prefetch_full_.push(batch);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
}

template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
// First, join the thread
JoinPrefetchThread();
DLOG(INFO) << "Thread joined";
Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
// Reshape to loaded data.
top[0]->ReshapeLike(prefetch_data_);
top[0]->Reshape(batch->data_.num(), batch->data_.channels(),
batch->data_.height(), batch->data_.width());
// Copy the data
caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(),
caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
top[0]->mutable_cpu_data());
DLOG(INFO) << "Prefetch copied";
if (this->output_labels_) {
// Reshape to loaded labels.
top[1]->ReshapeLike(prefetch_label_);
// Copy the labels.
caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(),
top[1]->mutable_cpu_data());
caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
top[1]->mutable_cpu_data());
}
// Start a new prefetch thread
DLOG(INFO) << "CreatePrefetchThread";
CreatePrefetchThread();

prefetch_free_.push(batch);
}

#ifdef CPU_ONLY
Expand Down
15 changes: 7 additions & 8 deletions src/caffe/layers/base_data_layer.cu
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ namespace caffe {
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_gpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
// First, join the thread
JoinPrefetchThread();
Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
// Reshape to loaded data.
top[0]->ReshapeLike(this->prefetch_data_);
top[0]->ReshapeLike(batch->data_);
// Copy the data
caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(),
caffe_copy(batch->data_.count(), batch->data_.gpu_data(),
top[0]->mutable_gpu_data());
if (this->output_labels_) {
// Reshape to loaded labels.
top[1]->ReshapeLike(prefetch_label_);
top[1]->ReshapeLike(batch->label_);
// Copy the labels.
caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(),
caffe_copy(batch->label_.count(), batch->label_.gpu_data(),
top[1]->mutable_gpu_data());
}
// Start a new prefetch thread
CreatePrefetchThread();

prefetch_free_.push(batch);
}

INSTANTIATE_LAYER_GPU_FORWARD(BasePrefetchingDataLayer);
Expand Down
26 changes: 14 additions & 12 deletions src/caffe/layers/data_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
namespace caffe {

template <typename Dtype>
DataLayer<Dtype>::~DataLayer<Dtype>() {
this->JoinPrefetchThread();
DataLayer<Dtype>::~DataLayer() {
this->StopInternalThread();
}

template <typename Dtype>
Expand Down Expand Up @@ -54,21 +54,23 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
<< top[0]->width();
// label
if (this->output_labels_) {
vector<int> label_shape(1, this->layer_param_.data_param().batch_size());
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
this->prefetch_label_.Reshape(label_shape);
for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
this->prefetch_[i].label_.Reshape(label_shape);
}
}
}

// This function is used to create a thread that prefetches the data.
template <typename Dtype>
void DataLayer<Dtype>::InternalThreadEntry() {
// This function is called on prefetch thread
template<typename Dtype>
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
CPUTimer batch_timer;
batch_timer.Start();
double read_time = 0;
double trans_time = 0;
CPUTimer timer;
CHECK(this->prefetch_data_.count());
CHECK(batch->data_.count());
CHECK(this->transformed_data_.count());

// Reshape according to the first datum of each batch
Expand All @@ -81,13 +83,13 @@ void DataLayer<Dtype>::InternalThreadEntry() {
this->transformed_data_.Reshape(top_shape);
// Reshape prefetch_data according to the batch_size.
top_shape[0] = batch_size;
this->prefetch_data_.Reshape(top_shape);
batch->data_.Reshape(top_shape);

Dtype* top_data = this->prefetch_data_.mutable_cpu_data();
Dtype* top_data = batch->data_.mutable_cpu_data();
Dtype* top_label = NULL; // suppress warnings about uninitialized variables

if (this->output_labels_) {
top_label = this->prefetch_label_.mutable_cpu_data();
top_label = batch->label_.mutable_cpu_data();
}
timer.Start();
for (int item_id = 0; item_id < batch_size; ++item_id) {
Expand All @@ -97,7 +99,7 @@ void DataLayer<Dtype>::InternalThreadEntry() {
read_time += timer.MicroSeconds();
timer.Start();
// Apply data transformations (mirror, scale, crop...)
int offset = this->prefetch_data_.offset(item_id);
int offset = batch->data_.offset(item_id);
this->transformed_data_.set_cpu_data(top_data + offset);
this->data_transformer_->Transform(datum, &(this->transformed_data_));
// Copy label.
Expand Down
Loading

0 comments on commit ddcdc9d

Please sign in to comment.