Skip to content

Commit

Permalink
Merge pull request #68 from jcfr/support-event-loop-integration-appro…
Browse files Browse the repository at this point in the history
…ach1

Support event loop integration approach1
  • Loading branch information
JohanMabille committed Jun 29, 2018
2 parents 654005d + c7eac6b commit a304f3d
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 55 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Expand Up @@ -66,6 +66,7 @@ set(XEUS_HEADERS
${XEUS_INCLUDE_DIR}/xeus/xkernel_configuration.hpp
${XEUS_INCLUDE_DIR}/xeus/xmessage.hpp
${XEUS_INCLUDE_DIR}/xeus/xserver.hpp
${XEUS_INCLUDE_DIR}/xeus/xserver_zmq.hpp
)

set(XEUS_SOURCES
Expand All @@ -87,8 +88,7 @@ set(XEUS_SOURCES
${XEUS_SOURCE_DIR}/xpublisher.cpp
${XEUS_SOURCE_DIR}/xpublisher.hpp
${XEUS_SOURCE_DIR}/xserver.cpp
${XEUS_SOURCE_DIR}/xserver_impl.cpp
${XEUS_SOURCE_DIR}/xserver_impl.hpp
${XEUS_SOURCE_DIR}/xserver_zmq.cpp
${XEUS_SOURCE_DIR}/xstring_utils.hpp
)

Expand Down
9 changes: 9 additions & 0 deletions include/xeus/xkernel.hpp
Expand Up @@ -18,6 +18,7 @@

namespace xeus
{
class xkernel_core;

XEUS_API
std::string get_user_name();
Expand All @@ -27,6 +28,7 @@ namespace xeus
public:

using interpreter_ptr = std::unique_ptr<xinterpreter>;
using kernel_core_ptr = std::unique_ptr<xkernel_core>;
using server_ptr = std::unique_ptr<xserver>;
using server_builder = server_ptr (*)(zmq::context_t& context,
const xconfiguration& config);
Expand All @@ -36,14 +38,21 @@ namespace xeus
interpreter_ptr interpreter,
server_builder builder = make_xserver);

~xkernel();

void start();

private:

xconfiguration m_config;
std::string m_kernel_id;
std::string m_session_id;
std::string m_user_name;
interpreter_ptr p_interpreter;
server_builder m_builder;
server_ptr p_server;
zmq::context_t m_context;
kernel_core_ptr p_core;
};
}

Expand Down
21 changes: 13 additions & 8 deletions src/xserver_impl.hpp → include/xeus/xserver_zmq.hpp
Expand Up @@ -9,25 +9,29 @@
#ifndef XSERVER_IMPL_HPP
#define XSERVER_IMPL_HPP

#include "xeus/xeus.hpp"
#include "xeus/xserver.hpp"
#include "xeus/xkernel_configuration.hpp"
#include "xpublisher.hpp"
#include "xheartbeat.hpp"

namespace xeus
{
class xpublisher;
class xheartbeat;

class xserver_impl : public xserver
class XEUS_API xserver_zmq : public xserver
{

public:

xserver_impl(zmq::context_t& context,
using publisher_ptr = std::unique_ptr<xpublisher>;
using heartbeat_ptr = std::unique_ptr<xheartbeat>;

xserver_zmq(zmq::context_t& context,
const xconfiguration& config);

virtual ~xserver_impl() = default;
virtual ~xserver_zmq();

private:
protected:

void send_shell_impl(zmq::multipart_t& message) override;
void send_control_impl(zmq::multipart_t& message) override;
Expand All @@ -38,6 +42,7 @@ namespace xeus
void abort_queue_impl(const listener& l, long polling_interval) override;
void stop_impl() override;

void poll(long timeout);
void stop_channels();

zmq::socket_t m_shell;
Expand All @@ -46,8 +51,8 @@ namespace xeus
zmq::socket_t m_publisher_pub;
zmq::socket_t m_controller_pub;

xpublisher m_publisher;
xheartbeat m_heartbeat;
publisher_ptr p_publisher;
heartbeat_ptr p_heartbeat;

bool m_request_stop;
};
Expand Down
2 changes: 2 additions & 0 deletions src/xheartbeat.cpp
Expand Up @@ -27,6 +27,8 @@ namespace xeus
m_controller.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}

xheartbeat::~xheartbeat(){}

void xheartbeat::run()
{
zmq::pollitem_t items[] = {
Expand Down
2 changes: 2 additions & 0 deletions src/xheartbeat.hpp
Expand Up @@ -24,6 +24,8 @@ namespace xeus
const std::string& ip,
const std::string& port);

~xheartbeat();

void run();

private:
Expand Down
19 changes: 10 additions & 9 deletions src/xkernel.cpp
Expand Up @@ -66,25 +66,26 @@ namespace xeus
{
}

xkernel::~xkernel(){}

void xkernel::start()
{
std::string kernel_id = new_xguid();
std::string session_id = new_xguid();
m_kernel_id = new_xguid();
m_session_id = new_xguid();

using authentication_ptr = xkernel_core::authentication_ptr;
authentication_ptr auth = make_xauthentication(m_config.m_signature_scheme, m_config.m_key);

zmq::multipart_t start_msg;
build_start_msg(auth, kernel_id, m_user_name, session_id, start_msg);
build_start_msg(auth, m_kernel_id, m_user_name, m_session_id, start_msg);

zmq::context_t context;
server_ptr server = m_builder(context, m_config);
p_server = m_builder(m_context, m_config);

xkernel_core core(kernel_id, m_user_name, session_id,
std::move(auth), server.get(), p_interpreter.get());
p_core = kernel_core_ptr(new xkernel_core(m_kernel_id, m_user_name, m_session_id,
std::move(auth), p_server.get(), p_interpreter.get()));

p_interpreter->configure();
server->start(start_msg);
p_interpreter->configure();
p_server->start(start_msg);
}

}
2 changes: 2 additions & 0 deletions src/xkernel_core.cpp
Expand Up @@ -59,6 +59,8 @@ namespace xeus
p_interpreter->register_comm_manager(&m_comm_manager);
}

xkernel_core::~xkernel_core(){}

void xkernel_core::dispatch_shell(zmq::multipart_t& wire_msg)
{
dispatch(wire_msg, channel::SHELL);
Expand Down
2 changes: 2 additions & 0 deletions src/xkernel_core.hpp
Expand Up @@ -35,6 +35,8 @@ namespace xeus
server_ptr server,
interpreter_ptr p_interpreter);

~xkernel_core();

void dispatch_shell(zmq::multipart_t& wire_msg);
void dispatch_control(zmq::multipart_t& wire_msg);
void dispatch_stdin(zmq::multipart_t& wire_msg);
Expand Down
2 changes: 2 additions & 0 deletions src/xpublisher.cpp
Expand Up @@ -29,6 +29,8 @@ namespace xeus
m_controller.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}

xpublisher::~xpublisher(){}

void xpublisher::run()
{
zmq::pollitem_t items[] = {
Expand Down
2 changes: 2 additions & 0 deletions src/xpublisher.hpp
Expand Up @@ -23,6 +23,8 @@ namespace xeus
const std::string& ip,
const std::string& port);

~xpublisher();

void run();

private:
Expand Down
4 changes: 2 additions & 2 deletions src/xserver.cpp
Expand Up @@ -7,8 +7,8 @@
****************************************************************************/

#include "xeus/xserver.hpp"
#include "xeus/xserver_zmq.hpp"
#include "xeus/make_unique.hpp"
#include "xserver_impl.hpp"

namespace xeus
{
Expand Down Expand Up @@ -80,6 +80,6 @@ namespace xeus
std::unique_ptr<xserver> make_xserver(zmq::context_t& context,
const xconfiguration& config)
{
return ::xeus::make_unique<xserver_impl>(context, config);
return ::xeus::make_unique<xserver_zmq>(context, config);
}
}
77 changes: 43 additions & 34 deletions src/xserver_impl.cpp → src/xserver_zmq.cpp
Expand Up @@ -6,11 +6,13 @@
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/

#include "xserver_impl.hpp"
#include "xeus/xserver_zmq.hpp"
#include <thread>
#include <chrono>
#include "zmq_addon.hpp"
#include "xmiddleware.hpp"
#include "xpublisher.hpp"
#include "xheartbeat.hpp"

namespace xeus
{
Expand All @@ -22,15 +24,15 @@ namespace xeus
socket.bind(end_point);
}

xserver_impl::xserver_impl(zmq::context_t& context,
xserver_zmq::xserver_zmq(zmq::context_t& context,
const xconfiguration& c)
: m_shell(context, zmq::socket_type::router),
m_controller(context, zmq::socket_type::router),
m_stdin(context, zmq::socket_type::router),
m_publisher_pub(context, zmq::socket_type::pub),
m_controller_pub(context, zmq::socket_type::pub),
m_publisher(context, c.m_transport, c.m_ip, c.m_iopub_port),
m_heartbeat(context, c.m_transport, c.m_ip, c.m_hb_port),
p_publisher(new xpublisher(context, c.m_transport, c.m_ip, c.m_iopub_port)),
p_heartbeat(new xheartbeat(context, c.m_transport, c.m_ip, c.m_hb_port)),
m_request_stop(false)
{
init_socket(m_shell, get_end_point(c.m_transport, c.m_ip, c.m_shell_port));
Expand All @@ -40,71 +42,78 @@ namespace xeus
init_socket(m_controller_pub, get_controller_end_point());
}

void xserver_impl::send_shell_impl(zmq::multipart_t& message)
xserver_zmq::~xserver_zmq(){}

void xserver_zmq::send_shell_impl(zmq::multipart_t& message)
{
message.send(m_shell);
}

void xserver_impl::send_control_impl(zmq::multipart_t& message)
void xserver_zmq::send_control_impl(zmq::multipart_t& message)
{
message.send(m_controller);
}

void xserver_impl::send_stdin_impl(zmq::multipart_t& message)
void xserver_zmq::send_stdin_impl(zmq::multipart_t& message)
{
message.send(m_stdin);
zmq::multipart_t wire_msg;
wire_msg.recv(m_stdin);
xserver::notify_stdin_listener(wire_msg);
}

void xserver_impl::publish_impl(zmq::multipart_t& message)
void xserver_zmq::publish_impl(zmq::multipart_t& message)
{
message.send(m_publisher_pub);
}

void xserver_impl::start_impl(zmq::multipart_t& message)
void xserver_zmq::start_impl(zmq::multipart_t& message)
{
std::thread iopub_thread(&xpublisher::run, &m_publisher);
std::thread iopub_thread(&xpublisher::run, p_publisher.get());
iopub_thread.detach();

std::thread hb_thread(&xheartbeat::run, &m_heartbeat);
std::thread hb_thread(&xheartbeat::run, p_heartbeat.get());
hb_thread.detach();

m_request_stop = false;

zmq::pollitem_t items[] = {
{ m_controller, 0, ZMQ_POLLIN, 0 },
{ m_shell, 0, ZMQ_POLLIN, 0 }
};

publish(message);

while (!m_request_stop)
{
zmq::poll(&items[0], 2, -1);

if (items[0].revents & ZMQ_POLLIN)
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_controller);
xserver::notify_control_listener(wire_msg);
}

if (!m_request_stop && (items[1].revents & ZMQ_POLLIN))
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_shell);
xserver::notify_shell_listener(wire_msg);
}
poll(-1);
}

stop_channels();

std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

void xserver_impl::abort_queue_impl(const listener& l, long polling_interval)
void xserver_zmq::poll(long timeout)
{
zmq::pollitem_t items[] = {
{ m_controller, 0, ZMQ_POLLIN, 0 },
{ m_shell, 0, ZMQ_POLLIN, 0 }
};

zmq::poll(&items[0], 2, std::chrono::milliseconds(timeout));

if (items[0].revents & ZMQ_POLLIN)
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_controller);
xserver::notify_control_listener(wire_msg);
}

if (!m_request_stop && (items[1].revents & ZMQ_POLLIN))
{
zmq::multipart_t wire_msg;
wire_msg.recv(m_shell);
xserver::notify_shell_listener(wire_msg);
}
}

void xserver_zmq::abort_queue_impl(const listener& l, long polling_interval)
{
while (true)
{
Expand All @@ -120,12 +129,12 @@ namespace xeus
}
}

void xserver_impl::stop_impl()
void xserver_zmq::stop_impl()
{
m_request_stop = true;
}

void xserver_impl::stop_channels()
void xserver_zmq::stop_channels()
{
zmq::message_t stop_msg("stop", 4);
m_controller_pub.send(stop_msg);
Expand Down

0 comments on commit a304f3d

Please sign in to comment.