From 9846ba25e25078869e917c2588cfd3bed74d84d2 Mon Sep 17 00:00:00 2001 From: lee-emqx Date: Wed, 24 Jan 2024 16:33:32 +0800 Subject: [PATCH 1/4] * MDF [parquet] rewrite parquet_object_free. Signed-off-by: lee-emqx --- src/supplemental/nanolib/parquet/parquet.cc | 56 +++++++++++---------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/src/supplemental/nanolib/parquet/parquet.cc b/src/supplemental/nanolib/parquet/parquet.cc index 6cd09c892..8c51619ca 100644 --- a/src/supplemental/nanolib/parquet/parquet.cc +++ b/src/supplemental/nanolib/parquet/parquet.cc @@ -112,7 +112,7 @@ setup_schema() parquet_object * parquet_object_alloc(uint64_t *keys, uint8_t **darray, uint32_t *dsize, - uint32_t size, nng_aio *aio, void *arg) + uint32_t size, nng_aio *aio, void *arg, parquet_write_type type) { parquet_object *elem = new parquet_object; elem->keys = keys; @@ -121,9 +121,8 @@ parquet_object_alloc(uint64_t *keys, uint8_t **darray, uint32_t *dsize, elem->size = size; elem->aio = aio; elem->arg = arg; - elem->result = PARQUET_WRITE_FAILURE; + elem->type = type; elem->file_size = 0; - elem->file_list = NULL; return elem; } @@ -134,14 +133,31 @@ parquet_object_free(parquet_object *elem) FREE_IF_NOT_NULL(elem->keys, elem->size); FREE_IF_NOT_NULL(elem->darray, elem->size); FREE_IF_NOT_NULL(elem->dsize, elem->size); - nng_aio_set_prov_data(elem->aio, elem->arg); - uint32_t *szp = (uint32_t *) malloc(sizeof(uint32_t)); - *szp = elem->size; - nng_aio_set_msg(elem->aio, (nng_msg *) szp); + switch (elem->type) + { + case HOOK_WRITE: + { + nng_aio_set_prov_data(elem->aio, elem->arg); + uint32_t *szp = (uint32_t *) malloc(sizeof(uint32_t)); + *szp = elem->size; + nng_aio_set_msg(elem->aio, (nng_msg *) szp); + break; + + } + case EXCHANGE_WRITE: + { + nng_aio_set_prov_data(elem->aio, elem->file_ranges); + uint32_t *szp = (uint32_t *) malloc(sizeof(uint32_t)); + *szp = elem->file_size; + nng_aio_set_msg(elem->aio, (nng_msg *) szp); + break; + } + default: + break; + } DO_IT_IF_NOT_NULL(nng_aio_finish_sync, elem->aio, 0); - for (int i = 0; i < elem->file_size && elem->file_list; i++) { - FREE_IF_NOT_NULL( - elem->file_list[i], strlen(elem->file_list[i])); + for (int i = 0; i < elem->file_size && elem->file_ranges; i++) { + parquet_file_range_free(elem->file_ranges[i]); } delete elem; } @@ -216,10 +232,10 @@ parquet_write( uint32_t old_index = 0; uint32_t new_index = 0; again: + new_index = compute_new_index(elem, old_index, conf->file_size); uint64_t key_start = elem->keys[old_index]; uint64_t key_end = elem->keys[new_index]; - pthread_mutex_lock(&parquet_queue_mutex); char *filename = get_file_name(conf, key_start, key_end); if (filename == NULL) { @@ -234,11 +250,10 @@ parquet_write( pthread_mutex_unlock(&parquet_queue_mutex); { + parquet_file_range *range = parquet_file_range_alloc(old_index, new_index, filename); - elem->file_list = (char **) realloc( - elem->file_list, sizeof(char *) * (++elem->file_size)); - *(elem->file_list + elem->file_size - 1) = - nng_strdup(filename); + elem->file_ranges = (parquet_file_range **) realloc( + elem->file_ranges, sizeof(parquet_file_range*) * (++elem->file_size)); // Create a ParquetFileWriter instance parquet::WriterProperties::Builder builder; @@ -297,11 +312,6 @@ parquet_write( goto again; } - elem->result = PARQUET_WRITE_SUCCESS; - - if (elem->cb) { - elem->cb(elem); - } parquet_object_free(elem); return 0; } @@ -433,9 +443,3 @@ parquet_find_span(uint64_t start_key, uint64_t end_key, uint32_t *size) (*size) = local_size; return ret; } - -void -parquet_object_set_cb(parquet_object *obj, parquet_cb cb) -{ - obj->cb = cb; -} \ No newline at end of file From 0c298cbed5ef5cf50b276da927c44b36645f1fa1 Mon Sep 17 00:00:00 2001 From: lee-emqx Date: Wed, 24 Jan 2024 16:34:23 +0800 Subject: [PATCH 2/4] * MDF [parquet] add parquet_file_range_alloc/free function. Signed-off-by: lee-emqx --- src/supplemental/nanolib/parquet/parquet.cc | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/supplemental/nanolib/parquet/parquet.cc b/src/supplemental/nanolib/parquet/parquet.cc index 8c51619ca..68f6e6779 100644 --- a/src/supplemental/nanolib/parquet/parquet.cc +++ b/src/supplemental/nanolib/parquet/parquet.cc @@ -110,6 +110,26 @@ setup_schema() GroupNode::Make("schema", Repetition::REQUIRED, fields)); } +parquet_file_range * +parquet_file_range_alloc(uint32_t start_idx, uint32_t end_idx, char *filename) +{ + parquet_file_range *range = new parquet_file_range; + range->start_idx = start_idx; + range->end_idx = end_idx; + range->filename = nng_strdup(filename); + return range; +} + +void +parquet_file_range_free(parquet_file_range *range) +{ + if (range) { + FREE_IF_NOT_NULL( + range->filename, strlen(range->filename)); + delete range; + } +} + parquet_object * parquet_object_alloc(uint64_t *keys, uint8_t **darray, uint32_t *dsize, uint32_t size, nng_aio *aio, void *arg, parquet_write_type type) From 99ff2b56b7f6db9890cc99f406b4f093af3a7f63 Mon Sep 17 00:00:00 2001 From: lee-emqx Date: Wed, 24 Jan 2024 16:35:23 +0800 Subject: [PATCH 3/4] * MDF [parquet] update header file. Signed-off-by: lee-emqx --- include/nng/supplemental/nanolib/parquet.h | 33 ++++++++++++++-------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/include/nng/supplemental/nanolib/parquet.h b/include/nng/supplemental/nanolib/parquet.h index d5f0f2ed0..f358e8673 100644 --- a/include/nng/supplemental/nanolib/parquet.h +++ b/include/nng/supplemental/nanolib/parquet.h @@ -14,21 +14,31 @@ typedef void (*parquet_cb)(parquet_object *arg); #define PARQUET_WRITE_SUCCESS 1 #define PARQUET_WRITE_FAILURE 0 +typedef struct { + uint32_t start_idx; + uint32_t end_idx; + char *filename; +} parquet_file_range; + +typedef enum { + HOOK_WRITE, + EXCHANGE_WRITE +} parquet_write_type; + struct parquet_object { - uint64_t *keys; - uint8_t **darray; - uint32_t *dsize; - uint32_t size; - nng_aio *aio; - void *arg; - int result; - int file_size; - char **file_list; - parquet_cb cb; + uint64_t *keys; + uint8_t **darray; + uint32_t *dsize; + uint32_t size; + nng_aio *aio; + void *arg; + parquet_write_type type; + int file_size; + parquet_file_range **file_ranges; }; parquet_object *parquet_object_alloc(uint64_t *keys, uint8_t **darray, - uint32_t *dsize, uint32_t size, nng_aio *aio, void *arg); + uint32_t *dsize, uint32_t size, nng_aio *aio, void *arg, parquet_write_type type); void parquet_object_free(parquet_object *elem); void parquet_object_set_cb(parquet_object *obj, parquet_cb cb); @@ -39,7 +49,6 @@ const char *parquet_find(uint64_t key); const char **parquet_find_span( uint64_t start_key, uint64_t end_key, uint32_t *size); -nng_msg *parquet_find_msg(char *filename, uint64_t key); #ifdef __cplusplus } From 4172a098225af000c1bebd3783f31748c798aca6 Mon Sep 17 00:00:00 2001 From: lee-emqx Date: Wed, 24 Jan 2024 18:12:10 +0800 Subject: [PATCH 4/4] * FIX [parquet] add some fix. Signed-off-by: lee-emqx --- src/supplemental/nanolib/parquet/parquet.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/supplemental/nanolib/parquet/parquet.cc b/src/supplemental/nanolib/parquet/parquet.cc index 68f6e6779..dbcdf65f4 100644 --- a/src/supplemental/nanolib/parquet/parquet.cc +++ b/src/supplemental/nanolib/parquet/parquet.cc @@ -142,6 +142,7 @@ parquet_object_alloc(uint64_t *keys, uint8_t **darray, uint32_t *dsize, elem->aio = aio; elem->arg = arg; elem->type = type; + elem->file_ranges = NULL; elem->file_size = 0; return elem; } @@ -274,6 +275,7 @@ parquet_write( elem->file_ranges = (parquet_file_range **) realloc( elem->file_ranges, sizeof(parquet_file_range*) * (++elem->file_size)); + *elem->file_ranges = range; // Create a ParquetFileWriter instance parquet::WriterProperties::Builder builder; @@ -306,7 +308,7 @@ parquet_write( parquet::Int64Writer *int64_writer = static_cast( rg_writer->NextColumn()); - for (uint32_t i = old_index; i < new_index; i++) { + for (uint32_t i = old_index; i <= new_index; i++) { int64_t value = elem->keys[i]; int16_t definition_level = 1; int64_writer->WriteBatch( @@ -317,7 +319,7 @@ parquet_write( parquet::ByteArrayWriter *ba_writer = static_cast( rg_writer->NextColumn()); - for (uint32_t i = old_index; i < new_index; i++) { + for (uint32_t i = old_index; i <= new_index; i++) { parquet::ByteArray value; int16_t definition_level = 1; value.ptr = elem->darray[i]; @@ -462,4 +464,4 @@ parquet_find_span(uint64_t start_key, uint64_t end_key, uint32_t *size) pthread_mutex_unlock(&parquet_queue_mutex); (*size) = local_size; return ret; -} +} \ No newline at end of file