Skip to content

Commit

Permalink
Replaced rx.hpp header with rx-lite.hpp.
Browse files Browse the repository at this point in the history
Move also rx headers into implementation files.
  • Loading branch information
geektoni committed Jul 11, 2017
1 parent aef58d8 commit e3dde55
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 51 deletions.
23 changes: 14 additions & 9 deletions src/shogun/base/SGObject.cpp
Expand Up @@ -28,6 +28,9 @@
#include <stdlib.h>
#include <stdio.h>

#include <rxcpp/rx-lite.hpp>
#include <rxcpp/operators/rx-filter.hpp>

#ifdef HAVE_CXX11
#include <unordered_map>
#else
Expand Down Expand Up @@ -151,9 +154,7 @@ namespace shogun
using namespace shogun;

CSGObject::CSGObject()
: self(), m_subject_params(),
m_observable_params(m_subject_params.get_observable()),
m_subscriber_params(m_subject_params.get_subscriber())
: self()
{
init();
set_global_objects();
Expand All @@ -163,10 +164,7 @@ CSGObject::CSGObject()
}

CSGObject::CSGObject(const CSGObject& orig)
: self(), io(orig.io), parallel(orig.parallel), version(orig.version),
m_subject_params(orig.m_subject_params),
m_observable_params(orig.m_observable_params),
m_subscriber_params(orig.m_subscriber_params)
: self(), io(orig.io), parallel(orig.parallel), version(orig.version)
{
init();
set_global_objects();
Expand All @@ -184,6 +182,9 @@ CSGObject::~CSGObject()
delete m_model_selection_parameters;
delete m_gradient_parameters;
delete m_refcount;
delete m_subject_params;
delete m_observable_params;
delete m_subscriber_params;
}

int32_t CSGObject::ref()
Expand Down Expand Up @@ -506,6 +507,10 @@ void CSGObject::init()
m_save_pre_called = false;
m_save_post_called = false;
m_hash = 0;

m_subject_params = new SGSubject();
*m_observable_params = m_subject_params->get_observable();
*m_subscriber_params = m_subject_params->get_subscriber();
}

void CSGObject::print_modsel_params()
Expand Down Expand Up @@ -821,7 +826,7 @@ void CSGObject::subscribe_to_parameters(ParameterObserverInterface* obs)
// parameters selected by the observable.
auto subscription =
m_observable_params
.filter([obs](ParameterObserverInterface::ObservedValue v) {
->filter([obs](ParameterObserverInterface::ObservedValue v) {
return obs->filter(v.second.first);
})
.subscribe(sub);
Expand All @@ -831,5 +836,5 @@ void CSGObject::observe_scalar(
const int64_t step, const std::string& name, const Any& value)
{
auto tmp = std::make_pair(step, std::make_pair(name, value));
m_subscriber_params.on_next(tmp);
m_subscriber_params->on_next(tmp);
}
18 changes: 10 additions & 8 deletions src/shogun/base/SGObject.h
Expand Up @@ -23,8 +23,8 @@
#include <shogun/lib/common.h>
#include <shogun/lib/config.h>
#include <shogun/lib/tag.h>
#include <shogun/lib/RxCppHeader.h>

#include <rxcpp/rx.hpp>
#include <utility>

/** \namespace shogun
Expand Down Expand Up @@ -123,6 +123,11 @@ enum EGradientAvailability
class CSGObject
{
public:

typedef rxcpp::subjects::subject<ParameterObserverInterface::ObservedValue> SGSubject;
typedef rxcpp::observable<ParameterObserverInterface::ObservedValue, rxcpp::dynamic_observable<ParameterObserverInterface::ObservedValue>> SGObservable;
typedef rxcpp::subscriber<ParameterObserverInterface::ObservedValue, rxcpp::observer<ParameterObserverInterface::ObservedValue, void, void, void, void>> SGSubscriber;

/** default constructor */
CSGObject();

Expand Down Expand Up @@ -403,7 +408,7 @@ class CSGObject
* Get parameters observable
* @return RxCpp observable
*/
rxcpp::observable<ParameterObserverInterface::ObservedValue>
SGObservable *
get_parameters_observable()
{
return m_observable_params;
Expand Down Expand Up @@ -604,16 +609,13 @@ class CSGObject
RefCount* m_refcount;

/** Subject used to create the params observer */
rxcpp::subjects::subject<ParameterObserverInterface::ObservedValue>
m_subject_params;
SGSubject * m_subject_params;

/** Parameter Observable */
rxcpp::observable<ParameterObserverInterface::ObservedValue>
m_observable_params;
SGObservable * m_observable_params;

/** Subscriber used to call onNext, onComplete etc.*/
rxcpp::subscriber<ParameterObserverInterface::ObservedValue>
m_subscriber_params;
SGSubscriber * m_subscriber_params;
};
}
#endif // __SGOBJECT_H__
1 change: 0 additions & 1 deletion src/shogun/lib/ParameterObserverInterface.h
Expand Up @@ -39,7 +39,6 @@
#include <utility>
#include <vector>

#include <rxcpp/rx-observable.hpp>
#include <shogun/lib/any.h>

namespace shogun
Expand Down
23 changes: 23 additions & 0 deletions src/shogun/lib/RxCppHeader.h
@@ -0,0 +1,23 @@
#ifndef SHOGUN_RXCPPHEADER_H
#define SHOGUN_RXCPPHEADER_H


/**
* Rx namespace
*/
namespace rxcpp
{
template<class, class, class, class, class> class observer;

namespace subjects
{
template <class> class subject;
}

template <class> class dynamic_observable;
template <class, class> class observable;
template <class, class> class subscriber;
class subscription;
}

#endif // SHOGUN_RXCPPHEADER_H
35 changes: 27 additions & 8 deletions src/shogun/lib/Signal.cpp
Expand Up @@ -12,25 +12,37 @@
#include <csignal>
#include <stdlib.h>

#include <rxcpp/rx.hpp>
#include <shogun/io/SGIO.h>
#include <shogun/lib/Signal.h>
#include <rxcpp/rx-lite.hpp>

using namespace shogun;
using namespace rxcpp;

bool CSignal::m_active = false;
rxcpp::subjects::subject<int> CSignal::m_subject =
rxcpp::subjects::subject<int>();
rxcpp::observable<int> CSignal::m_observable = m_subject.get_observable();
rxcpp::subscriber<int> CSignal::m_subscriber = m_subject.get_subscriber();
CSignal::SGSubjectS * CSignal::m_subject =
new rxcpp::subjects::subject<int>();

CSignal::SGObservableS * CSignal::m_observable;
CSignal::SGSubscriberS * CSignal::m_subscriber;

CSignal::CSignal()
{
// Should prevent memory leak
if (m_observable != NULL || m_subscriber != NULL)
{
delete m_observable;
delete m_subscriber;
}
*(CSignal::m_observable) = m_subject->get_observable();
*(CSignal::m_subscriber) = m_subject->get_subscriber();
}

CSignal::~CSignal()
{
delete m_subject;
delete m_observable;
delete m_subscriber;
}

void CSignal::handler(int signal)
Expand All @@ -53,18 +65,18 @@ void CSignal::handler(int signal)
{
case 'I':
SG_SPRINT("[ShogunSignalHandler] Killing the application...\n");
m_subscriber.on_completed();
m_subscriber->on_completed();
exit(0);
break;
case 'C':
SG_SPRINT(
"[ShogunSignalHandler] Terminating"
" prematurely current algorithm...\n");
m_subscriber.on_next(SG_BLOCK_COMP);
m_subscriber->on_next(SG_BLOCK_COMP);
break;
case 'P':
SG_SPRINT("[ShogunSignalHandler] Pausing current computation...")
m_subscriber.on_next(SG_PAUSE_COMP);
m_subscriber->on_next(SG_PAUSE_COMP);
break;
default:
SG_SPRINT("[ShogunSignalHandler] Continuing...\n")
Expand All @@ -76,3 +88,10 @@ void CSignal::handler(int signal)
SG_SPRINT("[ShogunSignalHandler] Unknown signal %d received\n", signal)
}
}

void CSignal::reset_handler()
{
m_subject = new rxcpp::subjects::subject<int>();
*m_observable = m_subject->get_observable();
*m_subscriber = m_subject->get_subscriber();
}
25 changes: 12 additions & 13 deletions src/shogun/lib/Signal.h
Expand Up @@ -12,7 +12,6 @@
#ifndef __SIGNAL__H_
#define __SIGNAL__H_

#include <rxcpp/rx-includes.hpp>
#include <shogun/base/SGObject.h>

namespace shogun
Expand All @@ -35,9 +34,14 @@ namespace shogun
* option bewteen: immediately exit the running method and fall back to
* the command line, prematurely stop the current algoritmh and do nothing.
*/
class CSignal : public CSGObject
class CSignal : CSGObject
{
public:

typedef rxcpp::subjects::subject<int> SGSubjectS;
typedef rxcpp::observable<int, rxcpp::dynamic_observable<int>> SGObservableS;
typedef rxcpp::subscriber<int, rxcpp::observer<int, void, void, void, void>> SGSubscriberS;

CSignal();
virtual ~CSignal();

Expand All @@ -52,7 +56,7 @@ namespace shogun
* Get observable
* @return RxCpp observable
*/
rxcpp::observable<int> get_observable()
SGObservableS * get_observable()
{
return m_observable;
};
Expand All @@ -63,7 +67,7 @@ namespace shogun
* Get subscriber
* @return RxCpp subscriber
*/
rxcpp::subscriber<int> get_subscriber()
SGSubscriberS * get_subscriber()
{
return m_subscriber;
};
Expand All @@ -78,12 +82,7 @@ namespace shogun
/**
* Reset handler in case of multiple instantiation
*/
static void reset_handler()
{
m_subject = rxcpp::subjects::subject<int>();
m_observable = m_subject.get_observable();
m_subscriber = m_subject.get_subscriber();
}
static void reset_handler();

/** @return object name */
virtual const char* get_name() const { return "Signal"; }
Expand All @@ -93,9 +92,9 @@ namespace shogun
static bool m_active;

/** Observable */
static rxcpp::subjects::subject<int> m_subject;
static rxcpp::observable<int> m_observable;
static rxcpp::subscriber<int> m_subscriber;
static SGSubjectS * m_subject;
static SGObservableS * m_observable;
static SGSubscriberS * m_subscriber;
};
}
#endif // __SIGNAL__H_
2 changes: 1 addition & 1 deletion src/shogun/lib/external/shogun_libsvm.cpp
Expand Up @@ -384,7 +384,7 @@ rxcpp::subscription Solver::connect_to_signal_handler()
this->on_next();
},
[this]() { this->on_complete(); });
return get_global_signal()->get_observable().subscribe(subscriber);
return get_global_signal()->get_observable()->subscribe(subscriber);
}

void Solver::reset_computation_variables()
Expand Down
5 changes: 2 additions & 3 deletions src/shogun/machine/Machine.cpp
Expand Up @@ -12,8 +12,7 @@
#include <shogun/base/init.h>
#include <shogun/lib/Signal.h>
#include <shogun/machine/Machine.h>

#include <rxcpp/rx.hpp>
#include <rxcpp/rx-lite.hpp>

using namespace shogun;

Expand Down Expand Up @@ -289,5 +288,5 @@ rxcpp::subscription CMachine::connect_to_signal_handler()
this->on_next();
},
[this]() { this->on_complete(); });
return get_global_signal()->get_observable().subscribe(subscriber);
return get_global_signal()->get_observable()->subscribe(subscriber);
}
1 change: 0 additions & 1 deletion src/shogun/machine/Machine.h
Expand Up @@ -25,7 +25,6 @@

#include <condition_variable>
#include <mutex>
#include <rxcpp/rx.hpp>

namespace shogun
{
Expand Down
14 changes: 7 additions & 7 deletions tests/unit/lib/Signal_unittest.cc
Expand Up @@ -33,9 +33,9 @@
*
*/
#include <gtest/gtest.h>
#include <rxcpp/rx.hpp>
#include <shogun/lib/Signal.h>

#include <rxcpp/rx-lite.hpp>
#include <csignal>

using namespace shogun;
Expand All @@ -50,8 +50,8 @@ TEST(Signal, return_to_prompt_test)
auto sub = rxcpp::make_subscriber<int>(
[&on_next_v](int v) { on_next_v = 1; }, [&]() { on_complete_v = 1; });

tmp.get_observable().subscribe(sub);
tmp.get_subscriber().on_completed();
tmp.get_observable()->subscribe(sub);
tmp.get_subscriber()->on_completed();

EXPECT_TRUE(on_complete_v == 1);
EXPECT_TRUE(on_next_v == 0);
Expand All @@ -68,8 +68,8 @@ TEST(Signal, prematurely_stop_computation_test)
auto sub = rxcpp::make_subscriber<int>(
[&](int v) { on_next_v++; }, [&]() { on_complete_v++; });

tmp.get_observable().subscribe(sub);
tmp.get_subscriber().on_next(SG_BLOCK_COMP);
tmp.get_observable()->subscribe(sub);
tmp.get_subscriber()->on_next(SG_BLOCK_COMP);

EXPECT_TRUE(on_next_v == 1);
EXPECT_TRUE(on_complete_v == 0);
Expand All @@ -92,8 +92,8 @@ TEST(Signal, pause_computation_test)
},
[&]() { on_complete_v++; });

tmp.get_observable().subscribe(sub);
tmp.get_subscriber().on_next(SG_PAUSE_COMP);
tmp.get_observable()->subscribe(sub);
tmp.get_subscriber()->on_next(SG_PAUSE_COMP);

EXPECT_TRUE(on_next_v == 2);
EXPECT_TRUE(on_complete_v == 0);
Expand Down

0 comments on commit e3dde55

Please sign in to comment.