Skip to content

Commit

Permalink
Parse the RunStatus packet for run start and run stop info
Browse files Browse the repository at this point in the history
Refs #5857
  • Loading branch information
rgmiller committed Sep 12, 2012
1 parent fa7cbde commit 963cfe4
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ namespace Mantid
// is microseconds!)
// pulseTime is the start of the pulse relative to Jan 1, 1990.
// Both values are designed to be passed straight into the TofEvent constructor.


ILiveListener::RunStatus m_status;
DataObjects::EventWorkspace_sptr m_buffer; ///< Used to buffer events between calls to extractData()

bool m_workspaceInitialized;
std::string m_wsName;
detid2index_map * m_indexMap; // maps pixel id's to workspace indexes
Expand All @@ -109,7 +111,8 @@ namespace Mantid
bool m_isConnected;

Poco::Thread m_thread;
Poco::FastMutex m_mutex;
Poco::FastMutex m_mutex; // protects m_buffer & m_status
bool m_pauseNetRead;
bool m_stopThread; // background thread checks this periodically. If true, the
// thread exits

Expand Down
99 changes: 87 additions & 12 deletions Code/Mantid/Framework/DataHandling/src/SNSLiveEventDataListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ namespace DataHandling
/// Constructor
SNSLiveEventDataListener::SNSLiveEventDataListener()
: ILiveListener(), ADARA::Parser(),
m_status(NoRun),
m_workspaceInitialized( false), m_socket(),
m_isConnected( false), m_stopThread( false)
m_isConnected( false), m_pauseNetRead(false), m_stopThread( false)
// ADARA::Parser() will accept values for buffer size and max packet size, but the
// defaults will work fine
{
Expand Down Expand Up @@ -164,7 +165,15 @@ namespace DataHandling

while (m_stopThread == false) // loop until the foreground thread tells us to stop
{
// Read the packets, accumulate them in m_buffer...

while (m_pauseNetRead)
{
// foreground thread doesn't want us to process any more packets until
// it's ready. See comments in rxPacket( const ADARA::RunStatusPkt &pkt)
Poco::Thread::sleep( 100); // 100 milliseconds
}

// Read the packets, accumulate events in m_buffer...
read( m_socket);

// Check the heartbeat
Expand Down Expand Up @@ -317,15 +326,68 @@ namespace DataHandling

bool SNSLiveEventDataListener::rxPacket( const ADARA::RunStatusPkt &pkt)
{
// runStart() is in the EPICS epoch - ie Jan 1, 1990. Convert to Unix epoch
time_t runStartTime = pkt.runStart() + ADARA::EPICS_EPOCH_OFFSET;
// Exactly what we have to do depends on the status field
if (pkt.status() == ADARA::RunStatus::NEW_RUN)
{
// Starting a new run: update m_status and add the run_start property

if (runStatus() != NoRun)
{
// Previous status should have been NoRun. Spit out a warning if it's not.
g_log.warning() << "Unexpected start of run. Run status should have been "
<< NoRun << " (NoRun), but was " << m_status << std::endl;
}

m_status = BeginRun;

// Add the run_start property
if ( m_buffer->mutableRun().hasProperty("run_start") )
{
m_buffer->mutableRun().removeProperty( "run_start"); // remove to old run_start value
}

// runStart() is in the EPICS epoch - ie Jan 1, 1990. Convert to Unix epoch
time_t runStartTime = pkt.runStart() + ADARA::EPICS_EPOCH_OFFSET;

// Add the run_start property
char timeString[64]; // largest the string should end up is 20 (plus a null terminator)
strftime( timeString, 64, "%FT%H:%M:%SZ", gmtime( &runStartTime));
// addProperty() wants the time as an ISO 8601 string

m_buffer->mutableRun().addProperty("run_start", std::string( timeString) );

// Add the run_start property
char timeString[64]; // largest the string should end up is 20 (plus a null terminator)
strftime( timeString, 64, "%FT%H:%M:%SZ", gmtime( &runStartTime));
// addProperty() wants the time as an ISO 8601 string
m_buffer->mutableRun().addProperty("run_start", std::string( timeString) );
}
else if (pkt.status() == ADARA::RunStatus::END_RUN)
{
// Run has ended: update m_status and swap buffers so that the current buffer
// only has values from the run that just ended

if (runStatus() != Running)
{
// Previous status should have been NoRun. Spit out a warning if it's not.
g_log.warning() << "Unexpected end of run. Run status should have been " << Running
<< " (Running), but was " << m_status << std::endl;
}

// Set the flag to make us stop reading from the network.
// Stopping network reads solves a number of problems:
// 1) We don't need to manage a second buffer in order to keep the events
// in the just ended run separate from the events in the next run.
// 2) We don't have to deal with the case where the next run has already
// started but extractData() hasn't been called to fetch the last events
// from the previous run.
// 3) We don't have to worry about the case where more than one run has
// started and finished between calls to extractData() (ie: 5 second runs
// and a 10 second update interval. Yes, that's an operator error, but I still
// don't want to worry about it.)
//
// Because of this, however, if extractData() isn't called at least once per run,
// the network packets may start to back up and SMS may eventually disconnect us.
// This flag will be cleared down in extractData().
m_pauseNetRead = true;
}

// Note: all other possibilities for pkt.status() can be ignored
return true;
}

Expand Down Expand Up @@ -594,18 +656,31 @@ namespace DataHandling
temp->mutableRun().clearTimeSeriesLogs();

// Get an exclusive lock
m_mutex.lock();
m_mutex.lock();
std::swap(m_buffer, temp);
m_mutex.unlock();


// update the run status if necessary
// (see the comments in ILiveListener.h)
if (m_status == BeginRun)
{
m_status = Running;
}
else if (m_status == EndRun)
{
m_status = NoRun;
}

m_pauseNetRead = false; // make sure the network reads start back up

return temp;
}


ILiveListener::RunStatus SNSLiveEventDataListener::runStatus()
{
// TODO: Implement this!!
return ILiveListener::NoRun;
return m_status;
}


Expand Down

0 comments on commit 963cfe4

Please sign in to comment.