In [109]:
using Statistics, OrderedCollections, Dates, Parameters

In [110]:
nthreads()

4

In [111]:
include("aggreg.jl");

In [112]:
# using EnergySystemModeling: SeriesInstance,
#     ClustInstance,
#     AggregInstance,
#     DistUpdate,
#     load_series_instance,
#     load_clust_instance,
#     aggreg1D,
#     cdad,
#     search_min_dist,
#     compute_dist,
#     update_marker,
#     replace_lines,
#     update_clust!,
#     update_k!

In [113]:
# Define a new copy method to copy _ClustInstance and _SeriesInstance
Base.copy(x::T) where T = T([getfield(x, k) for k ∈ fieldnames(T)]...);

In [114]:
function test_serial(series::VecOrMat{Float64},
    block_size::Int,
    stopping_k::Int,
    current_k::Int,
    dm::Symbol,
    rep_value::Symbol,
    lseries::Int,
    nseries::Int,
    series_dc::VecOrMat{Float64},
    ord_dc::VecOrMat{Int},
    k_cent::VecOrMat{Float64},
    weights::Vector{Int},
    series_clust::Vector{Int},
    nclusters::Int,
    search_range::UnitRange,
    dc_mode::Bool,
    _SeriesInstance::SeriesInstance,
    _ClustInstance::ClustInstance,
    _DistUpdate::Dict{Vector{Bool},DistUpdate},
    _SeriesUpdate::Dict{String,SeriesInstance},
    _ClustUpdate::Dict{String,ClustInstance})

    # @info "Clustering the next step"
    k = lseries
    @time while k >= stopping_k + block_size - 1
        
        # Vector with distances (to be updated as it goes)
        dist = Vector{Float64}(undef,length(search_range))

        # Sets
        N = 1:nseries
        K = copy(search_range)

        # Compute the distance for each aggregation (i.e., changing the merging_clust)
        # TODO: implement parallelisation such as '@async Threads.@threads @inbounds for k in K'
        @inbounds for k_search in K
            # Merging to be tested (neighbouring hypothesis)
            # TODO: implement non-neighbouring hypothesis
            merging_clust = k_search:k_search+block_size-1
            # Create a temporary marker to merge the clusters tested
            marker_temp = [sc in merging_clust for sc in series_clust]

            # Separation needed for duration curves analysis
            if dc_mode
                # Create a temp marker for the elements in between clustered [min,max] order
                marker_temp_dc = minimum(ord_dc[marker_temp,:]):maximum(ord_dc[marker_temp,:])
                # Using duration curves chunks (in between the min and max values of marker_temp)
                series_comp = sort(series, dims=1,rev=true)[marker_temp_dc,:]
                # Forming the centroids
                k_cent_comp = copy(series)
                (k_cent_comp[marker_temp,:],) = aggreg1D(series[marker_temp,:], rep_value)
                # Ordering chunk with clustering instance in a decrescent order
                k_cent_comp = sort(k_cent_comp, dims=1, rev=true)[marker_temp_dc,:]
            else
                # Part of series compared
                series_comp = series[marker_temp,:]
                # Centroids of the temporarily formed cluster (TODO: implement another method for aggreg1D receiving the clusters with respective weights)
                (k_cent_comp,) = aggreg1D(series_comp, rep_value)
            end
            
            # Distance computation
            dist[k_search] = compute_dist(N, dm, series_comp, k_cent_comp)
        end

        # Find whenever the min_dist occurs first (i.e., using findmin()[2])
        ## TODO: implement the multiple merges (e.g., using findall())
        min_dist = findmin(dist)[2] |> Int        
        merging_clust = min_dist:min_dist+block_size-1

        # Create a flag to the positions in the series that will be aggregated
        marker = update_marker(_SeriesInstance, _ClustInstance, min_dist)    

        # Update _DistUpdate dictionary with the minimal distance found and the new marker
        _DistUpdate = merge(+, _DistUpdate, Dict(marker => DistUpdate(min_dist,merging_clust)))

        # Update clusters and series_clust
        update_clust!(_ClustInstance, _SeriesInstance, min_dist)

        # Update clustering values
        series_clust = _ClustInstance.series_clust
        weights = _ClustInstance.weights

        nclusters = _ClustInstance.nclusters
        k_cent = _ClustInstance.k_cent
        search_range =_ClustInstance.search_range
        
        # Update number of clusters k
        new_current_k = _ClustInstance.nclusters
        update_k!(_SeriesInstance, new_current_k)
        k = new_current_k

        # Store series_clust and k_cent
        _ClustUpdate = merge(+,_ClustUpdate,Dict("$k" => copy(_ClustInstance)))
        _SeriesUpdate = merge(+,_SeriesUpdate,Dict("$k" => copy(_SeriesInstance)))

    end
end

test_serial (generic function with 1 method)

In [115]:
function test_parallel(series::VecOrMat{Float64},
    block_size::Int,
    stopping_k::Int,
    current_k::Int,
    dm::Symbol,
    rep_value::Symbol,
    lseries::Int,
    nseries::Int,
    series_dc::VecOrMat{Float64},
    ord_dc::VecOrMat{Int},
    k_cent::VecOrMat{Float64},
    weights::Vector{Int},
    series_clust::Vector{Int},
    nclusters::Int,
    search_range::UnitRange,
    dc_mode::Bool,
    _SeriesInstance::SeriesInstance,
    _ClustInstance::ClustInstance,
    _DistUpdate::Dict{Vector{Bool},DistUpdate},
    _SeriesUpdate::Dict{String,SeriesInstance},
    _ClustUpdate::Dict{String,ClustInstance})

    # @info "Clustering the next step"
    k = lseries
    @time while k >= stopping_k + block_size - 1
        
        # Vector with distances (to be updated as it goes)
        dist = Vector{Float64}(undef,length(search_range))

        # Sets
        N = 1:nseries
        K = copy(search_range)

        # Compute the distance for each aggregation (i.e., changing the merging_clust)
        # TODO: implement parallelisation such as '@async Threads.@threads @inbounds for k in K'
        Threads.@threads for k_search in K
            # Merging to be tested (neighbouring hypothesis)
            # TODO: implement non-neighbouring hypothesis
            merging_clust = k_search:k_search+block_size-1
            # Create a temporary marker to merge the clusters tested
            marker_temp = [sc in merging_clust for sc in series_clust]

            # Separation needed for duration curves analysis
            if dc_mode
                # Create a temp marker for the elements in between clustered [min,max] order
                marker_temp_dc = minimum(ord_dc[marker_temp,:]):maximum(ord_dc[marker_temp,:])
                # Using duration curves chunks (in between the min and max values of marker_temp)
                series_comp = sort(series, dims=1,rev=true)[marker_temp_dc,:]
                # Forming the centroids
                k_cent_comp = copy(series)
                (k_cent_comp[marker_temp,:],) = aggreg1D(series[marker_temp,:], rep_value)
                # Ordering chunk with clustering instance in a decrescent order
                k_cent_comp = sort(k_cent_comp, dims=1, rev=true)[marker_temp_dc,:]
            else
                # Part of series compared
                series_comp = series[marker_temp,:]
                # Centroids of the temporarily formed cluster (TODO: implement another method for aggreg1D receiving the clusters with respective weights)
                (k_cent_comp,) = aggreg1D(series_comp, rep_value)
            end
            
            # Distance computation
            dist[k_search] = compute_dist(N, dm, series_comp, k_cent_comp)
        end

        # Find whenever the min_dist occurs first (i.e., using findmin()[2])
        ## TODO: implement the multiple merges (e.g., using findall())
        min_dist = findmin(dist)[2] |> Int        
        merging_clust = min_dist:min_dist+block_size-1

        # Create a flag to the positions in the series that will be aggregated
        marker = update_marker(_SeriesInstance, _ClustInstance, min_dist)    

        # Update _DistUpdate dictionary with the minimal distance found and the new marker
        _DistUpdate = merge(+, _DistUpdate, Dict(marker => DistUpdate(min_dist,merging_clust)))

        # Update clusters and series_clust
        update_clust!(_ClustInstance, _SeriesInstance, min_dist)

        # Update clustering values
        series_clust = _ClustInstance.series_clust
        weights = _ClustInstance.weights

        nclusters = _ClustInstance.nclusters
        k_cent = _ClustInstance.k_cent
        search_range =_ClustInstance.search_range
        
        # Update number of clusters k
        new_current_k = _ClustInstance.nclusters
        update_k!(_SeriesInstance, new_current_k)
        k = new_current_k

        # Store series_clust and k_cent
        _ClustUpdate = merge(+,_ClustUpdate,Dict("$k" => copy(_ClustInstance)))
        _SeriesUpdate = merge(+,_SeriesUpdate,Dict("$k" => copy(_SeriesInstance)))

    end
end

LoadError: LoadError: ArgumentError: @threads requires a `for` loop expression
in expression starting at In[115]:36

In [116]:
function execute_inst(lseries::Int,nseries::Int,dc_mode::Bool; parallel=false)
    
    # Declare series
    series = rand(lseries,nseries);
    
    # Forming SeriesInstance
    block_size = 2
    stopping_k = 1
    current_k = lseries
    rep_value = :mean
    series_dc = sort(series, dims=1, rev=true)
    ord_dc = reduce(hcat,sortperm.(collect(eachslice(series,dims=2)),rev=true))
    
    # Forming ClustInstance
    k_cent = copy(series)
    weights = ones(lseries) |> Vector{Int64}
    series_clust = collect(1:lseries) |> Vector{Int64}
    nclusters = lseries
    search_range = 1:size(series,1)-block_size+1
    dm = :ward

    # Declaring instances
    _SeriesInstance = load_series_instance(series,block_size,current_k,stopping_k,dm,rep_value,lseries,nseries,series_dc,ord_dc);
    _ClustInstance = load_clust_instance(k_cent,series_clust,weights,search_range,dc_mode)

    # Dictionary to keep the min distances and respective markers/min_dist found in each iteration
    _DistUpdate = Dict{Vector{Bool}, DistUpdate}()

    # Dictionaries to store series_clust and k_cent
    _SeriesUpdate = Dict{String,SeriesInstance}()
    _ClustUpdate = Dict{String,ClustInstance}();

    if parallel
        test_parallel(series,block_size,
            stopping_k,
            current_k,
            dm,
            rep_value,
            lseries,
            nseries,
            series_dc,
            ord_dc,
            k_cent,
            weights,
            series_clust,
            nclusters,
            search_range,
            dc_mode,
            _SeriesInstance,
            _ClustInstance,
            _DistUpdate,
            _SeriesUpdate,
            _ClustUpdate)
    else
        test_serial(series,block_size,
            stopping_k,
            current_k,
            dm,
            rep_value,
            lseries,
            nseries,
            series_dc,
            ord_dc,
            k_cent,
            weights,
            series_clust,
            nclusters,
            search_range,
            dc_mode,
            _SeriesInstance,
            _ClustInstance,
            _DistUpdate,
            _SeriesUpdate,
            _ClustUpdate)
    end
end

execute_inst (generic function with 1 method)

## Test 1: 100 lines random series

In [117]:
execute_inst(100,4,true)

  0.621945 seconds (876.02 k allocations: 177.997 MiB, 5.78% gc time, 70.29% compilation time)


In [118]:
execute_inst(100,4,false)

  0.040128 seconds (214.85 k allocations: 22.252 MiB)


## Test 2: 200 lines random series

In [119]:
execute_inst(200,4,true)

  1.485150 seconds (1.32 M allocations: 1.085 GiB, 5.22% gc time)


In [120]:
execute_inst(200,4,false)

  0.196800 seconds (839.81 k allocations: 111.477 MiB, 7.11% gc time)


## Test 3: 500 lines random series

In [121]:
# execute_inst(500,4,true)

In [122]:
# execute_inst(500,4,false)

## Test 4: parallelisation

In [123]:
execute_inst(300,4,true,parallel=false)

  5.588317 seconds (3.00 M allocations: 3.734 GiB, 8.40% gc time)


In [124]:
execute_inst(300,4,true,parallel=true)

LoadError: TaskFailedException

[91m    nested task error: [39mArgumentError: reducing over an empty collection is not allowed
    Stacktrace:
      [1] [0m[1m_empty_reduce_error[22m[0m[1m([22m[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreduce.jl:301[24m[39m
      [2] [0m[1mreduce_empty[22m[0m[1m([22m[90mop[39m::[0mFunction, [90m#unused#[39m::[0mType[90m{Int64}[39m[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreduce.jl:311[24m[39m
      [3] [0m[1mmapreduce_empty[22m[0m[1m([22m[90m#unused#[39m::[0mtypeof(identity), [90mop[39m::[0mFunction, [90mT[39m::[0mType[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreduce.jl:345[24m[39m
      [4] [0m[1mreduce_empty[22m[0m[1m([22m[90mop[39m::[0mBase.MappingRF[90m{typeof(identity), typeof(min)}[39m, [90m#unused#[39m::[0mType[90m{Int64}[39m[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreduce.jl:331[24m[39m
      [5] [0m[1mreduce_empty_iter[22m
    [90m    @ [39m[90m./[39m[90m[4mreduce.jl:357[24m[39m[90m [inlined][39m
      [6] [0m[1mmapreduce_empty_iter[22m[0m[1m([22m[90mf[39m::[0mFunction, [90mop[39m::[0mFunction, [90mitr[39m::[0mMatrix[90m{Int64}[39m, [90mItrEltype[39m::[0mBase.HasEltype[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreduce.jl:353[24m[39m
      [7] [0m[1m_mapreduce[22m[0m[1m([22m[90mf[39m::[0mtypeof(identity), [90mop[39m::[0mtypeof(min), [90m#unused#[39m::[0mIndexLinear, [90mA[39m::[0mMatrix[90m{Int64}[39m[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreduce.jl:402[24m[39m
      [8] [0m[1m_mapreduce_dim[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:330[24m[39m[90m [inlined][39m
      [9] [0m[1m#mapreduce#725[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:322[24m[39m[90m [inlined][39m
     [10] [0m[1mmapreduce[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:322[24m[39m[90m [inlined][39m
     [11] [0m[1m#_minimum#747[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:894[24m[39m[90m [inlined][39m
     [12] [0m[1m_minimum[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:894[24m[39m[90m [inlined][39m
     [13] [0m[1m#_minimum#746[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:893[24m[39m[90m [inlined][39m
     [14] [0m[1m_minimum[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:893[24m[39m[90m [inlined][39m
     [15] [0m[1m#minimum#744[22m
    [90m    @ [39m[90m./[39m[90m[4mreducedim.jl:889[24m[39m[90m [inlined][39m
     [16] [0m[1mminimum[22m[0m[1m([22m[90ma[39m::[0mMatrix[90m{Int64}[39m[0m[1m)[22m
    [90m    @ [39m[90mBase[39m [90m./[39m[90m[4mreducedim.jl:889[24m[39m
     [17] [0m[1mmacro expansion[22m
    [90m    @ [39m[90m./[39m[90m[4mIn[99]:46[24m[39m[90m [inlined][39m
     [18] [0m[1m(::var"#564#threadsfor_fun#217"{Matrix{Float64}, Int64, Symbol, Symbol, Matrix{Int64}, Bool, UnitRange{Int64}, Vector{Float64}, UnitRange{Int64}})[22m[0m[1m([22m[90monethread[39m::[0mBool[0m[1m)[22m
    [90m    @ [39m[35mMain[39m [90m./[39m[90m[4mthreadingconstructs.jl:85[24m[39m
     [19] [0m[1m(::var"#564#threadsfor_fun#217"{Matrix{Float64}, Int64, Symbol, Symbol, Matrix{Int64}, Bool, UnitRange{Int64}, Vector{Float64}, UnitRange{Int64}})[22m[0m[1m([22m[0m[1m)[22m
    [90m    @ [39m[35mMain[39m [90m./[39m[90m[4mthreadingconstructs.jl:52[24m[39m