From ec22a5dff736571c9210f561945d189148228e37 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Fri, 8 Nov 2024 12:17:46 +0100 Subject: [PATCH] Use batch APIs to create arrays and hashes Basically a port of https://github.com/ruby/json/pull/678 Rather than to allocate the container and push elements one by one, we accumulate them on a stack and then use the faster batch APIs to directly create the final container. The original action stack remains, as we need to keep track of what we're currently parsing and how big it is. But for the recursive cases, we no longer need to create a child stack. --- ext/msgpack/extconf.rb | 1 + ext/msgpack/unpacker.c | 199 ++++++++++++++++++++++++----------- ext/msgpack/unpacker.h | 29 +++-- ext/msgpack/unpacker_class.c | 8 +- 4 files changed, 164 insertions(+), 73 deletions(-) diff --git a/ext/msgpack/extconf.rb b/ext/msgpack/extconf.rb index 800896ea..96214ff7 100644 --- a/ext/msgpack/extconf.rb +++ b/ext/msgpack/extconf.rb @@ -3,6 +3,7 @@ have_func("rb_enc_interned_str", "ruby.h") # Ruby 3.0+ have_func("rb_hash_new_capa", "ruby.h") # Ruby 3.2+ have_func("rb_proc_call_with_block", "ruby.h") # CRuby (TruffleRuby doesn't have it) +have_func("rb_hash_bulk_insert", "ruby.h") # Ruby 2.6+ and missing on TruffleRuby 24.0 append_cflags([ "-fvisibility=hidden", diff --git a/ext/msgpack/unpacker.c b/ext/msgpack/unpacker.c index 32423e9c..92e6c59a 100644 --- a/ext/msgpack/unpacker.c +++ b/ext/msgpack/unpacker.c @@ -49,7 +49,6 @@ static VALUE protected_proc_call(VALUE proc, int argc, VALUE *argv, int *raised) static int RAW_TYPE_STRING = 256; static int RAW_TYPE_BINARY = 257; -static int16_t INITIAL_BUFFER_CAPACITY_MAX = SHRT_MAX; static msgpack_rmem_t s_stack_rmem; @@ -60,9 +59,106 @@ static inline VALUE rb_hash_new_capa(long capa) } #endif -static inline int16_t initial_buffer_size(long size) +#ifndef HAVE_RB_HASH_BULK_INSERT +void rb_hash_bulk_insert(long count, const VALUE *pairs, VALUE hash) { - return (size > INITIAL_BUFFER_CAPACITY_MAX) ? INITIAL_BUFFER_CAPACITY_MAX : size; + long index = 0; + while (index < count) { + VALUE name = pairs[index++]; + VALUE value = pairs[index++]; + rb_hash_aset(hash, name, value); + } + RB_GC_GUARD(hash); +} +#endif + +/* rvalue_stack functions */ + +static inline void _msgpack_unpacker_rvalue_stack_free(msgpack_rvalue_stack_t* value_stack) { + switch (value_stack->type) { + case STACK_TYPE_UNALLOCATED: + break; + case STACK_TYPE_RMEM: + if (!msgpack_rmem_free(&s_stack_rmem, value_stack->data)) { + rb_bug("Failed to free an rmem pointer, memory leak?"); + } + break; + case STACK_TYPE_HEAP: + xfree(value_stack->data); + break; + } + memset(value_stack, 0, sizeof(msgpack_rvalue_stack_t)); +} + +static void _msgpack_unpacker_rvalue_stack_grow(msgpack_rvalue_stack_t* value_stack) { + switch (value_stack->type) { + case STACK_TYPE_UNALLOCATED: { + value_stack->data = msgpack_rmem_alloc(&s_stack_rmem); + value_stack->capacity = MSGPACK_RMEM_PAGE_SIZE / sizeof(VALUE); + value_stack->type = STACK_TYPE_RMEM; + break; + } + case STACK_TYPE_RMEM: { + size_t new_capacity = value_stack->capacity * 2; + VALUE *new_ptr = ALLOC_N(VALUE, new_capacity); + MEMCPY(new_ptr, value_stack->data, VALUE, value_stack->depth); + if (!msgpack_rmem_free(&s_stack_rmem, value_stack->data)) { + rb_bug("Failed to free an rmem pointer, memory leak?"); + } + value_stack->type = STACK_TYPE_HEAP; + value_stack->data = new_ptr; + value_stack->capacity = new_capacity; + break; + } + case STACK_TYPE_HEAP: { + size_t new_capacity = value_stack->capacity * 2; + REALLOC_N(value_stack->data, VALUE, new_capacity); + value_stack->capacity = new_capacity; + break; + } + } +} + +static inline void _msgpack_unpacker_rvalue_stack_push(msgpack_unpacker_t* uk, VALUE value) { + size_t free_slots = uk->value_stack.capacity - uk->value_stack.depth; + + if (RB_UNLIKELY(free_slots == 0)) { + _msgpack_unpacker_rvalue_stack_grow(&uk->value_stack); + free_slots = uk->value_stack.capacity - uk->value_stack.depth; + } + + RB_OBJ_WRITE(uk->self, &uk->value_stack.data[uk->value_stack.depth++], value); +} + +static inline VALUE _msgpack_unpacker_rvalue_stack_create_array(msgpack_unpacker_t* uk, size_t length) { + VALUE *data = &uk->value_stack.data[uk->value_stack.depth - length]; + + VALUE array = rb_ary_new_from_values(length, data); + + RB_OBJ_WRITE(uk->self, data, array); + uk->value_stack.depth -= (length - 1); + + return RB_GC_GUARD(array); +} + +static inline VALUE _msgpack_unpacker_rvalue_stack_create_hash(msgpack_unpacker_t* uk, size_t items_count) { + size_t length = items_count / 2; + VALUE *data = &uk->value_stack.data[uk->value_stack.depth - items_count]; + + VALUE hash = rb_hash_new_capa(length); + rb_hash_bulk_insert(items_count, data, hash); + + RB_OBJ_WRITE(uk->self, data, hash); + uk->value_stack.depth -= (items_count - 1); + + return RB_GC_GUARD(hash); +} + +void msgpack_unpacker_mark_rvalue_stack(msgpack_rvalue_stack_t* value_stack) +{ + if (value_stack->data) { + rb_gc_mark_locations(value_stack->data, value_stack->data + value_stack->depth); + } } void msgpack_unpacker_static_init(void) @@ -108,33 +204,16 @@ static inline void _msgpack_unpacker_free_stack(msgpack_unpacker_stack_t* stack) void _msgpack_unpacker_destroy(msgpack_unpacker_t* uk) { - msgpack_unpacker_stack_t *stack; - while ((stack = uk->stack)) { - uk->stack = stack->parent; - _msgpack_unpacker_free_stack(stack); - } - + _msgpack_unpacker_free_stack(uk->stack); + _msgpack_unpacker_rvalue_stack_free(&uk->value_stack); msgpack_buffer_destroy(UNPACKER_BUFFER_(uk)); } -void msgpack_unpacker_mark_stack(msgpack_unpacker_stack_t* stack) -{ - while (stack) { - msgpack_unpacker_stack_entry_t* s = stack->data; - msgpack_unpacker_stack_entry_t* send = stack->data + stack->depth; - for(; s < send; s++) { - rb_gc_mark(s->object); - rb_gc_mark(s->key); - } - stack = stack->parent; - } -} - void msgpack_unpacker_mark(msgpack_unpacker_t* uk) { rb_gc_mark(uk->last_object); rb_gc_mark(uk->reading_raw); - msgpack_unpacker_mark_stack(uk->stack); + msgpack_unpacker_mark_rvalue_stack(&uk->value_stack); /* See MessagePack_Buffer_wrap */ /* msgpack_buffer_mark(UNPACKER_BUFFER_(uk)); */ rb_gc_mark(uk->buffer_ref); @@ -149,6 +228,7 @@ void _msgpack_unpacker_reset(msgpack_unpacker_t* uk) /*memset(uk->stack, 0, sizeof(msgpack_unpacker_t) * uk->stack->depth);*/ uk->stack->depth = 0; + uk->value_stack.depth = 0; uk->last_object = Qnil; uk->reading_raw = Qnil; uk->reading_raw_remaining = 0; @@ -186,6 +266,7 @@ static inline int object_complete(msgpack_unpacker_t* uk, VALUE object) } uk->last_object = object; + _msgpack_unpacker_rvalue_stack_push(uk, object); reset_head_byte(uk); return PRIMITIVE_OBJECT_COMPLETE; } @@ -193,6 +274,7 @@ static inline int object_complete(msgpack_unpacker_t* uk, VALUE object) static inline int object_complete_symbol(msgpack_unpacker_t* uk, VALUE object) { uk->last_object = object; + _msgpack_unpacker_rvalue_stack_push(uk, object); reset_head_byte(uk); return PRIMITIVE_OBJECT_COMPLETE; } @@ -212,12 +294,14 @@ static inline int object_complete_ext(msgpack_unpacker_t* uk, int ext_type, VALU if(proc != Qnil) { VALUE obj; VALUE arg = (str == Qnil ? rb_str_buf_new(0) : str); + int raised; obj = protected_proc_call(proc, 1, &arg, &raised); if (raised) { uk->last_object = rb_errinfo(); return PRIMITIVE_RECURSIVE_RAISED; } + return object_complete(uk, obj); } @@ -235,7 +319,7 @@ static inline msgpack_unpacker_stack_entry_t* _msgpack_unpacker_stack_entry_top( return &uk->stack->data[uk->stack->depth-1]; } -static inline int _msgpack_unpacker_stack_push(msgpack_unpacker_t* uk, enum stack_type_t type, size_t count, VALUE object) +static inline int _msgpack_unpacker_stack_push(msgpack_unpacker_t* uk, enum stack_type_t type, size_t count) { reset_head_byte(uk); @@ -245,22 +329,20 @@ static inline int _msgpack_unpacker_stack_push(msgpack_unpacker_t* uk, enum stac msgpack_unpacker_stack_entry_t* next = &uk->stack->data[uk->stack->depth]; next->count = count; + next->size = count; next->type = type; - next->object = object; - next->key = Qnil; - uk->stack->depth++; return PRIMITIVE_CONTAINER_START; } -static inline VALUE msgpack_unpacker_stack_pop(msgpack_unpacker_t* uk) +static inline size_t msgpack_unpacker_stack_pop(msgpack_unpacker_t* uk) { return --uk->stack->depth; } static inline bool msgpack_unpacker_stack_is_empty(msgpack_unpacker_t* uk) { - return uk->stack->depth == 0; + return uk->stack->depth == 0 || _msgpack_unpacker_stack_entry_top(uk)->type == STACK_TYPE_RECURSIVE; } #ifdef USE_CASE_RANGE @@ -290,9 +372,7 @@ static inline bool is_reading_map_key(msgpack_unpacker_t* uk) { if(uk->stack->depth > 0) { msgpack_unpacker_stack_entry_t* top = _msgpack_unpacker_stack_entry_top(uk); - if(top->type == STACK_TYPE_MAP_KEY) { - return true; - } + return top->type == STACK_TYPE_MAP && (top->count % 2 == 0); } return false; } @@ -343,17 +423,17 @@ static inline int read_raw_body_begin(msgpack_unpacker_t* uk, int raw_type) reset_head_byte(uk); uk->reading_raw_remaining = 0; - msgpack_unpacker_stack_t* child_stack = _msgpack_unpacker_new_stack(); - child_stack->parent = uk->stack; - uk->stack = child_stack; + _msgpack_unpacker_stack_push(uk, STACK_TYPE_RECURSIVE, 1); + size_t value_stack_depth = uk->value_stack.depth; int raised; obj = protected_proc_call(proc, 1, &uk->self, &raised); - uk->stack = child_stack->parent; - _msgpack_unpacker_free_stack(child_stack); + uk->value_stack.depth = value_stack_depth; + msgpack_unpacker_stack_pop(uk); if (raised) { uk->last_object = rb_errinfo(); + _msgpack_unpacker_rvalue_stack_push(uk, Qnil); return PRIMITIVE_RECURSIVE_RAISED; } @@ -418,14 +498,14 @@ static int read_primitive(msgpack_unpacker_t* uk) if(count == 0) { return object_complete(uk, rb_ary_new()); } - return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count, rb_ary_new2(initial_buffer_size(count))); + return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count); SWITCH_RANGE(b, 0x80, 0x8f) // FixMap int count = b & 0x0f; if(count == 0) { return object_complete(uk, rb_hash_new()); } - return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP_KEY, count*2, rb_hash_new_capa(initial_buffer_size(count))); + return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP, count*2); SWITCH_RANGE(b, 0xc0, 0xdf) // Variable switch(b) { @@ -648,7 +728,7 @@ static int read_primitive(msgpack_unpacker_t* uk) if(count == 0) { return object_complete(uk, rb_ary_new()); } - return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count, rb_ary_new2(initial_buffer_size(count))); + return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count); } case 0xdd: // array 32 @@ -658,7 +738,7 @@ static int read_primitive(msgpack_unpacker_t* uk) if(count == 0) { return object_complete(uk, rb_ary_new()); } - return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count, rb_ary_new2(initial_buffer_size(count))); + return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count); } case 0xde: // map 16 @@ -668,7 +748,7 @@ static int read_primitive(msgpack_unpacker_t* uk) if(count == 0) { return object_complete(uk, rb_hash_new()); } - return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP_KEY, count*2, rb_hash_new_capa(initial_buffer_size(count))); + return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP, count*2); } case 0xdf: // map 32 @@ -678,7 +758,7 @@ static int read_primitive(msgpack_unpacker_t* uk) if(count == 0) { return object_complete(uk, rb_hash_new()); } - return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP_KEY, count*2, rb_hash_new_capa(initial_buffer_size(count))); + return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP, count*2); } default: @@ -766,28 +846,23 @@ int msgpack_unpacker_read(msgpack_unpacker_t* uk, size_t target_stack_depth) container_completed: { msgpack_unpacker_stack_entry_t* top = _msgpack_unpacker_stack_entry_top(uk); - switch(top->type) { - case STACK_TYPE_ARRAY: - rb_ary_push(top->object, uk->last_object); - break; - case STACK_TYPE_MAP_KEY: - top->key = uk->last_object; - top->type = STACK_TYPE_MAP_VALUE; - break; - case STACK_TYPE_MAP_VALUE: - if(uk->symbolize_keys && rb_type(top->key) == T_STRING) { - /* here uses rb_str_intern instead of rb_intern so that Ruby VM can GC unused symbols */ - rb_hash_aset(top->object, rb_str_intern(top->key), uk->last_object); - } else { - rb_hash_aset(top->object, top->key, uk->last_object); - } - top->type = STACK_TYPE_MAP_KEY; - break; - } size_t count = --top->count; if(count == 0) { - object_complete(uk, top->object); + switch(top->type) { + case STACK_TYPE_ARRAY: + _msgpack_unpacker_rvalue_stack_create_array(uk, top->size); + break; + case STACK_TYPE_MAP: + _msgpack_unpacker_rvalue_stack_create_hash(uk, top->size); + break; + case STACK_TYPE_RECURSIVE: + object_complete(uk, _msgpack_unpacker_rvalue_stack_pop(uk)); + return PRIMITIVE_OBJECT_COMPLETE; + break; + } + + object_complete(uk, _msgpack_unpacker_rvalue_stack_pop(uk)); if(msgpack_unpacker_stack_pop(uk) <= target_stack_depth) { return PRIMITIVE_OBJECT_COMPLETE; } diff --git a/ext/msgpack/unpacker.h b/ext/msgpack/unpacker.h index 6925b108..29d1dbf1 100644 --- a/ext/msgpack/unpacker.h +++ b/ext/msgpack/unpacker.h @@ -26,29 +26,42 @@ struct msgpack_unpacker_t; typedef struct msgpack_unpacker_t msgpack_unpacker_t; typedef struct msgpack_unpacker_stack_t msgpack_unpacker_stack_t; +typedef struct msgpack_rvalue_stack_t msgpack_rvalue_stack_t; + +enum rvalue_stack_type_t { + STACK_TYPE_UNALLOCATED = 0, + STACK_TYPE_RMEM = 1, + STACK_TYPE_HEAP = 2, +}; + +struct msgpack_rvalue_stack_t { + enum rvalue_stack_type_t type; + size_t depth; + size_t capacity; + VALUE *data; +}; enum stack_type_t { STACK_TYPE_ARRAY, - STACK_TYPE_MAP_KEY, - STACK_TYPE_MAP_VALUE, + STACK_TYPE_MAP, + STACK_TYPE_RECURSIVE, }; typedef struct { size_t count; + size_t size; enum stack_type_t type; - VALUE object; - VALUE key; } msgpack_unpacker_stack_entry_t; struct msgpack_unpacker_stack_t { size_t depth; size_t capacity; msgpack_unpacker_stack_entry_t *data; - msgpack_unpacker_stack_t *parent; }; struct msgpack_unpacker_t { msgpack_buffer_t buffer; + msgpack_rvalue_stack_t value_stack; msgpack_unpacker_stack_t *stack; unsigned int head_byte; @@ -125,9 +138,13 @@ int msgpack_unpacker_read(msgpack_unpacker_t* uk, size_t target_stack_depth); int msgpack_unpacker_skip(msgpack_unpacker_t* uk, size_t target_stack_depth); +static inline VALUE _msgpack_unpacker_rvalue_stack_pop(msgpack_unpacker_t* uk) { + return uk->value_stack.data[--uk->value_stack.depth]; +} + static inline VALUE msgpack_unpacker_get_last_object(msgpack_unpacker_t* uk) { - return uk->last_object; + return _msgpack_unpacker_rvalue_stack_pop(uk); } diff --git a/ext/msgpack/unpacker_class.c b/ext/msgpack/unpacker_class.c index 06dd3dfa..3e5e3c28 100644 --- a/ext/msgpack/unpacker_class.c +++ b/ext/msgpack/unpacker_class.c @@ -65,11 +65,8 @@ static size_t Unpacker_memsize(const void *ptr) total_size += sizeof(msgpack_unpacker_ext_registry_t) / (uk->ext_registry->borrow_count + 1); } - msgpack_unpacker_stack_t *stack = uk->stack; - while (stack) { - total_size += (stack->depth + 1) * sizeof(msgpack_unpacker_stack_t); - stack = stack->parent; - } + total_size += (uk->stack->depth + 1) * sizeof(msgpack_unpacker_stack_t); + total_size += (uk->value_stack.depth * sizeof(VALUE)); return total_size + msgpack_buffer_memsize(&uk->buffer); } @@ -163,6 +160,7 @@ static VALUE Unpacker_allow_unknown_ext_p(VALUE self) NORETURN(static void raise_unpacker_error(msgpack_unpacker_t *uk, int r)) { uk->stack->depth = 0; + switch(r) { case PRIMITIVE_EOF: rb_raise(rb_eEOFError, "end of buffer reached");