Skip to content

Commit

Permalink
fixes #37 Make transports pluggable
Browse files Browse the repository at this point in the history
We automatically register inproc, TCP, and IPC.  We can add more now
by just calling nni_tran_register().  (There is no unregister support.)

This requires transports to have access to the AIO framework (so that needs
to be something we consider), and a few nni_sock calls to get socket options.

Going forward we should version the ops vectors, and move to pushing down
transport options from the framework via setopt calls -- there is no reason
really that transports need to know all these.
  • Loading branch information
gdamore committed Aug 9, 2017
1 parent fec1e51 commit 5f0398d
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 51 deletions.
93 changes: 61 additions & 32 deletions src/core/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "core/nng_impl.h"

#include <stdio.h>
#include <string.h>

// For now the list of transports is hard-wired. Adding new transports
Expand All @@ -18,34 +19,60 @@ extern nni_tran nni_inproc_tran;
extern nni_tran nni_tcp_tran;
extern nni_tran nni_ipc_tran;

static nni_tran *transports[] = {
// clang-format off
&nni_inproc_tran,
&nni_tcp_tran,
&nni_ipc_tran,
NULL
// clang-format on
};
typedef struct nni_transport {
nni_tran t_tran;
char t_prefix[16]; // e.g. "tcp://" or "tls+tcp://"
nni_list_node t_node;
} nni_transport;

static nni_list nni_tran_list;
static nni_mtx nni_tran_lk;

int
nni_tran_register(const nni_tran *tran)
{
nni_transport *t;
int rv;

nni_mtx_lock(&nni_tran_lk);
// Check to see if the transport is already registered...
NNI_LIST_FOREACH (&nni_tran_list, t) {
if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) {
nni_mtx_unlock(&nni_tran_lk);
return (NNG_ESTATE);
}
}
if ((t = NNI_ALLOC_STRUCT(t)) == NULL) {
return (NNG_ENOMEM);
}

t->t_tran = *tran;
(void) snprintf(
t->t_prefix, sizeof(t->t_prefix), "%s://", tran->tran_scheme);
if ((rv = t->t_tran.tran_init()) != 0) {
nni_mtx_unlock(&nni_tran_lk);
NNI_FREE_STRUCT(t);
return (rv);
}
nni_list_append(&nni_tran_list, t);
nni_mtx_unlock(&nni_tran_lk);
return (0);
}

nni_tran *
nni_tran_find(const char *addr)
{
// address is of the form "<scheme>://blah..."
const char *end;
int len;
int i;
nni_tran * tran;
nni_transport *t;

if ((end = strstr(addr, "://")) == NULL) {
return (NULL);
}
len = (int) (end - addr);
for (i = 0; (tran = transports[i]) != NULL; i++) {
if ((strncmp(addr, tran->tran_scheme, len) == 0) &&
(tran->tran_scheme[len] == '\0')) {
return (tran);
nni_mtx_lock(&nni_tran_lk);
NNI_LIST_FOREACH (&nni_tran_list, t) {
if (strncmp(addr, t->t_prefix, strlen(t->t_prefix)) == 0) {
nni_mtx_unlock(&nni_tran_lk);
return (&t->t_tran);
}
}
nni_mtx_unlock(&nni_tran_lk);
return (NULL);
}

Expand All @@ -54,14 +81,15 @@ nni_tran_find(const char *addr)
int
nni_tran_sys_init(void)
{
nni_tran *tran;
int rv;

for (int i = 0; (tran = transports[i]) != NULL; i++) {
int rv;
if ((rv = tran->tran_init()) != 0) {
nni_tran_sys_fini();
return (rv);
}
NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node);
if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) ||
((rv = nni_tran_register(&nni_inproc_tran)) != 0) ||
((rv = nni_tran_register(&nni_ipc_tran)) != 0) ||
((rv = nni_tran_register(&nni_tcp_tran)) != 0)) {
nni_tran_sys_fini();
return (rv);
}
return (0);
}
Expand All @@ -71,11 +99,12 @@ nni_tran_sys_init(void)
void
nni_tran_sys_fini(void)
{
nni_tran *tran;
nni_transport *t;

for (int i = 0; (tran = transports[i]) != NULL; i++) {
if (tran->tran_fini != NULL) {
tran->tran_fini();
}
while ((t = nni_list_first(&nni_tran_list)) != NULL) {
nni_list_remove(&nni_tran_list, t);
t->t_tran.tran_fini();
NNI_FREE_STRUCT(t);
}
nni_mtx_fini(&nni_tran_lk);
}
1 change: 1 addition & 0 deletions src/core/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,6 @@ struct nni_tran_pipe {
extern nni_tran *nni_tran_find(const char *);
extern int nni_tran_sys_init(void);
extern void nni_tran_sys_fini(void);
extern int nni_tran_register(const nni_tran *);

#endif // CORE_TRANSPORT_H
44 changes: 25 additions & 19 deletions tests/trantest.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand All @@ -8,8 +9,8 @@
//

#include "convey.h"
#include "nng.h"
#include "core/nng_impl.h"
#include "nng.h"
#include <string.h>

// Transport common tests. By making a common test framework for transports,
Expand All @@ -18,20 +19,21 @@
// for comms.

typedef struct {
char addr[NNG_MAXADDRLEN+1];
char addr[NNG_MAXADDRLEN + 1];
nng_socket reqsock;
nng_socket repsock;
nni_tran *tran;
nni_tran * tran;
} trantest;

void
trantest_init(trantest *tt, const char *addr)
{
snprintf(tt->addr, sizeof (tt->addr), "%s", addr);
tt->tran = nni_tran_find(addr);
So(tt->tran != NULL);
(void) snprintf(tt->addr, sizeof(tt->addr), "%s", addr);
So(nng_open(&tt->reqsock, NNG_PROTO_REQ) == 0);
So(nng_open(&tt->repsock, NNG_PROTO_REP) == 0);

tt->tran = nni_tran_find(addr);
So(tt->tran != NULL);
}

void
Expand All @@ -57,9 +59,11 @@ trantest_conn_refused(trantest *tt)
Convey("Connection refused works", {
nng_endpoint ep = 0;

So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_ECONNREFUSED);
So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
NNG_ECONNREFUSED);
So(ep == 0);
So(nng_dial(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_ECONNREFUSED);
So(nng_dial(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
NNG_ECONNREFUSED);
So(ep == 0);
})
}
Expand All @@ -69,21 +73,24 @@ trantest_duplicate_listen(trantest *tt)
{
Convey("Duplicate listen rejected", {
nng_endpoint ep;
So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
0);
So(ep != 0);
ep = 0;
So(nng_listen(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_EADDRINUSE);
So(nng_listen(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
NNG_EADDRINUSE);
So(ep == 0);
})
}

void
trantest_listen_accept(trantest *tt)
{
Convey("Listen and accept" ,{
Convey("Listen and accept", {
nng_endpoint ep;
ep = 0;
So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
0);
So(ep != 0);

ep = 0;
Expand All @@ -97,12 +104,13 @@ trantest_send_recv(trantest *tt)
{
Convey("Send and recv", {
nng_endpoint ep = 0;
nng_msg *send;
nng_msg *recv;
size_t len;
nng_msg * send;
nng_msg * recv;
size_t len;

ep = 0;
So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
0);
So(ep != 0);
ep = 0;
So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
Expand Down Expand Up @@ -140,9 +148,7 @@ trantest_test_all(const char *addr)
Convey("Given transport", {
trantest_init(&tt, addr);

Reset({
trantest_fini(&tt);
})
Reset({ trantest_fini(&tt); });

trantest_scheme(&tt);
trantest_conn_refused(&tt);
Expand Down

0 comments on commit 5f0398d

Please sign in to comment.