Skip to content

Commit

Permalink
Merge branch 'develop/v0.8.0' of github.com:kaaproject/kaa into devel…
Browse files Browse the repository at this point in the history
…op/v0.8.0
  • Loading branch information
ikulikov committed Jan 26, 2016
2 parents 5b9418a + c872d1e commit 1da2f36
Show file tree
Hide file tree
Showing 19 changed files with 1,162 additions and 177 deletions.
187 changes: 132 additions & 55 deletions client/client-multi/client-c/src/kaa/kaa_logging.c
Expand Up @@ -19,6 +19,7 @@
#include <string.h>
#include <inttypes.h>
#include <sys/types.h>
#include <assert.h>
#include "platform/stdio.h"
#include "platform/sock.h"
#include "platform/time.h"
Expand All @@ -34,45 +35,40 @@
#include "utilities/kaa_log.h"
#include "avro_src/avro/io.h"



#define KAA_LOGGING_RECEIVE_UPDATES_FLAG 0x01
#define KAA_MAX_PADDING_LENGTH (KAA_ALIGNMENT - 1)



extern kaa_transport_channel_interface_t *kaa_channel_manager_get_transport_channel(kaa_channel_manager_t *self
, kaa_service_t service_type);

extern bool ext_log_upload_strategy_is_timeout_strategy(void *strategy);



typedef enum {
LOGGING_RESULT_SUCCESS = 0x00,
LOGGING_RESULT_FAILURE = 0x01
} logging_sync_result_t;

typedef struct {
uint16_t log_bucket_id;
kaa_time_t timeout;
kaa_time_t timeout; /**< Bucket timeout. */
uint16_t log_bucket_id; /**< ID of bucket present in storage. */
uint16_t log_count; /**< Current logs count. */
} timeout_info_t;

struct kaa_log_collector {
uint16_t log_bucket_id;
void *log_storage_context;
void *log_upload_strategy_context;
kaa_status_t *status;
kaa_channel_manager_t *channel_manager;
kaa_logger_t *logger;
kaa_list_t *timeouts;
bool is_sync_ignored;
kaa_log_bucket_constraints_t bucket_size;
void *log_storage_context;
void *log_upload_strategy_context;
kaa_status_t *status;
kaa_channel_manager_t *channel_manager;
kaa_logger_t *logger;
kaa_list_t *timeouts;
kaa_log_delivery_listener_t log_delivery_listeners;
bool is_sync_ignored;
uint32_t log_last_id; /**< Last log record ID */
uint16_t log_bucket_id;
};



static const kaa_service_t logging_sync_services[1] = {KAA_SERVICE_LOGGING};

static const kaa_service_t logging_sync_services[] = {KAA_SERVICE_LOGGING};

kaa_error_t kaa_logging_need_logging_resync(kaa_log_collector_t *self, bool *result)
{
Expand All @@ -85,7 +81,7 @@ kaa_error_t kaa_logging_need_logging_resync(kaa_log_collector_t *self, bool *res
return KAA_ERR_NONE;
}

static kaa_error_t remember_request(kaa_log_collector_t *self, uint16_t bucket_id)
static kaa_error_t remember_request(kaa_log_collector_t *self, uint16_t bucket_id, uint16_t count)
{
KAA_RETURN_IF_NIL(self, KAA_ERR_BADPARAM);

Expand All @@ -94,6 +90,7 @@ static kaa_error_t remember_request(kaa_log_collector_t *self, uint16_t bucket_i

info->log_bucket_id = bucket_id;
info->timeout = KAA_TIME() + (kaa_time_t)ext_log_upload_strategy_get_timeout(self->log_upload_strategy_context);
info->log_count = count;

kaa_list_node_t *it = kaa_list_push_back(self->timeouts, info);
if (!it) {
Expand All @@ -104,24 +101,41 @@ static kaa_error_t remember_request(kaa_log_collector_t *self, uint16_t bucket_i
return KAA_ERR_NONE;
}



static bool find_by_bucket_id(void *data, void *context)
{
KAA_RETURN_IF_NIL2(data, context, false);
return (((timeout_info_t *)data)->log_bucket_id == *((uint16_t *)context));
}
uint16_t bucket_id = *(uint16_t *) context;
timeout_info_t *timeout_info = data;

if (timeout_info->log_bucket_id == bucket_id) {
return true;
}

return false;
}


static kaa_error_t remove_request(kaa_log_collector_t *self, uint16_t bucket_id)
/* Returns amount of logs in bucket */
static size_t remove_request(kaa_log_collector_t *self, uint16_t bucket_id)
{
KAA_RETURN_IF_NIL(self, KAA_ERR_BADPARAM);
kaa_list_remove_first(self->timeouts, &find_by_bucket_id, &bucket_id, NULL);
return KAA_ERR_NONE;
}
kaa_list_node_t *node;
timeout_info_t *info;
size_t logs_sent = 0;

node = kaa_list_find_next(kaa_list_begin(self->timeouts), find_by_bucket_id, &bucket_id);

if (node) {
info = kaa_list_get_data(node);

if (info) {
logs_sent = info->log_count;
}

kaa_list_remove_at(self->timeouts, node, NULL);
}

return logs_sent;
}

static bool is_timeout(kaa_log_collector_t *self)
{
Expand All @@ -146,6 +160,15 @@ static bool is_timeout(kaa_log_collector_t *self)
while (it) {
timeout_info_t *info = (timeout_info_t *)kaa_list_get_data(it);
ext_log_storage_unmark_by_bucket_id(self->log_storage_context, info->log_bucket_id);
if (self->log_delivery_listeners.on_timeout) {
kaa_log_bucket_info_t log_bucket_info = {
.bucket_id = info->log_bucket_id,
.log_count = info->log_count,
};

self->log_delivery_listeners.on_timeout(self->log_delivery_listeners.ctx,
&log_bucket_info);
}
it = kaa_list_next(it);
}

Expand All @@ -172,8 +195,6 @@ static bool is_upload_allowed(kaa_log_collector_t *self)
return true;
}



void kaa_log_collector_destroy(kaa_log_collector_t *self)
{
KAA_RETURN_IF_NIL(self, );
Expand All @@ -194,13 +215,18 @@ kaa_error_t kaa_log_collector_create(kaa_log_collector_t **log_collector_p
kaa_log_collector_t * collector = (kaa_log_collector_t *) KAA_MALLOC(sizeof(kaa_log_collector_t));
KAA_RETURN_IF_NIL(collector, KAA_ERR_NOMEM);

collector->log_bucket_id = 0;
collector->log_storage_context = NULL;
collector->log_upload_strategy_context = NULL;
collector->status = status;
collector->channel_manager = channel_manager;
collector->logger = logger;
collector->is_sync_ignored = false;
collector->log_bucket_id = 0;
collector->log_last_id = 0;
collector->log_storage_context = NULL;
collector->log_upload_strategy_context = NULL;
collector->status = status;
collector->channel_manager = channel_manager;
collector->logger = logger;
collector->is_sync_ignored = false;

/* Must be overriden in _init() */
collector->bucket_size.max_bucket_log_count = 0;
collector->bucket_size.max_bucket_size = 0;

collector->timeouts = kaa_list_create();
if (!collector->timeouts) {
Expand All @@ -213,8 +239,7 @@ kaa_error_t kaa_log_collector_create(kaa_log_collector_t **log_collector_p
}



kaa_error_t kaa_logging_init(kaa_log_collector_t *self, void *log_storage_context, void *log_upload_strategy_context)
kaa_error_t kaa_logging_init(kaa_log_collector_t *self, void *log_storage_context, void *log_upload_strategy_context, const kaa_log_bucket_constraints_t *bucket_sizes)
{
KAA_RETURN_IF_NIL3(self, log_storage_context, log_upload_strategy_context, KAA_ERR_BADPARAM);

Expand All @@ -223,6 +248,8 @@ kaa_error_t kaa_logging_init(kaa_log_collector_t *self, void *log_storage_contex

self->log_storage_context = log_storage_context;
self->log_upload_strategy_context = log_upload_strategy_context;
self->log_delivery_listeners = KAA_LOG_EMPTY_LISTENERS;
self->bucket_size = *bucket_sizes;

KAA_LOG_DEBUG(self->logger, KAA_ERR_NONE, "Initialized log collector with log storage {%p}, log upload strategy {%p}"
, log_storage_context, log_upload_strategy_context);
Expand All @@ -231,6 +258,29 @@ kaa_error_t kaa_logging_init(kaa_log_collector_t *self, void *log_storage_contex
}


kaa_error_t kaa_logging_set_strategy(kaa_log_collector_t *self, void *log_upload_strategy_context)
{
KAA_RETURN_IF_NIL2(self, log_upload_strategy_context, KAA_ERR_BADPARAM);

if (self->log_upload_strategy_context)
ext_log_upload_strategy_destroy(self->log_upload_strategy_context);

self->log_upload_strategy_context = log_upload_strategy_context;

return KAA_ERR_NONE;
}

kaa_error_t kaa_logging_set_storage(kaa_log_collector_t *self, void *log_storage_context)
{
KAA_RETURN_IF_NIL2(self, log_storage_context, KAA_ERR_BADPARAM);

if (self->log_storage_context)
ext_log_storage_destroy(self->log_storage_context);

self->log_storage_context = log_storage_context;

return KAA_ERR_NONE;
}

static void do_sync(kaa_log_collector_t *self)
{
Expand Down Expand Up @@ -263,15 +313,16 @@ static void update_storage(kaa_log_collector_t *self)



kaa_error_t kaa_logging_add_record(kaa_log_collector_t *self, kaa_user_log_record_t *entry)
kaa_error_t kaa_logging_add_record(kaa_log_collector_t *self, kaa_user_log_record_t *entry, kaa_log_record_info_t *log_info)
{
KAA_RETURN_IF_NIL2(self, entry, KAA_ERR_BADPARAM);
KAA_RETURN_IF_NIL(self->log_storage_context, KAA_ERR_NOT_INITIALIZED);

kaa_log_record_t record = { NULL, entry->get_size(entry) };
if (!record.size) {
KAA_LOG_ERROR(self->logger, KAA_ERR_BADDATA, "Failed to add log record: serialized record size is null."
"Maybe log record schema is empty");
KAA_LOG_ERROR(self->logger, KAA_ERR_BADDATA,
"Failed to add log record: serialized record size is null. "
"Maybe log record schema is empty");
return KAA_ERR_BADDATA;
}

Expand All @@ -297,14 +348,25 @@ kaa_error_t kaa_logging_add_record(kaa_log_collector_t *self, kaa_user_log_recor
ext_log_storage_deallocate_log_record_buffer(self->log_storage_context, &record);
return error;
}

KAA_LOG_TRACE(self->logger, KAA_ERR_NONE, "Added log record, size %zu", record.size);
if (!is_timeout(self))
update_storage(self);

if (log_info) {
log_info->bucket_id = self->log_bucket_id + 1;
log_info->log_id = self->log_last_id++;
}

return KAA_ERR_NONE;
}


kaa_error_t kaa_logging_set_listeners(kaa_log_collector_t *self, const kaa_log_delivery_listener_t *listeners)
{
KAA_RETURN_IF_NIL2(self, listeners, KAA_ERR_BADPARAM);
self->log_delivery_listeners = *listeners;
return KAA_ERR_NONE;
}

kaa_error_t kaa_logging_request_get_size(kaa_log_collector_t *self, size_t *expected_size)
{
Expand All @@ -328,7 +390,7 @@ kaa_error_t kaa_logging_request_get_size(kaa_log_collector_t *self, size_t *expe
*expected_size += sizeof(uint32_t); // request id + log records count

size_t actual_size = records_count * sizeof(uint32_t) + records_count * KAA_MAX_PADDING_LENGTH + total_size;
size_t bucket_size = ext_log_upload_strategy_get_bucket_size(self->log_upload_strategy_context);
size_t bucket_size = self->bucket_size.max_bucket_size;

*expected_size += ((actual_size > bucket_size) ? bucket_size : actual_size);
}
Expand Down Expand Up @@ -360,16 +422,17 @@ kaa_error_t kaa_logging_request_serialize(kaa_log_collector_t *self, kaa_platfor
return KAA_ERR_WRITE_FAILED;
}

if (!self->log_bucket_id)
self->log_bucket_id = self->status->log_bucket_id;
++self->log_bucket_id;

*((uint16_t *) tmp_writer.current) = KAA_HTONS(self->log_bucket_id);
tmp_writer.current += sizeof(uint16_t);
char *records_count_p = tmp_writer.current; // Pointer to the records count. Will be filled in later.
tmp_writer.current += sizeof(uint16_t);

size_t bucket_size = ext_log_upload_strategy_get_bucket_size(self->log_upload_strategy_context);
/* Bucket size constraints */

size_t bucket_size = self->bucket_size.max_bucket_size;
size_t max_log_count = self->bucket_size.max_bucket_log_count;
size_t actual_size = (tmp_writer.end - tmp_writer.current);

bucket_size = (actual_size > bucket_size ? bucket_size : actual_size);
Expand All @@ -378,7 +441,7 @@ kaa_error_t kaa_logging_request_serialize(kaa_log_collector_t *self, kaa_platfor

uint16_t records_count = 0;

while (!error && bucket_size > sizeof(uint32_t)) {
while (!error && bucket_size > sizeof(uint32_t) && records_count < max_log_count) {
size_t record_len = 0;
error = ext_log_storage_write_next_record(self->log_storage_context
, tmp_writer.current + sizeof(uint32_t)
Expand Down Expand Up @@ -414,7 +477,7 @@ kaa_error_t kaa_logging_request_serialize(kaa_log_collector_t *self, kaa_platfor
*((uint16_t *) records_count_p) = KAA_HTONS(records_count);
*writer = tmp_writer;

error = remember_request(self, self->log_bucket_id);
error = remember_request(self, self->log_bucket_id, records_count);
if (error) {
KAA_LOG_WARN(self->logger, error, "Failed to remember request time stamp");
}
Expand Down Expand Up @@ -455,17 +518,31 @@ kaa_error_t kaa_logging_handle_server_sync(kaa_log_collector_t *self
error_code = kaa_platform_message_read(reader, &delivery_error_code, sizeof(int8_t));
KAA_RETURN_IF_ERR(error_code);


if (delivery_result == LOGGING_RESULT_SUCCESS) {
KAA_LOG_INFO(self->logger, KAA_ERR_NONE, "Log bucket uploaded successfully, id '%u'", bucket_id);
} else {
KAA_LOG_WARN(self->logger, KAA_ERR_WRITE_FAILED, "Failed to upload log bucket, id '%u' (delivery error code '%u')", bucket_id, delivery_error_code);
}

remove_request(self, bucket_id);
size_t uploaded_count = remove_request(self, bucket_id);

kaa_log_bucket_info_t log_bucket_info = {
.log_count = uploaded_count,
.bucket_id = bucket_id,
};

if (delivery_result == LOGGING_RESULT_SUCCESS) {
if (self->log_delivery_listeners.on_success) {
self->log_delivery_listeners.on_success(self->log_delivery_listeners.ctx,
&log_bucket_info);
}
ext_log_storage_remove_by_bucket_id(self->log_storage_context, bucket_id);
} else {
if (self->log_delivery_listeners.on_failed) {
self->log_delivery_listeners.on_failed(self->log_delivery_listeners.ctx,
&log_bucket_info);
}
ext_log_storage_unmark_by_bucket_id(self->log_storage_context, bucket_id);
ext_log_upload_strategy_on_failure(self->log_upload_strategy_context
, (logging_delivery_error_code_t)delivery_error_code);
Expand All @@ -481,10 +558,10 @@ kaa_error_t kaa_logging_handle_server_sync(kaa_log_collector_t *self

extern void ext_log_upload_timeout(kaa_log_collector_t *self)
{
if (!is_timeout(self))
update_storage(self);
else if (ext_log_upload_strategy_is_timeout_strategy(self->log_upload_strategy_context))
if (!is_timeout(self)
|| ext_log_upload_strategy_is_timeout_strategy(self->log_upload_strategy_context)) {
update_storage(self);
}
}

#endif
Expand Down

0 comments on commit 1da2f36

Please sign in to comment.