Skip to content

Commit

Permalink
[Linux] Run glib event loop on dedicated glib Matter context (#25960)
Browse files Browse the repository at this point in the history
* Run Glib thread on a dedicated context

* Run WiFiIPChangeListener on dedicated context

* Do not use g_io_add_watch when adding IO watch

* Check if D-Bus object proxies are created correctly

All D-Bus proxy objects created with GLib *_new_for_bus and
*_new_for_bus_sync functions have to be created with thread default main
context set. Otherwise, proxy object internal signals will be scheduled
on GLib global main context. We do not want that, because we do not know
whether Matter SDK will be used in GLib-based application or not, so we
can not run global main even loop on our own - all Matter-related GLib
signals shell be run on a dedicated Matter GLib context.

* Initialize BlueZ D-Bus object proxy on Matter GLib event loop

* Initialize WPA D-Bus object proxy on Matter GLib event loop

* Fix typo: devcie -> device

* Initialize OTBR D-Bus object proxy on Matter GLib event loop

* Fix glib source watch removal in BLE module

* Enable glib event loop in Thread-only configuration

* Run OTBR async calls on Matter glib context

* Document GDBusCreateObjectManagerContext usage
  • Loading branch information
arkq committed May 9, 2023
1 parent 900cebf commit 0ada46a
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 106 deletions.
6 changes: 3 additions & 3 deletions src/platform/Linux/CHIPDevicePlatformConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
#define CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE 0
#endif

// Start GLib main event loop if BLE or WiFi is enabled. This is needed to handle
// D-Bus communication with BlueZ or wpa_supplicant.
#if CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE || CHIP_DEVICE_CONFIG_ENABLE_WIFI
// Start GLib main event loop if BLE, Thread or WiFi is enabled. This is needed
// to handle D-Bus communication with BlueZ or wpa_supplicant.
#if CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE || CHIP_DEVICE_CONFIG_ENABLE_THREAD || CHIP_DEVICE_CONFIG_ENABLE_WIFI
#define CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP 1
#else
#define CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP 0
Expand Down
60 changes: 51 additions & 9 deletions src/platform/Linux/ConnectivityManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,10 @@ void ConnectivityManagerImpl::_OnWpaPropertiesChanged(WpaFiW1Wpa_supplicant1Inte

void ConnectivityManagerImpl::_OnWpaInterfaceProxyReady(GObject * source_object, GAsyncResult * res, gpointer user_data)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

GError * err = nullptr;

std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);
Expand Down Expand Up @@ -530,6 +534,10 @@ void ConnectivityManagerImpl::_OnWpaInterfaceProxyReady(GObject * source_object,

void ConnectivityManagerImpl::_OnWpaBssProxyReady(GObject * source_object, GAsyncResult * res, gpointer user_data)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

GError * err = nullptr;

std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);
Expand Down Expand Up @@ -559,6 +567,10 @@ void ConnectivityManagerImpl::_OnWpaBssProxyReady(GObject * source_object, GAsyn

void ConnectivityManagerImpl::_OnWpaInterfaceReady(GObject * source_object, GAsyncResult * res, gpointer user_data)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

GError * err = nullptr;

std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);
Expand Down Expand Up @@ -634,6 +646,10 @@ void ConnectivityManagerImpl::_OnWpaInterfaceReady(GObject * source_object, GAsy
void ConnectivityManagerImpl::_OnWpaInterfaceAdded(WpaFiW1Wpa_supplicant1 * proxy, const gchar * path, GVariant * properties,
gpointer user_data)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);

if (mWpaSupplicant.interfacePath)
Expand Down Expand Up @@ -696,6 +712,10 @@ void ConnectivityManagerImpl::_OnWpaInterfaceRemoved(WpaFiW1Wpa_supplicant1 * pr

void ConnectivityManagerImpl::_OnWpaProxyReady(GObject * source_object, GAsyncResult * res, gpointer user_data)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

GError * err = nullptr;

std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);
Expand Down Expand Up @@ -725,15 +745,8 @@ void ConnectivityManagerImpl::_OnWpaProxyReady(GObject * source_object, GAsyncRe

void ConnectivityManagerImpl::StartWiFiManagement()
{
std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);

mConnectivityFlag.ClearAll();
mWpaSupplicant = GDBusWpaSupplicant{};

ChipLogProgress(DeviceLayer, "wpa_supplicant: Start WiFi management");

wpa_fi_w1_wpa_supplicant1_proxy_new_for_bus(G_BUS_TYPE_SYSTEM, G_DBUS_PROXY_FLAGS_NONE, kWpaSupplicantServiceName,
kWpaSupplicantObjectPath, nullptr, _OnWpaProxyReady, nullptr);
CHIP_ERROR err = PlatformMgrImpl().GLibMatterContextInvokeSync(_StartWiFiManagement, this);
VerifyOrReturn(err == CHIP_NO_ERROR, ChipLogError(DeviceLayer, "Failed to start WiFi management"));
}

bool ConnectivityManagerImpl::IsWiFiManagementStarted()
Expand Down Expand Up @@ -1279,6 +1292,11 @@ int32_t ConnectivityManagerImpl::GetDisconnectReason()

CHIP_ERROR ConnectivityManagerImpl::GetConfiguredNetwork(NetworkCommissioning::Network & network)
{
// This function can be called without g_main_context_get_thread_default() being set.
// The network proxy object is created in a synchronous manner, so the D-Bus call will
// be completed before this function returns. Also, no external callbacks are registered
// with the proxy object.

std::lock_guard<std::mutex> lock(mWpaSupplicantMutex);
std::unique_ptr<GError, GErrorDeleter> err;

Expand Down Expand Up @@ -1463,6 +1481,11 @@ std::pair<WiFiBand, uint16_t> GetBandAndChannelFromFrequency(uint32_t freq)

bool ConnectivityManagerImpl::_GetBssInfo(const gchar * bssPath, NetworkCommissioning::WiFiScanResponse & result)
{
// This function can be called without g_main_context_get_thread_default() being set.
// The BSS proxy object is created in a synchronous manner, so the D-Bus call will be
// completed before this function returns. Also, no external callbacks are registered
// with the proxy object.

std::unique_ptr<GError, GErrorDeleter> err;
std::unique_ptr<WpaFiW1Wpa_supplicant1BSS, GObjectDeleter> bss(
wpa_fi_w1_wpa_supplicant1_bss_proxy_new_for_bus_sync(G_BUS_TYPE_SYSTEM, G_DBUS_PROXY_FLAGS_NONE, kWpaSupplicantServiceName,
Expand Down Expand Up @@ -1676,6 +1699,25 @@ void ConnectivityManagerImpl::_OnWpaInterfaceScanDone(GObject * source_object, G
g_strfreev(oldBsss);
}

CHIP_ERROR ConnectivityManagerImpl::_StartWiFiManagement(ConnectivityManagerImpl * self)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

std::lock_guard<std::mutex> lock(self->mWpaSupplicantMutex);

self->mConnectivityFlag.ClearAll();
self->mWpaSupplicant = GDBusWpaSupplicant{};

ChipLogProgress(DeviceLayer, "wpa_supplicant: Start WiFi management");

wpa_fi_w1_wpa_supplicant1_proxy_new_for_bus(G_BUS_TYPE_SYSTEM, G_DBUS_PROXY_FLAGS_NONE, kWpaSupplicantServiceName,
kWpaSupplicantObjectPath, nullptr, self->_OnWpaProxyReady, nullptr);

return CHIP_NO_ERROR;
}

#endif // CHIP_DEVICE_CONFIG_ENABLE_WPA

} // namespace DeviceLayer
Expand Down
2 changes: 2 additions & 0 deletions src/platform/Linux/ConnectivityManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class ConnectivityManagerImpl final : public ConnectivityManager,

static bool _GetBssInfo(const gchar * bssPath, NetworkCommissioning::WiFiScanResponse & result);

static CHIP_ERROR _StartWiFiManagement(ConnectivityManagerImpl * self);

static bool mAssociationStarted;
static BitFlags<ConnectivityFlags> mConnectivityFlag;
static GDBusWpaSupplicant mWpaSupplicant CHIP_GUARDED_BY(mWpaSupplicantMutex);
Expand Down
25 changes: 18 additions & 7 deletions src/platform/Linux/PlatformManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ PlatformManagerImpl PlatformManagerImpl::sInstance;
namespace {

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP
void * GLibMainLoopThread(void * loop)
void * GLibMainLoopThread(void * userData)
{
g_main_loop_run(static_cast<GMainLoop *>(loop));
GMainLoop * loop = static_cast<GMainLoop *>(userData);
GMainContext * context = g_main_loop_get_context(loop);

g_main_context_push_thread_default(context);
g_main_loop_run(loop);

return nullptr;
}
#endif
Expand Down Expand Up @@ -172,11 +177,15 @@ CHIP_ERROR RunWiFiIPChangeListener()
return CHIP_ERROR_INTERNAL;
}

GIOChannel * ch = g_io_channel_unix_new(sock);
g_io_add_watch_full(ch, G_PRIORITY_DEFAULT, G_IO_IN, WiFiIPChangeListener, nullptr, nullptr);

GIOChannel * ch = g_io_channel_unix_new(sock);
GSource * watchSource = g_io_create_watch(ch, G_IO_IN);
g_source_set_callback(watchSource, G_SOURCE_FUNC(WiFiIPChangeListener), nullptr, nullptr);
g_io_channel_set_close_on_unref(ch, TRUE);
g_io_channel_set_encoding(ch, nullptr, nullptr);

PlatformMgrImpl().GLibMatterContextAttachSource(watchSource);

g_source_unref(watchSource);
g_io_channel_unref(ch);

return CHIP_NO_ERROR;
Expand All @@ -190,8 +199,10 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()
{
#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

mGLibMainLoop = g_main_loop_new(nullptr, FALSE);
auto * context = g_main_context_new();
mGLibMainLoop = g_main_loop_new(context, FALSE);
mGLibMainLoopThread = g_thread_new("gmain-matter", GLibMainLoopThread, mGLibMainLoop);
g_main_context_unref(context);

{
// Wait for the GLib main loop to start. It is required that the context used
Expand All @@ -212,7 +223,7 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()
return G_SOURCE_REMOVE;
},
&invokeData, nullptr);
g_source_attach(idleSource, g_main_loop_get_context(mGLibMainLoop));
GLibMatterContextAttachSource(idleSource);
g_source_unref(idleSource);

invokeData.mDoneCond.wait(lock, [&invokeData]() { return invokeData.mDone; });
Expand Down
10 changes: 8 additions & 2 deletions src/platform/Linux/PlatformManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
return _GLibMatterContextInvokeSync((CHIP_ERROR(*)(void *)) func, (void *) userData);
}

unsigned int GLibMatterContextAttachSource(GSource * source)
{
VerifyOrDie(mGLibMainLoop != nullptr);
return g_source_attach(source, g_main_loop_get_context(mGLibMainLoop));
}

#endif

System::Clock::Timestamp GetStartTime() { return mStartTime; }
Expand Down Expand Up @@ -121,8 +127,8 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
// event loop thread before the call to g_source_attach().
std::mutex mGLibMainLoopCallbackIndirectionMutex;

GMainLoop * mGLibMainLoop;
GThread * mGLibMainLoopThread;
GMainLoop * mGLibMainLoop = nullptr;
GThread * mGLibMainLoopThread = nullptr;

#endif // CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP
};
Expand Down
97 changes: 72 additions & 25 deletions src/platform/Linux/ThreadStackManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,66 @@ constexpr char ThreadStackManagerImpl::kOpenthreadDeviceRoleLeader[];

constexpr char ThreadStackManagerImpl::kPropertyDeviceRole[];

namespace {

struct SetActiveDatasetContext
{
OpenthreadIoOpenthreadBorderRouter * proxy;
ByteSpan netInfo;
};

CHIP_ERROR GLibMatterContextSetActiveDataset(SetActiveDatasetContext * context)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

std::unique_ptr<GBytes, GBytesDeleter> bytes(g_bytes_new(context->netInfo.data(), context->netInfo.size()));
if (!bytes)
return CHIP_ERROR_NO_MEMORY;
std::unique_ptr<GVariant, GVariantDeleter> value(g_variant_new_from_bytes(G_VARIANT_TYPE_BYTESTRING, bytes.release(), true));
if (!value)
return CHIP_ERROR_NO_MEMORY;
openthread_io_openthread_border_router_set_active_dataset_tlvs(context->proxy, value.release());
return CHIP_NO_ERROR;
}

} // namespace

ThreadStackManagerImpl::ThreadStackManagerImpl() : mAttached(false) {}

CHIP_ERROR ThreadStackManagerImpl::_InitThreadStack()
CHIP_ERROR ThreadStackManagerImpl::GLibMatterContextInitThreadStack(ThreadStackManagerImpl * self)
{
// When creating D-Bus proxy object, the thread default context must be initialized. Otherwise,
// all D-Bus signals will be delivered to the GLib global default main context.
VerifyOrDie(g_main_context_get_thread_default() != nullptr);

std::unique_ptr<GError, GErrorDeleter> err;
mProxy.reset(openthread_io_openthread_border_router_proxy_new_for_bus_sync(G_BUS_TYPE_SYSTEM, G_DBUS_PROXY_FLAGS_NONE,
kDBusOpenThreadService, kDBusOpenThreadObjectPath,
nullptr, &MakeUniquePointerReceiver(err).Get()));
if (!mProxy)
{
ChipLogError(DeviceLayer, "openthread: failed to create openthread dbus proxy %s", err ? err->message : "unknown error");
return CHIP_ERROR_INTERNAL;
}
self->mProxy.reset(openthread_io_openthread_border_router_proxy_new_for_bus_sync(
G_BUS_TYPE_SYSTEM, G_DBUS_PROXY_FLAGS_NONE, kDBusOpenThreadService, kDBusOpenThreadObjectPath, nullptr,
&MakeUniquePointerReceiver(err).Get()));
VerifyOrReturnError(
self->mProxy != nullptr, CHIP_ERROR_INTERNAL,
ChipLogError(DeviceLayer, "openthread: failed to create openthread dbus proxy %s", err ? err->message : "unknown error"));

g_signal_connect(self->mProxy.get(), "g-properties-changed", G_CALLBACK(OnDbusPropertiesChanged), self);

g_signal_connect(mProxy.get(), "g-properties-changed", G_CALLBACK(OnDbusPropertiesChanged), this);
return CHIP_NO_ERROR;
}

CHIP_ERROR ThreadStackManagerImpl::_InitThreadStack()
{
CHIP_ERROR err;

err = PlatformMgrImpl().GLibMatterContextInvokeSync(GLibMatterContextInitThreadStack, this);
VerifyOrReturnError(err == CHIP_NO_ERROR, err, ChipLogError(DeviceLayer, "openthread: failed to init dbus proxy"));

// If get property is called inside dbus thread (we are going to make it so), XXX_get_XXX can be used instead of XXX_dup_XXX
// which is a little bit faster and the returned object doesn't need to be freed. Same for all following get properties.
std::unique_ptr<gchar, GFree> role(openthread_io_openthread_border_router_dup_device_role(mProxy.get()));
if (role)
{
ThreadDevcieRoleChangedHandler(role.get());
ThreadDeviceRoleChangedHandler(role.get());
}

return CHIP_NO_ERROR;
Expand Down Expand Up @@ -102,13 +140,13 @@ void ThreadStackManagerImpl::OnDbusPropertiesChanged(OpenthreadIoOpenthreadBorde
if (value_str == nullptr)
continue;
ChipLogProgress(DeviceLayer, "Thread role changed to: %s", StringOrNullMarker(value_str));
me->ThreadDevcieRoleChangedHandler(value_str);
me->ThreadDeviceRoleChangedHandler(value_str);
}
}
}
}

void ThreadStackManagerImpl::ThreadDevcieRoleChangedHandler(const gchar * role)
void ThreadStackManagerImpl::ThreadDeviceRoleChangedHandler(const gchar * role)
{
bool attached = strcmp(role, kOpenthreadDeviceRoleDetached) != 0 && strcmp(role, kOpenthreadDeviceRoleDisabled) != 0;

Expand Down Expand Up @@ -217,16 +255,9 @@ CHIP_ERROR ThreadStackManagerImpl::_SetThreadProvision(ByteSpan netInfo)
VerifyOrReturnError(mProxy, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(Thread::OperationalDataset::IsValid(netInfo), CHIP_ERROR_INVALID_ARGUMENT);

{
std::unique_ptr<GBytes, GBytesDeleter> bytes(g_bytes_new(netInfo.data(), netInfo.size()));
if (!bytes)
return CHIP_ERROR_NO_MEMORY;
std::unique_ptr<GVariant, GVariantDeleter> value(
g_variant_new_from_bytes(G_VARIANT_TYPE_BYTESTRING, bytes.release(), true));
if (!value)
return CHIP_ERROR_NO_MEMORY;
openthread_io_openthread_border_router_set_active_dataset_tlvs(mProxy.get(), value.release());
}
SetActiveDatasetContext context = { mProxy.get(), netInfo };
CHIP_ERROR err = PlatformMgrImpl().GLibMatterContextInvokeSync(GLibMatterContextSetActiveDataset, &context);
VerifyOrReturnError(err == CHIP_NO_ERROR, err, ChipLogError(DeviceLayer, "openthread: failed to set active dataset"));

// post an event alerting other subsystems about change in provisioning state
ChipDeviceEvent event;
Expand Down Expand Up @@ -345,12 +376,20 @@ bool ThreadStackManagerImpl::_IsThreadAttached() const
return mAttached;
}

CHIP_ERROR ThreadStackManagerImpl::GLibMatterContextCallAttach(ThreadStackManagerImpl * self)
{
VerifyOrDie(g_main_context_get_thread_default() != nullptr);
openthread_io_openthread_border_router_call_attach(self->mProxy.get(), nullptr, _OnThreadBrAttachFinished, self);
return CHIP_NO_ERROR;
}

CHIP_ERROR ThreadStackManagerImpl::_SetThreadEnabled(bool val)
{
VerifyOrReturnError(mProxy, CHIP_ERROR_INCORRECT_STATE);
if (val)
{
openthread_io_openthread_border_router_call_attach(mProxy.get(), nullptr, _OnThreadBrAttachFinished, this);
CHIP_ERROR err = PlatformMgrImpl().GLibMatterContextInvokeSync(GLibMatterContextCallAttach, this);
VerifyOrReturnError(err == CHIP_NO_ERROR, err, ChipLogError(DeviceLayer, "openthread: failed to attach"));
}
else
{
Expand Down Expand Up @@ -572,12 +611,20 @@ void ThreadStackManagerImpl::_SetRouterPromotion(bool val)
// Set Router Promotion is not supported on linux
}

CHIP_ERROR ThreadStackManagerImpl::GLibMatterContextCallScan(ThreadStackManagerImpl * self)
{
VerifyOrDie(g_main_context_get_thread_default() != nullptr);
openthread_io_openthread_border_router_call_scan(self->mProxy.get(), nullptr, _OnNetworkScanFinished, self);
return CHIP_NO_ERROR;
}

CHIP_ERROR ThreadStackManagerImpl::_StartThreadScan(ThreadDriver::ScanCallback * callback)
{
// There is another ongoing scan request, reject the new one.
VerifyOrReturnError(mpScanCallback == nullptr, CHIP_ERROR_INCORRECT_STATE);
mpScanCallback = callback;
openthread_io_openthread_border_router_call_scan(mProxy.get(), nullptr, _OnNetworkScanFinished, this);
CHIP_ERROR err = PlatformMgrImpl().GLibMatterContextInvokeSync(GLibMatterContextCallScan, this);
VerifyOrReturnError(err == CHIP_NO_ERROR, err, ChipLogError(DeviceLayer, "openthread: failed to start scan"));
return CHIP_NO_ERROR;
}

Expand Down
5 changes: 4 additions & 1 deletion src/platform/Linux/ThreadStackManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ class ThreadStackManagerImpl : public ThreadStackManager

std::unique_ptr<OpenthreadIoOpenthreadBorderRouter, GObjectDeleter> mProxy;

static CHIP_ERROR GLibMatterContextInitThreadStack(ThreadStackManagerImpl * self);
static CHIP_ERROR GLibMatterContextCallAttach(ThreadStackManagerImpl * self);
static CHIP_ERROR GLibMatterContextCallScan(ThreadStackManagerImpl * self);
static void OnDbusPropertiesChanged(OpenthreadIoOpenthreadBorderRouter * proxy, GVariant * changed_properties,
const gchar * const * invalidated_properties, gpointer user_data);
void ThreadDevcieRoleChangedHandler(const gchar * role);
void ThreadDeviceRoleChangedHandler(const gchar * role);

Thread::OperationalDataset mDataset = {};

Expand Down
Loading

0 comments on commit 0ada46a

Please sign in to comment.