Skip to content

Commit

Permalink
[PrematureStopping] Replace observables with rxcpp::subject::subject.
Browse files Browse the repository at this point in the history
Other:
* Add algorithm pausing option and its unit tests.
* Fixed also a bug related to selecting an option from the command line.
* Add reset_handler() to reset the handler's objects;
* Guarded Signal getter methods to prevent SWIG translation;
  • Loading branch information
geektoni committed Jun 22, 2017
1 parent 5a8e82b commit 56be92d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 48 deletions.
45 changes: 18 additions & 27 deletions src/shogun/lib/Signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ using namespace shogun;
using namespace rxcpp;

bool CSignal::m_active = false;

rxcpp::connectable_observable<int> CSignal::m_sigint_observable =
rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
s.on_completed();
}).publish();

rxcpp::connectable_observable<int> CSignal::m_sigurg_observable =
rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
s.on_next(1);
}).publish();
rxcpp::subjects::subject<int> CSignal::m_subject =
rxcpp::subjects::subject<int>();
rxcpp::observable<int> CSignal::m_observable = m_subject.get_observable();
rxcpp::subscriber<int> CSignal::m_subscriber = m_subject.get_subscriber();

CSignal::CSignal()
{
Expand All @@ -38,16 +32,6 @@ CSignal::~CSignal()
{
}

rxcpp::connectable_observable<int> CSignal::get_SIGINT_observable()
{
return m_sigint_observable;
}

rxcpp::connectable_observable<int> CSignal::get_SIGURG_observable()
{
return m_sigurg_observable;
}

void CSignal::handler(int signal)
{
/* If the handler is not enabled, then return */
Expand All @@ -57,22 +41,29 @@ void CSignal::handler(int signal)
if (signal == SIGINT)
{
SG_SPRINT(
"\n[ShogunSignalHandler] Immediately return to prompt / "
"Prematurely finish "
"computations / Do nothing (I/P/D)? ")
char answer = fgetc(stdin);
"\n[ShogunSignalHandler] "
"Immediately return to prompt / "
"Prematurely finish computations / "
"Pause current computation / "
"Do nothing (I/C/P/D)? ")
char answer = getchar();
getchar();
switch (answer)
{
case 'I':
SG_SPRINT("[ShogunSignalHandler] Killing the application...\n");
m_sigint_observable.connect();
m_subscriber.on_completed();
exit(0);
break;
case 'P':
case 'C':
SG_SPRINT(
"[ShogunSignalHandler] Terminating"
" prematurely current algorithm...\n");
m_sigurg_observable.connect();
m_subscriber.on_next(SG_BLOCK_COMP);
break;
case 'P':
SG_SPRINT("[ShogunSignalHandler] Pausing current computation...")
m_subscriber.on_next(SG_PAUSE_COMP);
break;
default:
SG_SPRINT("[ShogunSignalHandler] Continuing...\n")
Expand Down
57 changes: 42 additions & 15 deletions src/shogun/lib/Signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@

namespace shogun
{
/**
* Possible Shogun signal types.
*/
enum sg_signals_types
{
SG_BLOCK_COMP,
SG_PAUSE_COMP
};

/** @brief Class Signal implements signal handling to e.g. allow CTRL+C to
* cancel a long running process.
*
Expand All @@ -37,17 +46,27 @@ namespace shogun
*/
static void handler(int signal);

/**
* Get SIGINT observable
* @return observable
*/
rxcpp::connectable_observable<int> get_SIGINT_observable();
#ifndef SWIG // SWIG should skip this part
/**
* Get observable
* @return RxCpp observable
*/
rxcpp::observable<int> get_observable()
{
return m_observable;
};
#endif

/**
* Get SIGURG observable
* @ return observable
*/
rxcpp::connectable_observable<int> get_SIGURG_observable();
#ifndef SWIG // SWIG should skip this part
/**
* Get subscriber
* @return RxCpp subscriber
*/
rxcpp::subscriber<int> get_subscriber()
{
return m_subscriber;
};
#endif

/** Cancel computations
*
Expand All @@ -64,6 +83,15 @@ namespace shogun
{
m_active = true;
}
/**
* Reset handler in case of multiple instantiation
*/
static void reset_handler()
{
m_subject = rxcpp::subjects::subject<int>();
m_observable = m_subject.get_observable();
m_subscriber = m_subject.get_subscriber();
}

/** @return object name */
virtual const char* get_name() const { return "Signal"; }
Expand All @@ -72,11 +100,10 @@ namespace shogun
/** Active signal */
static bool m_active;

/** SIGINT Observable */
static rxcpp::connectable_observable<int> m_sigint_observable;

/** SIGURG Observable */
static rxcpp::connectable_observable<int> m_sigurg_observable;
/** Observable */
static rxcpp::subjects::subject<int> m_subject;
static rxcpp::observable<int> m_observable;
static rxcpp::subscriber<int> m_subscriber;
};
}
#endif // __SIGNAL__H_
38 changes: 32 additions & 6 deletions tests/unit/lib/Signal_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using namespace shogun;
using namespace rxcpp;

TEST(Signal, SIGINT_test)
TEST(Signal, return_to_prompt_test)
{
CSignal tmp;
tmp.enable_handler();
Expand All @@ -16,14 +16,15 @@ TEST(Signal, SIGINT_test)
auto sub = rxcpp::make_subscriber<int>(
[&on_next_v](int v) { on_next_v = 1; }, [&]() { on_complete_v = 1; });

tmp.get_SIGINT_observable().subscribe(sub);
tmp.get_SIGINT_observable().connect();
tmp.get_observable().subscribe(sub);
tmp.get_subscriber().on_completed();

EXPECT_TRUE(on_complete_v == 1);
EXPECT_TRUE(on_next_v == 0);
CSignal::reset_handler();
}

TEST(Signal, SIGURG_test)
TEST(Signal, prematurely_stop_computation_test)
{

CSignal tmp;
Expand All @@ -33,9 +34,34 @@ TEST(Signal, SIGURG_test)
auto sub = rxcpp::make_subscriber<int>(
[&](int v) { on_next_v++; }, [&]() { on_complete_v++; });

tmp.get_SIGURG_observable().subscribe(sub);
tmp.get_SIGURG_observable().connect();
tmp.get_observable().subscribe(sub);
tmp.get_subscriber().on_next(SG_BLOCK_COMP);

EXPECT_TRUE(on_next_v == 1);
EXPECT_TRUE(on_complete_v == 0);
CSignal::reset_handler();
}

TEST(Signal, pause_computation_test)
{

CSignal tmp;
tmp.enable_handler();
int on_next_v = 0;
int on_complete_v = 0;
auto sub = rxcpp::make_subscriber<int>(
[&](int v) {
if (v == SG_PAUSE_COMP)
on_next_v += 2;
else
on_next_v++;
},
[&]() { on_complete_v++; });

tmp.get_observable().subscribe(sub);
tmp.get_subscriber().on_next(SG_PAUSE_COMP);

EXPECT_TRUE(on_next_v == 2);
EXPECT_TRUE(on_complete_v == 0);
CSignal::reset_handler();
}

0 comments on commit 56be92d

Please sign in to comment.