Skip to content

Commit

Permalink
Merge pull request #3919 from geektoni/rxcpp_lite
Browse files Browse the repository at this point in the history
Replaced rx.hpp headers with rx-lite.hpp.
  • Loading branch information
vigsterkr committed Jul 11, 2017
2 parents aef58d8 + 4ad2814 commit 9be1025
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 53 deletions.
24 changes: 14 additions & 10 deletions src/shogun/base/SGObject.cpp
Expand Up @@ -28,6 +28,9 @@
#include <stdlib.h>
#include <stdio.h>

#include <rxcpp/operators/rx-filter.hpp>
#include <rxcpp/rx-lite.hpp>

#ifdef HAVE_CXX11
#include <unordered_map>
#else
Expand Down Expand Up @@ -150,10 +153,7 @@ namespace shogun

using namespace shogun;

CSGObject::CSGObject()
: self(), m_subject_params(),
m_observable_params(m_subject_params.get_observable()),
m_subscriber_params(m_subject_params.get_subscriber())
CSGObject::CSGObject() : self()
{
init();
set_global_objects();
Expand All @@ -163,10 +163,7 @@ CSGObject::CSGObject()
}

CSGObject::CSGObject(const CSGObject& orig)
: self(), io(orig.io), parallel(orig.parallel), version(orig.version),
m_subject_params(orig.m_subject_params),
m_observable_params(orig.m_observable_params),
m_subscriber_params(orig.m_subscriber_params)
: self(), io(orig.io), parallel(orig.parallel), version(orig.version)
{
init();
set_global_objects();
Expand All @@ -184,6 +181,9 @@ CSGObject::~CSGObject()
delete m_model_selection_parameters;
delete m_gradient_parameters;
delete m_refcount;
delete m_subject_params;
delete m_observable_params;
delete m_subscriber_params;
}

int32_t CSGObject::ref()
Expand Down Expand Up @@ -506,6 +506,10 @@ void CSGObject::init()
m_save_pre_called = false;
m_save_post_called = false;
m_hash = 0;

m_subject_params = new SGSubject();
m_observable_params = new SGObservable(m_subject_params->get_observable());
m_subscriber_params = new SGSubscriber(m_subject_params->get_subscriber());
}

void CSGObject::print_modsel_params()
Expand Down Expand Up @@ -821,7 +825,7 @@ void CSGObject::subscribe_to_parameters(ParameterObserverInterface* obs)
// parameters selected by the observable.
auto subscription =
m_observable_params
.filter([obs](ParameterObserverInterface::ObservedValue v) {
->filter([obs](ParameterObserverInterface::ObservedValue v) {
return obs->filter(v.second.first);
})
.subscribe(sub);
Expand All @@ -831,5 +835,5 @@ void CSGObject::observe_scalar(
const int64_t step, const std::string& name, const Any& value)
{
auto tmp = std::make_pair(step, std::make_pair(name, value));
m_subscriber_params.on_next(tmp);
m_subscriber_params->on_next(tmp);
}
26 changes: 17 additions & 9 deletions src/shogun/base/SGObject.h
Expand Up @@ -18,13 +18,13 @@
#include <shogun/io/SGIO.h>
#include <shogun/lib/DataType.h>
#include <shogun/lib/ParameterObserverInterface.h>
#include <shogun/lib/RxCppHeader.h>
#include <shogun/lib/ShogunException.h>
#include <shogun/lib/any.h>
#include <shogun/lib/common.h>
#include <shogun/lib/config.h>
#include <shogun/lib/tag.h>

#include <rxcpp/rx.hpp>
#include <utility>

/** \namespace shogun
Expand Down Expand Up @@ -123,6 +123,18 @@ enum EGradientAvailability
class CSGObject
{
public:
typedef rxcpp::subjects::subject<ParameterObserverInterface::ObservedValue>
SGSubject;
typedef rxcpp::observable<
ParameterObserverInterface::ObservedValue,
rxcpp::dynamic_observable<ParameterObserverInterface::ObservedValue>>
SGObservable;
typedef rxcpp::subscriber<
ParameterObserverInterface::ObservedValue,
rxcpp::observer<ParameterObserverInterface::ObservedValue, void, void,
void, void>>
SGSubscriber;

/** default constructor */
CSGObject();

Expand Down Expand Up @@ -403,8 +415,7 @@ class CSGObject
* Get parameters observable
* @return RxCpp observable
*/
rxcpp::observable<ParameterObserverInterface::ObservedValue>
get_parameters_observable()
SGObservable* get_parameters_observable()
{
return m_observable_params;
};
Expand Down Expand Up @@ -604,16 +615,13 @@ class CSGObject
RefCount* m_refcount;

/** Subject used to create the params observer */
rxcpp::subjects::subject<ParameterObserverInterface::ObservedValue>
m_subject_params;
SGSubject* m_subject_params;

/** Parameter Observable */
rxcpp::observable<ParameterObserverInterface::ObservedValue>
m_observable_params;
SGObservable* m_observable_params;

/** Subscriber used to call onNext, onComplete etc.*/
rxcpp::subscriber<ParameterObserverInterface::ObservedValue>
m_subscriber_params;
SGSubscriber* m_subscriber_params;
};
}
#endif // __SGOBJECT_H__
6 changes: 6 additions & 0 deletions src/shogun/base/init.cpp
Expand Up @@ -35,6 +35,8 @@ shogun::CMap<void*, shogun::MemoryBlock>* sg_mallocs=NULL;
#include <google/protobuf/stubs/common.h>
#endif

#include <rxcpp/rx-lite.hpp>

namespace shogun
{
Parallel* sg_parallel=NULL;
Expand Down Expand Up @@ -122,6 +124,10 @@ namespace shogun
SG_UNREF(sg_parallel);
SG_UNREF(sg_io);

delete CSignal::m_subscriber;
delete CSignal::m_observable;
delete CSignal::m_subject;

#ifdef HAVE_PROTOBUF
::google::protobuf::ShutdownProtobufLibrary();
#endif
Expand Down
1 change: 0 additions & 1 deletion src/shogun/lib/ParameterObserverInterface.h
Expand Up @@ -39,7 +39,6 @@
#include <utility>
#include <vector>

#include <rxcpp/rx-observable.hpp>
#include <shogun/lib/any.h>

namespace shogun
Expand Down
27 changes: 27 additions & 0 deletions src/shogun/lib/RxCppHeader.h
@@ -0,0 +1,27 @@
#ifndef SHOGUN_RXCPPHEADER_H
#define SHOGUN_RXCPPHEADER_H

/**
* Rx namespace
*/
namespace rxcpp
{
template <class, class, class, class, class>
class observer;

namespace subjects
{
template <class>
class subject;
}

template <class>
class dynamic_observable;
template <class, class>
class observable;
template <class, class>
class subscriber;
class subscription;
}

#endif // SHOGUN_RXCPPHEADER_H
29 changes: 21 additions & 8 deletions src/shogun/lib/Signal.cpp
Expand Up @@ -12,18 +12,20 @@
#include <csignal>
#include <stdlib.h>

#include <rxcpp/rx.hpp>
#include <rxcpp/rx-lite.hpp>
#include <shogun/io/SGIO.h>
#include <shogun/lib/Signal.h>

using namespace shogun;
using namespace rxcpp;

bool CSignal::m_active = false;
rxcpp::subjects::subject<int> CSignal::m_subject =
rxcpp::subjects::subject<int>();
rxcpp::observable<int> CSignal::m_observable = m_subject.get_observable();
rxcpp::subscriber<int> CSignal::m_subscriber = m_subject.get_subscriber();
CSignal::SGSubjectS* CSignal::m_subject = new rxcpp::subjects::subject<int>();

CSignal::SGObservableS* CSignal::m_observable =
new CSignal::SGObservableS(CSignal::m_subject->get_observable());
CSignal::SGSubscriberS* CSignal::m_subscriber =
new CSignal::SGSubscriberS(CSignal::m_subject->get_subscriber());

CSignal::CSignal()
{
Expand Down Expand Up @@ -53,18 +55,18 @@ void CSignal::handler(int signal)
{
case 'I':
SG_SPRINT("[ShogunSignalHandler] Killing the application...\n");
m_subscriber.on_completed();
m_subscriber->on_completed();
exit(0);
break;
case 'C':
SG_SPRINT(
"[ShogunSignalHandler] Terminating"
" prematurely current algorithm...\n");
m_subscriber.on_next(SG_BLOCK_COMP);
m_subscriber->on_next(SG_BLOCK_COMP);
break;
case 'P':
SG_SPRINT("[ShogunSignalHandler] Pausing current computation...")
m_subscriber.on_next(SG_PAUSE_COMP);
m_subscriber->on_next(SG_PAUSE_COMP);
break;
default:
SG_SPRINT("[ShogunSignalHandler] Continuing...\n")
Expand All @@ -76,3 +78,14 @@ void CSignal::handler(int signal)
SG_SPRINT("[ShogunSignalHandler] Unknown signal %d received\n", signal)
}
}

void CSignal::reset_handler()
{
delete m_subject;
delete m_observable;
delete m_subscriber;

m_subject = new rxcpp::subjects::subject<int>();
m_observable = new CSignal::SGObservableS(m_subject->get_observable());
m_subscriber = new CSignal::SGSubscriberS(m_subject->get_subscriber());
}
28 changes: 15 additions & 13 deletions src/shogun/lib/Signal.h
Expand Up @@ -12,7 +12,6 @@
#ifndef __SIGNAL__H_
#define __SIGNAL__H_

#include <rxcpp/rx-includes.hpp>
#include <shogun/base/SGObject.h>

namespace shogun
Expand All @@ -35,9 +34,16 @@ namespace shogun
* option bewteen: immediately exit the running method and fall back to
* the command line, prematurely stop the current algoritmh and do nothing.
*/
class CSignal : public CSGObject
class CSignal : CSGObject
{
public:
typedef rxcpp::subjects::subject<int> SGSubjectS;
typedef rxcpp::observable<int, rxcpp::dynamic_observable<int>>
SGObservableS;
typedef rxcpp::subscriber<int,
rxcpp::observer<int, void, void, void, void>>
SGSubscriberS;

CSignal();
virtual ~CSignal();

Expand All @@ -52,7 +58,7 @@ namespace shogun
* Get observable
* @return RxCpp observable
*/
rxcpp::observable<int> get_observable()
SGObservableS* get_observable()
{
return m_observable;
};
Expand All @@ -63,7 +69,7 @@ namespace shogun
* Get subscriber
* @return RxCpp subscriber
*/
rxcpp::subscriber<int> get_subscriber()
SGSubscriberS* get_subscriber()
{
return m_subscriber;
};
Expand All @@ -78,12 +84,7 @@ namespace shogun
/**
* Reset handler in case of multiple instantiation
*/
static void reset_handler()
{
m_subject = rxcpp::subjects::subject<int>();
m_observable = m_subject.get_observable();
m_subscriber = m_subject.get_subscriber();
}
static void reset_handler();

/** @return object name */
virtual const char* get_name() const { return "Signal"; }
Expand All @@ -92,10 +93,11 @@ namespace shogun
/** Active signal */
static bool m_active;

public:
/** Observable */
static rxcpp::subjects::subject<int> m_subject;
static rxcpp::observable<int> m_observable;
static rxcpp::subscriber<int> m_subscriber;
static SGSubjectS* m_subject;
static SGObservableS* m_observable;
static SGSubscriberS* m_subscriber;
};
}
#endif // __SIGNAL__H_
2 changes: 1 addition & 1 deletion src/shogun/lib/external/shogun_libsvm.cpp
Expand Up @@ -384,7 +384,7 @@ rxcpp::subscription Solver::connect_to_signal_handler()
this->on_next();
},
[this]() { this->on_complete(); });
return get_global_signal()->get_observable().subscribe(subscriber);
return get_global_signal()->get_observable()->subscribe(subscriber);
}

void Solver::reset_computation_variables()
Expand Down
5 changes: 2 additions & 3 deletions src/shogun/machine/Machine.cpp
Expand Up @@ -9,12 +9,11 @@
* Copyright (C) 1999-2009 Fraunhofer Institute FIRST and Max-Planck-Society
*/

#include <rxcpp/rx-lite.hpp>
#include <shogun/base/init.h>
#include <shogun/lib/Signal.h>
#include <shogun/machine/Machine.h>

#include <rxcpp/rx.hpp>

using namespace shogun;

CMachine::CMachine()
Expand Down Expand Up @@ -289,5 +288,5 @@ rxcpp::subscription CMachine::connect_to_signal_handler()
this->on_next();
},
[this]() { this->on_complete(); });
return get_global_signal()->get_observable().subscribe(subscriber);
return get_global_signal()->get_observable()->subscribe(subscriber);
}
1 change: 0 additions & 1 deletion src/shogun/machine/Machine.h
Expand Up @@ -25,7 +25,6 @@

#include <condition_variable>
#include <mutex>
#include <rxcpp/rx.hpp>

namespace shogun
{
Expand Down

0 comments on commit 9be1025

Please sign in to comment.