Skip to content

Commit

Permalink
ZOOKEEPER-1105: wait for server response in C client zookeeper_close
Browse files Browse the repository at this point in the history
**Thanks for Lincoln Lee for the original fix!**

In the current implementation, we always get a WARN in server side
("EndOfStreamException: Unable to read additional data from client")
whenever we close a zookeeper handler from the C client. This also happens
in the end of every execution of the command line C client.

The reason is that currently we don't wait for the response from the server
when we initiate the closing of the client connection, and we terminate
the socket on the client side too early.

I tested the patch both on linux and windows. I also tested it both with
NIO and Netty server side socket implementations.

Author: Mate Szalay-Beko <szalay.beko.mate@gmail.com>

Reviewers: Andor Molnar <andor@apache.org>, Norbert Kalmar <nkalmar@apache.org>

Closes apache#1176 from symat/ZOOKEEPER-1105
  • Loading branch information
symat authored and nkalmar committed Jan 28, 2020
1 parent e4758ba commit 57be7ae
Showing 1 changed file with 57 additions and 13 deletions.
70 changes: 57 additions & 13 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -3644,6 +3644,49 @@ static int add_multi_completion(zhandle_t *zh, int xid, void_completion_t dc,
return add_completion(zh, xid, COMPLETION_MULTI, dc, data, 0,0, clist);
}

/**
* After sending the close request, we are waiting for a given millisecs for
* getting the answer and/or for the socket to be closed by the server.
*
* This function should not be called while we still want to process
* any response from the server. It must be called after adaptor_finish called,
* in order not to mess with the I/O receiver thread in multi-threaded mode.
*/
int wait_for_session_to_be_closed(zhandle_t *zh, int timeout_ms)
{
int ret = 0;
#ifndef WIN32
struct pollfd fd_s[1];
#else
fd_set rfds;
struct timeval waittime = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
#endif

if (zh == NULL) {
return ZBADARGUMENTS;
}

#ifndef WIN32
fd_s[0].fd = zh->fd->sock;
fd_s[0].events = POLLIN;
ret = poll(fd_s, 1, timeout_ms);
#else
FD_ZERO(&rfds);
FD_SET(zh->fd->sock , &rfds);
ret = select(zh->fd->sock + 1, &rfds, NULL, NULL, &waittime);
#endif

if (ret == 0){
LOG_WARN(LOGCALLBACK(zh), "Timed out (%dms) during waiting for server's reply after sending a close request, sessionId=%#llx\n",
timeout_ms, zh->client_id.client_id);
} else if (ret < 0) {
LOG_WARN(LOGCALLBACK(zh), "System error (%d) happened while waiting for server's reply, sessionId=%#llx\n",
ret, zh->client_id.client_id);
}

return ZOK;
}

int zookeeper_close(zhandle_t *zh)
{
int rc=ZOK;
Expand All @@ -3669,32 +3712,33 @@ int zookeeper_close(zhandle_t *zh)
}
/* No need to decrement the counter since we're just going to
* destroy the handle later. */
if (is_connected(zh)){
if (is_connected(zh)) {
struct oarchive *oa;
struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to %s\n",
zh->client_id.client_id,zoo_get_current_server(zh));
zh->client_id.client_id, zoo_get_current_server(zh));
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa));
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
if (rc < 0) {
LOG_DEBUG(LOGCALLBACK(zh), "Error during closing zookeeper session, sessionId=%#llx to %s (error: %d)\n",
zh->client_id.client_id, zoo_get_current_server(zh), rc);
rc = ZMARSHALLINGERROR;
goto finish;
}
} else {
/* make sure the close request is sent; we set timeout to an arbitrary
* (but reasonable) number of milliseconds since we want the call to block*/
rc = adaptor_send_queue(zh, 3000);

/* make sure the close request is sent; we set timeout to an arbitrary
* (but reasonable) number of milliseconds since we want the call to block*/
rc=adaptor_send_queue(zh, 3000);
}else{
LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n",
zh->client_id.client_id);
/* give some time to the server to process the session close request properly */
rc = rc < 0 ? rc : wait_for_session_to_be_closed(zh, 1500);
}
} else {
rc = ZOK;
}

finish:
LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n", zh->client_id.client_id);
destroy(zh);
adaptor_destroy(zh);
free(zh->fd);
Expand Down

0 comments on commit 57be7ae

Please sign in to comment.