Skip to content

Commit

Permalink
Merge pull request InMobi#6 from AnnapurnaVemuri/msgstatsmismatch
Browse files Browse the repository at this point in the history
<fstype>_wrote_num_messgaes and received good mismatch
  • Loading branch information
satish-mittal committed Feb 19, 2013
2 parents 6fa11c0 + beedd5a commit a13eeca
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 29 deletions.
71 changes: 42 additions & 29 deletions src/store.cpp
Expand Up @@ -201,6 +201,7 @@ FileStoreBase::FileStoreBase(StoreQueue* storeq,
writeStats(false),
rotateOnReopen(false),
currentSize(0),
eventSize(0),
lastRollTime(0),
eventsWritten(0) {
}
Expand Down Expand Up @@ -527,7 +528,7 @@ void FileStoreBase::printStats() {
if (!writeStats) {
return;
}

string filename(filePath);
filename += "/scribe_stats";

Expand All @@ -540,28 +541,10 @@ void FileStoreBase::printStats() {
categoryHandled.c_str(), filename.c_str(), fsType.c_str());
// This isn't enough of a problem to change our status
return;
}

time_t rawtime = time(NULL);
struct tm timeinfo;
localtime_r(&rawtime, &timeinfo);

ostringstream msg;
msg << timeinfo.tm_year + 1900 << '-'
<< setw(2) << setfill('0') << timeinfo.tm_mon + 1 << '-'
<< setw(2) << setfill('0') << timeinfo.tm_mday << '-'
<< setw(2) << setfill('0') << timeinfo.tm_hour << ':'
<< setw(2) << setfill('0') << timeinfo.tm_min;

msg << " wrote <" << currentSize << "> bytes in <" << eventsWritten
<< "> events to file <" << currentFilename << ">" << endl;

string log_str = msg.str();
LOG_OPER("[%s]", log_str.c_str());
/*stats_file->write(msg.str());*/
// update the stats
g_Handler->incCounter(categoryHandled, fsType + "_wrote_num_messages", eventsWritten);
g_Handler->incCounter(categoryHandled, fsType + "_wrote_bytes", currentSize);
}

//scribe_stats should be created and message written if/when hdfs-append works
//stats_file->write(msg.str());
stats_file->close();
}

Expand Down Expand Up @@ -698,7 +681,7 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) {
if (writeMeta) {
writeFile->write(meta_logfile_prefix + file);
}
writeFile->close();
closeWriteFile();
}

writeFile = FileInterface::createFileInterface(fsType, file, isBufferFile);
Expand Down Expand Up @@ -743,13 +726,12 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) {
writeFile->createSymlink(symtarget, symlinkName);
}
// else it confuses the filename code on reads

LOG_OPER("[%s] Opened file <%s> for writing", categoryHandled.c_str(),
file.c_str());

currentSize = writeFile->fileSize();
currentFilename = file;
eventsWritten = 0;
setStatus("");
}

Expand All @@ -768,12 +750,43 @@ bool FileStore::isOpen() {
return writeFile && writeFile->isOpen();
}

void FileStore::close() {
void FileStore::closeWriteFile() {
if (writeFile) {
writeFile->close();
if (writeStats && !(writeFile->isOpen()) ) {
time_t rawtime = time(NULL);
struct tm timeinfo;
localtime_r(&rawtime, &timeinfo);

ostringstream msg;
msg << timeinfo.tm_year + 1900 << '-'
<< setw(2) << setfill('0') << timeinfo.tm_mon + 1 << '-'
<< setw(2) << setfill('0') << timeinfo.tm_mday << '-'
<< setw(2) << setfill('0') << timeinfo.tm_hour << ':'
<< setw(2) << setfill('0') << timeinfo.tm_min;

msg << " wrote <" << eventSize << "> bytes in <" << eventsWritten
<< "> events to file <" << currentFilename << ">" << endl;

string log_str = msg.str();
LOG_OPER("[%s]", log_str.c_str());

// the message that is logged here should be moved to printStats() to be
// appended to scribe_stats when hdfs append works.
}
// update stats
g_Handler->incCounter(categoryHandled, fsType + "_wrote_num_messages", eventsWritten);
g_Handler->incCounter(categoryHandled, fsType + "_wrote_bytes", eventSize);

eventsWritten = 0;
eventSize = 0;
}
}

void FileStore::close() {
closeWriteFile();
}

void FileStore::flush() {
if (writeFile) {
writeFile->flush();
Expand Down Expand Up @@ -901,6 +914,8 @@ bool FileStore::writeMessages(boost::shared_ptr<logentry_vector_t> messages,

num_written += num_buffered;
currentSize += current_size_buffered;
eventSize += current_size_buffered;
eventsWritten += num_buffered;
num_buffered = 0;
current_size_buffered = 0;
write_buffer = "";
Expand All @@ -918,8 +933,6 @@ bool FileStore::writeMessages(boost::shared_ptr<logentry_vector_t> messages,
success = false;
}

eventsWritten += num_written;

if (!success) {
close();

Expand Down
2 changes: 2 additions & 0 deletions src/store.h
Expand Up @@ -178,6 +178,7 @@ class FileStoreBase : public Store {
unsigned long eventsWritten; // This is how many events this process has
// written to the currently open file. It is NOT
// necessarily the number of lines in the file
unsigned long eventSize; // size corresponding to eventsWritten

private:
// disallow copy, assignment, and empty construction
Expand All @@ -200,6 +201,7 @@ class FileStore : public FileStoreBase {
bool handleMessages(boost::shared_ptr<logentry_vector_t> messages);
bool isOpen();
void configure(pStoreConf configuration, pStoreConf parent);
void closeWriteFile();
void close();
void flush();

Expand Down

0 comments on commit a13eeca

Please sign in to comment.