Skip to content

Commit

Permalink
sensing: implement sensor post data processing
Browse files Browse the repository at this point in the history
1) enhance sensor data process, check time, sensitivity condition
2) send data to its client
3) create dispatch thread to process sensor data from runtime module

Signed-off-by: Guangfu Hu <guangfu.hu@intel.com>
  • Loading branch information
ghu0510 committed Jun 28, 2023
1 parent 012f221 commit 7649e24
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 16 deletions.
38 changes: 38 additions & 0 deletions subsys/sensing/Kconfig
Expand Up @@ -5,6 +5,7 @@ config SENSING
bool "Sensing Subsystem"
default y
depends on DT_HAS_ZEPHYR_SENSING_ENABLED
select RING_BUFFER
help
Enable Sensing Subsystem.

Expand All @@ -24,6 +25,25 @@ config SENSING_MAX_SENSITIVITY_COUNT
So, maximum sensitivity count is needed for sensors
Typical values are 6

config SENSING_RING_BUF_SIZE
int "ring buf size to store sensor data"
depends on SENSING
default 16384
help
This is the ring buffer to store sensor data that
will be posted to application
Typical values are 16 * 1024

config SENSING_MAX_SENSOR_DATA_SIZE
int "maximum sensor data size"
depends on SENSING
default 64
help
This is the maximum sensor data size sensing subsystem could support
when sensor is posting data to client, sensing subsystem would
get no more than SENSING_MAX_SENSOR_DATA_SIZE sensor count from ring buffer
Typical values are 64

config SENSING_RUNTIME_THREAD_STACK_SIZE
int "stack size for sensing subsystem runtime thread"
depends on SENSING
Expand All @@ -32,6 +52,14 @@ config SENSING_RUNTIME_THREAD_STACK_SIZE
This is the stack size for sensing subsystem runtime thread
Typical values are 4096

config SENSING_DISPATCH_THREAD_STACK_SIZE
int "stack size for sensor dispatch thread"
depends on SENSING
default 1024
help
This is the stack size for sensor dispatch thread
Typical values are 1024

config SENSING_RUNTIME_THREAD_PRIORITY
int "priority for sensing subsystem runtime thread"
depends on SENSING
Expand All @@ -47,6 +75,16 @@ config SENSING_RUNTIME_THREAD_PRIORITY
data, then ring buf will always be put into data until overflow.
Typical values are 9

config SENSING_DISPATCH_THREAD_PRIORITY
int "priority for sensor dispatch thread"
depends on SENSING
default 8
help
This is the thread priority for sensing subsystem dispatch thread
Ring buffer data should be fetched ASAP, so Dispatch
thread priority should be higher than runtime thread
Typical values are 8

source "subsys/sensing/sensor/phy_3d_sensor/Kconfig"
source "subsys/sensing/sensor/hinge_angle/Kconfig"

Expand Down
117 changes: 108 additions & 9 deletions subsys/sensing/runtime.c
Expand Up @@ -12,6 +12,86 @@
LOG_MODULE_DECLARE(sensing, CONFIG_SENSING_LOG_LEVEL);


static int fetch_data_and_dispatch(struct sensing_context *ctx)
{
struct sensing_connection *conn = NULL;
uint8_t buf[CONFIG_SENSING_MAX_SENSOR_DATA_SIZE];
uint32_t wanted_size = sizeof(sensing_sensor_handle_t);
uint32_t ret_size, rd_size = 0;
uint16_t sample_size = 0;
int ret = 0;

while ((ret_size = ring_buf_get(&ctx->sensor_ring_buf, buf + rd_size, wanted_size)) > 0) {
rd_size += ret_size;

if (rd_size == sizeof(sensing_sensor_handle_t)) {
/* read sensing_sensor_handle_t handle first */
conn = (struct sensing_connection *)(*(int32_t *)buf);
if (!conn || !conn->source) {
LOG_ERR("fetch data and dispatch, connection or reporter is NULL");
ret = -EINVAL;
break;
}
sample_size = conn->source->sample_size;

__ASSERT(sample_size + sizeof(sensing_sensor_handle_t)
<= CONFIG_SENSING_MAX_SENSOR_DATA_SIZE,
"invalid sample size:%d", sample_size);
/* get sample_size from connection, then read sensor data from ring buf */
wanted_size = sample_size;
} else if (rd_size == sizeof(sensing_sensor_handle_t) + wanted_size) {
/* read next sample header */
wanted_size = sizeof(sensing_sensor_handle_t);
rd_size = 0;
if (!conn->data_evt_cb) {
LOG_WRN("sensor:%s event callback not registered",
conn->source->dev->name);
continue;
}

conn->data_evt_cb(conn, (const void*)buf);

Check failure on line 52 in subsys/sensing/runtime.c

View workflow job for this annotation

GitHub Actions / Run compliance checks on patch series (PR)

POINTER_LOCATION

subsys/sensing/runtime.c:52 "(foo*)" should be "(foo *)"
} else {
LOG_ERR("fetch data and dispatch, invalid ret_size:%d, rd_size:%d",
ret_size, rd_size);
ret = -EINVAL;
}
}

if (ret_size == 0 && wanted_size != sizeof(sensing_sensor_handle_t)) {
LOG_ERR("fetch data and dispatch, ret_size:%d, wanted_size:%d not expected:%d",
ret_size, wanted_size, sizeof(sensing_sensor_handle_t));
ret = -EINVAL;
__ASSERT(wanted_size, "wanted_size:%d", wanted_size);
}

return ret;
}


static void add_data_to_sensor_ring_buf(struct sensing_context *ctx,
struct sensing_sensor *sensor,
sensing_sensor_handle_t handle)
{
uint8_t data[CONFIG_SENSING_MAX_SENSOR_DATA_SIZE];
uint32_t size;

if (ring_buf_space_get(&ctx->sensor_ring_buf) < sizeof(void *) + sensor->sample_size) {
LOG_WRN("ring buffer will overflow, ignore the coming data");
return;
}
__ASSERT(sizeof(struct sensing_connection *) + sensor->sample_size
<= CONFIG_SENSING_MAX_SENSOR_DATA_SIZE,
"sample_size:%d is too large, should enlarge SENSING_MAX_SENSOR_DATA_SIZE:%d",
sensor->sample_size, CONFIG_SENSING_MAX_SENSOR_DATA_SIZE);

memcpy(data, &handle, sizeof(sensing_sensor_handle_t));
memcpy(data + sizeof(handle), sensor->data_buf, sensor->sample_size);
size = ring_buf_put(&ctx->sensor_ring_buf, data, sizeof(handle) + sensor->sample_size);
__ASSERT(size == sizeof(handle) + sensor->sample_size,
"sample size:%d put to ring buf is not expected: %d",
size, sizeof(handle) + sensor->sample_size);
}

/* check whether sensor need to poll data or not, if polling data is needed, update execute time
* when time arriving
*/
Expand Down Expand Up @@ -43,7 +123,7 @@ static bool sensor_need_poll(struct sensing_sensor *sensor, uint64_t cur_us)
sensor->next_exec_time += sensor->interval;
}

LOG_INF("sensor:%s, need_poll:%u, cur:%llu, next_exec_time:%llu, mode:%d",
LOG_DBG("sensor:%s need poll:%d, cur:%llu, next_exec_time:%llu, mode:%d",
sensor->dev->name, poll, cur_us, sensor->next_exec_time, sensor->mode);

return poll;
Expand Down Expand Up @@ -85,7 +165,7 @@ static int virtual_sensor_process_data(struct sensing_sensor *sensor)
if (!conn->new_data_arrive) {
continue;
}
LOG_INF("virtual sensor proc data, index:%d, sensor:%s, sample_size:%d",
LOG_DBG("virtual sensor proc data, index:%d, sensor:%s, sample_size:%d",
i, sensor->dev->name, sensor->sample_size);

ret |= sensor_api->process(sensor->dev, conn, conn->data, sensor->sample_size);
Expand All @@ -110,7 +190,7 @@ static int process_streaming_data(struct sensing_sensor *sensor, uint64_t cur_us
*/
next_time = (*sample_time == 0 ? cur_us : MIN(cur_us, *sample_time + sensor->interval));

LOG_INF("proc stream data, sensor:%s, cur:%lld, sample:%lld, ri:%d(us), next:%lld",
LOG_DBG("proc stream data, sensor:%s, cur:%lld, sample_time:%lld, ri:%d(us), next:%lld",
sensor->dev->name, cur_us, *sample_time, sensor->interval, next_time);

sensor_api = sensor->dev->api;
Expand Down Expand Up @@ -157,13 +237,13 @@ static bool sensor_test_consume_time(struct sensing_sensor *sensor,
{
uint64_t time = ((struct sensing_sensor_value_header *)sensor->data_buf)->base_timestamp;

LOG_INF("sensor:%s next_consume_time:%lld sample_time:%lld, cur_time:%lld",
LOG_DBG("sensor:%s next_consume_time:%lld sample_time:%lld, cur_time:%lld",
sensor->dev->name, conn->next_consume_time, time, cur_time);

if (conn->next_consume_time <= time)
return true;

LOG_INF("sensor:%s data not ready, next_consume_time:%lld sample_time:%lld, cur_time:%lld",
LOG_DBG("sensor:%s data not ready, next_consume_time:%lld sample_time:%lld, cur_time:%lld",
sensor->dev->name, conn->next_consume_time, time, cur_time);

return false;
Expand Down Expand Up @@ -211,6 +291,7 @@ static int sensor_sensitivity_test(struct sensing_sensor *sensor,
ret |= sensor_api->sensitivity_test(sensor->dev, i, sensor->sensitivity[i],
last_sample, sensor->sample_size, cur_sample, sensor->sample_size);
}
LOG_INF("sensor:%s sensitivity test, ret:%d", sensor->dev->name, ret);

return ret;
}
Expand Down Expand Up @@ -254,7 +335,8 @@ static int send_data_to_clients(struct sensing_context *ctx,

for_each_client_conn(sensor, conn) {
client = conn->sink;
LOG_INF("sensor:%s send data to client", conn->source->dev->name);
LOG_DBG("sensor:%s send data to client:%p", conn->source->dev->name, conn);

if (!is_client_request_data(conn)) {
continue;
}
Expand Down Expand Up @@ -294,13 +376,14 @@ static int send_data_to_clients(struct sensing_context *ctx,
client->next_exec_time = EXEC_TIME_INIT;
}
} else {
// add_data_to_sensor_ring_buf(ctx, sensor, conn);
add_data_to_sensor_ring_buf(ctx, sensor, conn);
ctx->data_to_ring_buf = true;
}
}

/* notify dispatch thread to dispatch data to application */
if (ctx->data_to_ring_buf) {
k_sem_give(&ctx->dispatch_sem);
ctx->data_to_ring_buf = false;
}

Expand Down Expand Up @@ -334,9 +417,10 @@ static int calc_sleep_time(struct sensing_context *ctx, uint64_t cur_us)

next_poll_time = calc_next_poll_time(ctx);
if (next_poll_time == EXEC_TIME_OFF) {
/* no sampling request. sleep forever */
/* no sampling requested, sleep forever */
sleep_time = UINT32_MAX;
} else if (next_poll_time <= cur_us) {
/* next polling time is no more than current time, no sleep at all */
sleep_time = 0;
} else {
sleep_time = (uint32_t)((next_poll_time - cur_us) / USEC_PER_MSEC);
Expand All @@ -355,7 +439,7 @@ int loop_sensors(struct sensing_context *ctx)
int i = 0, ret = 0;

cur_us = get_us();
LOG_INF("loop sensors, cur_us:%lld(us)", cur_us);
LOG_DBG("loop sensors, cur_us:%lld(us)", cur_us);

for_each_sensor(ctx, i, sensor) {
if (!sensor_need_exec(sensor, cur_us)) {
Expand All @@ -373,3 +457,18 @@ int loop_sensors(struct sensing_context *ctx)

return calc_sleep_time(ctx, cur_us);
}


void sensing_dispatch_thread(void *p1, void *p2, void *p3)
{
struct sensing_context *ctx = p1;

LOG_INF("sensing dispatch thread start...");

do {
k_sem_take(&ctx->dispatch_sem, K_FOREVER);

fetch_data_and_dispatch(ctx);
} while (1);
}

34 changes: 27 additions & 7 deletions subsys/sensing/sensor_mgmt.c
Expand Up @@ -25,6 +25,7 @@ DT_FOREACH_CHILD_STATUS_OKAY(DT_DRV_INST(0), SENSING_SENSOR_INFO_DEFINE)
DT_FOREACH_CHILD_STATUS_OKAY(DT_DRV_INST(0), SENSING_SENSOR_DEFINE)

K_THREAD_STACK_DEFINE(runtime_stack, CONFIG_SENSING_RUNTIME_THREAD_STACK_SIZE);
K_THREAD_STACK_DEFINE(dispatch_stack, CONFIG_SENSING_DISPATCH_THREAD_STACK_SIZE);


struct sensing_sensor *sensors[SENSING_SENSOR_NUM];
Expand All @@ -44,7 +45,7 @@ static uint32_t arbitrate_interval(struct sensing_sensor *sensor)

/* search from all clients, arbitrate the interval */
for_each_client_conn(sensor, conn) {
LOG_DBG("arbitrate interval, sensor:%s for each conn:%p, interval:%d(us)",
LOG_INF("arbitrate interval, sensor:%s for each conn:%p, interval:%d(us)",
sensor->dev->name, conn, conn->interval);
if (!is_client_request_data(conn)) {
continue;
Expand Down Expand Up @@ -215,14 +216,16 @@ static void sensing_runtime_thread(void *p1, void *p2, void *p3)
do {
sleep_time = loop_sensors(ctx);

LOG_INF("sensing runtime thread, sleep_time:%d(ms)", sleep_time);

ret = k_sem_take(&ctx->event_sem, calc_timeout(sleep_time));
if (!ret) {
if (atomic_test_and_clear_bit(&ctx->event_flag, EVENT_CONFIG_READY)) {
LOG_INF("runtime thread triggered by event_config ready");
LOG_INF("runtime thread triggered by EVENT_CONFIG_READY");
sensor_later_config(ctx);
}
if (atomic_test_and_clear_bit(&ctx->event_flag, EVENT_DATA_READY)) {
LOG_INF("runtime thread, event_data ready");
LOG_INF("runtime thread triggered by EVENT_DATA_READY");
}
}
} while (1);
Expand Down Expand Up @@ -296,8 +299,8 @@ static int init_sensor(struct sensing_sensor *sensor, int conns_num)

init_connection(conn, reporter, sensor);

LOG_DBG("init sensor, reporter:%s, client:%s, connection:%d",
reporter->dev->name, sensor->dev->name, i);
LOG_INF("init sensor, reporter:%s, client:%s, connection:%d(%p)",
reporter->dev->name, sensor->dev->name, i, conn);

tmp_conns[i] = conn;
}
Expand Down Expand Up @@ -427,8 +430,7 @@ static int sensing_init(void)
}

k_sem_init(&ctx->event_sem, 0, 1);

ctx->sensing_initialized = true;
k_sem_init(&ctx->dispatch_sem, 0, 1);

/* sensing subsystem runtime thread: sensor scheduling and sensor data processing */
ctx->runtime_id = k_thread_create(&ctx->runtime_thread, runtime_stack,
Expand All @@ -440,6 +442,20 @@ static int sensing_init(void)
return -EAGAIN;
}

/* sensor dispatch thread: get sensor data from senss and dispatch data */
ctx->dispatch_id = k_thread_create(&ctx->dispatch_thread, dispatch_stack,
CONFIG_SENSING_DISPATCH_THREAD_STACK_SIZE,
(k_thread_entry_t) sensing_dispatch_thread, ctx, NULL, NULL,
CONFIG_SENSING_DISPATCH_THREAD_PRIORITY, 0, K_NO_WAIT);
if (!ctx->dispatch_id) {
LOG_ERR("create dispatch thread error");
return -EAGAIN;
}

ring_buf_init(&ctx->sensor_ring_buf, sizeof(ctx->buf), ctx->buf);

ctx->sensing_initialized = true;

return ret;
}

Expand Down Expand Up @@ -517,6 +533,10 @@ int set_interval(struct sensing_connection *conn, uint32_t interval)
}

conn->interval = interval;
conn->next_consume_time = EXEC_TIME_INIT;

LOG_INF("set interval, sensor:%s, conn:%p, interval:%d",
conn->source->dev->name, conn, interval);

save_config_and_notify(conn->source);

Expand Down
7 changes: 7 additions & 0 deletions subsys/sensing/sensor_mgmt.h
Expand Up @@ -11,6 +11,7 @@
#include <zephyr/sensing/sensing_sensor.h>
#include <zephyr/kernel.h>
#include <zephyr/sys/slist.h>
#include <zephyr/sys/ring_buffer.h>
#include <string.h>

#ifdef __cplusplus
Expand Down Expand Up @@ -141,10 +142,15 @@ struct sensing_context {
int sensor_num;
struct sensing_sensor **sensors;
struct k_thread runtime_thread;
struct k_thread dispatch_thread;
k_tid_t runtime_id;
k_tid_t dispatch_id;
struct k_sem event_sem;
struct k_sem dispatch_sem;
atomic_t event_flag;
bool data_to_ring_buf;
struct ring_buf sensor_ring_buf;
uint8_t buf[CONFIG_SENSING_RING_BUF_SIZE];
};

int open_sensor(struct sensing_sensor *sensor, struct sensing_connection **conn);
Expand All @@ -156,6 +162,7 @@ int get_interval(struct sensing_connection *con, uint32_t *sensitivity);
int set_sensitivity(struct sensing_connection *conn, int8_t index, uint32_t interval);
int get_sensitivity(struct sensing_connection *con, int8_t index, uint32_t *sensitivity);
int loop_sensors(struct sensing_context *ctx);
void sensing_dispatch_thread(void *p1, void *p2, void *p3);
struct sensing_context *get_sensing_ctx(void);


Expand Down

0 comments on commit 7649e24

Please sign in to comment.