27
27
#include " logging/logFileOutput.hpp"
28
28
#include " logging/logFileStreamOutput.hpp"
29
29
#include " logging/logHandle.hpp"
30
+ #include " memory/resourceArea.hpp"
30
31
#include " runtime/atomic.hpp"
31
32
#include " runtime/os.inline.hpp"
32
33
@@ -42,28 +43,49 @@ class AsyncLogWriter::AsyncLogLocker : public StackObj {
42
43
}
43
44
};
44
45
45
- void AsyncLogWriter::enqueue_locked (const AsyncLogMessage& msg) {
46
- if (_buffer.size () >= _buffer_max_size) {
46
+ // LogDecorator::None applies to 'constant initialization' because of its constexpr constructor.
47
+ const LogDecorations& AsyncLogWriter::None = LogDecorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
48
+ LogDecorators::None);
49
+
50
+ bool AsyncLogWriter::Buffer::push_back (LogFileStreamOutput* output, const LogDecorations& decorations, const char * msg) {
51
+ const size_t sz = Message::calc_size (strlen (msg));
52
+ const bool is_token = output == nullptr ;
53
+ // Always leave headroom for the flush token. Pushing a token must succeed.
54
+ const size_t headroom = (!is_token) ? Message::calc_size (0 ) : 0 ;
55
+
56
+ if (_pos + sz <= (_capacity - headroom)) {
57
+ new (_buf + _pos) Message (output, decorations, msg);
58
+ _pos += sz;
59
+ return true ;
60
+ }
61
+
62
+ return false ;
63
+ }
64
+
65
+ void AsyncLogWriter::Buffer::push_flush_token () {
66
+ bool result = push_back (nullptr , AsyncLogWriter::None, " " );
67
+ assert (result, " fail to enqueue the flush token." );
68
+ }
69
+
70
+ void AsyncLogWriter::enqueue_locked (LogFileStreamOutput* output, const LogDecorations& decorations, const char * msg) {
71
+ // To save space and streamline execution, we just ignore null message.
72
+ // client should use "" instead.
73
+ assert (msg != nullptr , " enqueuing a null message!" );
74
+
75
+ if (!_buffer->push_back (output, decorations, msg)) {
47
76
bool p_created;
48
- uint32_t * counter = _stats.put_if_absent (msg. output () , 0 , &p_created);
77
+ uint32_t * counter = _stats.put_if_absent (output, 0 , &p_created);
49
78
*counter = *counter + 1 ;
50
- // drop the enqueueing message.
51
- os::free (msg.message ());
52
79
return ;
53
80
}
54
81
55
- _buffer.push_back (msg);
56
82
_data_available = true ;
57
83
_lock.notify ();
58
84
}
59
85
60
86
void AsyncLogWriter::enqueue (LogFileStreamOutput& output, const LogDecorations& decorations, const char * msg) {
61
- AsyncLogMessage m (&output, decorations, os::strdup (msg));
62
-
63
- { // critical area
64
- AsyncLogLocker locker;
65
- enqueue_locked (m);
66
- }
87
+ AsyncLogLocker locker;
88
+ enqueue_locked (&output, decorations, msg);
67
89
}
68
90
69
91
// LogMessageBuffer consists of a multiple-part/multiple-line message.
@@ -72,84 +94,79 @@ void AsyncLogWriter::enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iter
72
94
AsyncLogLocker locker;
73
95
74
96
for (; !msg_iterator.is_at_end (); msg_iterator++) {
75
- AsyncLogMessage m (&output, msg_iterator.decorations (), os::strdup (msg_iterator.message ()));
76
- enqueue_locked (m);
97
+ enqueue_locked (&output, msg_iterator.decorations (), msg_iterator.message ());
77
98
}
78
99
}
79
100
80
101
AsyncLogWriter::AsyncLogWriter ()
81
102
: _flush_sem(0 ), _lock(), _data_available(false ),
82
103
_initialized(false ),
83
104
_stats() {
105
+
106
+ size_t size = AsyncLogBufferSize / 2 ;
107
+ _buffer = new Buffer (size);
108
+ _buffer_staging = new Buffer (size);
109
+ log_info (logging)(" AsyncLogBuffer estimates memory use: " SIZE_FORMAT " bytes" , size * 2 );
84
110
if (os::create_thread (this , os::asynclog_thread)) {
85
111
_initialized = true ;
86
112
} else {
87
113
log_warning (logging, thread)(" AsyncLogging failed to create thread. Falling back to synchronous logging." );
88
114
}
89
-
90
- log_info (logging)(" The maximum entries of AsyncLogBuffer: " SIZE_FORMAT " , estimated memory use: " SIZE_FORMAT " bytes" ,
91
- _buffer_max_size, AsyncLogBufferSize);
92
115
}
93
116
94
- class AsyncLogMapIterator {
95
- AsyncLogBuffer& _logs;
96
-
97
- public:
98
- AsyncLogMapIterator (AsyncLogBuffer& logs) :_logs(logs) {}
99
- bool do_entry (LogFileStreamOutput* output, uint32_t & counter) {
100
- using none = LogTagSetMapping<LogTag::__NO_TAG>;
101
-
102
- if (counter > 0 ) {
103
- LogDecorations decorations (LogLevel::Warning, none::tagset (), LogDecorators::All);
104
- stringStream ss;
105
- ss.print (UINT32_FORMAT_W (6 ) " messages dropped due to async logging" , counter);
106
- AsyncLogMessage msg (output, decorations, ss.as_string (true /* c_heap*/ ));
107
- _logs.push_back (msg);
108
- counter = 0 ;
109
- }
110
-
111
- return true ;
112
- }
113
- };
114
-
115
117
void AsyncLogWriter::write () {
116
- // Use kind of copy-and-swap idiom here.
117
- // Empty 'logs' swaps the content with _buffer.
118
- // Along with logs destruction, all processed messages are deleted.
119
- //
120
- // The operation 'pop_all()' is done in O(1). All I/O jobs are then performed without
121
- // lock protection. This guarantees I/O jobs don't block logsites.
122
- AsyncLogBuffer logs;
118
+ ResourceMark rm;
119
+ // Similar to AsyncLogMap but on resource_area
120
+ ResourceHashtable<LogFileStreamOutput*, uint32_t ,
121
+ 17 /* table_size*/ , ResourceObj::RESOURCE_AREA,
122
+ mtLogging> snapshot;
123
123
124
- { // critical region
124
+ // lock protection. This guarantees I/O jobs don't block logsites.
125
+ {
125
126
AsyncLogLocker locker;
126
127
127
- _buffer.pop_all (&logs);
128
- // append meta-messages of dropped counters
129
- AsyncLogMapIterator dropped_counters_iter (logs);
130
- _stats.iterate(&dropped_counters_iter);
128
+ _buffer_staging->reset ();
129
+ swap (_buffer, _buffer_staging);
130
+
131
+ // move counters to snapshot and reset them.
132
+ _stats.iterate([&] (LogFileStreamOutput* output, uint32_t & counter) {
133
+ if (counter > 0 ) {
134
+ bool created = snapshot.put (output, counter);
135
+ assert (created == true , " sanity check" );
136
+ counter = 0 ;
137
+ }
138
+ return true ;
139
+ });
131
140
_data_available = false ;
132
141
}
133
142
134
- LinkedListIterator<AsyncLogMessage> it (logs.head ());
135
-
136
143
int req = 0 ;
137
- while (!it.is_empty ()) {
138
- AsyncLogMessage* e = it.next ();
139
- char * msg = e->message ();
140
-
141
- if (msg != nullptr ) {
142
- e->output ()->write_blocking (e->decorations (), msg);
143
- os::free (msg);
144
- } else if (e->output () == nullptr ) {
144
+ auto it = _buffer_staging->iterator ();
145
+ while (it.hasNext ()) {
146
+ const Message* e = it.next ();
147
+
148
+ if (!e->is_token ()){
149
+ e->output ()->write_blocking (e->decorations (), e->message ());
150
+ } else {
145
151
// This is a flush token. Record that we found it and then
146
152
// signal the flushing thread after the loop.
147
153
req++;
148
154
}
149
155
}
150
156
157
+ LogDecorations decorations (LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset (),
158
+ LogDecorators::All);
159
+ snapshot.iterate([&](LogFileStreamOutput* output, uint32_t & counter) {
160
+ if (counter > 0 ) {
161
+ stringStream ss;
162
+ ss.print (UINT32_FORMAT_W (6 ) " messages dropped due to async logging" , counter);
163
+ output->write_blocking (decorations, ss.as_string (false ));
164
+ }
165
+ return true ;
166
+ });
167
+
151
168
if (req > 0 ) {
152
- assert (req == 1 , " AsyncLogWriter::flush() is NOT MT-safe!" );
169
+ assert (req == 1 , " Only one token is allowed in queue. AsyncLogWriter::flush() is NOT MT-safe!" );
153
170
_flush_sem.signal (req);
154
171
}
155
172
}
@@ -186,6 +203,8 @@ void AsyncLogWriter::initialize() {
186
203
}
187
204
os::start_thread (self);
188
205
log_debug (logging, thread)(" Async logging thread started." );
206
+ } else {
207
+ delete self;
189
208
}
190
209
}
191
210
@@ -200,17 +219,33 @@ AsyncLogWriter* AsyncLogWriter::instance() {
200
219
void AsyncLogWriter::flush () {
201
220
if (_instance != nullptr ) {
202
221
{
203
- using none = LogTagSetMapping<LogTag::__NO_TAG>;
204
222
AsyncLogLocker locker;
205
- LogDecorations d (LogLevel::Off, none::tagset (), LogDecorators::None);
206
- AsyncLogMessage token (nullptr , d, nullptr );
207
-
208
223
// Push directly in-case we are at logical max capacity, as this must not get dropped.
209
- _instance->_buffer . push_back (token );
224
+ _instance->_buffer -> push_flush_token ( );
210
225
_instance->_data_available = true ;
211
226
_instance->_lock .notify ();
212
227
}
213
228
214
229
_instance->_flush_sem .wait ();
215
230
}
216
231
}
232
+
233
+ AsyncLogWriter::BufferUpdater::BufferUpdater (size_t newsize) {
234
+ AsyncLogLocker locker;
235
+ auto p = AsyncLogWriter::_instance;
236
+
237
+ _buf1 = p->_buffer ;
238
+ _buf2 = p->_buffer_staging ;
239
+ p->_buffer = new Buffer (newsize);
240
+ p->_buffer_staging = new Buffer (newsize);
241
+ }
242
+
243
+ AsyncLogWriter::BufferUpdater::~BufferUpdater () {
244
+ AsyncLogLocker locker;
245
+ auto p = AsyncLogWriter::_instance;
246
+
247
+ delete p->_buffer ;
248
+ delete p->_buffer_staging ;
249
+ p->_buffer = _buf1;
250
+ p->_buffer_staging = _buf2;
251
+ }
0 commit comments