Skip to content

Commit

Permalink
Pass autobahn with pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
uNetworkingAB committed Oct 8, 2019
1 parent d8b2d3c commit f58a9e1
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 26 deletions.
14 changes: 12 additions & 2 deletions examples/BroadcastingEchoServer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "App.h"

struct us_listen_socket_t *listen_socket;

int main() {
/* ws->getUserData returns one of these */
struct PerSocketData {
Expand All @@ -10,16 +12,23 @@ int main() {
uWS::App().ws<PerSocketData>("/*", {
/* Settings */
.compression = uWS::SHARED_COMPRESSOR,
.maxPayloadLength = 16 * 1024,
.maxPayloadLength = 16 * 1024 * 1024,
.idleTimeout = 10,
/* Handlers */
.open = [](auto *ws, auto *req) {
/* Let's make every connection subscribe to the "broadcast" topic */
ws->subscribe("broadcast");
},
.message = [](auto *ws, std::string_view message, uWS::OpCode opCode) {
/* Exit gracefully if we get a closedown message (ASAN debug) */
if (message == "closedown") {
/* Bye bye */
us_listen_socket_close(0, listen_socket);
ws->close();
}

/* Simply broadcast every single message we get */
ws->publish("broadcast", message/*, opCode*/);
ws->publish("broadcast", message, opCode);
},
.drain = [](auto *ws) {
/* Check getBufferedAmount here */
Expand All @@ -34,6 +43,7 @@ int main() {
/* We automatically unsubscribe from any topic here */
}
}).listen(9001, [](auto *token) {
listen_socket = token;
if (token) {
std::cout << "Listening on port " << 9001 << std::endl;
}
Expand Down
62 changes: 47 additions & 15 deletions src/TopicTreeDraft.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct Topic {
Topic *terminatingWildcardChild = nullptr;

/* What we published */
std::map<int, std::string> messages;
std::map<unsigned int, std::string> messages;

std::set<Subscriber *> subs;
};
Expand All @@ -70,7 +70,7 @@ struct TopicTree {
Topic *root = new Topic;

/* Global messageId for deduplication of overlapping topics and ordering between topics */
int messageId = 0;
unsigned int messageId = 0;

/* The triggered topics */
Topic *triggeredTopics[64];
Expand All @@ -89,7 +89,29 @@ struct TopicTree {
parent->wildcardChild = nullptr;
}
}
/* Erase us from our parents set (wildcards also live here) */
parent->children.erase(std::string_view(topic->name, topic->length));

/* If this node is triggered, make sure to remove it from the triggered list */
if (topic->triggered) {
Topic *tmp[64];
int length = 0;
for (int i = 0; i < numTriggeredTopics; i++) {
if (triggeredTopics[i] != topic) {
tmp[length++] = triggeredTopics[i];
}
}

for (int i = 0; i < length; i++) {
triggeredTopics[i] = tmp[i];
}
numTriggeredTopics = length;
}

/* Free various memory for the node */
delete [] topic->name;
delete topic;

if (parent != root) {
trimTree(parent);
}
Expand Down Expand Up @@ -155,6 +177,10 @@ struct TopicTree {
this->cb = cb;
}

~TopicTree() {
delete root;
}

void subscribe(std::string_view topic, Subscriber *subscriber) {
/* Start iterating from the root */
Topic *iterator = root;
Expand Down Expand Up @@ -222,29 +248,29 @@ struct TopicTree {
topic->subs.erase(subscriber);
trimTree(topic);
}
subscriber->subscriptions.clear();
}
}

/* Drain the tree by emitting what to send with every Subscriber */
/* Better name would be commit() and making it public so that one can commit and shutdown, etc */
void drain() {

/* Do nothing if nothing to send */
if (!numTriggeredTopics) {
return;
}

/* Fast path for one topic (can also be used with heuristics) */
if (numTriggeredTopics == -555555) {
/* Disabled */
/*std::string res;
for (auto &p : triggeredTopics[0]->messages) {
res.append(p.second);
/* bug fix: update min, as the one tracked via subscribe gets invalid as you unsubscribe */
min = (Subscriber *)UINTPTR_MAX;
for (int i = 0; i < numTriggeredTopics; i++) {
if ((triggeredTopics[i]->subs.size()) && (min > *triggeredTopics[i]->subs.begin())) {
min = *triggeredTopics[i]->subs.begin();
}
}

for (Subscriber *s : triggeredTopics[0]->subs) {
cb(s, res);
}*/
} else {
/* Check if we really have any sockets still */
if (min != (Subscriber *)UINTPTR_MAX) {

/* Up to 64 triggered Topics per batch */
std::map<uint64_t, std::string> intersectionCache;
Expand All @@ -263,7 +289,7 @@ struct TopicTree {
Subscriber *nextMin = (Subscriber *)UINTPTR_MAX;

/* The message sets relevant for this intersection */
std::map<int, std::string> *perSubscriberIntersectingTopicMessages[64];
std::map<unsigned int, std::string> *perSubscriberIntersectingTopicMessages[64];
int numPerSubscriberIntersectingTopicMessages = 0;

uint64_t intersection = 0;
Expand Down Expand Up @@ -297,7 +323,7 @@ struct TopicTree {
if (intersectionCache[intersection].length() == 0) {

/* Build the union in order without duplicates */
std::map<int, std::string> complete;
std::map<unsigned int, std::string> complete;
for (int i = 0; i < numPerSubscriberIntersectingTopicMessages; i++) {
complete.insert(perSubscriberIntersectingTopicMessages[i]->begin(), perSubscriberIntersectingTopicMessages[i]->end());
}
Expand Down Expand Up @@ -337,7 +363,13 @@ struct TopicTree {
for (int i = 0; i < indentation; i++) {
std::cout << " ";
}
std::cout << std::string_view(p.second->name, p.second->length) << " = " << p.second->messages.size() << " publishes, " << p.second->subs.size() << " subscribers" << std::endl;
std::cout << std::string_view(p.second->name, p.second->length) << " = " << p.second->messages.size() << " publishes, " << p.second->subs.size() << " subscribers {";

for (auto &p : p.second->subs) {
std::cout << p << " referring to socket: " << p->user << ", ";
}
std::cout << "}" << std::endl;

print(p.second, indentation + 1);
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct WebSocket : AsyncSocket<SSL> {

/* Make sure to unsubscribe from any pub/sub node at exit */
webSocketContextData->topicTree.unsubscribeAll(webSocketData->subscriber);
delete webSocketData->subscriber;
webSocketData->subscriber = nullptr;
}

/* Subscribe to a topic according to MQTT rules and syntax */
Expand All @@ -147,16 +149,17 @@ struct WebSocket : AsyncSocket<SSL> {
}

/* Publish a message to a topic according to MQTT rules and syntax */
void publish(std::string_view topic, std::string_view message) {
void publish(std::string_view topic, std::string_view message, OpCode opCode = OpCode::TEXT, bool compress = false) {
WebSocketContextData<SSL> *webSocketContextData = (WebSocketContextData<SSL> *) us_socket_context_ext(SSL,
(us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this)
);

/* We frame the message right here and only pass raw bytes to the pub/subber */
char dst[1024];
size_t dst_length = protocol::formatMessage<true>(dst, message.data(), message.length(), OpCode::TEXT, message.length(), false);
char *dst = (char *) malloc(protocol::messageFrameSize(message.size()));
size_t dst_length = protocol::formatMessage<true>(dst, message.data(), message.length(), opCode, message.length(), false);

webSocketContextData->topicTree.publish(topic, std::string_view(dst, dst_length));
free(dst);
}
};

Expand Down
3 changes: 2 additions & 1 deletion src/WebSocketContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ struct WebSocketContext {

/* Handle socket disconnections */
us_socket_context_on_close(SSL, getSocketContext(), [](auto *s) {

/* For whatever reason, if we already have emitted close event, do not emit it again */
WebSocketData *webSocketData = (WebSocketData *) (us_socket_ext(SSL, s));
if (!webSocketData->isShuttingDown) {
Expand All @@ -238,6 +237,8 @@ struct WebSocketContext {

/* Make sure to unsubscribe from any pub/sub node at exit */
webSocketContextData->topicTree.unsubscribeAll(webSocketData->subscriber);
delete webSocketData->subscriber;
webSocketData->subscriber = nullptr;
}

/* Destruct in-placed data struct */
Expand Down
12 changes: 7 additions & 5 deletions src/WebSocketContextData.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,24 @@ struct WebSocketContextData {
size_t maxPayloadLength = 0;
int idleTimeout = 0;

/* There needs to be a maxBackpressure which will force close everything over that limit */
size_t maxBackpressure = 16 * 1024;

/* Each websocket context has a topic tree for pub/sub */
TopicTree topicTree;

WebSocketContextData() : topicTree([](Subscriber *s, std::string_view data) -> int {
//std::cout << "Skickar data: " << data << " på sub: " << s << std::endl;


/* We rely on writing to regular asyncSockets */
auto *asyncSocket = (AsyncSocket<SSL> *) s->user;

asyncSocket->write(data.data(), data.length());

/* Reserved, unused */
return 0;
}) {

/* bug: This should probably happen in both post and pre, esp for libuv */
Loop::get()->addPostHandler([this](Loop *loop) {

/* Commit pub/sub batches every loop iteration */
topicTree.drain();
});
}
Expand Down

0 comments on commit f58a9e1

Please sign in to comment.