Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Harden against connection and server failures #50

Merged
merged 30 commits into from

2 participants

@eric
Owner

I created a test harness using a zk-server cluster that sent alternating SIGSTOP and SIGCONT signals every 30 seconds to each zookeeper server in sequence to simulate random network failures.

The changes from this included:

  • Make sure zookeeper_process() is called even if rb_thread_select() times out
  • Fixing a bug in the apache zookeeper library (reported as ZOOKEEPER-1756)
  • Upgrade to zookeeper 3.4.5 client library
  • Only submit commands when the connection state is connected
  • Prevent new commands from being submitted if the connection has expired
@slyphon
Owner

just noticed this line should have the condition reversed:

        until @reg.in_flight.empty? or @_closed or is_unrecoverable
         # stuff
        end

is_unrecoverable will barf with a RuntimeError if @_closed is true because the state of @_closed is flipped by the C layer when the zk handle has been cleaned up.

@slyphon
Owner

should remove the old zkc-3.3.5 tarball

We should also try to upgrade the java driver, to keep them on the same version (probably make that a separate commit before the next release).

@slyphon slyphon commented on the diff
ext/zkrb.c
@@ -526,7 +527,7 @@ static VALUE method_get(VALUE self, VALUE reqid, VALUE path, VALUE async, VALUE
char * data = NULL;
if (IS_SYNC(call_type)) {
data = malloc(MAX_ZNODE_SIZE); /* ugh */
- memset(data, 0, sizeof(data));
+ memset(data, 0, MAX_ZNODE_SIZE);
@slyphon Owner
slyphon added a note

You know more about this than I do, is sizeof(data) incorrect?

I understand that MAX_ZNODE_SIZE is a defined constant, but, shouldn't these two be equivalent?

(i'm not objecting to the change, I just want to know if I screwed up somehow)

@eric Owner
eric added a note

It is incorrect.

Because data is defined as:

   char * data = NULL;

The sizeof(data) will return 4 (the size of a pointer).

If you had this:

   char data[MAX_ZNODE_SIZE];

The result of sizeof(data) would have been MAX_ZNODE_SIZE.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@slyphon slyphon commented on the diff
ext/zkrb.c
@@ -788,10 +789,10 @@ static VALUE method_zkrb_iterate_event_loop(VALUE self) {
fd_set rfds, wfds, efds;
FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds);
- int fd=0, interest=0, events=0, rc=0, maxfd=0;
+ int fd = 0, interest = 0, events = 0, rc = 0, maxfd = 0, irc = 0, prc = 0;
@slyphon Owner
slyphon added a note

ahhh, breathing room.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Rakefile
@@ -94,6 +94,7 @@ end
task :build do
cd 'ext' do
+ ENV['ZK_DEV'] = 'true'
@slyphon Owner
slyphon added a note

yeah?

@eric Owner
eric added a note

If you're building via the rake task, why not leave around the source to zkc?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@slyphon
Owner

It looks like the build is failing because the GIL-release-wrapping-codegen script can't find zookeeper.h, not sure why though. It's doing a recursive glob in the ext dir here.

@eric
Owner

The problem with the build failing is a travis-ci bug, you can see the passing builds here.

I have sent them an email asking about the situation.

I've addressed your comments, removed the old 3.3.5 tar and updated java to 3.4.5.

The travis build with all of this should be at: https://travis-ci.org/zk-ruby/zookeeper/builds/11128784

@eric
Owner

Looks like I'm going to have to upgrade zk-server to be able to upgrade the java jar...

https://travis-ci.org/zk-ruby/zookeeper/jobs/11128789

@eric
Owner

I've given up on including the java client upgrade as part of this change. There are too many dependencies to sort out for me to take on as part of this.

@eric
Owner

If there aren't any more comments, this should be good to merge.

@eric eric merged commit ebdfa78 into from
@eric eric deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 7, 2013
  1. @eric

    Initial torture tests for the zookeeper client

    eric authored
    Creates an unstable zookeeper server environment to test how the client
    reacts.
  2. @eric

    Fix duplicates script

    eric authored
Commits on Sep 8, 2013
  1. @eric
  2. @eric

    Fix bug in apache zookeeper library

    eric authored
    Timeouts are not properly handled when there is an outstanding request.
  3. @eric

    Provide connected_host method on the C client

    eric authored
    Provides the server the client is connected to
  4. @eric
  5. @eric
  6. @eric
  7. @eric
  8. @eric
  9. @eric
  10. @eric

    Disable debugging by default

    eric authored
  11. @eric
  12. @eric

    Remove extra patch

    eric authored
  13. @eric

    Fix condition when closing

    eric authored
  14. @eric

    Remove old 3.3.5 zkc tar

    eric authored
  15. @eric

    Upgrade java zookeeper gem

    eric authored
  16. @eric

    Trying to get java to work

    eric authored
  17. @eric

    Merge branch 'master' into zoomonkey

    eric authored
    Conflicts:
    	lib/zookeeper/version.rb
  18. @eric

    updating zk-server

    eric authored
  19. @eric
  20. @eric
Commits on Sep 9, 2013
  1. @eric

    Fix dependency order

    eric authored
  2. @eric

    Fix Makefile reference

    eric authored
  3. @eric

    Improving build scripts

    eric authored
  4. @eric

    Updating wrappers

    eric authored
  5. @eric
  6. @eric

    Update generated wrappers

    eric authored
  7. @eric

    Added CHANGELOG entry

    eric authored
  8. @eric
This page is out of date. Refresh to see the latest.
View
16 CHANGELOG
@@ -1,3 +1,19 @@
+
+v1.4.6
+
+ * Fix two issues with dealing with unresponsive zookeeper servers
+ that would prevent the client from properly recovering the session
+ (including one in the Apache ZooKeeper C library — see ZOOKEEPER-1756
+ for more details)
+ * Reduce the chances of seeing Zookeeper::Exceptions::NotConnected
+ exceptions by only submitting new commands if we are connected
+ * Prevent commands from being queued to be sent to the server if the
+ session has expired which will solve most cases involving the
+ Zookeeper::Exceptions::ContinuationTimeoutError exception
+ * Upgrade the Apache ZooKeeper C library to 3.4.5 (client is backward
+ compatible with 3.3 servers). The Java library has not been updated.
+ * Cleanup complaints from compiler for uninitialized variable access
+
v1.4.5
* Allow passing :session_id and :session_password options #42 (thanks to avalanche123)
View
4 Gemfile
@@ -1,4 +1,4 @@
-source :rubygems
+source 'https://rubygems.org'
gemspec
@@ -8,7 +8,7 @@ group :test do
gem "rspec" , "~> 2.11"
gem 'eventmachine', '1.0.0'
gem 'evented-spec', '~> 0.9.0'
- gem 'zk-server', '~> 1.0'
+ gem 'zk-server', '~> 1.0', :git => 'https://github.com/zk-ruby/zk-server.git'
end
# ffs, :platform appears to be COMLETELY BROKEN so we just DO THAT HERE
View
8 Rakefile
@@ -90,6 +90,14 @@ namespace :build do
Rake::Task['build'].invoke
end
+
+ task :clobber do
+ cd 'ext' do
+ sh 'rake clobber'
+ end
+
+ Rake::Task['build'].invoke
+ end
end
task :build do
View
42 ext/Rakefile
@@ -1,10 +1,6 @@
require 'rbconfig'
-LIB_ZK_SO = 'lib/libzookeeper_mt_gem.la'
-
-TARBALL = FileList['zkc-*.tar.gz'].first
-
-raise "Where is the zkc tarball!?" unless TARBALL
+ZKRB_WRAPPER = %w[zkrb_wrapper.c zkrb_wrapper.h]
namespace :zkrb do
task :clean do
@@ -17,49 +13,29 @@ namespace :zkrb do
end
task :clobber => :clean do
- rm_rf %w[Makefile c lib bin include ._c]
+ rm_rf %w[Makefile c lib bin include ._c] + ZKRB_WRAPPER
end
end
-task :clean => 'zkrb:clean'
-task :clobber => 'zkrb:clobber'
-
-GENERATE_GVL_CODE_RB = 'generate_gvl_code.rb'
-
-# file 'c' do
-# if tarball = Dir['zkc-*.tar.gz'].first
-# sh "tar -zxf #{tarball}"
-# else
-# raise "couldn't find the tarball! wtf?!"
-# end
-# end
-
-file 'c' => TARBALL do
- sh "tar -zxf #{TARBALL}"
- sh "patch -p0 < patch-zookeeper"
-end
-
-file GENERATE_GVL_CODE_RB => 'c'
+task :clean => 'zkrb:clean'
+task :clobber => 'zkrb:clobber'
+task :wrappers => ZKRB_WRAPPER
+task :default => :build
-file 'zkrb_wrapper.c' => GENERATE_GVL_CODE_RB do
+file 'zkrb_wrapper.c' => 'generate_gvl_code.rb' do
sh "ruby generate_gvl_code.rb code"
end
-file 'zkrb_wrapper.h' => GENERATE_GVL_CODE_RB do
+file 'zkrb_wrapper.h' => 'generate_gvl_code.rb' do
sh "ruby generate_gvl_code.rb headers"
end
-ZKRB_WRAPPER = %w[zkrb_wrapper.c zkrb_wrapper.h]
-
-task :wrappers => ZKRB_WRAPPER
-
file 'Makefile' do
sh "ruby extconf.rb"
end
-task :build => [ZKRB_WRAPPER, 'Makefile'].flatten do
+task :build => [ 'Makefile', :wrappers ] do
sh 'make'
end
-task :default => :build
View
27 ext/c_zookeeper.rb
@@ -17,7 +17,7 @@ class CZookeeper
include Exceptions
include Logger
- DEFAULT_SESSION_TIMEOUT_MSEC = 10000
+ DEFAULT_RECEIVE_TIMEOUT_MSEC = 10000
class GotNilEventException < StandardError; end
@@ -65,7 +65,7 @@ def initialize(host, event_queue, opts={})
# the actual C data is stashed in this ivar. never *ever* touch this
@_data = nil
- @_session_timeout_msec = DEFAULT_SESSION_TIMEOUT_MSEC
+ @_receive_timeout_msec = opts[:receive_timeout_msec] || DEFAULT_RECEIVE_TIMEOUT_MSEC
@mutex = Monitor.new
@@ -120,6 +120,14 @@ def associating?
state == ZOO_ASSOCIATING_STATE
end
+ def unhealthy?
+ @_closed || @_shutting_down || is_unrecoverable
+ end
+
+ def healthy?
+ !unhealthy?
+ end
+
def close
return if closed?
@@ -183,11 +191,11 @@ def wait_until_connected(timeout=10)
while true
if timeout
now = Time.now
- break if (@state == ZOO_CONNECTED_STATE) || @_shutting_down || @_closed || (now > time_to_stop)
+ break if (@state == ZOO_CONNECTED_STATE) || unhealthy? || (now > time_to_stop)
delay = time_to_stop.to_f - now.to_f
@state_cond.wait(delay)
else
- break if (@state == ZOO_CONNECTED_STATE) || @_shutting_down || @_closed
+ break if (@state == ZOO_CONNECTED_STATE) || unhealthy?
@state_cond.wait
end
end
@@ -201,7 +209,7 @@ def wait_until_connected(timeout=10)
# blocks the caller until result has returned
def submit_and_block(meth, *args)
@mutex.synchronize do
- raise Exceptions::NotConnected if @_shutting_down
+ raise Exceptions::NotConnected if unhealthy?
end
cnt = Continuation.new(meth, *args)
@@ -257,8 +265,11 @@ def event_thread_body
event_thread_await_running
# this is the main loop
- until (@_shutting_down or @_closed or is_unrecoverable)
- submit_pending_calls if @reg.anything_to_do?
+ while healthy?
+ if @reg.anything_to_do? && connected?
+ submit_pending_calls
+ end
+
zkrb_iterate_event_loop
iterate_event_delivery
end
@@ -269,7 +280,7 @@ def event_thread_body
if @_shutting_down and not (@_closed or is_unrecoverable)
logger.debug { "we're in shutting down state, there are #{@reg.in_flight.length} in_flight completions" }
- until @reg.in_flight.empty? or is_unrecoverable or @_closed
+ until @reg.in_flight.empty? or @_closed or is_unrecoverable
zkrb_iterate_event_loop
iterate_event_delivery
logger.debug { "there are #{@reg.in_flight} in_flight completions left" }
View
38 ext/event_lib.c
@@ -21,7 +21,7 @@ slyphon@gmail.com
*/
#include "ruby.h"
-#include "c-client-src/zookeeper.h"
+#include "zookeeper/zookeeper.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
@@ -321,7 +321,7 @@ VALUE zkrb_event_to_ruby(zkrb_event_t *event) {
switch (event->type) {
case ZKRB_DATA: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_DATA\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_DATA");
struct zkrb_data_completion *data_ctx = event->completion.data_completion;
if (ZKRBDebugging) zkrb_print_stat(data_ctx->stat);
rb_hash_aset(hash, GET_SYM("data"), data_ctx->data ? rb_str_new(data_ctx->data, data_ctx->data_len) : Qnil);
@@ -329,39 +329,39 @@ VALUE zkrb_event_to_ruby(zkrb_event_t *event) {
break;
}
case ZKRB_STAT: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_STAT\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_STAT");
struct zkrb_stat_completion *stat_ctx = event->completion.stat_completion;
rb_hash_aset(hash, GET_SYM("stat"), stat_ctx->stat ? zkrb_stat_to_rarray(stat_ctx->stat) : Qnil);
break;
}
case ZKRB_STRING: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_STRING\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_STRING");
struct zkrb_string_completion *string_ctx = event->completion.string_completion;
rb_hash_aset(hash, GET_SYM("string"), string_ctx->value ? rb_str_new2(string_ctx->value) : Qnil);
break;
}
case ZKRB_STRINGS: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_STRINGS\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_STRINGS");
struct zkrb_strings_completion *strings_ctx = event->completion.strings_completion;
rb_hash_aset(hash, GET_SYM("strings"), strings_ctx->values ? zkrb_string_vector_to_ruby(strings_ctx->values) : Qnil);
break;
}
case ZKRB_STRINGS_STAT: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_STRINGS_STAT\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_STRINGS_STAT");
struct zkrb_strings_stat_completion *strings_stat_ctx = event->completion.strings_stat_completion;
rb_hash_aset(hash, GET_SYM("strings"), strings_stat_ctx->values ? zkrb_string_vector_to_ruby(strings_stat_ctx->values) : Qnil);
rb_hash_aset(hash, GET_SYM("stat"), strings_stat_ctx->stat ? zkrb_stat_to_rarray(strings_stat_ctx->stat) : Qnil);
break;
}
case ZKRB_ACL: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_ACL\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_ACL");
struct zkrb_acl_completion *acl_ctx = event->completion.acl_completion;
rb_hash_aset(hash, GET_SYM("acl"), acl_ctx->acl ? zkrb_acl_vector_to_ruby(acl_ctx->acl) : Qnil);
rb_hash_aset(hash, GET_SYM("stat"), acl_ctx->stat ? zkrb_stat_to_rarray(acl_ctx->stat) : Qnil);
break;
}
case ZKRB_WATCHER: {
- zkrb_debug("zkrb_event_to_ruby ZKRB_WATCHER\n");
+ zkrb_debug("zkrb_event_to_ruby ZKRB_WATCHER");
struct zkrb_watcher_completion *watcher_ctx = event->completion.watcher_completion;
rb_hash_aset(hash, GET_SYM("type"), INT2FIX(watcher_ctx->type));
rb_hash_aset(hash, GET_SYM("state"), INT2FIX(watcher_ctx->state));
@@ -377,8 +377,8 @@ VALUE zkrb_event_to_ruby(zkrb_event_t *event) {
}
void zkrb_print_stat(const struct Stat *s) {
- fprintf(stderr, "stat {\n");
if (s != NULL) {
+ fprintf(stderr, "stat {\n");
fprintf(stderr, "\t czxid: %"PRId64"\n", s->czxid); // PRId64 defined in inttypes.h
fprintf(stderr, "\t mzxid: %"PRId64"\n", s->mzxid);
fprintf(stderr, "\t ctime: %"PRId64"\n", s->ctime);
@@ -390,10 +390,10 @@ void zkrb_print_stat(const struct Stat *s) {
fprintf(stderr, "\t dataLength: %d\n", s->dataLength);
fprintf(stderr, "\t numChildren: %d\n", s->numChildren);
fprintf(stderr, "\t pzxid: %"PRId64"\n", s->pzxid);
+ fprintf(stderr, "}\n");
} else {
- fprintf(stderr, "\tNULL\n");
+ fprintf(stderr, "stat { NULL }\n");
}
- fprintf(stderr, "}\n");
}
zkrb_calling_context *zkrb_calling_context_alloc(int64_t req_id, zkrb_queue_t *queue) {
@@ -436,7 +436,7 @@ void zkrb_state_callback(
zhandle_t *zh, int type, int state, const char *path, void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_STATE WATCHER "
- "type = %d, state = %d, path = %p, value = %s\n",
+ "type = %d, state = %d, path = %p, value = %s",
type, state, (void *) path, path ? path : "NULL");
/* save callback context */
@@ -466,7 +466,7 @@ void zkrb_data_callback(
int rc, const char *value, int value_len, const struct Stat *stat, const void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_DATA WATCHER "
- "rc = %d (%s), value = %s, len = %d\n",
+ "rc = %d (%s), value = %s, len = %d",
rc, zerror(rc), value ? value : "NULL", value_len);
/* copy data completion */
@@ -494,7 +494,7 @@ void zkrb_data_callback(
void zkrb_stat_callback(
int rc, const struct Stat *stat, const void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_STAT WATCHER "
- "rc = %d (%s)\n", rc, zerror(rc));
+ "rc = %d (%s)", rc, zerror(rc));
struct zkrb_stat_completion *sc = zk_malloc(sizeof(struct zkrb_stat_completion));
sc->stat = NULL;
@@ -512,7 +512,7 @@ void zkrb_string_callback(
int rc, const char *string, const void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_STRING WATCHER "
- "rc = %d (%s)\n", rc, zerror(rc));
+ "rc = %d (%s)", rc, zerror(rc));
struct zkrb_string_completion *sc = zk_malloc(sizeof(struct zkrb_string_completion));
sc->value = NULL;
@@ -530,7 +530,7 @@ void zkrb_string_callback(
void zkrb_strings_callback(
int rc, const struct String_vector *strings, const void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_STRINGS WATCHER "
- "rc = %d (%s), calling_ctx = %p\n", rc, zerror(rc), calling_ctx);
+ "rc = %d (%s), calling_ctx = %p", rc, zerror(rc), calling_ctx);
/* copy string vector */
struct zkrb_strings_completion *sc = zk_malloc(sizeof(struct zkrb_strings_completion));
@@ -547,7 +547,7 @@ void zkrb_strings_callback(
void zkrb_strings_stat_callback(
int rc, const struct String_vector *strings, const struct Stat *stat, const void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_STRINGS_STAT WATCHER "
- "rc = %d (%s), calling_ctx = %p\n", rc, zerror(rc), calling_ctx);
+ "rc = %d (%s), calling_ctx = %p", rc, zerror(rc), calling_ctx);
struct zkrb_strings_stat_completion *sc = zk_malloc(sizeof(struct zkrb_strings_stat_completion));
sc->stat = NULL;
@@ -565,7 +565,7 @@ void zkrb_strings_stat_callback(
void zkrb_void_callback(int rc, const void *calling_ctx) {
zkrb_debug("ZOOKEEPER_C_VOID WATCHER "
- "rc = %d (%s)\n", rc, zerror(rc));
+ "rc = %d (%s)", rc, zerror(rc));
ZKH_SETUP_EVENT(queue, event);
event->rc = rc;
@@ -577,7 +577,7 @@ void zkrb_void_callback(int rc, const void *calling_ctx) {
void zkrb_acl_callback(
int rc, struct ACL_vector *acls, struct Stat *stat, const void *calling_ctx) {
- zkrb_debug("ZOOKEEPER_C_ACL WATCHER rc = %d (%s)\n", rc, zerror(rc));
+ zkrb_debug("ZOOKEEPER_C_ACL WATCHER rc = %d (%s)", rc, zerror(rc));
struct zkrb_acl_completion *ac = zk_malloc(sizeof(struct zkrb_acl_completion));
ac->acl = NULL;
View
2  ext/event_lib.h
@@ -2,7 +2,7 @@
#define ZKRB_EVENT_LIB_H
#include "ruby.h"
-#include "c-client-src/zookeeper.h"
+#include "zookeeper/zookeeper.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
View
13 ext/extconf.rb
@@ -3,10 +3,11 @@
require 'fileutils'
HERE = File.expand_path(File.dirname(__FILE__))
-BUNDLE = Dir.glob("zkc-*.tar.gz").first
-ZKPATCH = "patch-zookeeper"
+BUNDLE = Dir.glob("zkc-*.tar.gz").sort.last
+ZKC_VERSION = BUNDLE[/(zkc-.*?)\.tar.gz$/, 1]
+PATCHES = Dir.glob("patches/#{ZKC_VERSION}*.patch")
-BUNDLE_PATH = File.join(HERE, 'c')
+BUNDLE_PATH = File.join(HERE, ZKC_VERSION, 'c')
$EXTRA_CONF = ''
@@ -58,8 +59,10 @@ def safe_sh(cmd)
puts "Building zkc."
unless File.exists?('c')
- puts(cmd = "tar xzf #{BUNDLE} 2>&1 && patch -p0 < #{ZKPATCH} 2>&1")
- raise "'#{cmd}' failed" unless system(cmd)
+ safe_sh "tar xzf #{BUNDLE} 2>&1"
+ PATCHES.each do |patch|
+ safe_sh "patch -p0 < #{patch} 2>&1"
+ end
end
# clean up stupid apple rsrc fork bullshit
View
10 ext/generate_gvl_code.rb 100644 → 100755
@@ -202,11 +202,9 @@ def self.from_zookeeper_h(text)
typed_args = argstr.split(',').map(&:strip)
- # gah, fix up zoo_aset_acl which has a void_completion_t with no name assigned
- if zoo_fn_name == 'zoo_aset_acl'
- if idx = typed_args.index('void_completion_t')
- typed_args[idx] = 'void_completion_t completion'
- end
+ # gah, fix up functions which have a void_completion_t with no name assigned
+ if idx = typed_args.index('void_completion_t')
+ typed_args[idx] = 'void_completion_t completion'
end
struct = CallStruct.new(zoo_fn_name, typed_args)
@@ -233,7 +231,7 @@ def render_header_file(code)
#endif
#include "ruby.h"
-#include "c-client-src/zookeeper.h"
+#include "zookeeper/zookeeper.h"
#include "zkrb_wrapper_compat.h"
#include "dbg.h"
View
0  ext/patch-zookeeper → ext/patches/zkc-3.3.5-network.patch
File renamed without changes
View
41 ext/patches/zkc-3.4.5-logging.patch
@@ -0,0 +1,41 @@
+diff -ur zkc-3.4.5-orig/c/src/zookeeper.c zkc-3.4.5/c/src/zookeeper.c
+--- zkc-3.4.5-orig/c/src/zookeeper.c 2012-09-30 10:53:32.000000000 -0700
++++ zkc-3.4.5/c/src/zookeeper.c 2013-09-07 21:25:24.000000000 -0700
+@@ -1650,14 +1650,16 @@
+ // a PING
+ if (zh->state==ZOO_CONNECTED_STATE) {
+ send_to = zh->recv_timeout/3 - idle_send;
+- if (send_to <= 0 && zh->sent_requests.head==0) {
+-// LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
+-// format_current_endpoint_info(zh),-send_to));
+- int rc=send_ping(zh);
+- if (rc < 0){
+- LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
+- return api_epilog(zh,rc);
+- }
++ if (send_to <= 0) {
++ if (zh->sent_requests.head==0) {
++ LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
++ format_current_endpoint_info(zh),-send_to));
++ int rc=send_ping(zh);
++ if (rc < 0){
++ LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
++ return api_epilog(zh,rc);
++ }
++ }
+ send_to = zh->recv_timeout/3;
+ }
+ }
+@@ -1669,6 +1671,12 @@
+ zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
+ zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
+ }
++
++ if (tv->tv_sec == 0 && tv->tv_usec == 0) {
++ LOG_DEBUG(("Returning a 0.0 timeval: state=%d idle_recv=%d idle_send=%d recv_to=%d send_to=%d send_requests=%s",
++ zh->state, idle_recv, idle_send, recv_to, send_to, zh->sent_requests.head==0 ? "false" : "true"));
++ }
++
+ *interest = ZOOKEEPER_READ;
+ /* we are interested in a write if we are connected and have something
+ * to send, or we are waiting for a connect to finish. */
View
BIN  ext/zkc-3.3.5.tar.gz
Binary file not shown
View
BIN  ext/zkc-3.4.5.tar.gz
Binary file not shown
View
85 ext/zkrb.c
@@ -73,7 +73,7 @@
#include "ruby/io.h"
#endif
-#include "c-client-src/zookeeper.h"
+#include "zookeeper/zookeeper.h"
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
@@ -82,6 +82,7 @@
#include <pthread.h>
#include <inttypes.h>
#include <time.h>
+#include <arpa/inet.h>
#include "common.h"
#include "event_lib.h"
@@ -242,7 +243,7 @@ static void print_zkrb_instance_data(zkrb_instance_data_t* ptr) {
fprintf(stderr, "}\n");
}
-#define session_timeout_msec(self) rb_iv_get(self, "@_session_timeout_msec")
+#define receive_timeout_msec(self) rb_iv_get(self, "@_receive_timeout_msec")
inline static void zkrb_debug_clientid_t(const clientid_t *cid) {
int pass_len = sizeof(cid->passwd);
@@ -268,13 +269,13 @@ static VALUE method_zkrb_init(int argc, VALUE* argv, VALUE self) {
Check_Type(hostPort, T_STRING);
// Look up :zkc_log_level
- VALUE log_level = rb_hash_aref(options, ID2SYM(rb_intern("zkc_log_level")));
- if (NIL_P(log_level)) {
- zoo_set_debug_level(0); // no log messages
- } else {
- Check_Type(log_level, T_FIXNUM);
- zoo_set_debug_level(FIX2INT(log_level));
- }
+ // VALUE log_level = rb_hash_aref(options, ID2SYM(rb_intern("zkc_log_level")));
+ // if (NIL_P(log_level)) {
+ // zoo_set_debug_level(0); // no log messages
+ // } else {
+ // Check_Type(log_level, T_FIXNUM);
+ // zoo_set_debug_level(FIX2INT(log_level));
+ // }
volatile VALUE data;
zkrb_instance_data_t *zk_local_ctx;
@@ -306,7 +307,7 @@ static VALUE method_zkrb_init(int argc, VALUE* argv, VALUE self) {
zookeeper_init(
RSTRING_PTR(hostPort), // const char *host
zkrb_state_callback, // watcher_fn
- session_timeout_msec(self), // recv_timeout
+ receive_timeout_msec(self), // recv_timeout
&zk_local_ctx->myid, // cilentid_t
ctx, // void *context
0); // flags
@@ -333,7 +334,7 @@ static VALUE method_get_children(VALUE self, VALUE reqid, VALUE path, VALUE asyn
struct String_vector strings;
struct Stat stat;
- int rc;
+ int rc = 0;
switch (call_type) {
#ifdef THREADED
@@ -378,7 +379,7 @@ static VALUE method_exists(VALUE self, VALUE reqid, VALUE path, VALUE async, VAL
VALUE output = Qnil;
struct Stat stat;
- int rc;
+ int rc = 0;
switch (call_type) {
#ifdef THREADED
@@ -454,7 +455,7 @@ static VALUE method_create(VALUE self, VALUE reqid, VALUE path, VALUE data, VALU
int invalid_call_type=0;
- int rc;
+ int rc = 0;
switch (call_type) {
#ifdef THREADED
@@ -526,7 +527,7 @@ static VALUE method_get(VALUE self, VALUE reqid, VALUE path, VALUE async, VALUE
char * data = NULL;
if (IS_SYNC(call_type)) {
data = malloc(MAX_ZNODE_SIZE); /* ugh */
- memset(data, 0, sizeof(data));
+ memset(data, 0, MAX_ZNODE_SIZE);
@slyphon Owner
slyphon added a note

You know more about this than I do, is sizeof(data) incorrect?

I understand that MAX_ZNODE_SIZE is a defined constant, but, shouldn't these two be equivalent?

(i'm not objecting to the change, I just want to know if I screwed up somehow)

@eric Owner
eric added a note

It is incorrect.

Because data is defined as:

   char * data = NULL;

The sizeof(data) will return 4 (the size of a pointer).

If you had this:

   char data[MAX_ZNODE_SIZE];

The result of sizeof(data) would have been MAX_ZNODE_SIZE.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
}
int rc, invalid_call_type=0;
@@ -788,10 +789,10 @@ static VALUE method_zkrb_iterate_event_loop(VALUE self) {
fd_set rfds, wfds, efds;
FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds);
- int fd=0, interest=0, events=0, rc=0, maxfd=0;
+ int fd = 0, interest = 0, events = 0, rc = 0, maxfd = 0, irc = 0, prc = 0;
@slyphon Owner
slyphon added a note

ahhh, breathing room.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
struct timeval tv;
- zookeeper_interest(zk->zh, &fd, &interest, &tv);
+ irc = zookeeper_interest(zk->zh, &fd, &interest, &tv);
if (fd != -1) {
if (interest & ZOOKEEPER_READ) {
@@ -834,17 +835,24 @@ static VALUE method_zkrb_iterate_event_loop(VALUE self) {
rb_raise(rb_eRuntimeError, "read from pipe failed: %s", clean_errno());
}
}
-
- rc = zookeeper_process(zk->zh, events);
}
else if (rc == 0) {
- zkrb_debug("timed out waiting for descriptor to be ready");
+ // zkrb_debug("timed out waiting for descriptor to be ready. interest=%d fd=%d pipe_r_fd=%d maxfd=%d irc=%d timeout=%f",
+ // interest, fd, pipe_r_fd, maxfd, irc, tv.tv_sec + (tv.tv_usec/ 1000.0 / 1000.0));
}
else {
- log_err("select returned: %d", rc);
+ log_err("select returned an error: rc=%d interest=%d fd=%d pipe_r_fd=%d maxfd=%d irc=%d timeout=%f",
+ rc, interest, fd, pipe_r_fd, maxfd, irc, tv.tv_sec + (tv.tv_usec/ 1000.0 / 1000.0));
}
- return INT2FIX(rc);
+ prc = zookeeper_process(zk->zh, events);
+
+ if (rc == 0) {
+ zkrb_debug("timed out waiting for descriptor to be ready. prc=%d interest=%d fd=%d pipe_r_fd=%d maxfd=%d irc=%d timeout=%f",
+ prc, interest, fd, pipe_r_fd, maxfd, irc, tv.tv_sec + (tv.tv_usec/ 1000.0 / 1000.0));
+ }
+
+ return INT2FIX(prc);
}
static VALUE method_has_events(VALUE self) {
@@ -928,6 +936,38 @@ static VALUE method_zerror(VALUE self, VALUE errc) {
return rb_str_new2(zerror(FIX2INT(errc)));
}
+static VALUE method_connected_host(VALUE self) {
+ FETCH_DATA_PTR(self, zk);
+
+ struct sockaddr addr;
+ socklen_t addr_len = sizeof(addr);
+
+ if (zookeeper_get_connected_host(zk->zh, &addr, &addr_len) != NULL) {
+ char buf[255];
+ char addrstr[128];
+ void *inaddr;
+ int port;
+
+#if defined(AF_INET6)
+ if(addr.sa_family==AF_INET6){
+ inaddr = &((struct sockaddr_in6 *) &addr)->sin6_addr;
+ port = ((struct sockaddr_in6 *) &addr)->sin6_port;
+ } else {
+#endif
+ inaddr = &((struct sockaddr_in *) &addr)->sin_addr;
+ port = ((struct sockaddr_in *) &addr)->sin_port;
+#if defined(AF_INET6)
+ }
+#endif
+
+ inet_ntop(addr.sa_family, inaddr, addrstr, sizeof(addrstr)-1);
+ snprintf(buf, sizeof(buf), "%s:%d", addrstr, ntohs(port));
+ return rb_str_new2(buf);
+ }
+
+ return Qnil;
+}
+
static void zkrb_define_methods(void) {
#define DEFINE_METHOD(M, ARGS) { \
rb_define_method(CZookeeper, #M, method_ ## M, ARGS); }
@@ -963,6 +1003,7 @@ static void zkrb_define_methods(void) {
DEFINE_METHOD(sync, 2);
DEFINE_METHOD(zkrb_iterate_event_loop, 0);
DEFINE_METHOD(zkrb_get_next_event_st, 0);
+ DEFINE_METHOD(connected_host, 0);
// methods for the ruby-side event manager
DEFINE_METHOD(zkrb_get_next_event, 1);
@@ -995,7 +1036,9 @@ static VALUE zkrb_client_id_method_initialize(VALUE self) {
void Init_zookeeper_c() {
+ // Don't debug by default
ZKRBDebugging = 0;
+ zoo_set_debug_level(0);
mZookeeper = rb_define_module("Zookeeper");
mZookeeperExceptions = rb_define_module_under(mZookeeper, "Exceptions");
View
44 ext/zkrb_wrapper.c
@@ -381,6 +381,30 @@ int zkrb_call_zoo_aset_acl(zhandle_t *zh, const char *path, int version, struct
}
+static VALUE zkrb_gvl_zoo_amulti(void *data) {
+ zkrb_zoo_amulti_args_t *a = (zkrb_zoo_amulti_args_t *)data;
+ a->rc = zoo_amulti(a->zh, a->count, a->ops, a->results, a->completion, a->data);
+ return Qnil;
+}
+
+// wrapper that calls zoo_amulti via zkrb_gvl_zoo_amulti inside rb_thread_blocking_region
+int zkrb_call_zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results, void_completion_t completion, const void *data) {
+ zkrb_zoo_amulti_args_t args = {
+ .rc = ZKRB_FAIL,
+ .zh = zh,
+ .count = count,
+ .ops = ops,
+ .results = results,
+ .completion = completion,
+ .data = data
+ };
+
+ zkrb_thread_blocking_region(zkrb_gvl_zoo_amulti, (void *)&args);
+
+ return args.rc;
+}
+
+
static VALUE zkrb_gvl_zoo_add_auth(void *data) {
zkrb_zoo_add_auth_args_t *a = (zkrb_zoo_add_auth_args_t *)data;
a->rc = zoo_add_auth(a->zh, a->scheme, a->cert, a->certLen, a->completion, a->data);
@@ -729,3 +753,23 @@ int zkrb_call_zoo_set_acl(zhandle_t *zh, const char *path, int version, const st
}
+static VALUE zkrb_gvl_zoo_multi(void *data) {
+ zkrb_zoo_multi_args_t *a = (zkrb_zoo_multi_args_t *)data;
+ a->rc = zoo_multi(a->zh, a->count, a->ops, a->results);
+ return Qnil;
+}
+
+// wrapper that calls zoo_multi via zkrb_gvl_zoo_multi inside rb_thread_blocking_region
+int zkrb_call_zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results) {
+ zkrb_zoo_multi_args_t args = {
+ .rc = ZKRB_FAIL,
+ .zh = zh,
+ .count = count,
+ .ops = ops,
+ .results = results
+ };
+
+ zkrb_thread_blocking_region(zkrb_gvl_zoo_multi, (void *)&args);
+
+ return args.rc;
+}
View
22 ext/zkrb_wrapper.h
@@ -7,7 +7,7 @@
#endif
#include "ruby.h"
-#include "c-client-src/zookeeper.h"
+#include "zookeeper/zookeeper.h"
#include "zkrb_wrapper_compat.h"
#include "dbg.h"
@@ -159,6 +159,16 @@ typedef struct {
typedef struct {
zhandle_t *zh;
+ int count;
+ const zoo_op_t *ops;
+ zoo_op_result_t *results;
+ void_completion_t completion;
+ const void *data;
+ int rc;
+} zkrb_zoo_amulti_args_t;
+
+typedef struct {
+ zhandle_t *zh;
const char* scheme;
const char* cert;
int certLen;
@@ -295,6 +305,14 @@ typedef struct {
int rc;
} zkrb_zoo_set_acl_args_t;
+typedef struct {
+ zhandle_t *zh;
+ int count;
+ const zoo_op_t *ops;
+ zoo_op_result_t *results;
+ int rc;
+} zkrb_zoo_multi_args_t;
+
int zkrb_call_zoo_recv_timeout(zhandle_t *zh);
int zkrb_call_zoo_state(zhandle_t *zh);
int zkrb_call_zoo_acreate(zhandle_t *zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, string_completion_t completion, const void *data);
@@ -311,6 +329,7 @@ int zkrb_call_zoo_awget_children2(zhandle_t *zh, const char *path, watcher_fn wa
int zkrb_call_zoo_async(zhandle_t *zh, const char *path, string_completion_t completion, const void *data);
int zkrb_call_zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion, const void *data);
int zkrb_call_zoo_aset_acl(zhandle_t *zh, const char *path, int version, struct ACL_vector *acl, void_completion_t completion, const void *data);
+int zkrb_call_zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results, void_completion_t completion, const void *data);
int zkrb_call_zoo_add_auth(zhandle_t *zh, const char* scheme, const char* cert, int certLen, void_completion_t completion, const void *data);
int zkrb_call_zoo_create(zhandle_t *zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len);
int zkrb_call_zoo_delete(zhandle_t *zh, const char *path, int version);
@@ -326,5 +345,6 @@ int zkrb_call_zoo_get_children2(zhandle_t *zh, const char *path, int watch, stru
int zkrb_call_zoo_wget_children2(zhandle_t *zh, const char *path, watcher_fn watcher, void* watcherCtx, struct String_vector *strings, struct Stat *stat);
int zkrb_call_zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl, struct Stat *stat);
int zkrb_call_zoo_set_acl(zhandle_t *zh, const char *path, int version, const struct ACL_vector *acl);
+int zkrb_call_zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results);
#endif /* ZKRB_WRAPPER_H */
View
6 ext/zookeeper_base.rb
@@ -27,7 +27,8 @@ class ClientShutdownException < StandardError; end
ZOO_LOG_LEVEL_DEBUG = 4
def_delegators :czk, :get_children, :exists, :delete, :get, :set,
- :set_acl, :get_acl, :client_id, :sync, :add_auth, :wait_until_connected
+ :set_acl, :get_acl, :client_id, :sync, :add_auth, :wait_until_connected,
+ :connected_host
def self.threadsafe_inquisitor(*syms)
syms.each do |sym|
@@ -40,7 +41,8 @@ def #{sym}
end
end
- threadsafe_inquisitor :connected?, :connecting?, :associating?, :running?
+ threadsafe_inquisitor :connected?, :connecting?, :associating?, :running?,
+ :shutting_down?
attr_reader :event_queue
View
6 java/java_base.rb
@@ -1,9 +1,9 @@
require 'java'
require 'thread'
-require 'rubygems'
-gem 'slyphon-log4j', '= 1.2.15'
-gem 'slyphon-zookeeper_jar', '= 3.3.5'
+# require 'rubygems'
+# gem 'slyphon-log4j', '= 1.2.15'
+# gem 'zk-ruby-zookeeper_jar', "= #{Zookeeper::DRIVER_VERSION}"
require 'log4j'
require 'zookeeper_jar'
View
2  lib/zookeeper/version.rb
@@ -1,4 +1,4 @@
module Zookeeper
VERSION = '1.4.5'
- DRIVER_VERSION = '3.3.5'
+ DRIVER_VERSION = '3.4.5'
end
View
3  zoomonkey/duplicates
@@ -0,0 +1,3 @@
+#!/bin/sh
+
+awk '{ if (prev == $0) count++; else { if (count > 0) { print " --- Line repeated " count " times --- " } print; prev = $0; count = 0 } }' $*
View
194 zoomonkey/zoomonkey.rb
@@ -0,0 +1,194 @@
+require 'zookeeper'
+require 'zk-server'
+require 'fileutils'
+require 'tmpdir'
+
+SLEEP_TIME = 30
+STDOUT.sync = true
+
+if ENV['DEBUG']
+ def zookeeper_logger(from)
+ l = Logger.new(STDOUT)
+ l.formatter = proc do |sev, time, c, msg|
+ "t=#{time.to_i} from=#{from} level=#{sev.downcase} message=#{msg.inspect}\n"
+ end
+ l
+ end
+
+ Zookeeper.logger = zookeeper_logger('zookeeper')
+ Zookeeper.set_debug_level(Zookeeper::ZOO_LOG_LEVEL_DEBUG)
+end
+
+class Worker
+ def initialize(body = nil, &block)
+ raise ArgumentError, "Cannot include both body and block" if body && block
+ @body = body || block
+ end
+
+ def body
+ @body || method(:call)
+ end
+
+ def start
+ @thread = Thread.new do
+ Thread.current.abort_on_exception = true
+ body.call
+ end
+ end
+
+ def stop
+ if @thread
+ @thread.kill
+ @thread = nil
+ end
+ end
+
+ def join
+ if @thread
+ @thread.join
+ end
+ end
+end
+
+
+
+base_dir = Dir.mktmpdir('zk-server-cluster')
+num_cluster = 3
+cluster = ZK::Server::Cluster.new(num_cluster, :base_dir => base_dir)
+
+class Reader < Worker
+ attr_reader :client
+
+ def initialize(zookeeper_hosts)
+ @zookeeper_hosts = zookeeper_hosts
+ @log_from = :reader
+ end
+
+ def call
+ @client = Zookeeper.new(@zookeeper_hosts, 10, method(:watcher))
+ client.wait_until_connected
+
+ client.create(:path => "/test", :data => '') rescue client.set(:path => "/test", :data => '')
+
+ while true
+ error = nil
+ t = Benchmark.realtime do
+ begin
+ client.get(:path => "/test")
+ rescue => e
+ error = e
+ end
+ end
+
+ msg = "host=#{client.connected_host || 'nil'} session_id=#{client.session_id} state=#{client.state_by_value(client.state)} time=#{"%0.4f" % t}"
+ if error
+ msg << " error=#{error.class} error_message=#{error.to_s.inspect}"
+ msg << " closed=#{client.closed?} running=#{client.running?} shutting_down=#{client.shutting_down?}"
+ end
+
+ log msg
+
+ sleep 1
+ end
+ end
+
+ def log(message)
+ puts "t=#{Time.now.to_i} from=#{@log_from} #{message}\n"
+ end
+
+ def watcher(event)
+ if event[:state] == Zookeeper::ZOO_EXPIRED_SESSION_STATE
+ if client
+ log "action=reconnecting state=#{client.state_by_value(event[:state])} session_id=#{client.session_id}"
+ client.reopen
+ end
+ end
+
+ end
+end
+
+class Writer < Worker
+ def initialize(zookeeper_hosts)
+ @zookeeper_hosts = zookeeper_hosts
+ @log_from = :writer
+ end
+
+ def call
+ client = Zookeeper.new(@zookeeper_hosts)
+ client.wait_until_connected
+
+ while true
+ error = nil
+ t = Benchmark.realtime do
+ begin
+ client.create(:path => "/test", :data => '') rescue client.set(:path => "/test", :data => '')
+ rescue => e
+ error = e
+ end
+ end
+
+ msg = "host=#{client.connected_host || 'nil'} session_id=#{client.session_id} state=#{client.state_by_value(client.state)} time=#{"%0.4f" % t}"
+ msg << " error=#{error.class} error_message=#{error.to_s.inspect}" if error
+ log msg
+
+ sleep 1
+ end
+ end
+
+ def log(message)
+ puts "t=#{Time.now.to_i} from=#{@log_from} #{message}\n"
+ end
+end
+
+class ZooMonkey < Worker
+ attr_reader :cluster
+
+ def initialize(cluster)
+ @cluster = cluster
+ @log_from = :server
+ end
+
+ def call
+ while true
+ sleep SLEEP_TIME
+
+ cluster.processes.each do |server|
+ host = "127.0.0.1:#{server.client_port}"
+ log "host=#{host} pid=#{server.pid} action=pausing"
+ server.kill "STOP"
+ sleep SLEEP_TIME
+
+ log "host=#{host} pid=#{server.pid} action=resuming"
+ server.kill "CONT"
+ sleep SLEEP_TIME
+ end
+ end
+ end
+
+ def log(message)
+ puts "t=#{Time.now.to_i} from=#{@log_from} #{message}\n"
+ end
+end
+
+begin
+ cluster.run
+
+ zookeeper_hosts = cluster.processes.map { |p| "127.0.0.1:#{p.client_port}" }
+ zookeeper_spec = (zookeeper_hosts * 2).join(',')
+
+ reader = Reader.new(zookeeper_spec)
+ reader.start
+
+ # writer = Writer.new(zookeeper_spec)
+ # writer.start
+
+ monkey = ZooMonkey.new(cluster)
+ monkey.start
+
+ reader.join
+ writer.join
+ monkey.join
+ensure
+ cluster.clobber!
+ FileUtils.remove_entry(base_dir)
+end
Something went wrong with that request. Please try again.