Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Omwappi 1545 x dial server patches 2401 sprint #106

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion server/gdial-app.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,19 @@ GDialApp *gdial_app_new(const gchar *app_name) {
return app;
};

int gdial_get_wait_for_rtremote_state_response();

GDialAppError gdial_app_start(GDialApp *app, const gchar *payload, const gchar *query, const gchar *additional_data_url, gpointer state_cb_data) {
g_return_val_if_fail (GDIAL_IS_APP (app), GDIAL_APP_ERROR_BAD_REQUEST);

GDialAppPrivate *priv = gdial_app_get_instance_private(app);
priv->state_cb_data = state_cb_data;
GDialAppError app_err = gdial_plat_application_start(app->name, payload, query, additional_data_url, &app->instance_id);
if (app_err == GDIAL_APP_ERROR_NONE || (strcmp("system", app->name) != 0 && app->instance_id != GDIAL_APP_INSTANCE_NONE)) {
gdial_plat_application_state_async(app->name, app->instance_id, app);
// don't need to ask for state asynchronously if we're configured to refresh the state at each query anyway
if (gdial_get_wait_for_rtremote_state_response() < 1) {
gdial_plat_application_state_async(app->name, app->instance_id, app);
}
app_err = gdial_plat_application_state(app->name, app->instance_id, &app->state);
g_warn_if_fail(app->state == GDIAL_APP_STATE_RUNNING);
}
Expand Down
65 changes: 47 additions & 18 deletions server/gdial-rest.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,40 @@ static void gdial_rest_server_handle_DELETE(SoupMessage *msg, GHashTable *query,
g_object_unref(app);
}

int gdial_get_wait_for_rtremote_state_response() {
static int xdial_is_waiting_rtremote_state_response = -1;
if (xdial_is_waiting_rtremote_state_response == -1) {
xdial_is_waiting_rtremote_state_response = getenv("XDIAL_WAIT_FOR_RTREMOTE_STATE_RESPONSE_MS") && atoi(getenv("XDIAL_WAIT_FOR_RTREMOTE_STATE_RESPONSE_MS")) > 0;
}
return xdial_is_waiting_rtremote_state_response;
}

// returns TRUE if new application instance has been created; false otherwise
static gboolean refresh_app_state(const gchar *app_name) {
const int xdial_is_waiting_rtremote_state_response = gdial_get_wait_for_rtremote_state_response();
// trying to refresh the app state only makes sense if xdial is configured to wait for rt state responses
if (xdial_is_waiting_rtremote_state_response > 0) {
GDialApp *app = gdial_app_find_instance_by_name(app_name);
if (!app) {
printf("%s:%d app instance for '%s' not found, checking remote state\n", __FUNCTION__, __LINE__, app_name);
// try to fetch the remote state, maybe the app was started externally
GDialAppState state;
if (GDIAL_APP_ERROR_NONE == gdial_plat_application_state(app_name, 0, &state)) {
printf("%s:%d app instance for: '%s' remote state returned: %d; creating a new instance\n", __FUNCTION__, __LINE__, app_name, state);
// create app instance
app = gdial_app_new(app_name);
app->state = state;
return TRUE;
} else {
printf("%s:%d app instance for: '%s' no remote state returned\n", __FUNCTION__, __LINE__, app_name);
}
} else {
gdial_app_state(app);
}
}
return FALSE;
}

static void gdial_rest_server_handle_POST(GDialRestServer *gdial_rest_server, SoupMessage* msg, GHashTable *query, const gchar *app_name) {
GDialAppRegistry *app_registry = gdial_rest_server_find_app_registry(gdial_rest_server, app_name);
gdial_rest_server_http_return_if_fail(app_registry, msg, SOUP_STATUS_NOT_FOUND);
Expand All @@ -394,26 +428,18 @@ static void gdial_rest_server_handle_POST(GDialRestServer *gdial_rest_server, So
gdial_rest_server_http_return_if_fail(listening_port != 0, msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);

g_printerr("Starting the app with payload %.*s\n", (int)msg->request_body->length, msg->request_body->data);
refresh_app_state(app_registry->name);
GDialApp *app = gdial_app_find_instance_by_name(app_registry->name);
gboolean new_app_instance = FALSE;
gboolean first_instance_created = FALSE;
GDialAppState current_state = GDIAL_APP_STATE_STOPPED;

if (app != NULL && app_registry->is_singleton) {
/*
* Reuse app instance as is, but do not update refcnt
* per DIAL 2.1 recommendation, push relaunch decision to application platform,
*/
g_printerr("POST request received for running app [%s]\r\n", app->name);
new_app_instance = TRUE;
first_instance_created = FALSE;
current_state = GDIAL_APP_GET_STATE(app);
}
else {
if (!app) {
app = gdial_app_new(app_registry->name);
new_app_instance = TRUE;
first_instance_created = TRUE;
refresh_app_state(app);
}
// new_app_instance is always effectively TRUE, due to some accumulated changes to gdial_rest_server_handle_POST logic
const gboolean new_app_instance = TRUE;
GDialAppState current_state = GDIAL_APP_GET_STATE(app);
// first_instance_created value determines if we're going to get 201(created) or 200(ok) response
// we want to return 201 in case the app state is 'Not running or hidden' and 200 in case the app is 'starting' or 'running' (from dial 2.2.1 spec)
gboolean first_instance_created = current_state == GDIAL_APP_STATE_STOPPED || current_state == GDIAL_APP_STATE_HIDE;

GDialAppError start_error = GDIAL_APP_ERROR_NONE;

Expand Down Expand Up @@ -589,11 +615,13 @@ static void gdial_rest_server_handle_POST_dial_data(GDialRestServer *gdial_rest_
/*
* Cache dial_data so as to use on future queries.
*/
refresh_app_state(app_name);
GDialApp *app = gdial_app_find_instance_by_name(app_name);
if(app == NULL)
{
g_print("gdial_rest_server_handle_POST_dial_data creating app instance \n");
app = gdial_app_new(app_name);
refresh_app_state(app_name);
}
gdial_rest_server_http_return_if_fail(app, msg, SOUP_STATUS_NOT_FOUND);
/*
Expand Down Expand Up @@ -868,6 +896,7 @@ static void gdial_rest_http_server_apps_callback(SoupServer *server,
gdial_rest_server_handle_OPTIONS(msg, "DELETE, OPTIONS");
}
else if (msg->method == SOUP_METHOD_DELETE) {
refresh_app_state(app_name);
GDialApp *app = gdial_app_find_instance_by_name(app_name);
GDialApp *app_by_instance = gdial_rest_server_check_instance(app, instance);
if (app_by_instance) {
Expand All @@ -894,7 +923,7 @@ static void gdial_rest_http_server_apps_callback(SoupServer *server,
gdial_rest_server_handle_OPTIONS(msg, "POST, OPTIONS");
}
else if (msg->method == SOUP_METHOD_POST) {

refresh_app_state(app_name);
GDialApp *app = gdial_app_find_instance_by_name(app_name);
GDialApp *app_by_instance = gdial_rest_server_check_instance(app, instance);
if (app_by_instance) {
Expand Down
38 changes: 38 additions & 0 deletions server/plat/rtcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ void rtAppStatusCache :: setAppCacheId(const char *app_name,std::string id)

rtError rtAppStatusCache::UpdateAppStatusCache(rtValue app_status)
{
const auto now = std::chrono::steady_clock::now();
printf("RTCACHE : %s\n",__FUNCTION__);

rtError err;
Expand All @@ -76,9 +77,34 @@ rtError rtAppStatusCache::UpdateAppStatusCache(rtValue app_status)
}

err = ObjectCache->insert(id,temp);
notifyStateChanged(App_name);
if (err == RT_OK) {
err = ObjectCache->markUnevictable(id, true);
last_updated[id] = now;
}
return err;
}

rtAppStatusCache::StateChangedCallbackHandle rtAppStatusCache::registerStateChangedCallback(StateChangedCallback callback) {
std::unique_lock<std::mutex> lock(state_changed_listeners_mutex);
const auto handle = ++next_handle;
state_changed_listeners[handle] = callback;
return handle;
}

void rtAppStatusCache::unregisterStateChangedCallback(rtAppStatusCache::StateChangedCallbackHandle callbackId) {
std::unique_lock<std::mutex> lock(state_changed_listeners_mutex);
state_changed_listeners.erase(callbackId);
}

void rtAppStatusCache::notifyStateChanged(std::string& id) {
std::unique_lock<std::mutex> lock(state_changed_listeners_mutex);
for (auto& it : state_changed_listeners) {
it.second(id);
}
}


std::string rtAppStatusCache::SearchAppStatusInCache(const char *app_name)
{
printf("RTCACHE : %s\n",__FUNCTION__);
Expand Down Expand Up @@ -110,3 +136,15 @@ bool rtAppStatusCache::doIdExist(std::string id)
printf("True\n");
return true;
}

std::chrono::milliseconds rtAppStatusCache::getUpdateAge(const char *app_name)
{
const auto now = std::chrono::steady_clock::now();
std::string id = getAppCacheId(app_name);
auto it = last_updated.find(id);
if (it != last_updated.end()) {
return std::chrono::duration_cast<std::chrono::milliseconds>(now - it->second);
} else {
return std::chrono::milliseconds::max();
}
}
20 changes: 20 additions & 0 deletions server/plat/rtcache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,19 @@
#include <stdbool.h>
#include <string>

#include <functional>
#include <mutex>
#include <map>

using namespace std;

class rtAppStatusCache : public rtObject
{
public:

using StateChangedCallbackHandle = size_t;
using StateChangedCallback = std::function<void(const std::string&)>;

rtAppStatusCache(rtRemoteEnvironment* env) {ObjectCache = new rtRemoteObjectCache(env);};
~rtAppStatusCache() {delete(ObjectCache); };
std::string getAppCacheId(const char *app_name);
Expand All @@ -42,10 +50,22 @@ class rtAppStatusCache : public rtObject
std::string SearchAppStatusInCache(const char *app_name);
bool doIdExist(std::string id);

StateChangedCallbackHandle registerStateChangedCallback(StateChangedCallback callback);
void unregisterStateChangedCallback(StateChangedCallbackHandle callbackId);

std::chrono::milliseconds getUpdateAge(const char *app_name);

private:

void notifyStateChanged(std::string& id);

rtRemoteObjectCache* ObjectCache;
static std::string Netflix_AppCacheId;
static std::string Youtube_AppCacheId;
StateChangedCallbackHandle next_handle = 0;
std::map<StateChangedCallbackHandle, StateChangedCallback> state_changed_listeners;
std::mutex state_changed_listeners_mutex;
std::map<std::string, std::chrono::steady_clock::time_point> last_updated;
};

#endif
45 changes: 45 additions & 0 deletions server/plat/rtdial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <map>
#include <unistd.h>
#include <pthread.h>
#include <atomic>
#include <glib.h>

#include "Module.h"
Expand All @@ -40,6 +41,7 @@
#include "rtcache.hpp"
#include "rtdial.hpp"
#include "gdial_app_registry.h"
#include <rtRemoteEnvironment.h>

rtRemoteEnvironment* env;
static GSource *remoteSource = nullptr;
Expand Down Expand Up @@ -450,6 +452,8 @@ void rtdial_term() {

#define DIAL_MAX_ADDITIONALURL (1024)

static bool await_application_state_update(const char *app_name);

map<string,string> parse_query(const char* query_string) {
if (!query_string) return {};
char *unescaped = g_uri_unescape_string(query_string, nullptr);
Expand Down Expand Up @@ -661,6 +665,10 @@ int gdial_os_application_state(const char *app_name, int instance_id, GDialAppSt
if (RTCAST_ERROR_RT(ret) != RT_OK) {
printf("RTDIAL: DialObj.getApplicationState failed!!! Error: %s\n",rtStrError(RTCAST_ERROR_RT(ret)));
return GDIAL_APP_ERROR_INTERNAL;
} else {
if (await_application_state_update(app_name)) {
State = AppCache->SearchAppStatusInCache(app_name);
}
}
}

Expand Down Expand Up @@ -696,3 +704,40 @@ int gdial_os_application_state(const char *app_name, int instance_id, GDialAppSt

return GDIAL_APP_ERROR_NONE;
}

static bool await_application_state_update(const char *app_name) {
using namespace std::chrono;
static int xdial_wait_for_rtremote_state_response_ms = -1;
if (xdial_wait_for_rtremote_state_response_ms == -1) {
const char* waitstr = getenv("XDIAL_WAIT_FOR_RTREMOTE_STATE_RESPONSE_MS");
xdial_wait_for_rtremote_state_response_ms = waitstr ? atoi(waitstr) : 0;
}
static auto xdial_max_state_value_age = milliseconds::max();
if (xdial_max_state_value_age == milliseconds::max()) {
const char* str = getenv("XDIAL_MAX_STATE_VALUE_AGE_MS");
xdial_max_state_value_age = milliseconds(str ? atoi(str) : 0);
}
// do not poll for the state update if currently held value is younger than XDIAL_MAX_STATE_VALUE_AGE_MS
if (xdial_max_state_value_age > milliseconds(0) && AppCache->getUpdateAge(app_name) < xdial_max_state_value_age) {
return false;
}
std::atomic_bool updated {false};
if (xdial_wait_for_rtremote_state_response_ms > 0) {
// the cached status could be wrong; rtremote state update request has already been launched
// so lets give it some time & report the updated value, if possible
auto handlerid = AppCache->registerStateChangedCallback([&](const std::string& application){
if (application == app_name) {
updated = true;
}
});
const auto timemax = steady_clock::now() + milliseconds(xdial_wait_for_rtremote_state_response_ms);
while (steady_clock::now() < timemax && !updated) {
auto time_left = duration_cast<milliseconds>(timemax - steady_clock::now());
if (time_left.count() > 0) {
rtEnvironmentGetGlobal()->processSingleWorkItem(time_left, true, nullptr);
}
}
AppCache->unregisterStateChangedCallback(handlerid);
}
return updated;
}