Skip to content

Commit

Permalink
add Aio::prepareFlush().
Browse files Browse the repository at this point in the history
  • Loading branch information
starpos committed Dec 12, 2012
1 parent 72adcb5 commit 03f66e2
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 84 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.PHONY: all clean

CXX = g++
CFLAGS = -Wall -std=c++11 -pthread
CFLAGS = -Wall -Wextra -std=c++11 -pthread
ifeq ($(DEBUG),1)
CFLAGS += -g
else
Expand Down
59 changes: 30 additions & 29 deletions iores.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Options
bool isShowEachResponse_;
bool isShowVersion_;
bool isShowHelp_;

size_t period_;
size_t count_;
size_t nthreads_;
Expand Down Expand Up @@ -78,7 +78,7 @@ class Options

::printf("iores version %s\n", IORETH_VERSION);
}

void showHelp() {

::printf("usage: %s [option(s)] [file or device]\n"
Expand Down Expand Up @@ -118,7 +118,7 @@ class Options
void parse(int argc, char* argv[]) {

programName_ = argv[0];

while (1) {
int c = ::getopt(argc, argv, "s:b:p:c:t:q:wmrvh");

Expand Down Expand Up @@ -198,7 +198,7 @@ class IoResponseBench
XorShift128 rand_;

std::mutex& mutex_; //shared among threads.

public:
/**
* @param dev block device.
Expand Down Expand Up @@ -232,7 +232,7 @@ class IoResponseBench
throw std::runtime_error("posix_memalign failed");
}
buf_ = static_cast<char*>(bufV_);

for (size_t i = 0; i < blockSize_; i++) {
buf_[i] = static_cast<char>(rand_.get(256));
}
Expand Down Expand Up @@ -264,32 +264,33 @@ class IoResponseBench
}
putStat();
}

private:
/**
* @return response time.
*/
IoLog execBlockIO() {

double begin, end;
size_t blockId = rand_.get(accessRange_);
size_t oft = blockId * blockSize_;
begin = getTime();
bool isWrite = false;

switch(dev_.getMode()) {
case READ_MODE: isWrite = false; break;
case WRITE_MODE: isWrite = true; break;
case MIX_MODE: isWrite = (rand_.get(2) == 0); break;
}

IoType type = isWrite ? IOTYPE_WRITE : IOTYPE_READ;

if (isWrite) {
dev_.write(oft, blockSize_, buf_);
} else {
dev_.read(oft, blockSize_, buf_);
}
end = getTime();
return IoLog(threadId_, isWrite, blockId, begin, end - begin);
return IoLog(threadId_, type, blockId, begin, end - begin);
}

void putStat() const {
Expand All @@ -314,7 +315,7 @@ void do_work(int threadId, const Options& opt,
const bool isDirect = true;

BlockDevice bd(opt.getArgs()[0], opt.getMode(), isDirect);

IoResponseBench bench(threadId, bd, opt.getBlockSize(), opt.getAccessRange(),
rtQ, stat, opt.isShowEachResponse(), mutex);
if (opt.getPeriod() > 0) {
Expand Down Expand Up @@ -362,11 +363,11 @@ void execThreadExperiment(const Options& opt)

std::vector<std::queue<IoLog> > logQs;
std::vector<PerformanceStatistics> stats;

std::vector<std::future<void> > workers;
double begin, end;
std::mutex mutex;

begin = getTime();
worker_start(workers, nthreads, opt, logQs, stats, mutex);
worker_join(workers);
Expand Down Expand Up @@ -394,13 +395,13 @@ class AioResponseBench
const size_t accessRange_;
const bool isShowEachResponse_;
const Mode mode_;

BlockBuffer bb_;
Rand<size_t, std::uniform_int_distribution<size_t> > rand_;
std::queue<IoLog> logQ_;
PerformanceStatistics stat_;
Aio aio_;


public:
AioResponseBench(const BlockDevice& dev, size_t blockSize, size_t queueSize,
Expand All @@ -420,8 +421,8 @@ class AioResponseBench
assert(blockSize_ % 512 == 0);
assert(queueSize_ > 0);
assert(accessRange_ > 0);
}
}

void execNtimes(size_t nTimes) {

size_t pending = 0;
Expand All @@ -441,7 +442,7 @@ class AioResponseBench
waitAnIo();
pending--;

prepareIo(bb_.next());
prepareIo(bb_.next());
pending++;
c++;
aio_.submit();
Expand Down Expand Up @@ -473,7 +474,7 @@ class AioResponseBench
end = waitAnIo();
pending--;

prepareIo(bb_.next());
prepareIo(bb_.next());
pending++;
aio_.submit();
}
Expand All @@ -486,12 +487,12 @@ class AioResponseBench

PerformanceStatistics& getStat() { return stat_; }
std::queue<IoLog>& getIoLogQueue() { return logQ_; }

private:
bool decideIsWrite() {

bool isWrite = false;

switch(mode_) {
case READ_MODE:
isWrite = false;
Expand All @@ -507,11 +508,11 @@ class AioResponseBench
}
return isWrite;
}

void prepareIo(char *buf) {

size_t blockId = rand_.get(accessRange_);

if (decideIsWrite()) {
aio_.prepareWrite(blockId * blockSize_, blockSize_, buf);
} else {
Expand All @@ -529,10 +530,10 @@ class AioResponseBench
}
return ptr->endTime;
}

IoLog toIoLog(AioData *ptr) {

return IoLog(0, ptr->isWrite, ptr->oft / ptr->size,
return IoLog(0, ptr->type, ptr->oft / ptr->size,
ptr->beginTime, ptr->endTime - ptr->beginTime);
}
};
Expand All @@ -542,14 +543,14 @@ void execAioExperiment(const Options& opt)
assert(opt.getNthreads() == 0);
const size_t queueSize = opt.getQueueSize();
assert(queueSize > 0);

const bool isDirect = true;
BlockDevice bd(opt.getArgs()[0], opt.getMode(), isDirect);

AioResponseBench bench(bd, opt.getBlockSize(), opt.getQueueSize(),
opt.getAccessRange(),
opt.isShowEachResponse());

double begin, end;
begin = getTime();
if (opt.getPeriod() > 0) {
Expand Down Expand Up @@ -587,6 +588,6 @@ int main(int argc, char* argv[])
} catch (...) {
::printf("caught another error.\n");
}

return 0;
}
Loading

0 comments on commit 03f66e2

Please sign in to comment.