Skip to content
Merged

aa #10

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions frameworks/Java/officefloor/benchmark_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
"webserver": "vertx",
"os": "Linux",
"database_os": "Linux",
"display_name": "OfficeFloor-undertow",
"display_name": "OfficeFloor-vertx",
"notes": "",
"versus": "vertx-postgres"
},
Expand Down Expand Up @@ -232,4 +232,4 @@
}
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,15 @@ public void update(@HttpQueryParameter("queries") String queries, AsynchronousFl
return null;
}
Row row = rows.next();
return new World(row.getInteger(0), ThreadLocalRandom.current().nextInt(1, 10001));

// Ensure change to random number to trigger update
int previousRandomNumber = row.getInteger(1);
int newRandomNumber;
do {
newRandomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
} while (previousRandomNumber == newRandomNumber);

return new World(row.getInteger(0), newRandomNumber);
}));
}
return CompositeFuture.all(futures).flatMap((compositeFuture) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ public VertxSqlPoolConfigurer createService(ServiceContext context) throws Throw

@Override
public void configure(VertxSqlPoolConfigurerContext context) throws Exception {

// Ensure adequate number of connections
final int MAX_POOL_SIZE = 512;
System.out.println("Setting max pool size to " + MAX_POOL_SIZE);
context.getPoolOptions().setMaxSize(MAX_POOL_SIZE);

// Configure options
context.getSqlConnectOptions().setCachePreparedStatements(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import net.officefloor.server.RequestHandler;
import net.officefloor.server.http.HttpResponse;
import net.officefloor.server.http.ServerHttpConnection;
import net.officefloor.server.http.parse.HttpRequestParser;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -120,12 +118,12 @@ public void threadSetup(RequestHandler<HttpRequestParser> requestHandler) {
}

@Override
public void db(HttpResponse response, ServerHttpConnection connection, DatabaseOperationsContext context) {
public void db(DbSendResponse sender) {

// Determine if will overload queries
RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(1);
if (conn == null) {
context.sendError(connection, context.getTransientResourceException());
sender.sendOverloaded();
return; // rate limited
}

Expand All @@ -137,22 +135,21 @@ public void db(HttpResponse response, ServerHttpConnection connection, DatabaseO
Integer number = row.get(1, Integer.class);
return new World(id, number);
}))).publishOn(conn.writeScheduler).subscribe(world -> {
context.dbSend(response, connection, world);
sender.sendDb(world);
}, error -> {
context.sendError(connection, error);
sender.sendError(error);
}, () -> {
conn.processed(1);
});
}

@Override
public void queries(int queryCount, HttpResponse response, ServerHttpConnection connection,
DatabaseOperationsContext context) {
public void queries(int queryCount, QueriesSendResponse sender) {

// Determine if will overload queries
RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(queryCount);
if (conn == null) {
context.sendError(connection, context.getTransientResourceException());
sender.sendOverloaded();
return; // rate limited
}

Expand All @@ -165,21 +162,21 @@ public void queries(int queryCount, HttpResponse response, ServerHttpConnection
Integer number = row.get(1, Integer.class);
return new World(id, number);
}))).collectList().publishOn(conn.writeScheduler).subscribe(worlds -> {
context.queriesSend(response, connection, worlds);
sender.sendQueries(worlds.toArray(World[]::new));
}, error -> {
context.sendError(connection, error);
sender.sendError(error);
}, () -> {
conn.processed(queryCount);
});
}

@Override
public void fortunes(HttpResponse response, ServerHttpConnection connection, DatabaseOperationsContext context) {
public void fortunes(FortunesSendResponse sender) {

// Determine if will overload queries
RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(1);
if (conn == null) {
context.sendError(connection, context.getTransientResourceException());
sender.sendOverloaded();
return; // rate limited
}

Expand All @@ -190,24 +187,23 @@ public void fortunes(HttpResponse response, ServerHttpConnection connection, Dat
String message = row.get(1, String.class);
return new Fortune(id, message);
}))).collectList().publishOn(conn.writeScheduler).subscribe(fortunes -> {
context.fortunesSend(response, connection, fortunes);
sender.sendFortunes(fortunes);
}, error -> {
context.sendError(connection, error);
sender.sendError(error);
}, () -> {
conn.processed(1);
});
}

@Override
public void update(int queryCount, HttpResponse response, ServerHttpConnection connection,
DatabaseOperationsContext context) {
public void update(int queryCount, UpdateSendResponse sender) {

int executeQueryCount = queryCount + 1; // select all and update

// Determine if will overload queries
RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(executeQueryCount);
if (conn == null) {
context.sendError(connection, context.getTransientResourceException());
sender.sendOverloaded();
return; // rate limited
}

Expand All @@ -223,14 +219,21 @@ public void update(int queryCount, HttpResponse response, ServerHttpConnection c
Collections.sort(worlds, (a, b) -> a.id - b.id);
Batch batch = conn.connection.createBatch();
for (World world : worlds) {
world.randomNumber = ThreadLocalRandom.current().nextInt(1, 10001);

// Ensure change to random number to trigger update
int newRandomNumber;
do {
newRandomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
} while (world.randomNumber == newRandomNumber);
world.randomNumber = newRandomNumber;

batch.add("UPDATE WORLD SET RANDOMNUMBER = " + world.randomNumber + " WHERE ID = " + world.id);
}
return Mono.from(batch.execute()).map((result) -> worlds);
}).publishOn(conn.writeScheduler).subscribe(worlds -> {
context.updateSend(response, connection, worlds);
sender.sendUpdate(worlds.toArray(World[]::new));
}, error -> {
context.sendError(connection, error);
sender.sendError(error);
}, () -> {
conn.processed(executeQueryCount);
});
Expand Down
Loading