# Live-Data using Mantid

Demonstrates consuming data from Kafka with Mantid.

Use in conjunction with a source of schema id `ev42` data, such as the [NeXus Streamer](https://github.com/ess-dmsc/nexus-streamer).

The functions of "ProcessingScript" and "PostProcessingScript" are shown in the following diagram (from Mantid `LoadLiveData` algorithm documentation:

![LoadLiveData](LoadLiveData_flow.png)

In [1]:
%%writefile process.py

rebin_params = '0,10000,100000'
mtd[output] = Rebin(input, Params=rebin_params, PreserveEvents=False)

In [2]:
%%writefile postprocess.py

# We don't actually need to do anything to the data that ends up in the output Mantid workspace
mtd[output] = input

# We can do something with scipp here, but the notebook is not in scope: we can't update plots in the notebook...

In [3]:
from mantid.simpleapi import *
import mantid.api as api
from time import sleep

original_facility = ConfigService.getFacility().name()
ConfigService.setFacility("TEST_LIVE")
print(f"Temporarily setting facility to {ConfigService.getFacility().name()} from {original_facility}")

output_ws_name = "test"
try:
    print("Starting to consume data from Kafka")
    StartLiveData(Instrument="ISIS_Kafka_Event", OutputWorkspace=output_ws_name, AccumulationWorkspace="accum",
                  FromNow=False, FromStartOfRun=True, FromTime=False, Listener="KafkaEventListener",
                  Address="localhost:9092", UpdateEvery=5.,
                  AccumulationMethod="Add", PreserveEvents=True,
                  RunTransitionBehavior="Stop", ProcessingScriptFilename="process.py",
                  PostProcessingScriptFilename="postprocess.py")

    try:
        sleep(5)
        monitorLiveDataHandle = api.AlgorithmManagerImpl.Instance().runningInstancesOf("MonitorLiveData")[0]
        while monitorLiveDataHandle.isRunning():
            sleep(1)
    except IndexError:
        # maybe already finished?
        pass

except Exception as e:
    print(e)
finally:
    print("Completed consumption of data from Kafka")
    ConfigService.setFacility(original_facility)
    print(f"Set facility back to original setting of {ConfigService.getFacility().name()}")

ws_out = mtd[output_ws_name]

print(f"{ws_out.name()} is an {ws_out.id()}")
print(f"number of histograms = {ws_out.getNumberHistograms()}")
print(f"number of bins = {ws_out.blocksize()}")
print(f"run number = {ws_out.getRunNumber()}")

Temporarily setting facility to TEST_LIVE from ISIS
Starting to consume data from Kafka
Completed consumption of data from Kafka
Set facility back to original setting of ISIS
test is an Workspace2D
number of histograms = 4
number of bins = 10
run number = 1
