# The General Problem

Stochastic processes are a pain.

1. Random number generation is stochastic.
2. Concurrency is stochastic.

Concurrent random number generation is double-plus stochastic.

In [None]:
using Random, ProgressMeter

## Independent Replications of a Stochastic Process

In [None]:
"""
    replicate(f::Function, n::Integer)

Return a vector of the values of `n` calls to `f()` - used in simulations where the value of `f` is stochastic.

"""
function replicate(f::Function, n::Integer)
     @showprogress [f() for _ in Base.OneTo(n)]
end

In [None]:
const RNG = MersenneTwister;

In [None]:
onechar() =  randstring(RNG(42), 'a':'z', 1);

join(replicate(onechar, 6))

Many Random nuber functions are already vectorized

In [None]:
randstring(RNG(42), 'a':'z', 6)

So we do we need this?

Because we want have many replications of the vectorized operation.

In [None]:
# 10 samples, each of 180 observations from a normal distribution
myrng = RNG(42);
samps = replicate(() -> randn(myrng, 180), 10);
samps = hcat(samps...);
samps[1:10, :]

In [None]:
Threads.nthreads()

In [None]:
"""
    replicate(f::Function, n::Integer; use_threads=false)

Return a vector of the values of `n` calls to `f()` - used in simulations where the value of `f` is stochastic.

Note that if `f()` is not thread-safe or depends on a non thread-safe RNG,
    then you must set `use_threads=false`. Also note that ordering of replications
    is not guaranteed when `use_threads=true`, although the replications are not
    otherwise affected for thread-safe `f()`.
"""
function replicate(f::Function, n::Integer; use_threads=false)
    if use_threads
        # no macro version yet: https://github.com/timholy/ProgressMeter.jl/issues/143
        p = Progress(n)
        # get the type
        rr = f()
        next!(p)
        # pre-allocate
        results = [rr for _ in Base.OneTo(n)]
        Threads.@threads for idx = 2:n
            results[idx] = f()
            next!(p)
        end
    else
        results = @showprogress [f() for _ in Base.OneTo(n)]
    end
    results
end

In [None]:
myrng = RNG(42);
st_samps = replicate(() -> randn(myrng, 180), 10; use_threads=true);
st_samps = hcat(st_samps...)
st_samps[1:10, :]

In [None]:
all(samps .≈ st_samps)

In [None]:
myrng = RNG(42);
mt_samps = replicate(() -> randn(myrng, 180), 10; use_threads=true);
mt_samps = hcat(mt_samps...)
mt_samps[1:10, :]

In [None]:
# discrete stream model

In [None]:
all(samps .≈ mt_samps)

In [None]:
all(sort(reshape(samps, 1800)) .≈ sort(reshape(mt_samps, 1800)))

In [None]:
sorted_samps = sort(reshape(samps, 1800));
sorted_mt_samps = sort(reshape(mt_samps, 1800));

In [None]:
mismatch =  sorted_samps .≉ sorted_mt_samps;
hcat(sorted_samps[mismatch], sorted_mt_samps[mismatch])

Close, but clearly more than floating point error.

What's going on here? 

Random number generation is **not** atomic and random generators must be specially constructed to be reentrant. So if you multiple threads are accessing the RNG at the same time, they can run into issues with inconsistent state, thus leading to a different stream. (And this stream may not meet the guarantees normally given by the RNG in question!)

Add graphics here!

On a related note, a reproducible stream from a given (pseudo)random number generator may not be reproducible in quite the way you think. While for a given seed, the stream of bits produced from the MersenneTwister should be the same, the way those bits are translated, normalized, etc. to meet the distributional requirements of various methods such as `rand`, `randn`, etc. is not standardized across implementations.

In particular, `randn` is only guaranteed to give the same stream for a given seed for a given *minor* release of Julia. For example, `randn(MersenneTwister(42), 100)` will not give the same results on Julia 1.0 and Julia 1.6!

# Using locks to avoid issues with the RNG

In [None]:
function locked_replicate(f::Function, n::Integer; use_threads=false)
    if use_threads
        rnglock = ReentrantLock()
        # no macro version yet: https://github.com/timholy/ProgressMeter.jl/issues/143
        p = Progress(n)
        # get the type
        rr = f()
        next!(p)
        # pre-allocate
        results = [rr for _ in Base.OneTo(n)]
        Threads.@threads for idx = 2:n
            lock(rnglock)
            results[idx] = f()
            unlock(rnglock)
            next!(p)
            sleep(abs(first(results[idx]))) # create some competition
        end
    else
        results = @showprogress [f() for _ in Base.OneTo(n)]
    end
    results
end

In [None]:
myrng = RNG(42);
locked_samps = locked_replicate(() -> randn(myrng, 180), 10; use_threads=true);
locked_samps = hcat(locked_samps...)
locked_samps[1:10, :]

In [None]:
all(samps .≈ locked_samps)

In [None]:
all(sort(reshape(samps, 1800)) .≈ sort(reshape(locked_samps, 1800)))

Locking around the entire stochastic function is too much: it effectively reduces to serial execution without ordering guarantees. Instead, locking should be focused around random number generation. 

In [None]:
function parametricbootstrap(rng::AbstractRNG, n::Integer, morig::LinearMixedModel{T};
    β::AbstractVector=coef(morig),σ=morig.σ, θ::AbstractVector=morig.θ, use_threads::Bool=false) where {T}
  
    #####
    ##### pre allocation and book keeping
    #####
    
    β, σ, θ = convert(Vector{T}, β), T(σ), convert(Vector{T}, θ)
    βsc, θsc, p, k, m = similar(β), similar(θ), length(β), length(θ), deepcopy(morig)
    β_names = (Symbol.(fixefnames(morig))..., )
    rank = length(β_names)

    # we need arrays of these for in-place operations to work across threads
    m_threads = [m]
    βsc_threads = [βsc]
    θsc_threads = [θsc]

    if use_threads
        Threads.resize_nthreads!(m_threads)
        Threads.resize_nthreads!(βsc_threads)
        Threads.resize_nthreads!(θsc_threads)
    end

    ##### set up a lock, which will be available via closure
    rnglock = ReentrantLock()
    
    samp = replicate(n, use_threads=use_threads) do
        mod = m_threads[Threads.threadid()]
        ##### these are local because we're using these same names  in the outer scope
        local βsc = βsc_threads[Threads.threadid()]
        local θsc = θsc_threads[Threads.threadid()]
    
        #####
        ##### lock only around the stochastic step
        #####
            
        lock(rnglock)
        # there are deterministic steps in here, but they're fast
        # and finer grain locking would have meant much more complexity 
        # and book keeping and making an additional function lock- and thread-aware
        mod = simulate!(rng, mod, β = β, σ = σ, θ = θ)
        unlock(rnglock)
        
        #####
        ##### don't lock around the deterministic manipulation of the stochastic result
        #####
        
        refit!(mod)
        
        #####
        ##### pack all the interesting bits into a Tuple
        #####
        
        (
         objective = mod.objective,
         σ = mod.σ,
         β = NamedTuple{β_names}(fixef!(βsc, mod)),
         se = SVector{p,T}(stderror!(βsc, mod)),
         θ = SVector{k,T}(getθ!(θsc, mod)),
        )
    end
    
    #####
    ##### assemble the big struct
    #####
    
    MixedModelBootstrap(
        samp,
        deepcopy(morig.λ),
        getfield.(morig.reterms, :inds),
        copy(morig.optsum.lowerbd),
        NamedTuple{Symbol.(fnames(morig))}(map(t -> (t.cnames...,), morig.reterms)),
    )
end

In [None]:
using MixedModels

In [None]:
form =  @formula(rt_trunc ~ 1 + spkr * prec * load + (1 + spkr + prec + load | subj) +  (1 + spkr + prec + load | item))
mod = fit(MixedModel, form, MixedModels.dataset(:kb07))

In [None]:
MixedModels.parametricbootstrap(RNG(42), 1, mod);

In [None]:
@time st_bsam = MixedModels.parametricbootstrap(RNG(42), 100, mod; use_threads=false);

In [None]:
@time mt_bsam = MixedModels.parametricbootstrap(RNG(42), 100, mod; use_threads=true);

In [None]:
# from our tests
using Test, Tables

@testset "same results no matter how many threads" begin
    @test sort(mt_bsam.σ) == sort(st_bsam.σ)
    @test sort(mt_bsam.θ) == sort(st_bsam.θ)
    @test sort(columntable(mt_bsam.β).β) == sort(columntable(st_bsam.β).β)
    @test sum(issingular(mt_bsam)) == sum(issingular(st_bsam))
end

# Lessons and Tips


## Sharing a RNG between threads is tricky.
- RNGs are often not re-entrant
- Even if we assume that the individual values are produced atomically the sequence of threads is not guaranteed to be the same across runs, resulting in threads seeing different subsequences from the RNG stream ('striping'). 

## Locks are one good solution 
- Locking just the random number generation itself and not computations building upon it.
- *All* random number generation for a given stochastic function should be performed at once, if some of the results aren't used until much later. This avoids striping.
- This approach can also be extended to `@distributed` multi-processing, but shared objects are somewhat trickier.
- This approach yields the same results regardless of the number of threads.
- But it relies on the assumption that the RNG is the small part of total computation.
- Be very careful when sharing objects between threads that do operations in place!

## Alternatives

### Distinct RNGs for each thread
- You can use different RNGs for each thread, but they should be distinct!
- For example, each thread's RNG coudl be seeded using pulls from the first thread's RNG.
- Advantage: easy to implement, no locking overhead
- Disadvantage: results are dependent on both the original seed and the number of threads.


### 'Fast-forwarding' the current RNG 
- `Future.randjump(r::MersenneTwister, steps::Integer)`
  > Create an initialized MersenneTwister object, whose state is moved forward (without generating numbers) from r by steps steps. One such step corresponds to the generation of two Float64 numbers. For each different value of steps, a large polynomial has to be generated internally. One is already pre-computed for steps=big(10)^20.'
- Advantage: easy to do 
- Advantage: same results for arbitrary number of threads *if* you precompute the offsets
- Disadvantage: potentially a lot of bookkeeping ahead of time
- Disadvantage: still involves computing a large polynomial
- Disadvantage: not available for arbitrary RNGs

This work was supported by the Center for Interdisciplinary Research, Bielefeld (ZiF),  Cooperation Group "Statistical models for psychological and linguistic data".