diff --git a/src/store.cpp b/src/store.cpp index efa48a62..16eebfa1 100644 --- a/src/store.cpp +++ b/src/store.cpp @@ -201,6 +201,7 @@ FileStoreBase::FileStoreBase(StoreQueue* storeq, writeStats(false), rotateOnReopen(false), currentSize(0), + eventSize(0), lastRollTime(0), eventsWritten(0) { } @@ -527,7 +528,7 @@ void FileStoreBase::printStats() { if (!writeStats) { return; } - + string filename(filePath); filename += "/scribe_stats"; @@ -540,28 +541,10 @@ void FileStoreBase::printStats() { categoryHandled.c_str(), filename.c_str(), fsType.c_str()); // This isn't enough of a problem to change our status return; - } - - time_t rawtime = time(NULL); - struct tm timeinfo; - localtime_r(&rawtime, &timeinfo); - - ostringstream msg; - msg << timeinfo.tm_year + 1900 << '-' - << setw(2) << setfill('0') << timeinfo.tm_mon + 1 << '-' - << setw(2) << setfill('0') << timeinfo.tm_mday << '-' - << setw(2) << setfill('0') << timeinfo.tm_hour << ':' - << setw(2) << setfill('0') << timeinfo.tm_min; - - msg << " wrote <" << currentSize << "> bytes in <" << eventsWritten - << "> events to file <" << currentFilename << ">" << endl; - - string log_str = msg.str(); - LOG_OPER("[%s]", log_str.c_str()); - /*stats_file->write(msg.str());*/ - // update the stats - g_Handler->incCounter(categoryHandled, fsType + "_wrote_num_messages", eventsWritten); - g_Handler->incCounter(categoryHandled, fsType + "_wrote_bytes", currentSize); + } + + //scribe_stats should be created and message written if/when hdfs-append works + //stats_file->write(msg.str()); stats_file->close(); } @@ -698,7 +681,7 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) { if (writeMeta) { writeFile->write(meta_logfile_prefix + file); } - writeFile->close(); + closeWriteFile(); } writeFile = FileInterface::createFileInterface(fsType, file, isBufferFile); @@ -743,13 +726,12 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) { writeFile->createSymlink(symtarget, symlinkName); } // else it confuses the filename code on reads - + LOG_OPER("[%s] Opened file <%s> for writing", categoryHandled.c_str(), file.c_str()); currentSize = writeFile->fileSize(); currentFilename = file; - eventsWritten = 0; setStatus(""); } @@ -768,12 +750,43 @@ bool FileStore::isOpen() { return writeFile && writeFile->isOpen(); } -void FileStore::close() { +void FileStore::closeWriteFile() { if (writeFile) { writeFile->close(); + if (writeStats && !(writeFile->isOpen()) ) { + time_t rawtime = time(NULL); + struct tm timeinfo; + localtime_r(&rawtime, &timeinfo); + + ostringstream msg; + msg << timeinfo.tm_year + 1900 << '-' + << setw(2) << setfill('0') << timeinfo.tm_mon + 1 << '-' + << setw(2) << setfill('0') << timeinfo.tm_mday << '-' + << setw(2) << setfill('0') << timeinfo.tm_hour << ':' + << setw(2) << setfill('0') << timeinfo.tm_min; + + msg << " wrote <" << eventSize << "> bytes in <" << eventsWritten + << "> events to file <" << currentFilename << ">" << endl; + + string log_str = msg.str(); + LOG_OPER("[%s]", log_str.c_str()); + + // the message that is logged here should be moved to printStats() to be + // appended to scribe_stats when hdfs append works. + } + // update stats + g_Handler->incCounter(categoryHandled, fsType + "_wrote_num_messages", eventsWritten); + g_Handler->incCounter(categoryHandled, fsType + "_wrote_bytes", eventSize); + + eventsWritten = 0; + eventSize = 0; } } +void FileStore::close() { + closeWriteFile(); +} + void FileStore::flush() { if (writeFile) { writeFile->flush(); @@ -901,6 +914,8 @@ bool FileStore::writeMessages(boost::shared_ptr messages, num_written += num_buffered; currentSize += current_size_buffered; + eventSize += current_size_buffered; + eventsWritten += num_buffered; num_buffered = 0; current_size_buffered = 0; write_buffer = ""; @@ -918,8 +933,6 @@ bool FileStore::writeMessages(boost::shared_ptr messages, success = false; } - eventsWritten += num_written; - if (!success) { close(); diff --git a/src/store.h b/src/store.h index c64a0408..7c2aef24 100644 --- a/src/store.h +++ b/src/store.h @@ -178,6 +178,7 @@ class FileStoreBase : public Store { unsigned long eventsWritten; // This is how many events this process has // written to the currently open file. It is NOT // necessarily the number of lines in the file + unsigned long eventSize; // size corresponding to eventsWritten private: // disallow copy, assignment, and empty construction @@ -200,6 +201,7 @@ class FileStore : public FileStoreBase { bool handleMessages(boost::shared_ptr messages); bool isOpen(); void configure(pStoreConf configuration, pStoreConf parent); + void closeWriteFile(); void close(); void flush();