Skip to content

Commit

Permalink
Re #31 rename ISISKafka... to Kafka...
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew D Jones committed Mar 24, 2017
1 parent 1eb40b7 commit f3f2033
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 75 deletions.
22 changes: 11 additions & 11 deletions Framework/LiveData/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,27 @@ find_package ( LibRDKafka )
if ( LIBRDKAFKA_FOUND )
set ( SRC_FILES
${SRC_FILES}
src/ISIS/ISISKafkaEventListener.cpp
src/ISIS/ISISKafkaEventStreamDecoder.cpp
src/Kafka/KafkaEventListener.cpp
src/Kafka/KafkaEventStreamDecoder.cpp
src/Kafka/KafkaBroker.cpp
src/Kafka/KafkaTopicSubscriber.cpp
src/Kafka/KafkaTopicSubscriber.cpp
)
set ( INC_FILES
${INC_FILES}
inc/MantidLiveData/ISIS/ISISKafkaEventListener.h
inc/MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h
inc/MantidLiveData/Kafka/KafkaEventListener.h
inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h
inc/MantidLiveData/Kafka/IKafkaStreamSubscriber.h
inc/MantidLiveData/Kafka/IKafkaBroker.h
inc/MantidLiveData/Kafka/KafkaBroker.h
inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h
src/ISIS/private/Kafka/Schema/flatbuffers/flatbuffers.h
src/ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h
src/ISIS/private/Kafka/Schema/event_schema_generated.h
src/ISIS/private/Kafka/Schema/run_info_schema_generated.h
inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h
src/Kafka/private/Schema/flatbuffers/flatbuffers.h
src/Kafka/private/Schema/det_spec_mapping_schema_generated.h
src/Kafka/private/Schema/event_schema_generated.h
src/Kafka/private/Schema/run_info_schema_generated.h
)
set ( TEST_FILES
${TEST_FILES}
ISISKafkaEventStreamDecoderTest.h
KafkaEventStreamDecoderTest.h
KafkaTopicSubscriberTest.h
)
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Mantid {
namespace LiveData {
class ISISKafkaEventStreamDecoder;
class KafkaEventStreamDecoder;

/**
Implementation of a live listener to consume messages from the Kafka system
Expand All @@ -36,18 +36,18 @@ class ISISKafkaEventStreamDecoder;
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
class DLLExport ISISKafkaEventListener : public API::LiveListener {
class DLLExport KafkaEventListener : public API::LiveListener {
public:
ISISKafkaEventListener();
KafkaEventListener();
/// Destructor. Should handle termination of any socket connections.
~ISISKafkaEventListener() override = default;
~KafkaEventListener() override = default;

//----------------------------------------------------------------------
// Static properties
//----------------------------------------------------------------------

/// The name of this listener
std::string name() const override { return "ISISKafkaEventListener"; }
std::string name() const override { return "KafkaEventListener"; }
/// Does this listener support requests for (recent) past data
bool supportsHistory() const override { return true; }
/// Does this listener buffer events (true) or histogram data (false)
Expand All @@ -69,7 +69,7 @@ class DLLExport ISISKafkaEventListener : public API::LiveListener {
int runNumber() const override;

private:
std::unique_ptr<ISISKafkaEventStreamDecoder> m_decoder = nullptr;
std::unique_ptr<KafkaEventStreamDecoder> m_decoder = nullptr;
};

} // namespace LiveData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ namespace LiveData {
File change history is stored at: <https://github.com/mantidproject/mantid>
Code Documentation is available at: <http://doxygen.mantidproject.org>
*/
class DLLExport ISISKafkaEventStreamDecoder {
class DLLExport KafkaEventStreamDecoder {
public:
ISISKafkaEventStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
KafkaEventStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
const std::string &eventTopic,
const std::string &runInfoTopic,
const std::string &spDetTopic);
~ISISKafkaEventStreamDecoder();
ISISKafkaEventStreamDecoder(const ISISKafkaEventStreamDecoder &) = delete;
ISISKafkaEventStreamDecoder &
operator=(const ISISKafkaEventStreamDecoder &) = delete;
~KafkaEventStreamDecoder();
KafkaEventStreamDecoder(const KafkaEventStreamDecoder &) = delete;
KafkaEventStreamDecoder &
operator=(const KafkaEventStreamDecoder &) = delete;

public:
///@name Start/stop
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
#include "MantidLiveData/ISIS/ISISKafkaEventListener.h"
#include "MantidLiveData/Kafka/KafkaEventListener.h"
#include "MantidAPI/LiveListenerFactory.h"
#include "MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h"
#include "MantidLiveData/Kafka/KafkaEventStreamDecoder.h"
#include "MantidLiveData/Kafka/KafkaBroker.h"
#include "MantidLiveData/Kafka/KafkaTopicSubscriber.h"

namespace {
Mantid::Kernel::Logger g_log("ISISKafkaEventListener");
Mantid::Kernel::Logger g_log("KafkaEventListener");
}

namespace Mantid {
namespace LiveData {

DECLARE_LISTENER(ISISKafkaEventListener)
DECLARE_LISTENER(KafkaEventListener)

ISISKafkaEventListener::ISISKafkaEventListener() {
KafkaEventListener::KafkaEventListener() {
declareProperty("InstrumentName", "");
}

/// @copydoc ILiveListener::connect
bool ISISKafkaEventListener::connect(const Poco::Net::SocketAddress &address) {
bool KafkaEventListener::connect(const Poco::Net::SocketAddress &address) {
auto broker = std::make_shared<KafkaBroker>(address.toString());
try {
std::string instrumentName = getProperty("InstrumentName");
Expand All @@ -27,18 +27,18 @@ bool ISISKafkaEventListener::connect(const Poco::Net::SocketAddress &address) {
runInfoTopic(instrumentName + KafkaTopicSubscriber::RUN_TOPIC_SUFFIX),
spDetInfoTopic(instrumentName +
KafkaTopicSubscriber::DET_SPEC_TOPIC_SUFFIX);
m_decoder = Kernel::make_unique<ISISKafkaEventStreamDecoder>(
m_decoder = Kernel::make_unique<KafkaEventStreamDecoder>(
broker, eventTopic, runInfoTopic, spDetInfoTopic);
} catch (std::exception &exc) {
g_log.error() << "ISISKafkaEventListener::connect - Connection Error: "
g_log.error() << "KafkaEventListener::connect - Connection Error: "
<< exc.what() << "\n";
return false;
}
return true;
}

/// @copydoc ILiveListener::start
void ISISKafkaEventListener::start(Kernel::DateAndTime startTime) {
void KafkaEventListener::start(Kernel::DateAndTime startTime) {
bool startNow = true;
// Workaround for existing LiveListener interface
// startTime of 0 means start from now
Expand All @@ -54,7 +54,7 @@ void ISISKafkaEventListener::start(Kernel::DateAndTime startTime) {
}

/// @copydoc ILiveListener::extractData
boost::shared_ptr<API::Workspace> ISISKafkaEventListener::extractData() {
boost::shared_ptr<API::Workspace> KafkaEventListener::extractData() {
assert(m_decoder);
// The first call to extract is very early in the start live data process
// and we may not be completely ready yet, wait upto a maximum of 5 seconds
Expand All @@ -70,17 +70,17 @@ boost::shared_ptr<API::Workspace> ISISKafkaEventListener::extractData() {
}

/// @copydoc ILiveListener::isConnected
bool ISISKafkaEventListener::isConnected() {
bool KafkaEventListener::isConnected() {
return (m_decoder ? m_decoder->isCapturing() : false);
}

/// @copydoc ILiveListener::runStatus
API::ILiveListener::RunStatus ISISKafkaEventListener::runStatus() {
API::ILiveListener::RunStatus KafkaEventListener::runStatus() {
return m_decoder->hasReachedEndOfRun() ? EndRun : Running;
}

/// @copydoc ILiveListener::runNumber
int ISISKafkaEventListener::runNumber() const {
int KafkaEventListener::runNumber() const {
return (m_decoder ? m_decoder->runNumber() : -1);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h"
#include "MantidLiveData/Kafka/KafkaEventStreamDecoder.h"
#include "MantidAPI/AlgorithmManager.h"
#include "MantidAPI/Axis.h"
#include "MantidAPI/Run.h"
Expand All @@ -13,8 +13,8 @@
#include "MantidLiveData/Exception.h"

GCC_DIAG_OFF(conversion)
#include "private/Kafka/Schema/det_spec_mapping_schema_generated.h"
#include "private/Kafka/Schema/event_schema_generated.h"
#include "private/Schema/det_spec_mapping_schema_generated.h"
#include "private/Schema/event_schema_generated.h"
GCC_DIAG_ON(conversion)

#include <boost/make_shared.hpp>
Expand All @@ -26,7 +26,7 @@ GCC_DIAG_ON(conversion)

namespace {
/// Logger
Mantid::Kernel::Logger g_log("ISISKafkaEventStreamDecoder");
Mantid::Kernel::Logger g_log("KafkaEventStreamDecoder");

std::string PROTON_CHARGE_PROPERTY = "proton_charge";
std::string RUN_NUMBER_PROPERTY = "run_number";
Expand Down Expand Up @@ -117,7 +117,7 @@ using Kernel::DateAndTime;
* run
* mapping
*/
ISISKafkaEventStreamDecoder::ISISKafkaEventStreamDecoder(
KafkaEventStreamDecoder::KafkaEventStreamDecoder(
std::shared_ptr<IKafkaBroker> broker, const std::string &eventTopic,
const std::string &runInfoTopic, const std::string &spDetTopic)
: m_broker(broker), m_eventTopic(eventTopic), m_runInfoTopic(runInfoTopic),
Expand All @@ -129,13 +129,13 @@ ISISKafkaEventStreamDecoder::ISISKafkaEventStreamDecoder(
* Destructor.
* Stops capturing from the stream
*/
ISISKafkaEventStreamDecoder::~ISISKafkaEventStreamDecoder() { stopCapture(); }
KafkaEventStreamDecoder::~KafkaEventStreamDecoder() { stopCapture(); }

/**
* Start capturing from the stream on a separate thread. This is a non-blocking
* call and will return after the thread has started
*/
void ISISKafkaEventStreamDecoder::startCapture(bool startNow) {
void KafkaEventStreamDecoder::startCapture(bool startNow) {

// If we are not starting now, then we want to start at offsets corresponding
// to the start of the run
Expand All @@ -145,7 +145,7 @@ void ISISKafkaEventStreamDecoder::startCapture(bool startNow) {
runStream->consumeMessage(&rawMsgBuffer);
if (rawMsgBuffer.empty()) {
throw std::runtime_error(
"ISISKafkaEventStreamDecoder::initLocalCaches() - "
"KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from run info "
"topic. Unable to continue");
}
Expand All @@ -168,7 +168,7 @@ void ISISKafkaEventStreamDecoder::startCapture(bool startNow) {
* Stop capturing from the stream. This is a blocking call until the capturing
* function has completed
*/
void ISISKafkaEventStreamDecoder::stopCapture() noexcept {
void KafkaEventStreamDecoder::stopCapture() noexcept {
// This will interrupt the "event" loop
m_interrupt = true;
// Wait until the function has completed. The background thread
Expand All @@ -183,7 +183,7 @@ void ISISKafkaEventStreamDecoder::stopCapture() noexcept {
* @return True if data has been accumulated so that extractData()
* can be called, false otherwise
*/
bool ISISKafkaEventStreamDecoder::hasData() const noexcept {
bool KafkaEventStreamDecoder::hasData() const noexcept {
std::lock_guard<std::mutex> lock(m_mutex);
return !m_localEvents.empty();
}
Expand All @@ -192,7 +192,7 @@ bool ISISKafkaEventStreamDecoder::hasData() const noexcept {
* Check if a message has indicated that end of run has been reached
* @return True if end of run has been reached
*/
bool ISISKafkaEventStreamDecoder::hasReachedEndOfRun() noexcept {
bool KafkaEventStreamDecoder::hasReachedEndOfRun() noexcept {
// Notify the decoder that MonitorLiveData knows it has reached end of run
// and after giving it opportunity to interrupt, decoder can continue with
// messages of the next run
Expand All @@ -213,7 +213,7 @@ bool ISISKafkaEventStreamDecoder::hasReachedEndOfRun() noexcept {
* @return A pointer to the data collected since the last call to this
* method
*/
API::Workspace_sptr ISISKafkaEventStreamDecoder::extractData() {
API::Workspace_sptr KafkaEventStreamDecoder::extractData() {
if (m_exception) {
throw * m_exception;
}
Expand All @@ -233,7 +233,7 @@ API::Workspace_sptr ISISKafkaEventStreamDecoder::extractData() {
// Private members
// -----------------------------------------------------------------------------

API::Workspace_sptr ISISKafkaEventStreamDecoder::extractDataImpl() {
API::Workspace_sptr KafkaEventStreamDecoder::extractDataImpl() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_localEvents.size() == 1) {
auto temp = createBufferWorkspace(m_localEvents.front());
Expand All @@ -258,23 +258,23 @@ API::Workspace_sptr ISISKafkaEventStreamDecoder::extractDataImpl() {
* Implementation designed to be entry point for new thread of execution.
* It catches all thrown exceptions.
*/
void ISISKafkaEventStreamDecoder::captureImpl() noexcept {
void KafkaEventStreamDecoder::captureImpl() noexcept {
m_capturing = true;
try {
captureImplExcept();
} catch (std::exception &exc) {
m_exception = boost::make_shared<std::runtime_error>(exc.what());
} catch (...) {
m_exception = boost::make_shared<std::runtime_error>(
"ISISKafkaEventStreamDecoder: Unknown exception type caught.");
"KafkaEventStreamDecoder: Unknown exception type caught.");
}
m_capturing = false;
}

/**
* Exception-throwing variant of captureImpl(). Do not call this directly
*/
void ISISKafkaEventStreamDecoder::captureImplExcept() {
void KafkaEventStreamDecoder::captureImplExcept() {
g_log.debug("Event capture starting");
initLocalCaches();

Expand Down Expand Up @@ -306,7 +306,7 @@ void ISISKafkaEventStreamDecoder::captureImplExcept() {
std::lock_guard<std::mutex> lock(m_mutex);
if (frameData->period() < 0)
throw std::runtime_error(
"ISISKafkaEventStreamDecoder::captureImplExcept() - "
"KafkaEventStreamDecoder::captureImplExcept() - "
"Negative period number in event message. Producer error, unable "
"to continue");
auto &periodBuffer =
Expand Down Expand Up @@ -365,13 +365,13 @@ void ISISKafkaEventStreamDecoder::captureImplExcept() {
* By the end of this method the local event buffer is ready to accept
* events
*/
void ISISKafkaEventStreamDecoder::initLocalCaches() {
void KafkaEventStreamDecoder::initLocalCaches() {
std::string rawMsgBuffer;

// Load spectra-detector mapping from stream
m_spDetStream->consumeMessage(&rawMsgBuffer);
if (rawMsgBuffer.empty()) {
throw std::runtime_error("ISISKafkaEventStreamDecoder::initLocalCaches() - "
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from spectrum-detector "
"topic. Unable to continue");
}
Expand All @@ -381,7 +381,7 @@ void ISISKafkaEventStreamDecoder::initLocalCaches() {
auto nudet = spDetMsg->det()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "ISISKafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec=" << nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
Expand All @@ -394,7 +394,7 @@ void ISISKafkaEventStreamDecoder::initLocalCaches() {
// Load run metadata
m_runStream->consumeMessage(&rawMsgBuffer);
if (rawMsgBuffer.empty()) {
throw std::runtime_error("ISISKafkaEventStreamDecoder::initLocalCaches() - "
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from run info "
"topic. Unable to continue");
}
Expand Down Expand Up @@ -430,7 +430,7 @@ void ISISKafkaEventStreamDecoder::initLocalCaches() {
const size_t nperiods(static_cast<size_t>(runMsg->n_periods()));
if (nperiods == 0) {
throw std::runtime_error(
"ISISKafkaEventStreamDecoder - Message has n_periods==0. This is "
"KafkaEventStreamDecoder - Message has n_periods==0. This is "
"an error by the data producer");
}
std::lock_guard<std::mutex> lock(m_mutex);
Expand All @@ -453,7 +453,7 @@ void ISISKafkaEventStreamDecoder::initLocalCaches() {
* @return A new workspace of the appropriate size
*/
DataObjects::EventWorkspace_sptr
ISISKafkaEventStreamDecoder::createBufferWorkspace(const size_t nspectra,
KafkaEventStreamDecoder::createBufferWorkspace(const size_t nspectra,
const int32_t *spec,
const int32_t *udet,
const uint32_t length) {
Expand Down Expand Up @@ -495,7 +495,7 @@ ISISKafkaEventStreamDecoder::createBufferWorkspace(const size_t nspectra,
* @param parent A pointer to an existing workspace
*/
DataObjects::EventWorkspace_sptr
ISISKafkaEventStreamDecoder::createBufferWorkspace(
KafkaEventStreamDecoder::createBufferWorkspace(
const DataObjects::EventWorkspace_sptr &parent) {
auto buffer = boost::static_pointer_cast<DataObjects::EventWorkspace>(
API::WorkspaceFactory::Instance().create(
Expand All @@ -514,7 +514,7 @@ ISISKafkaEventStreamDecoder::createBufferWorkspace(
* @param name Name of an instrument to load
* @param workspace A pointer to the workspace receiving the instrument
*/
void ISISKafkaEventStreamDecoder::loadInstrument(
void KafkaEventStreamDecoder::loadInstrument(
const std::string &name, DataObjects::EventWorkspace_sptr workspace) {
if (name.empty()) {
g_log.warning("Empty instrument name found");
Expand Down

0 comments on commit f3f2033

Please sign in to comment.