Skip to content

Commit

Permalink
async execute (#392)
Browse files Browse the repository at this point in the history
async execute request

---------

Co-authored-by: Johan Mabille <johan.mabille@gmail.com>
  • Loading branch information
DerThorsten and JohanMabille committed Mar 20, 2024
1 parent 07ce01c commit 3989670
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 110 deletions.
16 changes: 8 additions & 8 deletions docs/source/example/src/custom_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ namespace nl = nlohmann;
namespace custom
{

nl::json custom_interpreter::execute_request_impl(xrequest_context request_context, // data required by other functions
int execution_counter, // Typically the cell number
const std::string& /*code*/, // Code to execute
bool /*silent*/,
bool /*store_history*/,
nl::json /*user_expressions*/,
bool /*allow_stdin*/)
void custom_interpreter::execute_request_impl(xrequest_context request_context,
send_reply_callback cb,
int execution_counter,
const std::string& code,
execute_request_config config,
nl::json user_expressions)
{
// You can use the C-API of your target language for executing the code,
// e.g. `PyRun_String` for the Python C-API
Expand All @@ -45,7 +44,8 @@ namespace custom
// publish_execution_error(error_name, error_value, error_traceback);
publish_execution_error(request_context, "TypeError", "123", {"!@#$", "*(*"});

return xeus::create_successful_reply();
// Call the callback parameter to send the reply
cb(xeus::create_successful_reply());
}

void custom_interpreter::configure_impl()
Expand Down
13 changes: 6 additions & 7 deletions docs/source/example/src/custom_interpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ namespace custom

void configure_impl() override;

nl::json execute_request_impl(xrequest_context request_context,
int execution_counter,
const std::string& code,
bool silent,
bool store_history,
nl::json user_expressions,
bool allow_stdin) override;
void execute_request_impl(xrequest_context request_context,
send_reply_callback cb,
int execution_counter,
const std::string& code,
execute_request_config config,
nl::json user_expressions) override;

nl::json complete_request_impl(const std::string& code,
int cursor_pos) override;
Expand Down
2 changes: 1 addition & 1 deletion docs/source/kernel_implementation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ course the ``execute_request_impl`` which executes the code whenever the client
.. literalinclude:: ./example/src/custom_interpreter.cpp
:language: cpp
:dedent: 4
:lines: 22-48
:lines: 22-49

The result and arguments of the execution request are described in the execute_request_ documentation.

Expand Down
32 changes: 19 additions & 13 deletions include/xeus/xinterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ namespace xeus
XEUS_API bool register_interpreter(xinterpreter* interpreter);
XEUS_API xinterpreter& get_interpreter();

struct XEUS_API execute_request_config
{
bool silent;
bool store_history;
bool allow_stdin;
};

class XEUS_API xinterpreter
{
public:
Expand All @@ -42,12 +49,12 @@ namespace xeus

void configure();

nl::json execute_request(xrequest_context context,
const std::string& code,
bool silent,
bool store_history,
nl::json user_expressions,
bool allow_stdin);
using send_reply_callback = std::function<void(nl::json)>;
void execute_request(xrequest_context context,
send_reply_callback callback,
const std::string& code,
execute_request_config config,
nl::json user_expressions);

nl::json complete_request(const std::string& code, int cursor_pos);

Expand Down Expand Up @@ -101,13 +108,12 @@ namespace xeus

virtual void configure_impl() = 0;

virtual nl::json execute_request_impl(xrequest_context request_context,
int execution_counter,
const std::string& code,
bool silent,
bool store_history,
nl::json user_expressions,
bool allow_stdin) = 0;
virtual void execute_request_impl(xrequest_context request_context,
send_reply_callback cb,
int execution_counter,
const std::string& code,
execute_request_config config,
nl::json user_expressions) = 0;

virtual nl::json complete_request_impl(const std::string& code,
int cursor_pos) = 0;
Expand Down
35 changes: 21 additions & 14 deletions src/xinterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,34 @@ namespace xeus
configure_impl();
}

nl::json xinterpreter::execute_request(xrequest_context context,
const std::string& code,
bool silent,
bool store_history,
nl::json user_expressions,
bool allow_stdin)
{
if (!silent)
void xinterpreter::execute_request(xrequest_context context,
send_reply_callback callback,
const std::string& code,
execute_request_config config,
nl::json user_expressions)
{
if (!config.silent)
{
++m_execution_count;
publish_execution_input(context, code, m_execution_count);
}
// copy m_execution_count in a local variable to capture it in the lambda
auto execution_count = m_execution_count;

auto callback_impl = [execution_count, callback = std::move(callback)](nl::json reply)
{
reply["execution_count"] = execution_count;
callback(std::move(reply));
};

nl::json reply = execute_request_impl(
execute_request_impl(
std::move(context),
m_execution_count, code, silent,
store_history, user_expressions, allow_stdin
std::move(callback_impl),
m_execution_count,
code,
std::move(config),
user_expressions
);

reply["execution_count"] = m_execution_count;
return reply;
}

nl::json xinterpreter::complete_request(const std::string& code, int cursor_pos)
Expand Down
101 changes: 60 additions & 41 deletions src/xkernel_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ namespace xeus
, p_history_manager(history_manager)
, p_debugger(debugger)
{
// Request handlers
m_handler["execute_request"] = &xkernel_core::execute_request;
m_handler["complete_request"] = &xkernel_core::complete_request;
m_handler["inspect_request"] = &xkernel_core::inspect_request;
m_handler["history_request"] = &xkernel_core::history_request;
m_handler["is_complete_request"] = &xkernel_core::is_complete_request;
m_handler["comm_info_request"] = &xkernel_core::comm_info_request;
m_handler["comm_open"] = &xkernel_core::comm_open;
m_handler["comm_close"] = &xkernel_core::comm_close;
m_handler["comm_msg"] = &xkernel_core::comm_msg;
m_handler["kernel_info_request"] = &xkernel_core::kernel_info_request;
m_handler["shutdown_request"] = &xkernel_core::shutdown_request;
m_handler["interrupt_request"] = &xkernel_core::interrupt_request;
m_handler["debug_request"] = &xkernel_core::debug_request;
// Request handlers (all but execute_request are blocking)
m_handler["execute_request"] = handler_type{&xkernel_core::execute_request, /*blocking*/ false};
m_handler["complete_request"] = handler_type{&xkernel_core::complete_request, true};
m_handler["inspect_request"] = handler_type{&xkernel_core::inspect_request, true};
m_handler["history_request"] = handler_type{&xkernel_core::history_request, true};
m_handler["is_complete_request"] = handler_type{&xkernel_core::is_complete_request, true};
m_handler["comm_info_request"] = handler_type{&xkernel_core::comm_info_request, true};
m_handler["comm_open"] = handler_type{&xkernel_core::comm_open, true};
m_handler["comm_close"] = handler_type{&xkernel_core::comm_close, true};
m_handler["comm_msg"] = handler_type{&xkernel_core::comm_msg, true};
m_handler["kernel_info_request"] = handler_type{&xkernel_core::kernel_info_request, true};
m_handler["shutdown_request"] = handler_type{&xkernel_core::shutdown_request, true};
m_handler["interrupt_request"] = handler_type{&xkernel_core::interrupt_request, true};
m_handler["debug_request"] = handler_type{&xkernel_core::debug_request, true};

// Server bindings
p_server->register_shell_listener(std::bind(&xkernel_core::dispatch_shell, this, _1));
Expand Down Expand Up @@ -195,7 +195,7 @@ namespace xeus

std::string msg_type = header.value("msg_type", "");
handler_type handler = get_handler(msg_type);
if (handler == nullptr)
if (handler.fptr == nullptr)
{
std::cerr << "ERROR: received unknown message" << std::endl;
std::cerr << "Message type: " << msg_type << std::endl;
Expand All @@ -204,22 +204,25 @@ namespace xeus
{
try
{
(this->*handler)(std::move(msg), c);
(this->*(handler.fptr))(std::move(msg), c);
}
catch (std::exception& e)
{
std::cerr << "ERROR: received bad message: " << e.what() << std::endl;
std::cerr << "Message type: " << msg_type << std::endl;
}
}

publish_status(header, "idle", c);
// async handlers need to set the idle status themselves
if(handler.blocking)
{
publish_status(header, "idle", c);
}
}

auto xkernel_core::get_handler(const std::string& msg_type) -> handler_type
{
auto iter = m_handler.find(msg_type);
handler_type res = (iter == m_handler.end()) ? nullptr : iter->second;
handler_type res = (iter == m_handler.end()) ? handler_type{nullptr} : iter->second;
return res;
}

Expand All @@ -236,30 +239,46 @@ namespace xeus
bool allow_stdin = content.value("allow_stdin", true);
bool stop_on_error = content.value("stop_on_error", false);

nl::json metadata = get_metadata();
xrequest_context request_context(request.header(), c, request.identities());
nl::json reply = p_interpreter->execute_request(std::move(request_context),
code, silent, store_history, std::move(user_expression), allow_stdin);
int execution_count = reply.value("execution_count", 1);
std::string status = reply.value("status", "error");
send_reply(
request.identities(),
"execute_reply",
request.header(),
std::move(metadata),
std::move(reply),
c);

if (!silent && store_history)
execute_request_config config { silent, store_history, allow_stdin };
int execution_count = 1;
std::string status;
auto reply_callback = [&](nl::json reply)
{
p_history_manager->store_inputs(0, execution_count, code);
}

if (!silent && status == "error" && stop_on_error)
{
constexpr long polling_interval = 50;
p_server->abort_queue(std::bind(&xkernel_core::abort_request, this, _1), polling_interval);
}
execution_count = reply.value("execution_count", 1);
status = reply.value("status", "error");
nl::json metadata = get_metadata();

send_reply(
request.identities(),
"execute_reply",
request.header(),
std::move(metadata),
std::move(reply),
c
);

if (!silent && store_history)
{
p_history_manager->store_inputs(0, execution_count, code);
}
if (!silent && status == "error" && stop_on_error)
{
constexpr long polling_interval = 50;
p_server->abort_queue(std::bind(&xkernel_core::abort_request, this, _1), polling_interval);
}

// idle
publish_status("idle", request_context.header(), request_context.origin());
};

p_interpreter->execute_request(
std::move(request_context),
std::move(reply_callback),
code,
config,
std::move(user_expression)
);
}
catch (std::exception& e)
{
Expand Down
7 changes: 6 additions & 1 deletion src/xkernel_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ namespace xeus

private:

using handler_type = void (xkernel_core::*)(xmessage, channel);
using handler_fptr_type = void (xkernel_core::*)(xmessage, channel);

struct handler_type{
handler_fptr_type fptr = nullptr;
bool blocking = true;
};


void dispatch(xmessage msg, channel c);
Expand Down
15 changes: 7 additions & 8 deletions src/xmock_interpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@ namespace xeus
{
}

nl::json execute_request_impl(xrequest_context /*request_context*/,
int /*execution_counter*/,
const std::string& /*code*/,
bool /*silent*/,
bool /*store_history*/,
nl::json /*user_expressions*/,
bool /*allow_stdin*/) override
void execute_request_impl(xrequest_context /*request_context*/,
send_reply_callback cb,
int /*execution_counter*/,
const std::string& /*code*/,
execute_request_config /*config*/,
nl::json /*user_expressions*/) override
{
return nl::json();
cb(nl::json());
}

nl::json complete_request_impl(const std::string& /*code*/, int /*cursor_pos*/) override
Expand Down
2 changes: 1 addition & 1 deletion src/xrequest_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ namespace xeus
{
return m_id;
}
}
}
17 changes: 8 additions & 9 deletions test/xmock_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ namespace xeus
using function_type = std::function<void(xeus::xcomm&&, const xeus::xmessage&)>;
}

nl::json xmock_interpreter::execute_request_impl(xrequest_context request_context,
int execution_counter,
const std::string& code,
bool /* silent */,
bool /* store_history */,
nl::json /* user_expressions */,
bool /* allow_stdin */)
void xmock_interpreter::execute_request_impl(xrequest_context request_context,
send_reply_callback cb,
int execution_counter,
const std::string& code,
execute_request_config /*config*/,
nl::json /*user_expressions*/)
{
if (code.compare("hello, world") == 0)
{
Expand All @@ -62,14 +61,14 @@ namespace xeus
{"start", 0}
});

return xeus::create_successful_reply(payload);
cb(xeus::create_successful_reply(payload));
}

nl::json pub_data;
pub_data["text/plain"] = code;
publish_execution_result(request_context, execution_counter, std::move(pub_data), nl::json::object());

return xeus::create_successful_reply();
cb(xeus::create_successful_reply());
}

nl::json xmock_interpreter::complete_request_impl(const std::string& /* code */,
Expand Down

0 comments on commit 3989670

Please sign in to comment.