In [None]:
using Distributed
using StatsPlots

In [None]:
addprocs(5)

In [None]:
@everywhere using LCIO

In [None]:
@everywhere function readEvents(fnames, ZMassList, done)
	total_nEvents_processed = 0
	while true
		try
			FILENAME = take!(fnames)
            LCIO.open(FILENAME) do reader
                for (iEvent, event) in enumerate(reader)
                    mcpList = getCollection(event, "MCParticle")
                    mu1 = mcpList[10]
                    mu2 = mcpList[11]
                    p = getMomentum(mu1) .+ getMomentum(mu2)
                    E = getEnergy(mu1) + getEnergy(mu2)
                    put!(ZMassList, sqrt(E^2 - sum(p.^2)))
                    total_nEvents_processed += 1
                end
            end
		catch e
			break
		end
	end		
	put!(done, total_nEvents_processed)
end

In [None]:
# let's start with reading a number of files concurrently
fnames = RemoteChannel(()->Channel{String}(400))

# let's create a buffer large enough for 100k events concurrently
ZMassList = RemoteChannel(()->Channel{Float64}(200000))

# the readers can signal when they are done reading events
done = RemoteChannel(()->Channel{Int}(400))

In [None]:
# spawn the readers, one per worker
processors = [@spawnat w readEvents(fnames, ZMassList, done) for w in workers()]

In [None]:
fileList = filter(s->occursin(r"E250_SetA.Pmumuh2ss.Gwhizard-2_84.eL0.8\.pR0.3\..*.slcio", s), readdir("/nfs/dust/ilc/user/jstrube/StrangeHiggs/data/GeneratorLevel", join=true))

In [None]:
for f in fileList put!(fnames, f) end
close(fnames)

In [None]:
# wait for all readers to be done
# then we can close the event queue and the doers can finish
nDone = 0
nEvents = 0
values = Float64[]
while nDone != nworkers()
	nDone += 1
    theseEvents = take!(done)
	nEvents += theseEvents
    for i in 1:theseEvents
        push!(values, take!(ZMassList))
    end
end
close(ZMassList)
println("Done")

In [None]:
histogram(values)