Skip to content

Commit

Permalink
Merge pull request #1346 from spark/fix/tcpserver-threading
Browse files Browse the repository at this point in the history
[Photon/P1] Fixes thread priority issues
  • Loading branch information
technobly committed Jul 10, 2017
2 parents 82e4e31 + 77c207f commit 744a7b9
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 44 deletions.
Binary file modified hal/src/photon/lib/Supplicant_BESL.a
Binary file not shown.
56 changes: 18 additions & 38 deletions hal/src/photon/socket_hal.cpp
Expand Up @@ -32,6 +32,8 @@
#include <vector>
#include "lwip/api.h"
#include "network_interface.h"
#include "spark_wiring_thread.h"
#include "spark_wiring_vector.h"

wiced_result_t wiced_last_error( wiced_tcp_socket_t* socket);

Expand Down Expand Up @@ -208,13 +210,12 @@ class tcp_server_client_t {
struct tcp_server_t : public wiced_tcp_server_t
{
tcp_server_t() {
wiced_rtos_init_semaphore(&accept_lock);
wiced_rtos_set_semaphore(&accept_lock);
os_mutex_create(&accept_lock);
memset(clients, 0, sizeof(clients));
}

~tcp_server_t() {
wiced_rtos_deinit_semaphore(&accept_lock);
os_mutex_destroy(accept_lock);
}

/**
Expand All @@ -240,14 +241,14 @@ struct tcp_server_t : public wiced_tcp_server_t
wiced_result_t accept(wiced_tcp_socket_t* socket) {
wiced_result_t result;
if ((result=wiced_tcp_accept(socket))==WICED_SUCCESS) {
wiced_rtos_get_semaphore(&accept_lock, WICED_WAIT_FOREVER);
os_mutex_lock(accept_lock);

int idx = index(socket);
if (idx>=0) {
clients[idx] = new tcp_server_client_t(this, socket);
to_accept.insert(to_accept.end(), idx);
to_accept.append(idx);
}
wiced_rtos_set_semaphore(&accept_lock);
os_mutex_unlock(accept_lock);
}
return result;
}
Expand All @@ -257,13 +258,13 @@ struct tcp_server_t : public wiced_tcp_server_t
* @return The next client, or NULL
*/
tcp_server_client_t* next_accept() {
wiced_rtos_get_semaphore(&accept_lock, WICED_WAIT_FOREVER);
os_mutex_lock(accept_lock);
int index = -1;
if (to_accept.size()) {
index = *to_accept.begin();
to_accept.erase(to_accept.begin());
to_accept.removeAt(0);
}
wiced_rtos_set_semaphore(&accept_lock);
os_mutex_unlock(accept_lock);
return index>=0 ? clients[index] : NULL;
}

Expand All @@ -273,12 +274,12 @@ struct tcp_server_t : public wiced_tcp_server_t
* @return
*/
wiced_result_t notify_disconnected(wiced_tcp_socket_t* socket) {
wiced_rtos_get_semaphore(&accept_lock, WICED_WAIT_FOREVER);
os_mutex_lock(accept_lock);
int idx = index(socket);
tcp_server_client_t* client = clients[idx];
if (client)
client->notify_disconnected();
wiced_rtos_set_semaphore(&accept_lock);
os_mutex_unlock(accept_lock);
return WICED_SUCCESS;
}

Expand Down Expand Up @@ -316,8 +317,8 @@ struct tcp_server_t : public wiced_tcp_server_t
// for each server instance, maintain an associated tcp_server_client_t instance
tcp_server_client_t* clients[WICED_MAXIMUM_NUMBER_OF_SERVER_SOCKETS];

wiced_semaphore_t accept_lock;
std::vector<int> to_accept;
os_mutex_t accept_lock;
spark::Vector<int> to_accept;
};

void tcp_server_client_t::close() {
Expand Down Expand Up @@ -443,24 +444,6 @@ class socket_t
bool isOpen() { return !closed && is_inner_open(); }
};

/**
* Lock/unlock a semaphore via RAII
*/
class SemaphoreLock
{
wiced_semaphore_t& sem;

public:

SemaphoreLock(wiced_semaphore_t& sem_) : sem(sem_) {
wiced_rtos_get_semaphore(&sem, NEVER_TIMEOUT);
}

~SemaphoreLock() {
wiced_rtos_set_semaphore(&sem);
}
};

/**
* Maintains a singly linked list of sockets. Access to the list is not
* made thread-safe - callers should use SocketListLock to correctly serialize
Expand All @@ -469,19 +452,16 @@ class SemaphoreLock
struct SocketList
{
socket_t* items;
wiced_semaphore_t semaphore;
Mutex mutex;

public:

SocketList() : items(NULL)
{
wiced_rtos_init_semaphore(&semaphore);
wiced_rtos_set_semaphore(&semaphore);
}

~SocketList()
{
wiced_rtos_deinit_semaphore(&semaphore);
}

/**
Expand Down Expand Up @@ -550,11 +530,11 @@ struct SocketList
friend class SocketListLock;
};

struct SocketListLock : SemaphoreLock
struct SocketListLock : std::lock_guard<Mutex>
{
SocketListLock(SocketList& list) :
SemaphoreLock(list.semaphore)
{}
std::lock_guard<Mutex>(list.mutex)
{};
};

class ServerSocketList : public SocketList
Expand Down
4 changes: 3 additions & 1 deletion system/src/active_object.cpp
Expand Up @@ -41,7 +41,9 @@ void ActiveObjectBase::start_thread()

void ActiveObjectBase::run()
{
std::lock_guard<std::mutex> lck (_start);
/* XXX: We shouldn't constantly hold a mutex. This breaks priority inhertiance mechanisms in FreeRTOS. */
/* It's not even used anywhere */
// std::lock_guard<std::mutex> lck (_start);
started = true;

uint32_t last_background_run = 0;
Expand Down
2 changes: 1 addition & 1 deletion user/tests/app/tcp_server/README.md
@@ -1,4 +1,4 @@
Flash server application to the device:
Flash server application to the device (add USE_THREADING=y option to enable threading):
$ cd ~/firmware/modules
$ make -s all program-dfu PLATFORM=photon TEST=app/tcp_server

Expand Down
7 changes: 6 additions & 1 deletion user/tests/app/tcp_server/server.cpp
Expand Up @@ -2,6 +2,10 @@

SYSTEM_MODE(MANUAL)

#if USE_THREADING == 1
SYSTEM_THREAD(ENABLED)
#endif

// Assertion macro to use in Errorable classes
#define CHECK(_expr) \
do { \
Expand Down Expand Up @@ -284,7 +288,8 @@ class NetworkManager: public Errorable {
}
addr_ = IPAddress();
WiFi.connect();
if (!WiFi.ready()) {
if (!waitFor(WiFi.ready, 10000)) {
WiFi.disconnect();
setError("Unable to connect to network");
return false;
}
Expand Down
7 changes: 7 additions & 0 deletions user/tests/app/tcp_server/test.mk
@@ -0,0 +1,7 @@
ifeq ("${USE_THREADING}","y")
USE_THREADING_VALUE=1
else
USE_THREADING_VALUE=0
endif

CFLAGS += -DUSE_THREADING=${USE_THREADING_VALUE}
13 changes: 10 additions & 3 deletions wiring/src/spark_wiring_tcpserver.cpp
Expand Up @@ -26,9 +26,12 @@
#include "spark_wiring_tcpclient.h"
#include "spark_wiring_tcpserver.h"
#include "spark_wiring_network.h"
#include "spark_wiring_thread.h"

using namespace spark;

static TCPClient* s_invalid_client = NULL;

class TCPServerClient : public TCPClient
{

Expand All @@ -48,7 +51,11 @@ class TCPServerClient : public TCPClient

TCPServer::TCPServer(uint16_t port, network_interface_t nif) : _port(port), _nif(nif), _sock(socket_handle_invalid()), _client(socket_handle_invalid())
{

SINGLE_THREADED_BLOCK() {
if (!s_invalid_client) {
s_invalid_client = new TCPClient(socket_handle_invalid());
}
}
}

bool TCPServer::begin()
Expand Down Expand Up @@ -90,15 +97,15 @@ TCPClient TCPServer::available()
if((!Network.from(_nif).ready()) || (_sock == SOCKET_INVALID))
{
_sock = SOCKET_INVALID;
_client = TCPClient(SOCKET_INVALID);
_client = *s_invalid_client;
return _client;
}

int sock = socket_accept(_sock);

if (!socket_handle_valid(sock))
{
_client = TCPClient(SOCKET_INVALID);
_client = *s_invalid_client;
}
else
{
Expand Down

0 comments on commit 744a7b9

Please sign in to comment.