Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add workaround for null value handling in Avro C API
The Avro C API fails to write bytes of size zero. A workaround is to write
a single zero byte for each NULL field of type bytes.

Also added an option to configure the Avro block size in case very large
records are written.
  • Loading branch information
markus456 committed Mar 4, 2017
1 parent 09df0ac commit f2fc9b9
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 9 deletions.
5 changes: 5 additions & 0 deletions Documentation/Routers/Avrorouter.md
Expand Up @@ -142,6 +142,11 @@ data block. The default value is 1 transaction.
Controls the number of row events that are grouped into a single Avro
data block. The default value is 1000 row events.

#### `block_size`

The Avro data block size in bytes. The default is 16 kilobytes. Increase this
value if individual events in the binary logs are very large.

# Files Created by the Avrorouter

The avrorouter creates two files in the location pointed by _avrodir_:
Expand Down
3 changes: 2 additions & 1 deletion server/modules/include/avrorouter.h
Expand Up @@ -261,6 +261,7 @@ typedef struct avro_instance
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */
uint64_t block_size; /**< Avro datablock size */
struct avro_instance *next;
} AVRO_INSTANCE;

Expand All @@ -278,7 +279,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size);
extern void* avro_table_free(AVRO_TABLE *table);
extern void avro_flush_all_tables(AVRO_INSTANCE *router);
extern char* json_new_schema_from_table(TABLE_MAP *map);
Expand Down
5 changes: 5 additions & 0 deletions server/modules/routing/avro/avro.c
Expand Up @@ -332,6 +332,7 @@ createInstance(SERVICE *service, char **options)
inst->trx_count = 0;
inst->row_target = AVRO_DEFAULT_BLOCK_ROW_COUNT;
inst->trx_target = AVRO_DEFAULT_BLOCK_TRX_COUNT;
inst->block_size = 0;
int first_file = 1;
bool err = false;

Expand Down Expand Up @@ -402,6 +403,10 @@ createInstance(SERVICE *service, char **options)
{
first_file = MAX(1, atoi(value));
}
else if (strcmp(options[i], "block_size") == 0)
{
inst->block_size = atoi(value);
}
else
{
MXS_WARNING("[avrorouter] Unknown router option: '%s'", options[i]);
Expand Down
4 changes: 2 additions & 2 deletions server/modules/routing/avro/avro_file.c
Expand Up @@ -105,7 +105,7 @@ void avro_close_binlog(int fd)
* @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format
*/
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size)
{
AVRO_TABLE *table = calloc(1, sizeof(AVRO_TABLE));
if (table)
Expand All @@ -126,7 +126,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
}
else
{
rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file);
rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, &table->avro_file, "null", block_size);
}

if (rc)
Expand Down
30 changes: 24 additions & 6 deletions server/modules/routing/avro/avro_rbr.c
Expand Up @@ -104,7 +104,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr

/** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size);

if (avro_table)
{
Expand Down Expand Up @@ -296,7 +296,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record);
if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}

/** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with
Expand All @@ -305,7 +309,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record);
if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
}

rows++;
Expand Down Expand Up @@ -501,14 +509,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
{
ss_dassert(create->columns == map->columns);
avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);

if (bit_is_set(columns_present, ncolumns, i))
{
npresent++;
if (bit_is_set(null_bitmap, ncolumns, i))
{
avro_value_set_null(&field);
if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
else
{
avro_value_set_null(&field);
}
}
else if (column_is_fixed_string(map->column_types[i]))
{
Expand Down Expand Up @@ -604,7 +621,8 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
}
else
{
avro_value_set_null(&field);
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
ss_dassert(ptr < end);
}
Expand Down

0 comments on commit f2fc9b9

Please sign in to comment.