Skip to content

Commit

Permalink
Re #8555. Test received period number to be in valid range.
Browse files Browse the repository at this point in the history
  • Loading branch information
mantid-roman committed Dec 9, 2013
1 parent 65d843e commit 8308628
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ namespace Mantid
// Initialize the event buffer
void initEventBuffer(const TCPStreamEventDataSetup& setup);
// Save received event data in the buffer workspace
void saveEvents(const std::vector<TCPStreamEventNeutron> &data, const Kernel::DateAndTime &pulseTime, const size_t period);
void saveEvents(const std::vector<TCPStreamEventNeutron> &data, const Kernel::DateAndTime &pulseTime, size_t period);
// Set the spectra-detector map
void loadSpectraMap();
// Load the instrument
Expand All @@ -137,6 +137,31 @@ namespace Mantid
// Get an integer array ising the IDC interface
void getIntArray(const std::string& par, std::vector<int>& arr, const size_t dim);

// receive a header and check if it's valid
template <typename T>
void Receive(T buffer, const std::string& head, const std::string &msg)
{
long timeout = 0;
while( m_socket.available() < static_cast<int>(sizeof(buffer)) )
{
Poco::Thread::sleep(RECV_WAIT);
timeout += RECV_WAIT;
if ( timeout > RECV_TIMEOUT * 1000 ) throw std::runtime_error("Operation of receiving " + head + " timed out.");
}
m_socket.receiveBytes(&buffer, sizeof(buffer));
if ( !buffer.isValid() )
{
throw std::runtime_error(msg);
}
}

// receive data that cannot be processed
template <typename T>
void CollectJunk(T head)
{
m_socket.receiveBytes(junk_buffer, head.length - static_cast<uint32_t>(sizeof(head)));
}

/// The socket communicating with the DAE
Poco::Net::StreamSocket m_socket;
/// Keep connection status
Expand Down Expand Up @@ -166,6 +191,9 @@ namespace Mantid
/// number of spectra
int m_numberOfSpectra;

/// buffer to collect data that cannot be processed
char* junk_buffer[1000];

/// reference to the logger class
static Kernel::Logger& g_log;

Expand Down
50 changes: 13 additions & 37 deletions Code/Mantid/Framework/LiveData/src/ISISLiveEventDataListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
const long RECV_TIMEOUT = 30;
// Sleep time in case we need to wait for the data to become available (in milliseconds)
const long RECV_WAIT = 100;
const char* PROTON_CHARGE_PROPERTY = "proton_charge";
const char* RUN_NUMBER_PROPERTY = "run_number";

namespace Mantid
{
Expand All @@ -25,34 +27,6 @@ namespace LiveData

DECLARE_LISTENER(ISISLiveEventDataListener)

// receive a header and check if it's valid
#define RECEIVE(buffer,msg) \
{\
long timeout = 0;\
while( m_socket.available() < static_cast<int>(sizeof(buffer)) )\
{\
Poco::Thread::sleep(RECV_WAIT);\
timeout += RECV_WAIT;\
if ( timeout > RECV_TIMEOUT * 1000 ) throw std::runtime_error("Receive operation timed out.");\
}\
m_socket.receiveBytes(&buffer, sizeof(buffer));\
if ( !buffer.isValid() )\
{\
throw std::runtime_error(msg);\
}\
}

namespace {
// buffer to collect data that cannot be processed
static char* junk_buffer[10000];
}

// receive data that cannot be processed
#define COLLECT_JUNK(head) m_socket.receiveBytes(junk_buffer, head.length - static_cast<uint32_t>(sizeof(head)));

#define PROTON_CHARGE_PROPERTY "proton_charge"
#define RUN_NUMBER_PROPERTY "run_number"

// Get a reference to the logger
Kernel::Logger& ISISLiveEventDataListener::g_log = Kernel::Logger::get("ISISLiveEventDataListener");

Expand Down Expand Up @@ -140,11 +114,8 @@ bool ISISLiveEventDataListener::connect(const Poco::Net::SocketAddress &address)
m_numberOfPeriods = getInt("NPER");
m_numberOfSpectra = getInt("NSP1");

std::cerr << "number of periods " << m_numberOfPeriods << std::endl;
std::cerr << "number of spectra " << m_numberOfSpectra << std::endl;

TCPStreamEventDataSetup setup;
RECEIVE(setup,"Wrong version");
Receive(setup, "Setup", "Wrong version");
m_startTime.set_from_time_t(setup.head_setup.start_time);

// initialize the buffer workspace
Expand Down Expand Up @@ -235,17 +206,17 @@ void ISISLiveEventDataListener::run()
while (m_stopThread == false)
{
// get the header with the type of the packet
RECEIVE(events.head,"Corrupt stream - you should reconnect.");
Receive(events.head, "Events header","Corrupt stream - you should reconnect.");
if ( !(events.head.type == TCPStreamEventHeader::Neutron) )
{
// don't know what to do with it - stop
throw std::runtime_error("Unknown packet type.");
}
COLLECT_JUNK( events.head );
CollectJunk( events.head );

// get the header with the sream size
RECEIVE(events.head_n,"Corrupt stream - you should reconnect.");
COLLECT_JUNK( events.head_n );
Receive(events.head_n, "Neutrons header","Corrupt stream - you should reconnect.");
CollectJunk( events.head_n );

// absolute pulse (frame) time
Mantid::Kernel::DateAndTime pulseTime = m_startTime + static_cast<double>( events.head_n.frame_time_zero );
Expand Down Expand Up @@ -365,10 +336,15 @@ void ISISLiveEventDataListener::initEventBuffer(const TCPStreamEventDataSetup &s
* Save received event data in the buffer workspace.
* @param data :: A vector with events.
*/
void ISISLiveEventDataListener::saveEvents(const std::vector<TCPStreamEventNeutron> &data, const Kernel::DateAndTime &pulseTime, const size_t period)
void ISISLiveEventDataListener::saveEvents(const std::vector<TCPStreamEventNeutron> &data, const Kernel::DateAndTime &pulseTime, size_t period)
{
Poco::ScopedLock<Poco::FastMutex> scopedLock(m_mutex);

if ( period >= m_numberOfPeriods )
{
period = 0;
}

for(auto it = data.begin(); it != data.end(); ++it)
{
Mantid::DataObjects::TofEvent event( it->time_of_flight, pulseTime );
Expand Down

0 comments on commit 8308628

Please sign in to comment.