Skip to content

Commit

Permalink
Merge branch 'tor-github/pr/1040'
Browse files Browse the repository at this point in the history
  • Loading branch information
dgoulet-tor committed Jun 11, 2019
2 parents 8e112ce + 5f5f6bb commit f7e8b3b
Show file tree
Hide file tree
Showing 22 changed files with 427 additions and 345 deletions.
3 changes: 3 additions & 0 deletions changes/ticket29976
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
o Code simplification and refactoring:
- Rework bootstrap tracking to use the new publish-subscribe
subsystem. Closes ticket 29976.
2 changes: 2 additions & 0 deletions src/app/main/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,8 @@ pubsub_connect(void)
/* XXXX For each pubsub channel, its delivery strategy should be set at
* this XXXX point, using tor_mainloop_set_delivery_strategy().
*/
tor_mainloop_set_delivery_strategy("orconn", DELIV_IMMEDIATE);
tor_mainloop_set_delivery_strategy("ocirc", DELIV_IMMEDIATE);
}
}

Expand Down
11 changes: 5 additions & 6 deletions src/core/or/circuitbuild.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,13 @@ origin_circuit_get_guard_state(origin_circuit_t *circ)
static void
circuit_chan_publish(const origin_circuit_t *circ, const channel_t *chan)
{
ocirc_event_msg_t msg;
ocirc_chan_msg_t *msg = tor_malloc(sizeof(*msg));

msg.type = OCIRC_MSGTYPE_CHAN;
msg.u.chan.gid = circ->global_identifier;
msg.u.chan.chan = chan->global_identifier;
msg.u.chan.onehop = circ->build_state->onehop_tunnel;
msg->gid = circ->global_identifier;
msg->chan = chan->global_identifier;
msg->onehop = circ->build_state->onehop_tunnel;

ocirc_event_publish(&msg);
ocirc_chan_publish(msg);
}

/** Start establishing the first hop of our circuit. Figure out what
Expand Down
33 changes: 16 additions & 17 deletions src/core/or/circuitlist.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,44 +496,42 @@ int
circuit_event_status(origin_circuit_t *circ, circuit_status_event_t tp,
int reason_code)
{
ocirc_event_msg_t msg;
ocirc_cevent_msg_t *msg = tor_malloc(sizeof(*msg));

tor_assert(circ);

msg.type = OCIRC_MSGTYPE_CEVENT;
msg.u.cevent.gid = circ->global_identifier;
msg.u.cevent.evtype = tp;
msg.u.cevent.reason = reason_code;
msg.u.cevent.onehop = circ->build_state->onehop_tunnel;
msg->gid = circ->global_identifier;
msg->evtype = tp;
msg->reason = reason_code;
msg->onehop = circ->build_state->onehop_tunnel;

ocirc_event_publish(&msg);
ocirc_cevent_publish(msg);
return control_event_circuit_status(circ, tp, reason_code);
}

/**
* Helper function to publish a state change message
*
* circuit_set_state() calls this to notify subscribers about a change
* of the state of an origin circuit.
* of the state of an origin circuit. @a circ must be an origin
* circuit.
**/
static void
circuit_state_publish(const circuit_t *circ)
{
ocirc_event_msg_t msg;
ocirc_state_msg_t *msg = tor_malloc(sizeof(*msg));
const origin_circuit_t *ocirc;

if (!CIRCUIT_IS_ORIGIN(circ))
return;
tor_assert(CIRCUIT_IS_ORIGIN(circ));
ocirc = CONST_TO_ORIGIN_CIRCUIT(circ);
/* Only inbound OR circuits can be in this state, not origin circuits. */
tor_assert(circ->state != CIRCUIT_STATE_ONIONSKIN_PENDING);

msg.type = OCIRC_MSGTYPE_STATE;
msg.u.state.gid = ocirc->global_identifier;
msg.u.state.state = circ->state;
msg.u.state.onehop = ocirc->build_state->onehop_tunnel;
msg->gid = ocirc->global_identifier;
msg->state = circ->state;
msg->onehop = ocirc->build_state->onehop_tunnel;

ocirc_event_publish(&msg);
ocirc_state_publish(msg);
}

/** Change the state of <b>circ</b> to <b>state</b>, adding it to or removing
Expand Down Expand Up @@ -565,7 +563,8 @@ circuit_set_state(circuit_t *circ, uint8_t state)
if (state == CIRCUIT_STATE_GUARD_WAIT || state == CIRCUIT_STATE_OPEN)
tor_assert(!circ->n_chan_create_cell);
circ->state = state;
circuit_state_publish(circ);
if (CIRCUIT_IS_ORIGIN(circ))
circuit_state_publish(circ);
}

/** Append to <b>out</b> all circuits in state CHAN_WAIT waiting for
Expand Down
28 changes: 13 additions & 15 deletions src/core/or/connection_or.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 +414,12 @@ void
connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
int reason)
{
orconn_event_msg_t msg;
orconn_status_msg_t *msg = tor_malloc(sizeof(*msg));

msg.type = ORCONN_MSGTYPE_STATUS;
msg.u.status.gid = conn->base_.global_identifier;
msg.u.status.status = tp;
msg.u.status.reason = reason;
orconn_event_publish(&msg);
msg->gid = conn->base_.global_identifier;
msg->status = tp;
msg->reason = reason;
orconn_status_publish(msg);
control_event_or_conn_status(conn, tp, reason);
}

Expand All @@ -433,26 +432,25 @@ connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
static void
connection_or_state_publish(const or_connection_t *conn, uint8_t state)
{
orconn_event_msg_t msg;
orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));

msg.type = ORCONN_MSGTYPE_STATE;
msg.u.state.gid = conn->base_.global_identifier;
msg->gid = conn->base_.global_identifier;
if (conn->is_pt) {
/* Do extra decoding because conn->proxy_type indicates the proxy
* protocol that tor uses to talk with the transport plugin,
* instead of PROXY_PLUGGABLE. */
tor_assert_nonfatal(conn->proxy_type != PROXY_NONE);
msg.u.state.proxy_type = PROXY_PLUGGABLE;
msg->proxy_type = PROXY_PLUGGABLE;
} else {
msg.u.state.proxy_type = conn->proxy_type;
msg->proxy_type = conn->proxy_type;
}
msg.u.state.state = state;
msg->state = state;
if (conn->chan) {
msg.u.state.chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
msg->chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
} else {
msg.u.state.chan = 0;
msg->chan = 0;
}
orconn_event_publish(&msg);
orconn_state_publish(msg);
}

/** Call this to change or_connection_t states, so the owning channel_tls_t can
Expand Down
112 changes: 78 additions & 34 deletions src/core/or/ocirc_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,59 +26,103 @@
#include "core/or/origin_circuit_st.h"
#include "lib/subsys/subsys.h"

/** List of subscribers */
static smartlist_t *ocirc_event_rcvrs;
DECLARE_PUBLISH(ocirc_state);
DECLARE_PUBLISH(ocirc_chan);
DECLARE_PUBLISH(ocirc_cevent);

static void
ocirc_event_free(msg_aux_data_t u)
{
tor_free_(u.ptr);
}

static char *
ocirc_state_fmt(msg_aux_data_t u)
{
ocirc_state_msg_t *msg = (ocirc_state_msg_t *)u.ptr;
char *s = NULL;

tor_asprintf(&s, "<gid=%"PRIu32" state=%d onehop=%d>",
msg->gid, msg->state, msg->onehop);
return s;
}

static char *
ocirc_chan_fmt(msg_aux_data_t u)
{
ocirc_chan_msg_t *msg = (ocirc_chan_msg_t *)u.ptr;
char *s = NULL;

tor_asprintf(&s, "<gid=%"PRIu32" chan=%"PRIu64" onehop=%d>",
msg->gid, msg->chan, msg->onehop);
return s;
}

static char *
ocirc_cevent_fmt(msg_aux_data_t u)
{
ocirc_cevent_msg_t *msg = (ocirc_cevent_msg_t *)u.ptr;
char *s = NULL;

tor_asprintf(&s, "<gid=%"PRIu32" evtype=%d reason=%d onehop=%d>",
msg->gid, msg->evtype, msg->reason, msg->onehop);
return s;
}

static dispatch_typefns_t ocirc_state_fns = {
.free_fn = ocirc_event_free,
.fmt_fn = ocirc_state_fmt,
};

static dispatch_typefns_t ocirc_chan_fns = {
.free_fn = ocirc_event_free,
.fmt_fn = ocirc_chan_fmt,
};

static dispatch_typefns_t ocirc_cevent_fns = {
.free_fn = ocirc_event_free,
.fmt_fn = ocirc_cevent_fmt,
};

/** Initialize subscriber list */
static int
ocirc_event_init(void)
ocirc_add_pubsub(struct pubsub_connector_t *connector)
{
ocirc_event_rcvrs = smartlist_new();
if (DISPATCH_REGISTER_TYPE(connector, ocirc_state, &ocirc_state_fns))
return -1;
if (DISPATCH_REGISTER_TYPE(connector, ocirc_chan, &ocirc_chan_fns))
return -1;
if (DISPATCH_REGISTER_TYPE(connector, ocirc_cevent, &ocirc_cevent_fns))
return -1;
if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_state))
return -1;
if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_chan))
return -1;
if (DISPATCH_ADD_PUB(connector, ocirc, ocirc_cevent))
return -1;
return 0;
}

/** Free subscriber list */
static void
ocirc_event_fini(void)
void
ocirc_state_publish(ocirc_state_msg_t *msg)
{
smartlist_free(ocirc_event_rcvrs);
PUBLISH(ocirc_state, msg);
}

/**
* Subscribe to messages about origin circuit events
*
* Register a callback function to receive messages about origin
* circuits. The publisher calls this function synchronously.
**/
void
ocirc_event_subscribe(ocirc_event_rcvr_t fn)
ocirc_chan_publish(ocirc_chan_msg_t *msg)
{
tor_assert(fn);
/* Don't duplicate subscriptions. */
if (smartlist_contains(ocirc_event_rcvrs, fn))
return;

smartlist_add(ocirc_event_rcvrs, fn);
PUBLISH(ocirc_chan, msg);
}

/**
* Publish a message about OR connection events
*
* This calls the subscriber receiver function synchronously.
**/
void
ocirc_event_publish(const ocirc_event_msg_t *msg)
ocirc_cevent_publish(ocirc_cevent_msg_t *msg)
{
SMARTLIST_FOREACH_BEGIN(ocirc_event_rcvrs, ocirc_event_rcvr_t, fn) {
tor_assert(fn);
(*fn)(msg);
} SMARTLIST_FOREACH_END(fn);
PUBLISH(ocirc_cevent, msg);
}

const subsys_fns_t sys_ocirc_event = {
.name = "ocirc_event",
.supported = true,
.level = -32,
.initialize = ocirc_event_init,
.shutdown = ocirc_event_fini,
.add_pubsub = ocirc_add_pubsub,
};
35 changes: 9 additions & 26 deletions src/core/or/ocirc_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <stdbool.h>

#include "lib/cc/torint.h"
#include "lib/pubsub/pubsub.h"

/** Used to indicate the type of a circuit event passed to the controller.
* The various types are defined in control-spec.txt */
Expand All @@ -30,6 +31,8 @@ typedef struct ocirc_state_msg_t {
bool onehop; /**< one-hop circuit? */
} ocirc_state_msg_t;

DECLARE_MESSAGE(ocirc_state, ocirc_state, ocirc_state_msg_t *);

/**
* Message when a channel gets associated to a circuit.
*
Expand All @@ -44,6 +47,8 @@ typedef struct ocirc_chan_msg_t {
bool onehop; /**< one-hop circuit? */
} ocirc_chan_msg_t;

DECLARE_MESSAGE(ocirc_chan, ocirc_chan, ocirc_chan_msg_t *);

/**
* Message for origin circuit status event
*
Expand All @@ -56,34 +61,12 @@ typedef struct ocirc_cevent_msg_t {
bool onehop; /**< one-hop circuit? */
} ocirc_cevent_msg_t;

/** Discriminant values for origin circuit event message */
typedef enum ocirc_msgtype_t {
OCIRC_MSGTYPE_STATE,
OCIRC_MSGTYPE_CHAN,
OCIRC_MSGTYPE_CEVENT,
} ocirc_msgtype_t;

/** Discriminated union for the actual message */
typedef struct ocirc_event_msg_t {
int type;
union {
ocirc_state_msg_t state;
ocirc_chan_msg_t chan;
ocirc_cevent_msg_t cevent;
} u;
} ocirc_event_msg_t;

/**
* Receiver function pointer for origin circuit subscribers
*
* This function gets called synchronously by the publisher.
**/
typedef void (*ocirc_event_rcvr_t)(const ocirc_event_msg_t *);

void ocirc_event_subscribe(ocirc_event_rcvr_t fn);
DECLARE_MESSAGE(ocirc_cevent, ocirc_cevent, ocirc_cevent_msg_t *);

#ifdef OCIRC_EVENT_PRIVATE
void ocirc_event_publish(const ocirc_event_msg_t *msg);
void ocirc_state_publish(ocirc_state_msg_t *msg);
void ocirc_chan_publish(ocirc_chan_msg_t *msg);
void ocirc_cevent_publish(ocirc_cevent_msg_t *msg);
#endif

#endif /* !defined(TOR_OCIRC_EVENT_H) */
Loading

0 comments on commit f7e8b3b

Please sign in to comment.