Skip to content

Commit

Permalink
RpcClient invoke with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
apangin committed Jun 14, 2018
1 parent 327b298 commit 5583b69
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/one/nio/rpc/RpcClient.java
Expand Up @@ -40,7 +40,11 @@ public RpcClient(ConnectionString conn) {
}

public Object invoke(Object request) throws Exception {
byte[] buffer = invokeRaw(request);
return invoke(request, readTimeout);
}

public Object invoke(Object request, int timeout) throws Exception {
byte[] buffer = invokeRaw(request, timeout);

for (;;) {
Object response;
Expand All @@ -61,7 +65,7 @@ public Object invoke(Object request) throws Exception {
} else if (response instanceof SerializerNotFoundException) {
long uid = ((SerializerNotFoundException) response).getUid();
provideSerializer(Repository.requestSerializer(uid));
buffer = invokeRaw(request);
buffer = invokeRaw(request, readTimeout);
} else {
throw (Exception) response;
}
Expand All @@ -86,26 +90,26 @@ protected Serializer requestSerializer(long uid) throws Exception {
}

protected Object invokeServiceRequest(Object request) throws Exception {
byte[] rawResponse = invokeRaw(request);
byte[] rawResponse = invokeRaw(request, readTimeout);
Object response = new DeserializeStream(rawResponse).readObject();
if (response instanceof Exception) {
throw (Exception) response;
}
return response;
}

private byte[] invokeRaw(Object request) throws Exception {
private byte[] invokeRaw(Object request, int timeout) throws Exception {
byte[] buffer = serialize(request);

Socket socket = borrowObject();
try {
try {
sendRequest(socket, buffer);
sendRequest(socket, buffer, timeout);
} catch (SocketException e) {
// Stale connection? Retry on a fresh socket
destroyObject(socket);
socket = createObject();
sendRequest(socket, buffer);
sendRequest(socket, buffer, timeout);
}

int responseSize = RpcPacket.getSize(buffer, socket);
Expand All @@ -132,7 +136,8 @@ private byte[] serialize(Object request) throws IOException {
return buffer;
}

private void sendRequest(Socket socket, byte[] buffer) throws IOException {
private void sendRequest(Socket socket, byte[] buffer, int timeout) throws IOException {
if (timeout != 0) socket.setTimeout(timeout);
socket.writeFully(buffer, 0, buffer.length);
socket.readFully(buffer, 0, 4);
}
Expand Down

0 comments on commit 5583b69

Please sign in to comment.