Permalink
Browse files

Cleanup

  • Loading branch information...
1 parent af4bf93 commit db015ca0b1f848e8df47bec15f6dcf2888057266 @ewalshe ewalshe committed Nov 15, 2011
@@ -28,9 +28,6 @@
static void transition_to_initialized(amqp_connection_t *connection);
static void default_state_initialization(amqp_connection_t *connection, const char *new_state_name);
-static void transition_to_failed(amqp_connection_t *connection);
-static void transition_to_sending_header_response(amqp_connection_t *connection);
-static void transition_to_negotiated(amqp_connection_t *connection);
void amqp_connection_amqp_initialize(amqp_connection_t *connection)
{
@@ -42,7 +39,6 @@ static void cleanup_resources(amqp_connection_t *connection)
}
void amqp_connection_amqp_cleanup(amqp_connection_t *connection)
{
-// trace_cleanup();
cleanup_resources(connection);
}
@@ -51,81 +47,29 @@ int amqp_connection_amqp_is_state(const amqp_connection_t *connection, const cha
return connection->state.amqp.name != 0 ? (strcmp(connection->state.amqp.name, state_name) == 0) : false;
}
-static void amqp_done_callback(amqp_connection_t *connection)
+//static
+void amqp_done_callback(amqp_connection_t *connection)
{
connection->state.amqp.done(connection);
}
-static void amqp_version_accepted(amqp_connection_t *connection)
+static void start_while_initialized(amqp_connection_t *connection)
{
- transition_to_negotiated(connection);
- connection->specification_version.supported.amqp = connection->specification_version.required.amqp;
- connection->state.connection.done(connection);
+ not_implemented(todo);
}
-static void amqp_version_rejected(amqp_connection_t *connection, uint32_t version)
+static void wait_while_initialized(amqp_connection_t *connection)
{
- transition_to_failed(connection);
- connection->specification_version.supported.amqp = version;
- connection->state.connection.fail(connection);
-}
-static void amqp_connect_while_initialized(amqp_connection_t *connection)
-{
- connection->state.negotiator.reset(connection);
- connection->state.negotiator.start(connection, connection->specification_version.required.amqp, amqp_version_accepted, amqp_version_rejected);
-}
-
-static void tunnel_accept_while_initialized(amqp_connection_t *connection, uint32_t requested_version)
-{
- uint32_t supported_version;
-
- assert(amqp_version_protocol_id(requested_version) == AMQP_PROTOCOL_ID);
-
- supported_version = amqp_negotiator_choose_amqp_protocol_version(connection, requested_version);
-
- if (requested_version != supported_version)
- {
- transition_to_failed(connection);
- amqp_connection_trace(connection, "requested amqp version: %08x, supported version: %08x", requested_version, supported_version);
- call_action(connection->state.connection.mode.server.reject, connection->context, connection, supported_version);
- return;
- }
-
- transition_to_sending_header_response(connection);
- call_action(connection->state.negotiator.send, connection->context, connection, supported_version, amqp_done_callback);
+ not_implemented(todo);
}
static void transition_to_initialized(amqp_connection_t *connection)
{
default_state_initialization(connection, "Initialized");
- connection->state.amqp.connect = amqp_connect_while_initialized;
- connection->state.amqp.tunnel.accept = tunnel_accept_while_initialized;
- trace_transition("Created");
-}
-static void transition_to_failed(amqp_connection_t *connection)
-{
- save_old_state();
- default_state_initialization(connection, "Failed");
- trace_transition(old_state_name);
+ connection->state.amqp.start = start_while_initialized;
+ connection->state.amqp.wait = wait_while_initialized;
+ trace_transition("Initialized");
}
-static void done_while_sending_header_response(amqp_connection_t *connection)
-{
- transition_to_negotiated(connection);
- call_action(connection->state.connection.done, connection->context, connection);
-}
-static void transition_to_sending_header_response(amqp_connection_t *connection)
-{
- save_old_state();
- default_state_initialization(connection, "SendingHeader");
- connection->state.amqp.done = done_while_sending_header_response;
- trace_transition(old_state_name);
-}
-static void transition_to_negotiated(amqp_connection_t *connection)
-{
- save_old_state();
- default_state_initialization(connection, "Negotiated");
- trace_transition(old_state_name);
-}
/**********************************************
Default states
@@ -138,26 +82,26 @@ static void illegal_state(amqp_connection_t *connection, const char *event)
amqp_fatal_program_error("Connection amqp state error");
}
-static void default_connect(amqp_connection_t *connection)
+static void default_start(amqp_connection_t *connection)
{
illegal_state(connection, "Start");
}
-static void default_done(amqp_connection_t *connection)
+static void default_wait(amqp_connection_t *connection)
{
- illegal_state(connection, "Done");
+ illegal_state(connection, "Wait");
}
-static void default_tunnel_establish(amqp_connection_t *connection, uint32_t version)
+static void default_done(amqp_connection_t *connection)
{
- illegal_state(connection, "TunnelEstablish");
+ illegal_state(connection, "Done");
}
static void default_state_initialization(amqp_connection_t *connection, const char *new_state_name)
{
- connection->state.amqp.connect = default_connect;
+ connection->state.amqp.start = default_start;
+ connection->state.amqp.wait = default_wait;
connection->state.amqp.done = default_done;
- connection->state.amqp.tunnel.accept = default_tunnel_establish;
connection->state.amqp.name = new_state_name;
}
@@ -0,0 +1,163 @@
+/*
+ Copyright 2011 StormMQ Limited
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "Context/Context.h"
+#include "Transport/Connection/Connection.h"
+#include "Transport/Connection/ConnectionTrace.h"
+
+#ifdef LIBAMQP_TRACE_CONNECT_STATE
+#define save_old_state() const char* old_state_name = connection->state.amqp_tunnel.name
+#define trace_transition(old_state_name) amqp_connection_trace_transition(connection, AMQP_TRACE_CONNECTION_AMQP, old_state_name, connection->state.amqp_tunnel.name)
+#else
+#define save_old_state()
+#define trace_transition(old_state_name)
+#endif
+
+static void transition_to_initialized(amqp_connection_t *connection);
+static void default_state_initialization(amqp_connection_t *connection, const char *new_state_name);
+static void transition_to_failed(amqp_connection_t *connection);
+static void transition_to_sending_header_response(amqp_connection_t *connection);
+static void transition_to_negotiated(amqp_connection_t *connection);
+
+void amqp_connection_amqp_tunnel_initialize(amqp_connection_t *connection)
+{
+ transition_to_initialized(connection);
+}
+
+static void cleanup_resources(amqp_connection_t *connection)
+{
+}
+void amqp_connection_amqp_tunnel_cleanup(amqp_connection_t *connection)
+{
+// trace_cleanup();
+ cleanup_resources(connection);
+}
+
+int amqp_connection_amqp_tunnel_is_state(const amqp_connection_t *connection, const char *state_name)
+{
+ return connection->state.amqp_tunnel.name != 0 ? (strcmp(connection->state.amqp_tunnel.name, state_name) == 0) : false;
+}
+
+static void amqp_done_callback(amqp_connection_t *connection)
+{
+ connection->state.amqp_tunnel.done(connection);
+}
+
+static void amqp_version_accepted(amqp_connection_t *connection)
+{
+ transition_to_negotiated(connection);
+ connection->specification_version.supported.amqp = connection->specification_version.required.amqp;
+ connection->state.connection.done(connection);
+}
+static void amqp_version_rejected(amqp_connection_t *connection, uint32_t version)
+{
+ transition_to_failed(connection);
+ connection->specification_version.supported.amqp = version;
+ connection->state.connection.fail(connection);
+}
+static void amqp_connect_while_initialized(amqp_connection_t *connection)
+{
+ connection->state.negotiator.reset(connection);
+ connection->state.negotiator.start(connection, connection->specification_version.required.amqp, amqp_version_accepted, amqp_version_rejected);
+}
+
+static void tunnel_accept_while_initialized(amqp_connection_t *connection, uint32_t requested_version)
+{
+ uint32_t supported_version;
+
+ assert(amqp_version_protocol_id(requested_version) == AMQP_PROTOCOL_ID);
+
+ supported_version = amqp_negotiator_choose_amqp_protocol_version(connection, requested_version);
+
+ if (requested_version != supported_version)
+ {
+ transition_to_failed(connection);
+ amqp_connection_trace(connection, "requested amqp version: %08x, supported version: %08x", requested_version, supported_version);
+ call_action(connection->state.connection.mode.server.reject, connection->context, connection, supported_version);
+ return;
+ }
+
+ transition_to_sending_header_response(connection);
+ call_action(connection->state.negotiator.send, connection->context, connection, supported_version, amqp_done_callback);
+}
+static void transition_to_initialized(amqp_connection_t *connection)
+{
+ default_state_initialization(connection, "Initialized");
+ connection->state.amqp_tunnel.connect = amqp_connect_while_initialized;
+ connection->state.amqp_tunnel.tunnel.accept = tunnel_accept_while_initialized;
+ trace_transition("Created");
+}
+static void transition_to_failed(amqp_connection_t *connection)
+{
+ save_old_state();
+ default_state_initialization(connection, "Failed");
+ trace_transition(old_state_name);
+}
+
+static void done_while_sending_header_response(amqp_connection_t *connection)
+{
+ transition_to_negotiated(connection);
+ call_action(connection->state.connection.done, connection->context, connection);
+}
+static void transition_to_sending_header_response(amqp_connection_t *connection)
+{
+ save_old_state();
+ default_state_initialization(connection, "SendingHeader");
+ connection->state.amqp_tunnel.done = done_while_sending_header_response;
+ trace_transition(old_state_name);
+}
+
+static void transition_to_negotiated(amqp_connection_t *connection)
+{
+ save_old_state();
+ default_state_initialization(connection, "Negotiated");
+ trace_transition(old_state_name);
+}
+
+/**********************************************
+ Default states
+ *********************************************/
+static void illegal_state(amqp_connection_t *connection, const char *event)
+{
+ amqp_connection_error(connection, AMQP_ERROR_ILLEGAL_STATE,
+ "Connection amqp tunnel does not support \"%s\" while \"%s\" and connection is \"%s\".",
+ event, connection->state.amqp_tunnel.name, connection->state.connection.name);
+ amqp_fatal_program_error("Connection amqp tunnel state error");
+}
+
+static void default_connect(amqp_connection_t *connection)
+{
+ illegal_state(connection, "Start");
+}
+
+static void default_done(amqp_connection_t *connection)
+{
+ illegal_state(connection, "Done");
+}
+
+static void default_tunnel_establish(amqp_connection_t *connection, uint32_t version)
+{
+ illegal_state(connection, "TunnelEstablish");
+}
+
+static void default_state_initialization(amqp_connection_t *connection, const char *new_state_name)
+{
+ connection->state.amqp_tunnel.connect = default_connect;
+ connection->state.amqp_tunnel.done = default_done;
+
+ connection->state.amqp_tunnel.tunnel.accept = default_tunnel_establish;
+ connection->state.amqp_tunnel.name = new_state_name;
+}
@@ -0,0 +1,43 @@
+/*
+ Copyright 2011 StormMQ Limited
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#ifndef LIBAMQP_TRANSPORT_CONNECTION_CONNECTION_AMQP_TUNNEL_H
+#define LIBAMQP_TRANSPORT_CONNECTION_CONNECTION_AMQP_TUNNEL_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "libamqp_common.h"
+
+#ifndef LIBAMQP_AMQP_CONTEXT_TYPE_T
+#define LIBAMQP_AMQP_CONTEXT_TYPE_T
+typedef struct amqp_context_t amqp_context_t;
+#endif
+
+#ifndef LIBAMQP_AMQP_CONNECTION_TYPE_T
+#define LIBAMQP_AMQP_CONNECTION_TYPE_T
+typedef struct amqp_connection_t amqp_connection_t;
+#endif
+
+extern void amqp_connection_amqp_tunnel_initialize(amqp_connection_t *connection);
+extern void amqp_connection_amqp_tunnel_cleanup(amqp_connection_t *connection);
+extern int amqp_connection_amqp_tunnel_is_state(const amqp_connection_t *connection, const char *state_name);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
@@ -398,11 +398,35 @@ static void default_sasl_mechanisms(amqp_connection_t *connection, amqp_frame_t
illegal_state(connection, "SaslMechanisms");
}
+static void default_sasl_init(amqp_connection_t *connection, amqp_frame_t *frame)
+{
+ illegal_state(connection, "SaslInit");
+}
+
+static void default_sasl_challenge(amqp_connection_t *connection, amqp_frame_t *frame)
+{
+ illegal_state(connection, "SaslChallenge");
+}
+
+static void default_sasl_challenge_response(amqp_connection_t *connection, amqp_frame_t *frame)
+{
+ illegal_state(connection, "SaslChallengeResponse");
+}
+
+static void default_sasl_outcome(amqp_connection_t *connection, amqp_frame_t *frame)
+{
+ illegal_state(connection, "SaslOutcome");
+}
+
static void default_state_initialization(amqp_connection_t *connection, const char *new_state_name)
{
connection->state.sasl.connect = default_connect;
connection->state.sasl.done = default_done;
connection->state.sasl.tunnel.accept = default_tunnel_establish;
connection->state.sasl.messages.mechanisms = default_sasl_mechanisms;
+ connection->state.sasl.messages.init = default_sasl_init;
+ connection->state.sasl.messages.challenge = default_sasl_challenge;
+ connection->state.sasl.messages.response = default_sasl_challenge_response;
+ connection->state.sasl.messages.outcome = default_sasl_outcome;
connection->state.sasl.name = new_state_name;
}
Oops, something went wrong.

0 comments on commit db015ca

Please sign in to comment.