Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Sparse bitsets instead of uint32 bitsets #15

Merged
merged 2 commits into from
Nov 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions weave/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import strutils
# Static configuration & compile-time options
# ----------------------------------------------------------------------------------

# const WV_MaxWorkers* {.intDefine.} = 256
# ## Influences the size of the global context
# # https://github.com/nim-lang/Nim/blob/v1.0.2/lib/pure/concurrency/threadpool.nim#L319-L322
const WV_MaxWorkers* {.intDefine.} = 255
## Influences the size of the size of the sets of victims
# https://github.com/nim-lang/Nim/blob/v1.0.2/lib/pure/concurrency/threadpool.nim#L319-L322

# WV_Asserts: turn on specific assertions independently from
# --assertions:off or -d:danger
Expand Down
166 changes: 166 additions & 0 deletions weave/datatypes/sparsesets.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
../config,
../instrumentation/contracts,
../memory/allocs,
../primitives/c

template selectUint(): typedesc =
# we keep high(uint) i;e. 0xFFFFFFFF ...
# as a special value to signify absence in the set
# So we have one less value to work with
when WV_MaxWorkers <= int high(uint8):
uint8
elif WV_MaxWorkers <= int high(uint16):
uint16
else:
uint32

type Setuint = selectUint()

const Empty = high(Setuint)

type
SparseSet* = object
## Stores efficiently a set of integers in the range [0 .. Capacity)
## Supports:
## - O(1) inclusion, exclusion and contains
## - O(1) random pick
## - O(1) length
## - O(length) iteration
##
## Space: Capacity * sizeof(words)
##
## This is contrary to bitsets which requires:
## - random picking: multiple random "contains" + a fallback to uncompressing the set
## - O(Capacity/sizeof(words)) length (via popcounts)
## - O(capacity) iteration
indices: ptr UncheckedArray[Setuint]
values: ptr UncheckedArray[Setuint]
rawBuffer: ptr UncheckedArray[Setuint]
len*: Setuint
capacity*: Setuint

func allocate*(s: var SparseSet, capacity: SomeInteger) =
preCondition: capacity <= WV_MaxWorkers
preCondition: s.indices.isNil
preCondition: s.values.isNil
preCondition: s.rawBuffer.isNil

s.capacity = Setuint capacity
s.rawBuffer = wv_alloc(Setuint, 2*capacity)
s.indices = s.rawBuffer
s.values = cast[ptr UncheckedArray[Setuint]](s.rawBuffer[capacity].addr)

func delete*(s: var SparseSet) =
s.indices = nil
s.values = nil
wv_free(s.rawBuffer)


func refill*(s: var SparseSet) {.inline.} =
## Reset the sparseset by including all integers
## in the range [0 .. Capacity)
preCondition: not s.indices.isNil
preCondition: not s.values.isNil
preCondition: not s.rawBuffer.isNil
preCondition: s.capacity != 0

s.len = s.capacity

for i in Setuint(0) ..< s.len:
s.indices[i] = i
s.values[i] = i

func isEmpty*(s: SparseSet): bool {.inline.} =
s.len == 0

func contains*(s: SparseSet, n: SomeInteger): bool {.inline.} =
assert n.int != Empty.int
s.indices[n] != Empty

func incl*(s: var SparseSet, n: SomeInteger) {.inline.} =
preCondition: n < Empty
preCondition: n notin s
preCondition: s.len < s.capacity

s.indices[n] = s.len
s.values[s.len] = n
s.len += 1

func peek*(s: SparseSet): int32 {.inline.} =
## Returns the last point in the set
## Note: if an item is deleted this is not the last inserted point
preCondition: s.len.int > 0
int32 s.values[s.len - 1]

func excl*(s: var SparseSet, n: SomeInteger) {.inline.} =
preCondition: n in s

# We do constant time deletion by replacing the deleted
# integer by the last value in the array of values

let delIdx = s.indices[n]

s.len -= 1
let lastVal = s.values[s.len]

s.indices[lastVal] = del_idx # Last value now points to deleted index
s.values[delIdx] = s.values[lastVal] # Deleted item is now last value

# Erase the item
s.indices[n] = Empty

func randomPick*(s: SparseSet, rngState: var uint32): int32 =
## Randomly pick from the set.
# The value is NOT removed from it.
# TODO: this would require rejection sampling for proper uniform distribution
# TODO: use a rng with better speed / distribution
let pickIdx = rand_r(rngState) mod s.len.int32
result = s.values[pickIdx].int32

func `$`*(s: SparseSet): string =
$toOpenArray(s.values, 0, s.len.int - 1)

# Sanity checks
# ------------------------------------------------------------------------------

when isMainModule:

const Size = 10
const Picked = 5

var S: SparseSet
S.allocate(Size)
S.refill()
echo S

var rng = 123'u32
var picked: seq[int32]

for _ in 0 ..< Picked:
let p = S.randomPick(rng)
picked.add p
S.excl p
echo "---"
echo "picked: ", p
echo "S indices: ", toOpenArray(S.indices, 0, S.capacity.int - 1)

echo "---"
echo "picked: ", picked
echo "S: ", S
echo "S indices: ", toOpenArray(S.indices, 0, S.capacity.int - 1)

for x in 0'i32 ..< Size:
if x notin picked:
echo x, " notin picked -> in S"
doAssert x in S
else:
echo x, " in picked -> notin S"
doAssert x notin S
4 changes: 2 additions & 2 deletions weave/datatypes/sync_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.

import
./victims_bitsets,
./sparsesets,
../config,
../channels/channels_spsc_single_ptr,
../instrumentation/contracts,
Expand Down Expand Up @@ -75,7 +75,7 @@ type
thiefAddr*: ptr ChannelSpscSinglePtr[Task] # Channel for sending tasks back to the thief
thiefID*: WorkerID
retry*: int32 # 0 <= retry <= num_workers
victims*: VictimsBitset # bitfield of potential victims
victims*: SparseSet # set of potential victims
state*: WorkerState # State of the thief
when StealStrategy == StealKind.adaptative:
stealHalf*: bool # Thief wants half the tasks
Expand Down
64 changes: 0 additions & 64 deletions weave/datatypes/victims_bitsets.nim

This file was deleted.

4 changes: 2 additions & 2 deletions weave/instrumentation/contracts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ proc inspectInfix(node: NimNode): NimNode =
bindSym"&",
newCall(
bindSym"&",
newCall(bindSym"$", inspect(node[1])),
newCall(ident"$", inspect(node[1])),
newLit(" " & $node[0] & " ")
),
newCall(bindSym"$", inspect(node[2]))
newCall(ident"$", inspect(node[2]))
)
of {nnkIdent, nnkSym}:
return node
Expand Down
4 changes: 2 additions & 2 deletions weave/runtime.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ proc init*(_: type Runtime) =
workforce() = getEnv"WEAVE_NUM_THREADS".parseInt.int32
if workforce() <= 0:
raise newException(ValueError, "WEAVE_NUM_THREADS must be > 0")
# elif workforce() > WV_MaxWorkers:
# echo "WEAVE_NUM_THREADS truncated to ", WV_MaxWorkers
elif workforce() > WV_MaxWorkers:
echo "WEAVE_NUM_THREADS truncated to ", WV_MaxWorkers, " (WV_MaxWorkers)"
else:
workforce() = int32 countProcessors()

Expand Down
10 changes: 7 additions & 3 deletions weave/scheduler.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import
./instrumentation/[contracts, profilers, loggers],
./primitives/barriers,
./datatypes/[sync_types, prell_deques, context_thread_local, flowvars],
./datatypes/[sync_types, prell_deques, context_thread_local, flowvars, sparsesets],
./channels/[channels_mpsc_bounded_lock, channels_spsc_single_ptr, channels_spsc_single_object],
./memory/[persistacks, intrusive_stacks],
./contexts, ./config,
Expand All @@ -29,9 +29,12 @@ proc init*(ctx: var TLContext) =
myWorker().deque = newPrellDeque(Task)
myThieves().initialize(WV_MaxConcurrentStealPerWorker * workforce())
myTodoBoxes().initialize()
localCtx.stealCache.initialize()
myWorker().initialize(maxID = workforce() - 1)

localCtx.stealCache.initialize()
for i in 0 ..< localCtx.stealCache.len:
localCtx.stealCache.access(i).victims.allocate(capacity = workforce())

ascertain: myTodoBoxes().len == WV_MaxConcurrentStealPerWorker

# Workers see their RNG with their myID()
Expand Down Expand Up @@ -144,12 +147,12 @@ proc schedulingLoop() =

proc threadLocalCleanup*() =
myWorker().deque.delete()
`=destroy`(localCtx.taskCache)
myThieves().delete()

for i in 0 ..< WV_MaxConcurrentStealPerWorker:
# No tasks left
ascertain: myTodoBoxes().access(i).isEmpty()
localCtx.stealCache.access(i).victims.delete()
myTodoBoxes().delete()

# A BoundedQueue (work-sharing requests) is on the stack
Expand All @@ -158,6 +161,7 @@ proc threadLocalCleanup*() =

# The task cache is full of tasks
`=destroy`(localCtx.taskCache)
delete(localCtx.stealCache)

proc worker_entry_fn*(id: WorkerID) =
## On the start of the threadpool workers will execute this
Expand Down
Loading