Skip to content

Commit 4977e00

Browse files
committed
Reduce contention on monitoring.
Keep statistics per-thread, and sum them only when we need to display them.
1 parent 387a603 commit 4977e00

14 files changed

+426
-319
lines changed

src/BUILD

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ cc_library(
2020
"apib_lines.cc",
2121
"apib_message.cc",
2222
"apib_rand.cc",
23-
"apib_reporting.cc",
2423
"apib_time.cc",
2524
"apib_url.cc",
2625
"apib_util.cc",
@@ -62,6 +61,7 @@ cc_library(
6261
"apib_io_socket.cc",
6362
"apib_iothread.cc",
6463
"apib_oauth.cc",
64+
"apib_reporting.cc",
6565
],
6666
hdrs = [
6767
"apib_commandqueue.h",
@@ -104,9 +104,6 @@ cc_binary(
104104
srcs = [
105105
"apib_main.cc",
106106
],
107-
copts = [
108-
"-std=gnu++14",
109-
],
110107
deps = [
111108
":io",
112109
"//third_party:base64",

src/apib_cpu_proc.cc

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,21 @@ static int getTicks(CPUUsage* cpu) {
171171
if (!tok.empty()) {
172172
int64_t c;
173173
if (!absl::SimpleAtoi(tok, &c)) {
174-
return 0;
174+
continue;
175175
}
176-
if ((i == 3) || (i == 4) || (i == 7)) {
177-
/* The fourth and fifth columns are "idle" and "iowait".
176+
switch (i) {
177+
case 3:
178+
case 4:
179+
case 7:
180+
/* The fourth and fifth columns are "idle" and "iowait".
178181
We consider both to be idle CPU.
179182
The eigth is "steal", which is time lost to virtualization
180183
as a client -- that's idle too in our estimation */
181-
idleCount += c;
182-
} else {
183-
nonIdleCount += c;
184+
idleCount += c;
185+
break;
186+
default:
187+
nonIdleCount += c;
188+
break;
184189
}
185190
i++;
186191
}

src/apib_io_basic.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ int ConnectionState::Connect() {
6565

6666
size_t addrLen;
6767
const struct sockaddr* addr = url_->address(t_->threadIndex(), &addrLen);
68+
if (addr == nullptr) {
69+
io_Verbose(this, "No addresses to look up\n");
70+
return -2;
71+
}
6872

6973
if (t_->verbose) {
7074
char hostName[512];

src/apib_iothread.cc

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ limitations under the License.
1818

1919
#include <cassert>
2020
#include <functional>
21+
#include <iostream>
2122
#include <thread>
2223

2324
#include "ev.h"
@@ -108,11 +109,15 @@ void ConnectionState::writeRequest() {
108109
void ConnectionState::ConnectAndSend() {
109110
startTime_ = GetTime();
110111
if (needsOpen_) {
111-
int err = Connect();
112-
mandatoryAssert(err == 0);
113-
// Should only fail if we can't create a new socket --
114-
// errors actually connecting will be handled during write.
115-
RecordConnectionOpen();
112+
const int err = Connect();
113+
if (err == 0) {
114+
RecordConnectionOpen();
115+
} else {
116+
std::cerr << "Error opening TCP connection: " << err << std::endl;
117+
RecordSocketError();
118+
sendAfterDelay(kConnectFailureDelay);
119+
return;
120+
}
116121
}
117122
writeRequest();
118123
SendWrite();
@@ -127,22 +132,26 @@ void ConnectionState::thinkingDone(struct ev_loop* loop, ev_timer* t,
127132

128133
void ConnectionState::addThinkTime() {
129134
const double thinkTime = (double)(t_->thinkTime) / 1000.0;
130-
io_Verbose(this, "Thinking for %.4lf seconds\n", thinkTime);
131-
ev_timer_init(&thinkTimer_, thinkingDone, thinkTime, 0);
135+
sendAfterDelay(thinkTime);
136+
}
137+
138+
void ConnectionState::sendAfterDelay(double seconds) {
139+
io_Verbose(this, "Thinking for %.4lf seconds\n", seconds);
140+
ev_timer_init(&thinkTimer_, thinkingDone, seconds, 0);
132141
thinkTimer_.data = this;
133142
ev_timer_start(t_->loop(), &thinkTimer_);
134143
}
135144

136-
void ConnectionState::recycle(int closeConn) {
145+
void ConnectionState::recycle(bool closeConn) {
137146
if (closeConn || t_->noKeepAlive || !t_->shouldKeepRunning()) {
138-
needsOpen_ = 1;
147+
needsOpen_ = true;
139148
// Close is async, especially for TLS. So we will
140149
// reconnect later...
141150
Close();
142151
return;
143152
}
144153

145-
needsOpen_ = 0;
154+
needsOpen_ = false;
146155
if (t_->thinkTime > 0) {
147156
addThinkTime();
148157
} else {
@@ -152,7 +161,7 @@ void ConnectionState::recycle(int closeConn) {
152161

153162
int ConnectionState::StartConnect() {
154163
url_ = URLInfo::GetNext(t_->rand());
155-
needsOpen_ = 1;
164+
needsOpen_ = true;
156165
ConnectAndSend();
157166
return 0;
158167
}
@@ -174,7 +183,7 @@ void ConnectionState::WriteDone(int err) {
174183
if (err != 0) {
175184
RecordSocketError();
176185
io_Verbose(this, "Error on write: %i\n", err);
177-
recycle(1);
186+
recycle(true);
178187
} else {
179188
io_Verbose(this, "Write complete. Starting to read\n");
180189
// Prepare to read.
@@ -191,32 +200,45 @@ void ConnectionState::ReadDone(int err) {
191200
if (err != 0) {
192201
io_Verbose(this, "Error on read: %i\n", err);
193202
RecordSocketError();
194-
recycle(1);
203+
recycle(true);
195204
return;
196205
}
197206

198-
RecordResult(parser_.status_code);
199-
t_->recordLatency(GetTime() - startTime_);
207+
t_->recordResult(parser_.status_code, GetTime() - startTime_);
200208
if (!http_should_keep_alive(&(parser_))) {
201209
io_Verbose(this, "Server does not want keep-alive\n");
202-
recycle(1);
210+
recycle(true);
203211
} else {
204212
const URLInfo* oldUrl = url_;
205213
url_ = URLInfo::GetNext(t_->rand());
206214
if (!URLInfo::IsSameServer(*oldUrl, *url_, t_->index)) {
207215
io_Verbose(this, "Switching to a different server\n");
208216
writeDirty_ = true;
209-
recycle(1);
217+
recycle(true);
210218
} else {
211219
// URLs are static throughout the run, so we can just compare pointer here
212220
if (url_ != oldUrl) {
213221
writeDirty_ = true;
214222
}
215-
recycle(0);
223+
recycle(false);
216224
}
217225
}
218226
}
219227

228+
void IOThread::recordResult(int statusCode, int_fast64_t latency) {
229+
Counters* c = getCounters();
230+
if ((statusCode >= 200) && (statusCode < 300)) {
231+
c->successfulRequests++;
232+
} else {
233+
c->failedRequests++;
234+
}
235+
c->latencies.push_back(latency);
236+
}
237+
238+
void IOThread::recordRead(size_t c) { getCounters()->bytesRead += c; }
239+
240+
void IOThread::recordWrite(size_t c) { getCounters()->bytesWritten += c; }
241+
220242
void IOThread::setNumConnections(size_t newVal) {
221243
iothread_Verbose(this, "Current connections = %zu. New connections = %zu\n",
222244
connections_.size(), newVal);
@@ -277,10 +299,6 @@ void IOThread::processCommands(struct ev_loop* loop, ev_async* a, int revents) {
277299

278300
void IOThread::threadLoop() {
279301
int ret = 0;
280-
readCount_ = 0;
281-
writeCount_ = 0;
282-
readBytes_ = 0;
283-
writeBytes_ = 0;
284302

285303
iothread_Verbose(this, "Starting new event loop %i for %i connection\n",
286304
index, numConnections);
@@ -311,8 +329,6 @@ void IOThread::threadLoop() {
311329
}
312330
ret = ev_run(loop_, 0);
313331
iothread_Verbose(this, "ev_run finished: %i\n", ret);
314-
RecordByteCounts(writeBytes_, readBytes_);
315-
RecordLatencies(latencies_);
316332

317333
finish:
318334
iothread_Verbose(this, "Cleaning up event loop %i\n", index);
@@ -322,13 +338,26 @@ void IOThread::threadLoop() {
322338
ev_loop_destroy(loop_);
323339
}
324340

341+
IOThread::IOThread() {
342+
Counters* c = new Counters();
343+
counterPtr_.store(reinterpret_cast<uintptr_t>(c));
344+
}
345+
325346
IOThread::~IOThread() {
326347
if (sslCtx != nullptr) {
327348
SSL_CTX_free(sslCtx);
328349
}
329350
if (thread_ != nullptr) {
330351
delete thread_;
331352
}
353+
Counters* c = reinterpret_cast<Counters*>(counterPtr_.load());
354+
delete c;
355+
}
356+
357+
Counters* IOThread::exchangeCounters() {
358+
Counters* newCounters = new Counters();
359+
return reinterpret_cast<Counters*>(
360+
counterPtr_.exchange(reinterpret_cast<uintptr_t>(newCounters)));
332361
}
333362

334363
void IOThread::initializeParser() {
@@ -383,14 +412,4 @@ void IOThread::SetNumConnections(int newConnections) {
383412
ev_async_send(loop_, &async_);
384413
}
385414

386-
void IOThread::recordRead(size_t c) {
387-
readCount_++;
388-
readBytes_ += c;
389-
}
390-
391-
void IOThread::recordWrite(size_t c) {
392-
writeCount_++;
393-
writeBytes_ += c;
394-
}
395-
396415
} // namespace apib

src/apib_iothread.h

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ limitations under the License.
1919

2020
#include <openssl/ssl.h>
2121

22+
#include <atomic>
23+
#include <memory>
2224
#include <sstream>
2325
#include <string>
2426
#include <thread>
@@ -38,6 +40,7 @@ namespace apib {
3840
// already set
3941

4042
class ConnectionState;
43+
class Counters;
4144

4245
// This structure represents a single thread that runs a benchmark
4346
// across multiple connections.
@@ -67,6 +70,7 @@ class IOThread {
6770
static constexpr int kConnectionSet = (1 << 4);
6871
static constexpr int kUserAgentSet = (1 << 5);
6972

73+
IOThread();
7074
~IOThread();
7175

7276
// Start the thread. It's up to the caller to initialize everything
@@ -99,29 +103,32 @@ class IOThread {
99103

100104
void recordRead(size_t c);
101105
void recordWrite(size_t c);
102-
void recordLatency(int64_t l) { latencies_.push_back(l); }
106+
void recordResult(int statusCode, int64_t latency);
107+
108+
// Swap the current set of performance counters and start new ones.
109+
// The caller must free the result.
110+
Counters* exchangeCounters();
103111

104112
private:
105113
void threadLoop();
106114
static void initializeParser();
107115
static void processCommands(struct ev_loop* loop, ev_async* a, int revents);
108116
static void hardShutdown(struct ev_loop* loop, ev_timer* timer, int revents);
109117
void setNumConnections(size_t newVal);
118+
Counters* getCounters() {
119+
return reinterpret_cast<Counters*>(counterPtr_.load());
120+
}
110121

111122
static http_parser_settings parserSettings_;
112123

113-
long readCount_;
114-
long writeCount_;
115-
long long readBytes_;
116-
long long writeBytes_;
117124
std::vector<ConnectionState*> connections_;
118-
std::thread* thread_;
125+
std::thread* thread_ = nullptr;
119126
RandomGenerator rand_;
120-
struct ev_loop* loop_;
127+
struct ev_loop* loop_ = nullptr;
121128
ev_async async_;
122129
CommandQueue commands_;
123130
ev_timer shutdownTimer_;
124-
std::vector<int64_t> latencies_;
131+
std::atomic_uintptr_t counterPtr_;
125132
};
126133

127134
typedef enum {
@@ -166,11 +173,13 @@ class ConnectionState {
166173
static int httpComplete(http_parser* p);
167174

168175
private:
169-
static const int readBufSize = 512;
170-
static const int writeBufSize = 1024;
176+
static constexpr int readBufSize = 512;
177+
static constexpr int writeBufSize = 1024;
178+
static constexpr double kConnectFailureDelay = 0.25;
171179

172180
void addThinkTime();
173-
void recycle(int closeConn);
181+
void sendAfterDelay(double seconds);
182+
void recycle(bool closeConn);
174183
void writeRequest();
175184
void printSslError(const std::string& msg, int err) const;
176185

@@ -207,6 +216,9 @@ class ConnectionState {
207216
long long startTime_ = 0LL;
208217
};
209218

219+
// A typedef used to clean up some messy interfaces
220+
typedef std::vector<std::unique_ptr<IOThread>> ThreadList;
221+
210222
// Debugging macro
211223
#define io_Verbose(c, ...) \
212224
if ((c)->t_->verbose) { \

0 commit comments

Comments
 (0)