In [1]:
using Base.Threads

In [2]:
Threads.nthreads()

1

In [8]:
pwd()

"/Users/FredYu/Documents/GitHub/BulkLMM.jl/analysis/BXD"

In [9]:
cd("..")

In [10]:
include("../test/BXDdata_for_test.jl");

In [11]:
include("../src/parallel_helpers.jl");

In [12]:
include("../test/testHelper.jl");

In [13]:
function scan_perms_threads(y::Array{Float64,2}, g::Array{Float64,2}, K::Array{Float64,2};
              nperms::Int64 = 1024, rndseed::Int64 = 0, 
              reml::Bool = false, original::Bool = true)

    # check the number of traits as this function only works for permutation testing of univariate trait
    if(size(y, 2) != 1)
        error("Can only handle one trait.")
    end

    # n - the sample size
    # p - the number of markers
    (n, p) = size(g)

    # make intercept
    intercept = ones(n, 1)

    # rotate data so errors are uncorrelated
    (y0, X0, lambda0) = rotateData(y, [intercept g], K)


    ## Note: estimate once the variance components from the null model and use for all marker scans
    # fit lmm

    # X0_intercept = @view X0[:, 1] # to compare
    vc = fitlmm(y0, reshape(X0[:, 1], :, 1), lambda0; reml = reml) # vc.b is estimated through weighted least square
    r0 = y0 - X0[:, 1]*vc.b

    # weights proportional to the variances
    sqrtw = sqrt.(makeweights(vc.h2, lambda0))

    # compared runtime of the following with "wls(X0[:, 2:end], X0[:, 1], wts)" ?
    # rescale by weights; now these have the same mean/variance and are independent
    rowMultiply!(r0, sqrtw);
    rowMultiply!(X0, sqrtw);

    
    # after re-weighting X, calling resid on re-weighted X is the same as doing wls on the X after rotation.
    X00 = resid(X0[:, 2:end], reshape(X0[:, 1], :, 1)) # consider not using sub-array, consider @view; in-place changes

    ## random permutations; the first column is the original trait (after transformation)
    rng = MersenneTwister(rndseed);
    ## permute r0 (which is an iid, standard normal distributed N-vector under the null)
    r0perm = shuffleVector(rng, r0[:, 1], nperms; original = original)

    ## Null RSS:
    # By null hypothesis, mean is 0. RSS just becomes the sum of squares of the residuals (r0perm's)
    # (For theoretical derivation of the results, see notebook)
    rss0 = sum(r0perm[:, 1].^2) # a scalar; bc rss0 for every permuted trait is the same under the null (zero mean);
    
    ## make array to hold Alternative RSS's for each permutated trait
    if original
        rss1 = Array{Float64, 2}(undef, nperms+1, p)
    else
        rss1 = Array{Float64, 2}(undef, nperms, p)
    end
    
    ## loop over markers
    Threads.@threads for i = 1:p

        ## alternative rss
        @inbounds rss1[:, i] = rss(r0perm, @view X00[:, i]);
        
    end

    lod = (-n/2)*(log10.(rss1) .- log10(rss0))

    return lod

end

scan_perms_threads (generic function with 1 method)

In [14]:
BLAS.get_num_threads()

4

In [15]:
BLAS.set_num_threads(2)

In [16]:
BLAS.get_num_threads()

2

In [17]:
Threads.nthreads()

1

In [18]:
@benchmark scan_perms_threads(pheno_y, geno, kinship; nperms = 1024, rndseed = 0, reml = false, original = true)

LoadError: TaskFailedException

[91m    nested task error: [39mInterruptException:
    Stacktrace:
      [1] [0m[1mArray[22m
    [90m    @ [39m[90m./[39m[90m[4mboot.jl:461[24m[39m[90m [inlined][39m
      [2] [0m[1mArray[22m
    [90m    @ [39m[90m./[39m[90m[4mboot.jl:469[24m[39m[90m [inlined][39m
      [3] [0m[1msimilar[22m
    [90m    @ [39m[90m./[39m[90m[4marray.jl:378[24m[39m[90m [inlined][39m
      [4] [0m[1msimilar[22m
    [90m    @ [39m[90m./[39m[90m[4mabstractarray.jl:803[24m[39m[90m [inlined][39m
      [5] [0m[1mreducedim_initarray[22m[0m[1m([22m[90mA[39m::[0mMatrix[90m{Float64}[39m, [90mregion[39m::[0mInt64, [90minit[39m::[0mFloat64, [90m#unused#[39m::[0mType[90m{Float64}[39m[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreducedim.jl:91[24m[39m
      [6] [0m[1mreducedim_initarray[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:92[24m[39m[90m [inlined][39m
      [7] [0m[1mreducedim_init[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:219[24m[39m[90m [inlined][39m
      [8] [0m[1m_mapreduce_dim[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:371[24m[39m[90m [inlined][39m
      [9] [0m[1m#mapreduce#765[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:357[24m[39m[90m [inlined][39m
     [10] [0m[1m#reduce#767[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:406[24m[39m[90m [inlined][39m
     [11] [0m[1mrss[22m[0m[1m([22m[90my[39m::[0mMatrix[90m{Float64}[39m, [90mX[39m::[0mSubArray[90m{Float64, 1, Matrix{Float64}, Tuple{Base.Slice{Base.OneTo{Int64}}, Int64}, true}[39m; [90mmethod[39m::[0mString[0m[1m)[22m
    [90m    @ [39m[35mMain[39m [90m~/Documents/GitHub/BulkLMM.jl/src/[39m[90m[4mwls.jl:149[24m[39m
     [12] [0m[1mrss[22m
    [90m    @ [39m[90m~/Documents/GitHub/BulkLMM.jl/src/[39m[90m[4mwls.jl:146[24m[39m[90m [inlined][39m
     [13] [0m[1mmacro expansion[22m
    [90m    @ [39m[90m./[39m[90m[4mIn[13]:61[24m[39m[90m [inlined][39m
     [14] [0m[1m(::var"#49#threadsfor_fun#42"{var"#49#threadsfor_fun#41#43"{Matrix{Float64}, Matrix{Float64}, UnitRange{Int64}}})[22m[0m[1m([22m[90mtid[39m::[0mInt64; [90monethread[39m::[0mBool[0m[1m)[22m
    [90m    @ [39m[35mMain[39m [90m./[39m[90m[4mthreadingconstructs.jl:84[24m[39m
     [15] [0m[1m#49#threadsfor_fun[22m
    [90m    @ [39m[90m./[39m[90m[4mthreadingconstructs.jl:51[24m[39m[90m [inlined][39m
     [16] [0m[1m(::Base.Threads.var"#1#2"{var"#49#threadsfor_fun#42"{var"#49#threadsfor_fun#41#43"{Matrix{Float64}, Matrix{Float64}, UnitRange{Int64}}}, Int64})[22m[0m[1m([22m[0m[1m)[22m
    [90m    @ [39m[90mBase.Threads[39m [90m./[39m[90m[4mthreadingconstructs.jl:30[24m[39m

In [19]:
@benchmark scan_perms(pheno_y, geno, kinship; nperms = 1024, rndseed = 0, reml = false, original = true)

LoadError: InterruptException:

In [20]:
function threads12_by_blocks(r0perm::Array{Float64, 2}, X00::Array{Float64, 2}, nblocks::Int64)
    # Does distributed processes of calculations of LOD scores for markers in each block

    p = size(X00, 2);

    ## (Create blocks...)
    
    block_size = ceil(Int, p/nblocks);
    blocks = createBlocks2(p, block_size);
    # blocks = createBlocks(p, nblocks);

    LODs_blocks = tmap(x -> calcLODs_block(r0perm, X00, x), 16, blocks);
    results = reduce(hcat, LODs_blocks);

    return results

end

threads12_by_blocks (generic function with 1 method)

In [21]:
function scan_perms_threads12(y::Array{Float64,2}, g::Array{Float64,2}, K::Array{Float64,2};
                                reml::Bool = false,
                                nperms::Int64 = 1024, rndseed::Int64 = 0, original::Bool = true,
                                # (options for blocks, nperms distribution methods...)
                                option::String = "by blocks", nblocks::Int64 = 1, ncopies::Int64 = 1, 
                                nprocs::Int64 = 0)

    (y0, X0, lambda0) = transform_rotation(y, g, K); # rotation of data
    (r0, X00) = transform_reweight(y0, X0, lambda0; reml = reml); # reweighting and taking residuals
    r0perm = transform_permute(r0; nperms = nperms, rndseed = rndseed, original = original);

    if option == "by blocks"
        results = threads12_by_blocks(r0perm, X00, nblocks);
    elseif option == "by nperms"
        results = distribute_by_nperms(r0, X00, nperms, ncopies, original);
    else
        throw(error("Option unsupported."))
    end

    return results

end

scan_perms_threads12 (generic function with 1 method)

In [22]:
using ThreadTools

In [23]:
b = @benchmark scan_perms_threads12(pheno_y, geno, kinship; reml = false, nperms = 1024, rndseed = 0, original = true, option = "by blocks", nblocks = 120)

LoadError: UndefVarError: createBlocks2 not defined

In [24]:
b.times

LoadError: UndefVarError: b not defined

In [25]:
median(b.times)

LoadError: UndefVarError: b not defined

In [26]:
runtimes_tmap = Array{Float64, 1}(undef, 10);

In [43]:
for t in 1:10
    
    b = @benchmark scan_perms_threads12(pheno_y, geno, kinship; reml = false, nperms = 1024, rndseed = 0, original = true, option = "by blocks", nblocks = 120)
    runtimes_tmap[t] = mean(b.times)
    
end

In [56]:
runtimes_tmap

10-element Vector{Float64}:
 5.591043052222222e8
 5.598697161111112e8
 5.516050436e8
 5.899172702222222e8
 5.774236751111112e8
 5.826831521111112e8
 5.147052465e8
 5.524582602e8
 5.436127193e8
 5.334789712e8

In [57]:
mean(runtimes_tmap)/1e9

0.5564858359577778

In [60]:
runtimes_tloops = Array{Float64, 1}(undef, 10);

In [61]:
for t in 1:10
    
    b_tloops = @benchmark scan_perms_threads(pheno_y, geno, kinship; reml = false, nperms = 1024, rndseed = 0, original = true)
    runtimes_tloops[t] = median(b_tloops.times)
    
end

In [62]:
mean(runtimes_tloops)/1e9

0.63325012635

In [63]:
nthreads()

16

In [64]:
BLAS.get_num_threads()

2

In [66]:
@time tmap_LODs = scan_perms_threads12(pheno_y, geno, kinship; reml = false, nperms = 1024, rndseed = 0, original = true, option = "by blocks", nblocks = 150);

  0.708999 seconds (77.70 k allocations: 13.675 GiB, 31.67% gc time)


In [29]:
BLAS.set_num_threads(4)

In [30]:
@time tloops_LODs = scan_perms_threads(pheno_y, geno, kinship; nperms = 0, rndseed = 0, reml = false, original = true);

  0.021720 seconds (72.62 k allocations: 39.878 MiB)


In [68]:
sumSqDiff(tmap_LODs, tloops_LODs)

0.0