From 958ecd500b6e42f8ac96673222f530c900b10863 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Fri, 21 Jun 2024 15:58:01 +0800 Subject: [PATCH] feat: adding grouped virtual resource allocator (#33669) See #33559 --------- Signed-off-by: Ted Xu --- pkg/util/vralloc/alloc.go | 94 +++++++++++++++-- pkg/util/vralloc/alloc_test.go | 59 +++++++++++ pkg/util/vralloc/sharedalloc.go | 147 +++++++++++++++++++++++++++ pkg/util/vralloc/sharedalloc_test.go | 100 ++++++++++++++++++ 4 files changed, 392 insertions(+), 8 deletions(-) create mode 100644 pkg/util/vralloc/sharedalloc.go create mode 100644 pkg/util/vralloc/sharedalloc_test.go diff --git a/pkg/util/vralloc/alloc.go b/pkg/util/vralloc/alloc.go index 32eb5442ccd6..ceee169af421 100644 --- a/pkg/util/vralloc/alloc.go +++ b/pkg/util/vralloc/alloc.go @@ -25,6 +25,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/hardware" ) +var zero = &Resource{0, 0, 0} + type Resource struct { Memory int64 // Memory occupation in bytes CPU int64 // CPU in cycles per second @@ -63,13 +65,22 @@ func (r Resource) Le(limit *Resource) bool { type Allocator[T comparable] interface { // Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource. // The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8). + // Allocate on identical id is not allowed, in which case it returns (false, nil). Use #Reallocate instead. Allocate(id T, r *Resource) (allocated bool, short *Resource) + // Reallocate re-allocates the resource on given id with delta resource. Delta can be negative, in which case the resource is released. + // If delta is negative and the allocated resource is less than the delta, returns (false, nil). + Reallocate(id T, delta *Resource) (allocated bool, short *Resource) // Release releases the resource - Release(id T) + Release(id T) *Resource // Used returns the used resource Used() Resource + // Wait waits for new release. Releases could be initiated by #Release or #Reallocate. + Wait() // Inspect returns the allocated resources Inspect() map[T]*Resource + + // notify notifies the waiters. + notify() } type FixedSizeAllocator[T comparable] struct { @@ -78,17 +89,23 @@ type FixedSizeAllocator[T comparable] struct { lock sync.RWMutex used Resource allocs map[T]*Resource + cond sync.Cond } func (a *FixedSizeAllocator[T]) Allocate(id T, r *Resource) (allocated bool, short *Resource) { + if r.Le(zero) { + return false, nil + } a.lock.Lock() defer a.lock.Unlock() + + _, ok := a.allocs[id] + if ok { + // Re-allocate on identical id is not allowed + return false, nil + } + if a.used.Add(r).Le(a.limit) { - _, ok := a.allocs[id] - if ok { - // Re-allocate on identical id is not allowed - return false, nil - } a.allocs[id] = r return true, nil } @@ -97,15 +114,47 @@ func (a *FixedSizeAllocator[T]) Allocate(id T, r *Resource) (allocated bool, sho return false, short } -func (a *FixedSizeAllocator[T]) Release(id T) { +func (a *FixedSizeAllocator[T]) Reallocate(id T, delta *Resource) (allocated bool, short *Resource) { + a.lock.Lock() + r, ok := a.allocs[id] + a.lock.Unlock() + + if !ok { + return a.Allocate(id, delta) + } + + a.lock.Lock() + defer a.lock.Unlock() + r.Add(delta) + if !zero.Le(r) { + r.Sub(delta) + return false, nil + } + + if a.used.Add(delta).Le(a.limit) { + if !zero.Le(delta) { + // If delta is negative, notify waiters + a.notify() + } + return true, nil + } + short = a.used.Diff(a.limit) + r.Sub(delta) + a.used.Sub(delta) + return false, short +} + +func (a *FixedSizeAllocator[T]) Release(id T) *Resource { a.lock.Lock() defer a.lock.Unlock() r, ok := a.allocs[id] if !ok { - return + return zero } delete(a.allocs, id) a.used.Sub(r) + a.notify() + return r } func (a *FixedSizeAllocator[T]) Used() Resource { @@ -120,14 +169,26 @@ func (a *FixedSizeAllocator[T]) Inspect() map[T]*Resource { return maps.Clone(a.allocs) } +func (a *FixedSizeAllocator[T]) Wait() { + a.cond.L.Lock() + a.cond.Wait() + a.cond.L.Unlock() +} + +func (a *FixedSizeAllocator[T]) notify() { + a.cond.Broadcast() +} + func NewFixedSizeAllocator[T comparable](limit *Resource) *FixedSizeAllocator[T] { return &FixedSizeAllocator[T]{ limit: limit, allocs: make(map[T]*Resource), + cond: sync.Cond{L: &sync.Mutex{}}, } } // PhysicalAwareFixedSizeAllocator allocates resources with additional consideration of physical resource usage. +// Note: wait on PhysicalAwareFixedSizeAllocator may only be notified if there is virtual resource released. type PhysicalAwareFixedSizeAllocator[T comparable] struct { FixedSizeAllocator[T] @@ -155,6 +216,23 @@ func (a *PhysicalAwareFixedSizeAllocator[T]) Allocate(id T, r *Resource) (alloca return false, expected.Diff(a.hwLimit) } +func (a *PhysicalAwareFixedSizeAllocator[T]) Reallocate(id T, delta *Resource) (allocated bool, short *Resource) { + memoryUsage := int64(hardware.GetUsedMemoryCount()) + diskUsage := int64(0) + if usageStats, err := disk.Usage(a.dir); err != nil { + diskUsage = int64(usageStats.Used) + } + + expected := &Resource{ + Memory: a.Used().Memory + delta.Memory + memoryUsage, + Disk: a.Used().Disk + delta.Disk + diskUsage, + } + if expected.Le(a.hwLimit) { + return a.FixedSizeAllocator.Reallocate(id, delta) + } + return false, expected.Diff(a.hwLimit) +} + func NewPhysicalAwareFixedSizeAllocator[T comparable](limit *Resource, hwMemoryLimit, hwDiskLimit int64, dir string) *PhysicalAwareFixedSizeAllocator[T] { return &PhysicalAwareFixedSizeAllocator[T]{ FixedSizeAllocator: FixedSizeAllocator[T]{ diff --git a/pkg/util/vralloc/alloc_test.go b/pkg/util/vralloc/alloc_test.go index 6b50d39ac442..d50df585849e 100644 --- a/pkg/util/vralloc/alloc_test.go +++ b/pkg/util/vralloc/alloc_test.go @@ -22,13 +22,21 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" ) +func inspect[T comparable](a Allocator[T]) { + m := a.Inspect() + log.Info("Allocation", zap.Any("allocations", m), zap.Any("used", a.Used())) +} + func TestFixedSizeAllocator(t *testing.T) { a := NewFixedSizeAllocator[string](&Resource{100, 100, 100}) + // Allocate allocated, _ := a.Allocate("a1", &Resource{10, 10, 10}) assert.Equal(t, true, allocated) allocated, _ = a.Allocate("a2", &Resource{90, 90, 90}) @@ -36,13 +44,36 @@ func TestFixedSizeAllocator(t *testing.T) { allocated, short := a.Allocate("a3", &Resource{10, 0, 0}) assert.Equal(t, false, allocated) assert.Equal(t, &Resource{10, 0, 0}, short) + allocated, _ = a.Allocate("a0", &Resource{-10, 0, 0}) + assert.Equal(t, false, allocated) + inspect[string](a) + + // Release a.Release("a2") allocated, _ = a.Allocate("a3", &Resource{10, 0, 0}) assert.Equal(t, true, allocated) + + // Inspect m := a.Inspect() assert.Equal(t, 2, len(m)) + + // Allocate on identical id is not allowed allocated, _ = a.Allocate("a1", &Resource{10, 0, 0}) assert.Equal(t, false, allocated) + + // Reallocate + allocated, _ = a.Reallocate("a1", &Resource{10, 0, 0}) + assert.Equal(t, true, allocated) + allocated, _ = a.Reallocate("a1", &Resource{-10, 0, 0}) + assert.Equal(t, true, allocated) + allocated, _ = a.Reallocate("a1", &Resource{-20, 0, 0}) + assert.Equal(t, false, allocated) + allocated, _ = a.Reallocate("a1", &Resource{80, 0, 0}) + assert.Equal(t, true, allocated) + allocated, _ = a.Reallocate("a1", &Resource{10, 0, 0}) + assert.Equal(t, false, allocated) + allocated, _ = a.Reallocate("a4", &Resource{0, 10, 0}) + assert.Equal(t, true, allocated) } func TestFixedSizeAllocatorRace(t *testing.T) { @@ -61,6 +92,28 @@ func TestFixedSizeAllocatorRace(t *testing.T) { assert.Equal(t, 100, len(m)) } +func TestWait(t *testing.T) { + a := NewFixedSizeAllocator[string](&Resource{100, 100, 100}) + allocated, _ := a.Allocate("a1", &Resource{100, 100, 100}) + assert.True(t, allocated) + for i := 0; i < 100; i++ { + go func(index int) { + allocated, _ := a.Reallocate("a1", &Resource{-1, -1, -1}) + assert.Equal(t, true, allocated) + }(i) + } + + allocated, _ = a.Allocate("a2", &Resource{100, 100, 100}) + i := 1 + for !allocated { + a.Wait() + allocated, _ = a.Allocate("a2", &Resource{100, 100, 100}) + i++ + } + assert.True(t, allocated) + assert.True(t, i < 100 && i > 1) +} + func TestPhysicalAwareFixedSizeAllocator(t *testing.T) { hwMemoryLimit := int64(float32(hardware.GetMemoryCount()) * 0.9) hwDiskLimit := int64(1<<63 - 1) @@ -73,4 +126,10 @@ func TestPhysicalAwareFixedSizeAllocator(t *testing.T) { allocated, short := a.Allocate("a3", &Resource{10, 0, 0}) assert.Equal(t, false, allocated) assert.Equal(t, &Resource{10, 0, 0}, short) + + // Reallocate + allocated, _ = a.Reallocate("a1", &Resource{0, -10, 0}) + assert.True(t, allocated) + allocated, _ = a.Reallocate("a1", &Resource{10, 0, 0}) + assert.False(t, allocated) } diff --git a/pkg/util/vralloc/sharedalloc.go b/pkg/util/vralloc/sharedalloc.go new file mode 100644 index 000000000000..7944e98b5025 --- /dev/null +++ b/pkg/util/vralloc/sharedalloc.go @@ -0,0 +1,147 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vralloc + +type SharedAllocator struct { + Allocator[string] + parent *GroupedAllocator + name string +} + +// GroupedAllocator is a shared allocator that can be grouped with other shared allocators. The sum of used resources of all +// children should not exceed the limit. +type GroupedAllocator struct { + SharedAllocator + name string + children map[string]Allocator[string] +} + +// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource. +// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8). +func (sa *SharedAllocator) Allocate(id string, r *Resource) (allocated bool, short *Resource) { + allocated, short = sa.Allocator.Allocate(id, r) + if !allocated { + return + } + if sa.parent != nil { + allocated, short = sa.parent.Reallocate(sa.name, r) // Ask for allocation on self name. + if !allocated { + sa.Allocator.Release(id) + } + } + + return +} + +// Reallocate re-allocates the resource on given id with delta resource. Delta can be negative, in which case the resource is released. +// If delta is negative and the allocated resource is less than the delta, returns (false, nil). +func (sa *SharedAllocator) Reallocate(id string, delta *Resource) (allocated bool, short *Resource) { + allocated, short = sa.Allocator.Reallocate(id, delta) + if !allocated { + return + } + if sa.parent != nil { + allocated, short = sa.parent.Reallocate(sa.name, delta) + if !allocated { + sa.Allocator.Reallocate(id, zero.Diff(delta)) + } + } + return +} + +// Release releases the resource +func (sa *SharedAllocator) Release(id string) *Resource { + r := sa.Allocator.Release(id) + if sa.parent != nil { + sa.parent.Reallocate(sa.name, zero.Diff(r)) + } + return r +} + +// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource. +// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8). +// Allocate on identical id is not allowed, in which case it returns (false, nil). Use #Reallocate instead. +func (ga *GroupedAllocator) Allocate(id string, r *Resource) (allocated bool, short *Resource) { + return false, nil +} + +// Release releases the resource +func (ga *GroupedAllocator) Release(id string) *Resource { + return nil +} + +func (ga *GroupedAllocator) Reallocate(id string, delta *Resource) (allocated bool, short *Resource) { + allocated, short = ga.SharedAllocator.Reallocate(id, delta) + if allocated { + // Propagate to parent. + if ga.parent != nil { + allocated, short = ga.parent.Reallocate(ga.name, delta) + if !allocated { + ga.SharedAllocator.Reallocate(id, zero.Diff(delta)) + return + } + } + // Notify siblings of id. + for name := range ga.children { + if name != id { + ga.children[name].notify() + } + } + } + + return +} + +func (ga *GroupedAllocator) GetAllocator(name string) Allocator[string] { + return ga.children[name] +} + +type GroupedAllocatorBuilder struct { + ga GroupedAllocator +} + +func NewGroupedAllocatorBuilder(name string, limit *Resource) *GroupedAllocatorBuilder { + return &GroupedAllocatorBuilder{ + ga: GroupedAllocator{ + SharedAllocator: SharedAllocator{ + Allocator: NewFixedSizeAllocator[string](limit), + name: name, + }, + name: name, + children: make(map[string]Allocator[string]), + }, + } +} + +func (b *GroupedAllocatorBuilder) AddChild(name string, limit *Resource) *GroupedAllocatorBuilder { + b.ga.children[name] = &SharedAllocator{ + Allocator: NewFixedSizeAllocator[string](limit), + parent: &b.ga, + name: name, + } + return b +} + +func (b *GroupedAllocatorBuilder) AddChildGroup(allocator *GroupedAllocator) *GroupedAllocatorBuilder { + allocator.parent = &b.ga + b.ga.children[allocator.name] = allocator + return b +} + +func (b *GroupedAllocatorBuilder) Build() *GroupedAllocator { + return &b.ga +} diff --git a/pkg/util/vralloc/sharedalloc_test.go b/pkg/util/vralloc/sharedalloc_test.go new file mode 100644 index 000000000000..0250d500287f --- /dev/null +++ b/pkg/util/vralloc/sharedalloc_test.go @@ -0,0 +1,100 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vralloc + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGroupedAllocator(t *testing.T) { + t.Run("test allocator", func(t *testing.T) { + a := NewGroupedAllocatorBuilder("a", &Resource{100, 100, 100}). + AddChild("c1", &Resource{10, 10, 10}). + AddChild("c2", &Resource{10, 10, 10}). + AddChild("c3", &Resource{90, 90, 90}). + Build() + + c1 := a.GetAllocator("c1") + c2 := a.GetAllocator("c2") + c3 := a.GetAllocator("c3") + + // Allocate + allocated, _ := c1.Allocate("x11", &Resource{10, 10, 10}) + assert.Equal(t, true, allocated) + allocated, short := c1.Allocate("x12", &Resource{90, 90, 90}) + assert.Equal(t, false, allocated) + assert.Equal(t, &Resource{90, 90, 90}, short) + allocated, _ = c2.Allocate("x21", &Resource{10, 10, 10}) + assert.Equal(t, true, allocated) + allocated, short = c3.Allocate("x31", &Resource{90, 90, 90}) + assert.Equal(t, false, allocated) + assert.Equal(t, &Resource{10, 10, 10}, short) + inspect[string](a) + + // Release + c1.Release("x11") + allocated, _ = c3.Allocate("x31", &Resource{90, 90, 90}) + assert.Equal(t, true, allocated) + + // Inspect + m := a.Inspect() + assert.Equal(t, 3, len(m)) + }) + + t.Run("test 3 level", func(t *testing.T) { + // a + // c1 c2 + // c3 c4 + // Leaf nodes: c1, c3, c4 + + root := NewGroupedAllocatorBuilder("a", &Resource{100, 100, 100}). + AddChild("c1", &Resource{100, 100, 100}). + AddChildGroup(NewGroupedAllocatorBuilder("c2", &Resource{100, 100, 100}).AddChild("c3", &Resource{100, 100, 100}).AddChild("c4", &Resource{100, 100, 100}).Build()). + Build() + + c1 := root.GetAllocator("c1") + c2 := root.GetAllocator("c2").(*GroupedAllocator) + c3 := c2.GetAllocator("c3") + // c4 := c2.GetAllocator("c4") + + // Allocate + allocated, _ := c1.Allocate("x11", &Resource{100, 100, 100}) + assert.Equal(t, true, allocated) + allocated, _ = c2.Allocate("x12", &Resource{90, 90, 90}) + assert.Equal(t, false, allocated) // allocation on grouped allocator is not allowed + allocated, _ = c3.Allocate("x21", &Resource{10, 10, 10}) + assert.Equal(t, false, allocated) // not enough resource + + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + allocated, _ = c3.Allocate("x21", &Resource{10, 10, 10}) + if !allocated { + c3.Wait() + allocated, _ = c3.Allocate("x21", &Resource{10, 10, 10}) + assert.Equal(t, true, allocated) + } + wg.Done() + }() + + c1.Release("x11") + wg.Wait() + }) +}