Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature/9543_accumulate_monitor_…
Browse files Browse the repository at this point in the history
…data_in_loadlivedata'
  • Loading branch information
AndreiSavici committed Jun 13, 2014
2 parents 9109d2c + 194ea5a commit 9db7466
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 59 deletions.
Expand Up @@ -42,8 +42,12 @@ namespace DataHandling
*/
void ExtractMonitorWorkspace::init()
{
declareProperty(new WorkspaceProperty<>("InputWorkspace","",Direction::Input), "An input workspace.");
declareProperty(new WorkspaceProperty<>("MonitorWorkspace","",Direction::Output), "An output workspace.");
declareProperty(new WorkspaceProperty<>("InputWorkspace","",Direction::Input), "A data workspace that holds a monitor workspace within.");
declareProperty(new WorkspaceProperty<>("MonitorWorkspace","",Direction::Output),
"The workspace containing only monitor data relating to the main data in the InputWorkspace.");
declareProperty("ClearFromInputWorkspace", true, "Whether to hold onto the monitor workspace within "
"the input workspace. The default is not to, but if you are running this algorithm in the post-processing "
"step of a live data run then you will need this to be false.");
}

/** Execute the algorithm.
Expand All @@ -59,8 +63,9 @@ namespace DataHandling
}

setProperty("MonitorWorkspace", monitorWS);
// Now clear off the pointer on the input workspace
inputWS->setMonitorWorkspace(MatrixWorkspace_sptr());
// Now clear off the pointer on the input workspace, if desired
const bool clearPointer = getProperty("ClearFromInputWorkspace");
if ( clearPointer ) inputWS->setMonitorWorkspace(MatrixWorkspace_sptr());
}

} // namespace DataHandling
Expand Down
Expand Up @@ -25,6 +25,7 @@ class ExtractMonitorWorkspaceTest : public CxxTest::TestSuite
ExtractMonitorWorkspace alg;
TS_ASSERT_THROWS_NOTHING( alg.initialize() )
TS_ASSERT( alg.isInitialized() )
TS_ASSERT( alg.getProperty("ClearFromInputWorkspace") )
}

void test_fails_if_no_monitor_workspace()
Expand All @@ -48,7 +49,8 @@ class ExtractMonitorWorkspaceTest : public CxxTest::TestSuite
TS_ASSERT_THROWS_NOTHING( alg.initialize() )
TS_ASSERT_THROWS_NOTHING( alg.setProperty("InputWorkspace", inws) );
TS_ASSERT_THROWS_NOTHING( alg.setPropertyValue("MonitorWorkspace", outWSName) );
TS_ASSERT_THROWS_NOTHING( alg.execute(); );
TS_ASSERT_THROWS_NOTHING( alg.setProperty("ClearFromInputWorkspace", false) );
TS_ASSERT_THROWS_NOTHING( alg.execute() );
TS_ASSERT( alg.isExecuted() );

MatrixWorkspace_sptr ws;
Expand All @@ -57,6 +59,12 @@ class ExtractMonitorWorkspaceTest : public CxxTest::TestSuite
if (!ws) return;

TS_ASSERT_EQUALS( ws, monws );
TS_ASSERT_EQUALS( inws->monitorWorkspace(), monws );

// Now run it clearing off the monitor from the input workspace
TS_ASSERT_THROWS_NOTHING( alg.setProperty("ClearFromInputWorkspace", true) );
TS_ASSERT_THROWS_NOTHING( alg.setProperty("InputWorkspace", inws) );
TS_ASSERT( alg.execute() );
TSM_ASSERT( "The monitor workspace should have been wiped off the input workspace",
! inws->monitorWorkspace() );

Expand Down
Expand Up @@ -71,8 +71,6 @@ namespace Mantid
void loadChunk();
/// Shared pointer to the correct file loader instance - it needs to be kept alive.
API::Algorithm_sptr m_loader;
/// Flag indicating whether we're using LoadPreNexus (true) or LoadEventPreNexus (false)
bool m_preNexus;
};

} // namespace LiveData
Expand Down
Expand Up @@ -135,10 +135,7 @@ namespace Mantid
std::vector<std::string> m_monitorLogs; // Names of any monitor logs (these must be manually removed
// during the call to extractData())

uint64_t m_rtdlPulseId; // We get this from the RTDL packe

Poco::Net::StreamSocket m_socket;
//int m_sockfd; // socket file descriptor
bool m_isConnected;

Poco::Thread m_thread;
Expand Down
10 changes: 6 additions & 4 deletions Code/Mantid/Framework/LiveData/src/ADARA/ADARAPackets.cpp
Expand Up @@ -256,14 +256,16 @@ void BankedEventPkt::firstEventInBank() const
/* ------------------------------------------------------------------------ */

BeamMonitorPkt::BeamMonitorPkt(const uint8_t *data, uint32_t len) :
Packet(data, len), m_fields((const uint32_t *)payload()), m_sectionStartIndex(0)
Packet(data, len), m_fields((const uint32_t *)payload()),
m_sectionStartIndex(0), m_eventNum(0)
{
if (m_payload_len < (4 * sizeof(uint32_t)))
throw invalid_packet("BeamMonitor packet is too short");
if (m_payload_len < (4 * sizeof(uint32_t)))
throw invalid_packet("BeamMonitor packet is too short");
}

BeamMonitorPkt::BeamMonitorPkt(const BeamMonitorPkt &pkt) :
Packet(pkt), m_fields((const uint32_t *)payload()), m_sectionStartIndex(0)
Packet(pkt), m_fields((const uint32_t *)payload()),
m_sectionStartIndex(0), m_eventNum(0)
{}

#define EVENT_COUNT_MASK 0x003FFFFF // lower 22 bits
Expand Down
104 changes: 65 additions & 39 deletions Code/Mantid/Framework/LiveData/src/LoadLiveData.cpp
Expand Up @@ -254,25 +254,25 @@ namespace LiveData

if ( gws )
{
WorkspaceGroup_sptr accum_gws = boost::dynamic_pointer_cast<WorkspaceGroup>(m_accumWS);
if ( !accum_gws )
{
throw std::runtime_error("Two workspace groups are expected.");
}
if ( accum_gws->getNumberOfEntries() != gws->getNumberOfEntries() )
{
throw std::runtime_error("Accumulation and chunk workspace groups are expected to have the same size.");
}
// binary operations cannot handle groups passed by pointers, so add members one by one
for(size_t i = 0; i < static_cast<size_t>(gws->getNumberOfEntries()); ++i)
{
addMatrixWSChunk( algoName, accum_gws->getItem(i), gws->getItem(i) );
}
WorkspaceGroup_sptr accum_gws = boost::dynamic_pointer_cast<WorkspaceGroup>(m_accumWS);
if ( !accum_gws )
{
throw std::runtime_error("Two workspace groups are expected.");
}
if ( accum_gws->getNumberOfEntries() != gws->getNumberOfEntries() )
{
throw std::runtime_error("Accumulation and chunk workspace groups are expected to have the same size.");
}
// binary operations cannot handle groups passed by pointers, so add members one by one
for(size_t i = 0; i < static_cast<size_t>(gws->getNumberOfEntries()); ++i)
{
addMatrixWSChunk( algoName, accum_gws->getItem(i), gws->getItem(i) );
}
}
else
{
// just add the chunk
addMatrixWSChunk( algoName, m_accumWS, chunkWS );
// just add the chunk
addMatrixWSChunk( algoName, m_accumWS, chunkWS );
}
}

Expand All @@ -286,29 +286,40 @@ namespace LiveData
*/
void LoadLiveData::addMatrixWSChunk(const std::string& algoName, Workspace_sptr accumWS, Workspace_sptr chunkWS)
{
IAlgorithm_sptr alg = this->createChildAlgorithm(algoName);
alg->setProperty("LHSWorkspace", accumWS);
alg->setProperty("RHSWorkspace", chunkWS);
alg->setProperty("OutputWorkspace", accumWS);
alg->execute();
if (!alg->isExecuted())
{
throw std::runtime_error("Error when calling " + alg->name() + " to add the chunk of live data. See log.");
}
else
{
// Is this really necessary?

// Get the output as the generic Workspace type
Property * prop = alg->getProperty("OutputWorkspace");
IWorkspaceProperty * wsProp = dynamic_cast<IWorkspaceProperty*>(prop);
if (!wsProp)
throw std::runtime_error("The " + alg->name() + " Algorithm's OutputWorkspace property is not a WorkspaceProperty!");
Workspace_sptr temp = wsProp->getWorkspace();
accumWS = temp;
// And sort the events, if any
doSortEvents(accumWS);
}
// Handle the addition of the internal monitor workspace, if present
auto accumMW = boost::dynamic_pointer_cast<MatrixWorkspace>(accumWS);
auto chunkMW = boost::dynamic_pointer_cast<MatrixWorkspace>(chunkWS);
if ( accumMW && chunkMW )
{
auto accumMon = accumMW->monitorWorkspace();
auto chunkMon = chunkMW->monitorWorkspace();

if ( accumMon && chunkMon ) accumMon += chunkMon;
}

// Now do the main workspace
IAlgorithm_sptr alg = this->createChildAlgorithm(algoName);
alg->setProperty("LHSWorkspace", accumWS);
alg->setProperty("RHSWorkspace", chunkWS);
alg->setProperty("OutputWorkspace", accumWS);
alg->execute();
if (!alg->isExecuted())
{
throw std::runtime_error("Error when calling " + alg->name() + " to add the chunk of live data. See log.");
}
else
{
// Get the output as the generic Workspace type
// This step is necessary for when we are operating on MD workspaces (PlusMD)
Property * prop = alg->getProperty("OutputWorkspace");
IWorkspaceProperty * wsProp = dynamic_cast<IWorkspaceProperty*>(prop);
if (!wsProp)
throw std::runtime_error("The " + alg->name() + " Algorithm's OutputWorkspace property is not a WorkspaceProperty!");
Workspace_sptr temp = wsProp->getWorkspace();
accumWS = temp;
// And sort the events, if any
doSortEvents(accumWS);
}
}


Expand Down Expand Up @@ -490,6 +501,20 @@ namespace LiveData
EventWorkspace_sptr processedEvent = boost::dynamic_pointer_cast<EventWorkspace>(processed);
if (!PreserveEvents && processedEvent)
{
// Convert the monitor workspace, if there is one and it's necessary
MatrixWorkspace_sptr monitorWS = processedEvent->monitorWorkspace();
auto monitorEventWS = boost::dynamic_pointer_cast<EventWorkspace>(monitorWS);
if ( monitorEventWS )
{
auto monAlg = this->createChildAlgorithm("ConvertToMatrixWorkspace");
monAlg->setProperty("InputWorkspace", monitorEventWS);
monAlg->executeAsChildAlg();
if (!monAlg->isExecuted())
g_log.error("Failed to convert monitors from events to histogram form.");
monitorWS = monAlg->getProperty("OutputWorkspace");
}

// Now do the main workspace
Algorithm_sptr alg = this->createChildAlgorithm("ConvertToMatrixWorkspace");
alg->setProperty("InputWorkspace", processedEvent);
std::string outputName = "__anonymous_livedata_convert_" + this->getPropertyValue("OutputWorkspace");
Expand All @@ -499,6 +524,7 @@ namespace LiveData
throw std::runtime_error("Error when calling ConvertToMatrixWorkspace (since PreserveEvents=False). See log.");
// Replace the "processed" workspace with the converted one.
MatrixWorkspace_sptr temp = alg->getProperty("OutputWorkspace");
if ( monitorWS ) temp->setMonitorWorkspace( monitorWS ); // Set back the monitor workspace
processed = temp;
}

Expand Down
30 changes: 25 additions & 5 deletions Code/Mantid/Framework/LiveData/src/MonitorLiveData.cpp
Expand Up @@ -57,12 +57,16 @@ namespace LiveData
}

//----------------------------------------------------------------------------------------------
/** Clone a workspace, if there is enough memory available */
/** Clone a workspace, if there is enough memory available.
It needs to be a clone rather than a rename so that an open instrument window continues
to track the live workspace.
*/
void MonitorLiveData::doClone(const std::string & originalName, const std::string & newName)
{
if (AnalysisDataService::Instance().doesExist(originalName))
auto & ads = AnalysisDataService::Instance();
if ( ads.doesExist(originalName) )
{
Workspace_sptr original = AnalysisDataService::Instance().retrieveWS<Workspace>(originalName);
Workspace_sptr original = ads.retrieveWS<Workspace>(originalName);
if (original)
{
size_t bytesUsed = original->getMemorySize();
Expand All @@ -71,11 +75,29 @@ namespace LiveData
if (size_t(3)*bytesUsed < bytesAvail)
{
WriteLock _lock(*original);

// Clone the monitor workspace, if there is one
auto originalMatrix = boost::dynamic_pointer_cast<MatrixWorkspace>(original);
MatrixWorkspace_sptr monitorWS, newMonitorWS;
if ( originalMatrix && ( monitorWS = originalMatrix->monitorWorkspace() ) )
{
auto monitorsCloner = createChildAlgorithm("CloneWorkspace", 0, 0, false);
monitorsCloner->setProperty("InputWorkspace", monitorWS);
monitorsCloner->executeAsChildAlg();
Workspace_sptr outputWS = monitorsCloner->getProperty("OutputWorkspace");
newMonitorWS = boost::dynamic_pointer_cast<MatrixWorkspace>(outputWS);
}

Algorithm_sptr cloner = createChildAlgorithm("CloneWorkspace", 0, 0, false);
cloner->setPropertyValue("InputWorkspace", originalName);
cloner->setPropertyValue("OutputWorkspace", newName);
cloner->setAlwaysStoreInADS(true); // We must force the ADS to be updated
cloner->executeAsChildAlg();

if ( newMonitorWS ) // If there was a monitor workspace, set it back on the result
{
ads.retrieveWS<MatrixWorkspace>(newName)->setMonitorWorkspace(newMonitorWS);
}
}
else
{
Expand Down Expand Up @@ -212,8 +234,6 @@ namespace LiveData


doClone(OutputWorkspace, OutputWorkspace + postFix);
if (!AccumulationWorkspace.empty())
doClone(AccumulationWorkspace, AccumulationWorkspace + postFix);
}

runNumber = 0;
Expand Down
8 changes: 8 additions & 0 deletions Code/Mantid/Framework/LiveData/test/LoadLiveDataTest.h
Expand Up @@ -149,6 +149,9 @@ class LoadLiveDataTest : public CxxTest::TestSuite
TSM_ASSERT( "Workspace being added stayed the same pointer", ws1 == ws2 );
TSM_ASSERT( "Events are sorted", ws2->getEventList(0).isSortedByTof());
TS_ASSERT_EQUALS(AnalysisDataService::Instance().size(), 1);

// Test monitor workspace is present
TS_ASSERT( ws2->monitorWorkspace() );
}

//--------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -205,6 +208,9 @@ class LoadLiveDataTest : public CxxTest::TestSuite

TSM_ASSERT( "Workspace being added stayed the same pointer", ws1 == ws2 );
TS_ASSERT_EQUALS(AnalysisDataService::Instance().size(), 1);

TS_ASSERT( ws1->monitorWorkspace() );
TS_ASSERT_EQUALS( ws1->monitorWorkspace(), ws2->monitorWorkspace() );
}


Expand All @@ -220,6 +226,7 @@ class LoadLiveDataTest : public CxxTest::TestSuite
TS_ASSERT_EQUALS(ws->blocksize(), 20);
TS_ASSERT_DELTA(ws->dataX(0)[0], 40e3, 1e-4);
TS_ASSERT_EQUALS(AnalysisDataService::Instance().size(), 1);
TS_ASSERT( ws->monitorWorkspace() );
}

//--------------------------------------------------------------------------------------------
Expand All @@ -233,6 +240,7 @@ class LoadLiveDataTest : public CxxTest::TestSuite
TS_ASSERT_EQUALS(ws->blocksize(), 20);
TS_ASSERT_DELTA(ws->dataX(0)[0], 40e3, 1e-4);
TS_ASSERT_EQUALS(AnalysisDataService::Instance().size(), 1);
TS_ASSERT( ws->monitorWorkspace() );
}

//--------------------------------------------------------------------------------------------
Expand Down
8 changes: 7 additions & 1 deletion Code/Mantid/Framework/LiveData/test/MonitorLiveDataTest.h
Expand Up @@ -209,12 +209,18 @@ class MonitorLiveDataTest : public CxxTest::TestSuite
alg1->cancel();

// The first workspace got cloned to a new name (the suffix is set in the TestDataListener)
EventWorkspace_sptr ws1 = AnalysisDataService::Instance().retrieveWS<EventWorkspace>("fake2_999");
EventWorkspace_const_sptr ws1 = AnalysisDataService::Instance().retrieveWS<EventWorkspace>("fake2_999");
TS_ASSERT_EQUALS( ws1->getNumberEvents(), 4*200);
// Make sure the monitor workspace is present and correct
TS_ASSERT( ws1->monitorWorkspace() );
TS_ASSERT_EQUALS( ws1->monitorWorkspace()->readY(0)[0], 4 );

// And this is the current run
EventWorkspace_sptr ws2 = AnalysisDataService::Instance().retrieveWS<EventWorkspace>("fake2");
TS_ASSERT_EQUALS( ws2->getNumberEvents(), 200);
// Make sure the monitor workspace is present and correct
TS_ASSERT( ws2->monitorWorkspace() );
TS_ASSERT_EQUALS( ws2->monitorWorkspace()->readY(0)[0], 1 );

Kernel::Timer timer;
while ( alg1->isRunning() && timer.elapsed_no_reset() < 0.5 ) {}
Expand Down
8 changes: 8 additions & 0 deletions Code/Mantid/Framework/LiveData/test/TestDataListener.cpp
Expand Up @@ -92,6 +92,12 @@ namespace LiveData
m_buffer->setInstrument(inst);
// Set a run number
m_buffer->mutableRun().addProperty("run_number", std::string("999"));
// Add a monitor workspace
auto monitorWS = WorkspaceFactory::Instance().create("EventWorkspace",1,2,1);
WorkspaceFactory::Instance().initializeFromParent(m_buffer,monitorWS,true);
monitorWS->dataX(0)[0] = 40000;
monitorWS->dataX(0)[1] = 60000;
m_buffer->setMonitorWorkspace(monitorWS);
}

boost::shared_ptr<Workspace> TestDataListener::extractData()
Expand All @@ -107,6 +113,8 @@ namespace LiveData
el1.addEventQuickly(TofEvent(m_rand->nextValue()));
el2.addEventQuickly(TofEvent(m_rand->nextValue()));
}
auto mon_buffer = boost::dynamic_pointer_cast<EventWorkspace>(m_buffer->monitorWorkspace());
mon_buffer->getEventList(0).addEventQuickly(TofEvent(m_rand->nextValue()));

// Copy the workspace pointer to a temporary variable
EventWorkspace_sptr extracted = m_buffer;
Expand Down

0 comments on commit 9db7466

Please sign in to comment.