Skip to content

Commit

Permalink
Merge pull request #218 from lukasmacko/op_data
Browse files Browse the repository at this point in the history
Operational data support in sr_get_item
  • Loading branch information
rastislavs committed Jul 7, 2016
2 parents 088834b + da5c55a commit ffd70d6
Show file tree
Hide file tree
Showing 14 changed files with 571 additions and 49 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ include(CheckStructHasMember)
CHECK_FUNCTION_EXISTS(pthread_rwlockattr_setkind_np HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP)
CHECK_FUNCTION_EXISTS(getpeereid HAVE_GETPEEREID)
CHECK_FUNCTION_EXISTS(getpeerucred HAVE_GETPEERUCRED)
CHECK_FUNCTION_EXISTS(pthread_mutex_timed_lock HAVE_TIMED_LOCK)
CHECK_FUNCTION_EXISTS(pthread_mutex_timedlock HAVE_TIMED_LOCK)
CHECK_INCLUDE_FILES(ucred.h HAVE_UCRED_H)
CHECK_FUNCTION_EXISTS(setfsuid HAVE_SETFSUID)
CHECK_STRUCT_HAS_MEMBER("struct stat" st_mtim "sys/stat.h" HAVE_STAT_ST_MTIM)
Expand Down
9 changes: 9 additions & 0 deletions inc/sysrepo.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ int sr_session_refresh(sr_session_ctx_t *session);
*/
int sr_session_switch_ds(sr_session_ctx_t *session, sr_datastore_t ds);

/**
* @brief Alter the session options.
*
* @param [in] session
* @param [in] opts
* @return Error code (SR_ERR_OK on success)
*/
int sr_session_set_options(sr_session_ctx_t *session, const sr_sess_options_t opts);

/**
* @brief Retrieves detailed information about the error that has occurred
* during the last operation executed within provided session.
Expand Down
35 changes: 35 additions & 0 deletions src/client_library.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,41 @@ sr_session_switch_ds(sr_session_ctx_t* session, sr_datastore_t datastore)
return cl_session_return(session, rc);
}

int
sr_session_set_options(sr_session_ctx_t *session, const sr_sess_options_t opts)
{
Sr__Msg *msg_req = NULL, *msg_resp = NULL;
int rc = SR_ERR_OK;

CHECK_NULL_ARG(session);
cl_session_clear_errors(session);

/* prepare session_set_opts message */
rc = sr_gpb_req_alloc(SR__OPERATION__SESSION_SET_OPTS, session->id, &msg_req);
CHECK_RC_MSG_GOTO(rc, cleanup, "Cannot allocate GPB message.");

msg_req->request->session_set_opts_req->options = opts;

/* send the request and receive the response */
rc = cl_request_process(session, msg_req, &msg_resp, SR__OPERATION__SESSION_SET_OPTS);
CHECK_RC_MSG_GOTO(rc, cleanup, "Error by processing of the request.");

sr__msg__free_unpacked(msg_req, NULL);
sr__msg__free_unpacked(msg_resp, NULL);

return cl_session_return(session, SR_ERR_OK);

cleanup:
if (NULL != msg_req) {
sr__msg__free_unpacked(msg_req, NULL);
}
if (NULL != msg_resp) {
sr__msg__free_unpacked(msg_resp, NULL);
}
return cl_session_return(session, rc);

}

int
sr_list_schemas(sr_session_ctx_t *session, sr_schema_t **schemas, size_t *schema_cnt)
{
Expand Down
20 changes: 20 additions & 0 deletions src/common/sr_protobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ sr_gpb_operation_name(Sr__Operation operation)
return "session-refresh";
case SR__OPERATION__SESSION_SWITCH_DS:
return "session-switch-ds";
case SR__OPERATION__SESSION_SET_OPTS:
return "session-set-opts";
case SR__OPERATION__LIST_SCHEMAS:
return "list-schemas";
case SR__OPERATION__GET_SCHEMA:
Expand Down Expand Up @@ -136,6 +138,12 @@ sr_gpb_req_alloc(const Sr__Operation operation, const uint32_t session_id, Sr__M
sr__session_switch_ds_req__init((Sr__SessionSwitchDsReq*)sub_msg);
req->session_switch_ds_req = (Sr__SessionSwitchDsReq*)sub_msg;
break;
case SR__OPERATION__SESSION_SET_OPTS:
sub_msg = calloc(1, sizeof(Sr__SessionSetOptsReq));
CHECK_NULL_NOMEM_GOTO(sub_msg, rc, error);
sr__session_set_opts_req__init((Sr__SessionSetOptsReq*)sub_msg);
req->session_set_opts_req = (Sr__SessionSetOptsReq*)sub_msg;
break;
case SR__OPERATION__LIST_SCHEMAS:
sub_msg = calloc(1, sizeof(Sr__ListSchemasReq));
CHECK_NULL_NOMEM_GOTO(sub_msg, rc, error);
Expand Down Expand Up @@ -328,6 +336,12 @@ sr_gpb_resp_alloc(const Sr__Operation operation, const uint32_t session_id, Sr__
sr__session_switch_ds_resp__init((Sr__SessionSwitchDsResp*)sub_msg);
resp->session_switch_ds_resp = (Sr__SessionSwitchDsResp*)sub_msg;
break;
case SR__OPERATION__SESSION_SET_OPTS:
sub_msg = calloc(1, sizeof(Sr__SessionSetOptsResp));
CHECK_NULL_NOMEM_GOTO(sub_msg, rc, error);
sr__session_set_opts_resp__init((Sr__SessionSetOptsResp*)sub_msg);
resp->session_set_opts_resp = (Sr__SessionSetOptsResp*)sub_msg;
break;
case SR__OPERATION__LIST_SCHEMAS:
sub_msg = calloc(1, sizeof(Sr__ListSchemasResp));
CHECK_NULL_NOMEM_GOTO(sub_msg, rc, error);
Expand Down Expand Up @@ -654,6 +668,9 @@ sr_gpb_msg_validate(const Sr__Msg *msg, const Sr__Msg__MsgType type, const Sr__O
case SR__OPERATION__SESSION_SWITCH_DS:
CHECK_NULL_RETURN(msg->request->session_switch_ds_req, SR_ERR_MALFORMED_MSG);
break;
case SR__OPERATION__SESSION_SET_OPTS:
CHECK_NULL_RETURN(msg->request->session_set_opts_req, SR_ERR_MALFORMED_MSG);
break;
case SR__OPERATION__LIST_SCHEMAS:
CHECK_NULL_RETURN(msg->request->list_schemas_req, SR_ERR_MALFORMED_MSG);
break;
Expand Down Expand Up @@ -739,6 +756,9 @@ sr_gpb_msg_validate(const Sr__Msg *msg, const Sr__Msg__MsgType type, const Sr__O
case SR__OPERATION__SESSION_SWITCH_DS:
CHECK_NULL_RETURN(msg->response->session_switch_ds_resp, SR_ERR_MALFORMED_MSG);
break;
case SR__OPERATION__SESSION_SET_OPTS:
CHECK_NULL_RETURN(msg->response->session_set_opts_resp, SR_ERR_MALFORMED_MSG);
break;
case SR__OPERATION__LIST_SCHEMAS:
CHECK_NULL_RETURN(msg->response->list_schemas_resp, SR_ERR_MALFORMED_MSG);
break;
Expand Down
113 changes: 98 additions & 15 deletions src/request_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ rp_feature_enable_req_process(const rp_ctx_t *rp_ctx, const rp_session_t *sessio
* @brief Processes a get_item request.
*/
static int
rp_get_item_req_process(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg)
rp_get_item_req_process(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg, bool *skip_msg_cleanup)
{
int rc = SR_ERR_OK;

Expand All @@ -315,10 +315,7 @@ rp_get_item_req_process(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg)

Sr__Msg *resp = NULL;
rc = sr_gpb_resp_alloc(SR__OPERATION__GET_ITEM, session->id, &resp);
if (SR_ERR_OK != rc) {
SR_LOG_ERR_MSG("Memory allocation failed");
return SR_ERR_NOMEM;
}
CHECK_RC_MSG_RETURN(rc, "Gpb response allocation failed");

sr_val_t *value = NULL;
char *xpath = msg->request->get_item_req->xpath;
Expand All @@ -328,16 +325,45 @@ rp_get_item_req_process(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg)
CHECK_RC_MSG_GOTO(rc, cleanup, "Check notif session failed");
}

MUTEX_LOCK_TIMED_CHECK_GOTO(&session->cur_req_mutex, rc, cleanup);
if (RP_REQ_FINISHED == session->state) {
session->state = RP_REQ_NEW;
} else if (RP_REQ_WAITING_FOR_DATA == session->state) {
if (msg == session->req) {
SR_LOG_ERR("Time out waiting for operational data expired before all responses have been received, session id = %u", session->id);
} else {
SR_LOG_ERR("A request was not processed, probably invalid state, session id = %u", session->id);
sr__msg__free_unpacked(session->req, NULL);
session->state = RP_REQ_NEW;
}
}
/* we do not need to keep the pointer to the request */
session->req = NULL;

/* get value from data manager */
rc = rp_dt_get_value_wrapper(rp_ctx, session, xpath, &value);
if (SR_ERR_OK != rc && SR_ERR_NOT_FOUND != rc) {
SR_LOG_ERR("Get item failed for '%s', session id=%"PRIu32".", xpath, session->id);
}

if (RP_REQ_WAITING_FOR_DATA == session->state) {
SR_LOG_DBG_MSG("Request paused, waiting for data");
/* we are waiting for operational data do not free the request */
*skip_msg_cleanup = true;
/* save message */
session->req = msg;
//TODO: setup timeout
sr__msg__free_unpacked(resp, NULL);
pthread_mutex_unlock(&session->cur_req_mutex);
return rc;
}

pthread_mutex_unlock(&session->cur_req_mutex);

/* copy value to gpb */
if (SR_ERR_OK == rc) {
rc = sr_dup_val_t_to_gpb(value, &resp->response->get_item_resp->value);
if (SR_ERR_OK != rc){
if (SR_ERR_OK != rc) {
SR_LOG_ERR("Copying sr_val_t to gpb failed for xpath '%s'", xpath);
}
}
Expand Down Expand Up @@ -796,6 +822,42 @@ rp_switch_datastore_req_process(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg
return rc;
}

static int
rp_session_set_opts(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg)
{
Sr__Msg *resp = NULL;
int rc = SR_ERR_OK;

CHECK_NULL_ARG5(rp_ctx, session, msg, msg->request, msg->request->session_set_opts_req);

SR_LOG_DBG_MSG("Procession session set opts request.");

/* allocate the response */
rc = sr_gpb_resp_alloc(SR__OPERATION__SESSION_SET_OPTS, session->id, &resp);
if (SR_ERR_OK != rc) {
SR_LOG_ERR_MSG("Allocation of session_set_opts response failed.");
return SR_ERR_NOMEM;
}

/* white list options that can be set */
uint32_t mutable_opts = SR_SESS_CONFIG_ONLY;

session->options = msg->request->session_set_opts_req->options & mutable_opts;

/* set response code */
resp->response->result = rc;

rc = rp_resp_fill_errors(resp, session->dm_session);
if (SR_ERR_OK != rc) {
SR_LOG_ERR_MSG("Copying errors to gpb failed");
}

/* send the response */
rc = cm_msg_send(rp_ctx->cm_ctx, resp);

return rc;
}

/**
* @brief Processes a lock request.
*/
Expand Down Expand Up @@ -1193,30 +1255,43 @@ rp_rpc_req_process(const rp_ctx_t *rp_ctx, const rp_session_t *session, Sr__Msg

return rc;
}

/**
* @brief Processes an operational data provider response.
*/
static int
rp_data_provide_resp_process(const rp_ctx_t *rp_ctx, const rp_session_t *session, Sr__Msg *msg)
rp_data_provide_resp_process(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg)
{
sr_val_t *values = NULL;
size_t values_cnt = 0;
int rc = SR_ERR_OK;

CHECK_NULL_ARG5(rp_ctx, session, msg, msg->response, msg->response->data_provide_resp);

/* copy values fom GPB to sysrepo */
rc = sr_values_gpb_to_sr(msg->response->data_provide_resp->values, msg->response->data_provide_resp->n_values,
/* copy values from GPB to sysrepo */
rc = sr_values_gpb_to_sr(msg->response->data_provide_resp->values, msg->response->data_provide_resp->n_values,
&values, &values_cnt);
CHECK_RC_MSG_GOTO(rc, cleanup, "Failed to transform gpb to sr_val_t");

if (SR_ERR_OK == rc) {
// TODO: process the operational data
for (size_t i = 0; i < values_cnt; i++) {
printf("%s = %s\n", values[i].xpath, values[i].data.string_val);
MUTEX_LOCK_TIMED_CHECK_GOTO(&session->cur_req_mutex, rc, cleanup);
for (size_t i = 0; i < values_cnt; i++) {
SR_LOG_DBG("Received value for data provider for xpath %s \n", values[i].xpath);
rc = rp_dt_set_item(rp_ctx->dm_ctx, session->dm_session, values[i].xpath, SR_EDIT_DEFAULT, &values[i]);
if (SR_ERR_OK != rc) {
//TODO: maybe validate if this path corresponds to the operational data
SR_LOG_WRN("Failed to set operational data for xpath %s", values[i].xpath);
}
}
//TODO: generate request asking for nested data
session->dp_req_waiting -= 1;
if (0 == session->dp_req_waiting) {
SR_LOG_DBG("All data from data providers has been received session id = %u, reenque the request", session->id);
//TODO validate data
session->state = RP_REQ_DATA_LOADED;
rp_msg_process(rp_ctx, session, session->req);
}
pthread_mutex_unlock(&session->cur_req_mutex);

cleanup:
sr_free_values(values, values_cnt);

return rc;
Expand Down Expand Up @@ -1367,6 +1442,9 @@ rp_req_dispatch(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg, bool *ski
case SR__OPERATION__SESSION_SWITCH_DS:
rc = rp_switch_datastore_req_process(rp_ctx, session, msg);
break;
case SR__OPERATION__SESSION_SET_OPTS:
rc = rp_session_set_opts(rp_ctx, session, msg);
break;
case SR__OPERATION__LIST_SCHEMAS:
rc = rp_list_schemas_req_process(rp_ctx, session, msg);
break;
Expand All @@ -1380,7 +1458,7 @@ rp_req_dispatch(rp_ctx_t *rp_ctx, rp_session_t *session, Sr__Msg *msg, bool *ski
rc = rp_feature_enable_req_process(rp_ctx, session, msg);
break;
case SR__OPERATION__GET_ITEM:
rc = rp_get_item_req_process(rp_ctx, session, msg);
rc = rp_get_item_req_process(rp_ctx, session, msg, skip_msg_cleanup);
break;
case SR__OPERATION__GET_ITEMS:
rc = rp_get_items_req_process(rp_ctx, session, msg);
Expand Down Expand Up @@ -1594,7 +1672,11 @@ rp_session_cleanup(const rp_ctx_t *rp_ctx, rp_session_t *session)
ly_set_free(session->get_items_ctx.nodes);
free(session->get_items_ctx.xpath);
pthread_mutex_destroy(&session->msg_count_mutex);
pthread_mutex_destroy(&session->cur_req_mutex);
free(session->change_ctx.xpath);
if (NULL != session->req) {
sr__msg__free_unpacked(session->req, NULL);
}
free(session);

return SR_ERR_OK;
Expand Down Expand Up @@ -1854,6 +1936,7 @@ rp_session_start(const rp_ctx_t *rp_ctx, const uint32_t session_id, const ac_ucr
session->datastore = datastore;
session->options = session_options;
session->commit_id = commit_id;
pthread_mutex_init(&session->cur_req_mutex, NULL);

rc = ac_session_init(rp_ctx->ac_ctx, user_credentials, &session->ac_session);
if (SR_ERR_OK != rc) {
Expand Down
Loading

0 comments on commit ffd70d6

Please sign in to comment.