Skip to content

Commit

Permalink
sysrepo FEATURE increase notification timestamp precision
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvasko committed May 19, 2021
1 parent dfef551 commit 7ba762f
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 68 deletions.
2 changes: 1 addition & 1 deletion examples/notif_subscribe_example.c
Expand Up @@ -118,7 +118,7 @@ print_val(const sr_val_t *value)

static void
notif_cb(sr_session_ctx_t *session, const sr_ev_notif_type_t notif_type, uint32_t sub_id, const char *path,
const sr_val_t *values, const size_t values_cnt, time_t timestamp, void *private_data)
const sr_val_t *values, const size_t values_cnt, struct timespec timestamp, void *private_data)
{
size_t i;

Expand Down
16 changes: 12 additions & 4 deletions src/common.c
Expand Up @@ -554,6 +554,7 @@ sr_subscr_notif_sub_del(sr_subscription_ctx_t *subscr, uint32_t sub_id, sr_lock_
struct modsub_notifsub_s *sub;
sr_session_ctx_t *ev_sess = NULL;
sr_lock_mode_t cur_mode = has_subs_lock;
struct timespec cur_time;

assert((has_subs_lock == SR_LOCK_WRITE) || (has_subs_lock == SR_LOCK_READ_UPGR) || (has_subs_lock == SR_LOCK_NONE));

Expand Down Expand Up @@ -594,8 +595,9 @@ sr_subscr_notif_sub_del(sr_subscription_ctx_t *subscr, uint32_t sub_id, sr_lock_

if (ev_sess) {
/* send special last notification */
sr_time_get(&cur_time, 0);
if ((err_info = sr_notif_call_callback(ev_sess, sub->cb, sub->tree_cb, sub->private_data,
SR_EV_NOTIF_TERMINATED, sub->sub_id, NULL, time(NULL)))) {
SR_EV_NOTIF_TERMINATED, sub->sub_id, NULL, cur_time))) {
sr_errinfo_free(&err_info);
}
}
Expand Down Expand Up @@ -1418,7 +1420,7 @@ sr_notif_find_subscriber(sr_conn_ctx_t *conn, const char *mod_name, sr_mod_notif

sr_error_info_t *
sr_notif_call_callback(sr_session_ctx_t *ev_sess, sr_event_notif_cb cb, sr_event_notif_tree_cb tree_cb, void *private_data,
const sr_ev_notif_type_t notif_type, uint32_t sub_id, const struct lyd_node *notif_op, time_t notif_ts)
const sr_ev_notif_type_t notif_type, uint32_t sub_id, const struct lyd_node *notif_op, struct timespec notif_ts)
{
sr_error_info_t *err_info = NULL;
const struct lyd_node *elem;
Expand Down Expand Up @@ -2523,8 +2525,14 @@ sr_time_get(struct timespec *ts, uint32_t add_ms)
return;
}

add_ms += ts->tv_nsec / 1000000;
ts->tv_nsec %= 1000000;
if (!add_ms) {
return;
}

if (ts->tv_nsec) {
add_ms += ts->tv_nsec / 1000000;
ts->tv_nsec %= 1000000;
}
ts->tv_nsec += (add_ms % 1000) * 1000000;
ts->tv_sec += add_ms / 1000;
}
Expand Down
4 changes: 2 additions & 2 deletions src/common.h.in
Expand Up @@ -388,7 +388,7 @@ struct sr_session_ctx_s {
(READ-lock is not used). */
struct sr_sess_notif_buf_node {
char *notif_lyb; /**< Buffered notification to be stored in LYB format. */
time_t notif_ts; /**< Buffered notification timestamp. */
struct timespec notif_ts; /**< Buffered notification timestamp. */
const struct lys_module *notif_mod; /**< Buffered notification modules. */
struct sr_sess_notif_buf_node *next; /**< Next stored notification buffer node. */
} *first; /**< First stored notification buffer node. */
Expand Down Expand Up @@ -730,7 +730,7 @@ sr_error_info_t *sr_notif_find_subscriber(sr_conn_ctx_t *conn, const char *mod_n
*/
sr_error_info_t *sr_notif_call_callback(sr_session_ctx_t *ev_sess, sr_event_notif_cb cb, sr_event_notif_tree_cb tree_cb,
void *private_data, const sr_ev_notif_type_t notif_type, uint32_t sub_id, const struct lyd_node *notif_op,
time_t notif_ts);
struct timespec notif_ts);

/*
* Utility functions
Expand Down
4 changes: 2 additions & 2 deletions src/modinfo.c
Expand Up @@ -2178,7 +2178,7 @@ sr_modinfo_generate_config_change_notif(struct sr_mod_info_s *mod_info, sr_sessi
struct lyd_node *root, *elem, *notif = NULL;
struct ly_set *set;
sr_mod_t *shm_mod;
time_t notif_ts;
struct timespec notif_ts;
sr_mod_notif_sub_t *notif_subs;
uint32_t idx = 0, notif_sub_count;
char *xpath;
Expand Down Expand Up @@ -2214,7 +2214,7 @@ sr_modinfo_generate_config_change_notif(struct sr_mod_info_s *mod_info, sr_sessi
}

/* remember when the notification was generated */
notif_ts = time(NULL);
sr_time_get(&notif_ts, 0);

/* EXT READ LOCK */
if ((err_info = sr_shmext_conn_remap_lock(mod_info->conn, SR_LOCK_READ, 0, __func__))) {
Expand Down
37 changes: 20 additions & 17 deletions src/replay.c
Expand Up @@ -269,7 +269,7 @@ sr_replay_find_file(const char *mod_name, time_t from_ts, time_t to_ts, time_t *
* @return err_info, NULL on success.
*/
static sr_error_info_t *
sr_writev_notif(int fd, const char *notif_lyb, uint32_t notif_lyb_len, time_t notif_ts)
sr_writev_notif(int fd, const char *notif_lyb, uint32_t notif_lyb_len, struct timespec notif_ts)
{
sr_error_info_t *err_info = NULL;
struct iovec iov[3];
Expand Down Expand Up @@ -359,7 +359,7 @@ sr_replay_rename_file(const char *mod_name, time_t old_from_ts, time_t old_to_ts
* @return err_info, NULL on success.
*/
static sr_error_info_t *
sr_notif_write(const struct lys_module *ly_mod, sr_mod_t *shm_mod, char *notif_lyb, time_t notif_ts, sr_cid_t cid)
sr_notif_write(const struct lys_module *ly_mod, sr_mod_t *shm_mod, char *notif_lyb, struct timespec notif_ts, sr_cid_t cid)
{
sr_error_info_t *err_info = NULL;
time_t from_ts, to_ts;
Expand Down Expand Up @@ -398,7 +398,7 @@ sr_notif_write(const struct lys_module *ly_mod, sr_mod_t *shm_mod, char *notif_l
}

/* update notification file name */
if ((err_info = sr_replay_rename_file(ly_mod->name, from_ts, to_ts, notif_ts))) {
if ((err_info = sr_replay_rename_file(ly_mod->name, from_ts, to_ts, notif_ts.tv_sec))) {
goto cleanup_unlock;
}

Expand All @@ -412,7 +412,8 @@ sr_notif_write(const struct lys_module *ly_mod, sr_mod_t *shm_mod, char *notif_l
}

/* creating a new file */
if ((err_info = sr_replay_open_file(ly_mod->name, notif_ts, notif_ts, O_WRONLY | O_APPEND | O_CREAT | O_EXCL, &fd))) {
if ((err_info = sr_replay_open_file(ly_mod->name, notif_ts.tv_sec, notif_ts.tv_sec,
O_WRONLY | O_APPEND | O_CREAT | O_EXCL, &fd))) {
goto cleanup_unlock;
}

Expand Down Expand Up @@ -444,7 +445,8 @@ sr_notif_write(const struct lys_module *ly_mod, sr_mod_t *shm_mod, char *notif_l
* @return err_info, NULL on success.
*/
static sr_error_info_t *
sr_notif_buf_store(struct sr_sess_notif_buf *notif_buf, const struct lys_module *ly_mod, char *notif_lyb, time_t notif_ts)
sr_notif_buf_store(struct sr_sess_notif_buf *notif_buf, const struct lys_module *ly_mod, char *notif_lyb,
struct timespec notif_ts)
{
sr_error_info_t *err_info = NULL;
struct sr_sess_notif_buf_node *node = NULL;
Expand Down Expand Up @@ -499,7 +501,7 @@ sr_notif_buf_store(struct sr_sess_notif_buf *notif_buf, const struct lys_module
}

sr_error_info_t *
sr_replay_store(sr_session_ctx_t *sess, const struct lyd_node *notif, time_t notif_ts)
sr_replay_store(sr_session_ctx_t *sess, const struct lyd_node *notif, struct timespec notif_ts)
{
sr_error_info_t *err_info = NULL;
sr_mod_t *shm_mod;
Expand Down Expand Up @@ -664,9 +666,9 @@ sr_notif_buf_thread(void *arg)
* @return err_info, NULL on success.
*/
static sr_error_info_t *
sr_replay_read_ts(int notif_fd, time_t *notif_ts)
sr_replay_read_ts(int notif_fd, struct timespec *notif_ts)
{
*notif_ts = 0;
memset(notif_ts, 0, sizeof *notif_ts);
return sr_read(notif_fd, notif_ts, sizeof *notif_ts);
}

Expand Down Expand Up @@ -744,7 +746,8 @@ sr_replay_notify(sr_conn_ctx_t *conn, const char *mod_name, uint32_t sub_id, con
{
sr_error_info_t *err_info = NULL;
sr_mod_t *shm_mod;
time_t file_from_ts, file_to_ts, notif_ts;
time_t file_from_ts, file_to_ts;
struct timespec notif_ts;
struct ly_set *set = NULL;
struct lyd_node *notif = NULL, *notif_op;
sr_session_ctx_t *ev_sess = NULL;
Expand Down Expand Up @@ -782,17 +785,17 @@ sr_replay_notify(sr_conn_ctx_t *conn, const char *mod_name, uint32_t sub_id, con
if ((err_info = sr_replay_read_ts(fd, &notif_ts))) {
goto cleanup;
}
if (!notif_ts) {
if (!notif_ts.tv_sec) {
sr_errinfo_new(&err_info, SR_ERR_INTERNAL, "Unexpected notification file EOF.");
goto cleanup;
}
if ((notif_ts < start_time) && (err_info = sr_replay_skip_notif(fd))) {
if ((notif_ts.tv_sec < start_time) && (err_info = sr_replay_skip_notif(fd))) {
goto cleanup;
}
} while (notif_ts < start_time);
} while (notif_ts.tv_sec < start_time);

/* replay notifications until stop_time is reached */
while (notif_ts && (!stop_time || (notif_ts <= stop_time))) {
while (notif_ts.tv_sec && (!stop_time || (notif_ts.tv_sec <= stop_time))) {

/* parse notification */
lyd_free_all(notif);
Expand Down Expand Up @@ -828,7 +831,7 @@ sr_replay_notify(sr_conn_ctx_t *conn, const char *mod_name, uint32_t sub_id, con
}

/* no more notifications should be replayed */
if (stop_time && (notif_ts > stop_time)) {
if (stop_time && (notif_ts.tv_sec > stop_time)) {
break;
}

Expand All @@ -839,9 +842,9 @@ sr_replay_notify(sr_conn_ctx_t *conn, const char *mod_name, uint32_t sub_id, con
}

/* replay last notification if the subscription continues */
notif_ts = time(NULL);
if ((!stop_time || (stop_time >= notif_ts)) && (err_info = sr_notif_call_callback(ev_sess, cb, tree_cb, private_data,
SR_EV_NOTIF_REPLAY_COMPLETE, sub_id, NULL, stop_time ? stop_time : notif_ts))) {
sr_time_get(&notif_ts, 0);
if ((!stop_time || (stop_time >= notif_ts.tv_sec)) && (err_info = sr_notif_call_callback(ev_sess, cb, tree_cb,
private_data, SR_EV_NOTIF_REPLAY_COMPLETE, sub_id, NULL, notif_ts))) {
goto cleanup;
}

Expand Down
2 changes: 1 addition & 1 deletion src/replay.h
Expand Up @@ -53,7 +53,7 @@ sr_error_info_t *sr_replay_find_file(const char *mod_name, time_t from_ts, time_
* @param[in] notif_ts Notification timestamp to store.
* @return err_info, NULL on success.
*/
sr_error_info_t *sr_replay_store(sr_session_ctx_t *sess, const struct lyd_node *notif, time_t notif_ts);
sr_error_info_t *sr_replay_store(sr_session_ctx_t *sess, const struct lyd_node *notif, struct timespec notif_ts);

/**
* @brief Notification buffer thread.
Expand Down
2 changes: 1 addition & 1 deletion src/shm.h
Expand Up @@ -1075,7 +1075,7 @@ sr_error_info_t *sr_shmsub_rpc_notify_abort(sr_conn_ctx_t *conn, sr_rpc_t *shm_r
* @param[in] wait Whether to wait for the callbacks or not.
* @return err_info, NULL on success.
*/
sr_error_info_t *sr_shmsub_notif_notify(sr_conn_ctx_t *conn, const struct lyd_node *notif, time_t notif_ts,
sr_error_info_t *sr_shmsub_notif_notify(sr_conn_ctx_t *conn, const struct lyd_node *notif, struct timespec notif_ts,
const char *orig_name, const void *orig_data, uint32_t timeout_ms, int wait);

/**
Expand Down
34 changes: 17 additions & 17 deletions src/shm_sub.c
Expand Up @@ -514,7 +514,7 @@ sr_shmsub_notify_write_event(sr_sub_shm_t *sub_shm, uint32_t request_id, sr_sub_
static sr_error_info_t *
sr_shmsub_multi_notify_write_event(sr_multi_sub_shm_t *multi_sub_shm, uint32_t request_id, uint32_t priority,
sr_sub_event_t event, const char *orig_name, const void *orig_data, uint32_t subscriber_count,
sr_shm_t *shm_data_sub, time_t notif_ts, const char *data, uint32_t data_len, const char *event_desc)
sr_shm_t *shm_data_sub, struct timespec *notif_ts, const char *data, uint32_t data_len, const char *event_desc)
{
sr_error_info_t *err_info = NULL;
char *shm_data_ptr;
Expand All @@ -539,7 +539,7 @@ sr_shmsub_multi_notify_write_event(sr_multi_sub_shm_t *multi_sub_shm, uint32_t r
/* remap if needed */
if (notif_ts || data_len) {
if ((err_info = sr_shmsub_data_open_remap(NULL, NULL, -1, shm_data_sub, orig_size +
(notif_ts ? sizeof notif_ts : 0) + data_len))) {
(notif_ts ? sizeof *notif_ts : 0) + data_len))) {
return err_info;
}

Expand All @@ -555,8 +555,8 @@ sr_shmsub_multi_notify_write_event(sr_multi_sub_shm_t *multi_sub_shm, uint32_t r
}
if (notif_ts) {
/* write notification timestamp */
memcpy(shm_data_ptr, &notif_ts, sizeof notif_ts);
shm_data_ptr += sizeof notif_ts;
memcpy(shm_data_ptr, notif_ts, sizeof *notif_ts);
shm_data_ptr += sizeof *notif_ts;
}
if (data && data_len) {
/* write any event data */
Expand Down Expand Up @@ -934,7 +934,7 @@ sr_shmsub_change_notify_update(struct sr_mod_info_s *mod_info, const char *orig_
mod->request_id = ++multi_sub_shm->request_id;
}
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, mod->request_id, cur_priority,
SR_SUB_EV_UPDATE, orig_name, orig_data, subscriber_count, &shm_data_sub, 0, diff_lyb, diff_lyb_len,
SR_SUB_EV_UPDATE, orig_name, orig_data, subscriber_count, &shm_data_sub, NULL, diff_lyb, diff_lyb_len,
mod->ly_mod->name))) {
goto cleanup_wrunlock;
}
Expand Down Expand Up @@ -1047,7 +1047,7 @@ sr_shmsub_change_notify_clear(struct sr_mod_info_s *mod_info)

/* clear it */
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, mod->request_id, multi_sub_shm->priority,
0, NULL, NULL, 0, NULL, 0, NULL, 0, NULL))) {
0, NULL, NULL, 0, NULL, NULL, NULL, 0, NULL))) {
goto cleanup_wrunlock;
}

Expand Down Expand Up @@ -1142,7 +1142,7 @@ sr_shmsub_change_notify_change(struct sr_mod_info_s *mod_info, const char *orig_
mod->request_id = ++multi_sub_shm->request_id;
}
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, mod->request_id, cur_priority,
SR_SUB_EV_CHANGE, orig_name, orig_data, subscriber_count, &shm_data_sub, 0, diff_lyb, diff_lyb_len,
SR_SUB_EV_CHANGE, orig_name, orig_data, subscriber_count, &shm_data_sub, NULL, diff_lyb, diff_lyb_len,
mod->ly_mod->name))) {
goto cleanup_wrunlock;
}
Expand Down Expand Up @@ -1258,7 +1258,7 @@ sr_shmsub_change_notify_change_done(struct sr_mod_info_s *mod_info, const char *
mod->request_id = ++multi_sub_shm->request_id;
}
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, mod->request_id, cur_priority,
SR_SUB_EV_DONE, orig_name, orig_data, subscriber_count, &shm_data_sub, 0, diff_lyb, diff_lyb_len,
SR_SUB_EV_DONE, orig_name, orig_data, subscriber_count, &shm_data_sub, NULL, diff_lyb, diff_lyb_len,
mod->ly_mod->name))) {
goto cleanup_wrunlock;
}
Expand Down Expand Up @@ -1360,7 +1360,7 @@ sr_shmsub_change_notify_change_abort(struct sr_mod_info_s *mod_info, const char
/* this must be the right subscription SHM */
assert(multi_sub_shm->request_id == mod->request_id);
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, mod->request_id, cur_priority, 0,
NULL, NULL, 0, &shm_data_sub, 0, NULL, 0, NULL))) {
NULL, NULL, 0, &shm_data_sub, NULL, NULL, 0, NULL))) {
goto cleanup_wrunlock;
}

Expand Down Expand Up @@ -1410,7 +1410,7 @@ sr_shmsub_change_notify_change_abort(struct sr_mod_info_s *mod_info, const char
do {
/* write "abort" event with the same LYB data trees */
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, mod->request_id, cur_priority,
SR_SUB_EV_ABORT, orig_name, orig_data, subscriber_count, &shm_data_sub, 0, diff_lyb, diff_lyb_len,
SR_SUB_EV_ABORT, orig_name, orig_data, subscriber_count, &shm_data_sub, NULL, diff_lyb, diff_lyb_len,
mod->ly_mod->name))) {
goto cleanup_wrunlock;
}
Expand Down Expand Up @@ -1798,7 +1798,7 @@ sr_shmsub_rpc_notify(sr_conn_ctx_t *conn, sr_rpc_t *shm_rpc, const char *op_path
*request_id = ++multi_sub_shm->request_id;
}
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, *request_id, cur_priority, SR_SUB_EV_RPC,
orig_name, orig_data, subscriber_count, &shm_data_sub, 0, input_lyb, input_lyb_len, op_path))) {
orig_name, orig_data, subscriber_count, &shm_data_sub, NULL, input_lyb, input_lyb_len, op_path))) {
goto cleanup_wrunlock;
}

Expand Down Expand Up @@ -1901,7 +1901,7 @@ sr_shmsub_rpc_notify_abort(sr_conn_ctx_t *conn, sr_rpc_t *shm_rpc, const char *o
/* clear the SHM */
assert(multi_sub_shm->event == SR_SUB_EV_ERROR);
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, request_id, cur_priority, 0, NULL, NULL, 0,
&shm_data_sub, 0, NULL, 0, NULL))) {
&shm_data_sub, NULL, NULL, 0, NULL))) {
goto cleanup_wrunlock;
}

Expand Down Expand Up @@ -1945,7 +1945,7 @@ sr_shmsub_rpc_notify_abort(sr_conn_ctx_t *conn, sr_rpc_t *shm_rpc, const char *o

/* write "abort" event with the same input */
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, request_id, cur_priority, SR_SUB_EV_ABORT,
orig_name, orig_data, subscriber_count, &shm_data_sub, 0, input_lyb, input_lyb_len, op_path))) {
orig_name, orig_data, subscriber_count, &shm_data_sub, NULL, input_lyb, input_lyb_len, op_path))) {
goto cleanup_wrunlock;
}

Expand Down Expand Up @@ -1988,7 +1988,7 @@ sr_shmsub_rpc_notify_abort(sr_conn_ctx_t *conn, sr_rpc_t *shm_rpc, const char *o
}

sr_error_info_t *
sr_shmsub_notif_notify(sr_conn_ctx_t *conn, const struct lyd_node *notif, time_t notif_ts, const char *orig_name,
sr_shmsub_notif_notify(sr_conn_ctx_t *conn, const struct lyd_node *notif, struct timespec notif_ts, const char *orig_name,
const void *orig_data, uint32_t timeout_ms, int wait)
{
sr_error_info_t *err_info = NULL, *cb_err_info = NULL;
Expand Down Expand Up @@ -2043,7 +2043,7 @@ sr_shmsub_notif_notify(sr_conn_ctx_t *conn, const struct lyd_node *notif, time_t
/* write the notification */
request_id = multi_sub_shm->request_id + 1;
if ((err_info = sr_shmsub_multi_notify_write_event(multi_sub_shm, request_id, 0, SR_SUB_EV_NOTIF, orig_name,
orig_data, notif_sub_count, &shm_data_sub, notif_ts, notif_lyb, notif_lyb_len, ly_mod->name))) {
orig_data, notif_sub_count, &shm_data_sub, &notif_ts, notif_lyb, notif_lyb_len, ly_mod->name))) {
goto cleanup_ext_sub_unlock;
}

Expand Down Expand Up @@ -3294,7 +3294,7 @@ sr_shmsub_notif_listen_process_module_events(struct modsub_notif_s *notif_subs,
uint32_t i, request_id;
struct lyd_node *notif = NULL, *notif_op;
struct ly_in *in = NULL;
time_t notif_ts;
struct timespec notif_ts;
char *shm_data_ptr;
sr_multi_sub_shm_t *multi_sub_shm;
sr_shm_t shm_data_sub = SR_SHM_INITIALIZER;
Expand Down Expand Up @@ -3326,7 +3326,7 @@ sr_shmsub_notif_listen_process_module_events(struct modsub_notif_s *notif_subs,
}

/* parse timestamp */
notif_ts = *(time_t *)shm_data_ptr;
memcpy(&notif_ts, shm_data_ptr, sizeof notif_ts);
shm_data_ptr += sizeof notif_ts;

/* parse notification */
Expand Down

0 comments on commit 7ba762f

Please sign in to comment.