Skip to content

Commit

Permalink
Merge 360766d into c712e58
Browse files Browse the repository at this point in the history
  • Loading branch information
platinummonkey committed May 19, 2019
2 parents c712e58 + 360766d commit 9e98b29
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 0 deletions.
26 changes: 26 additions & 0 deletions strategy/lookup_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,32 @@ func NewLookupPartitionStrategyWithMetricRegistry(
return strategy, nil
}

// AddPartition will dynamically add a partition
// will return false if this partition is already defined, otherwise true if successfully added
func (s *LookupPartitionStrategy) AddPartition(name string, partition *LookupPartition) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.partitions[name]
if ok {
return false
}
s.partitions[name] = partition
return true
}

// RemovePartition will remove a given partition dynamically
// will return the busy count from that partition, along with true if the partition was found, otherwise false.
func (s *LookupPartitionStrategy) RemovePartition(name string) (int, bool) {
s.mu.Lock()
defer s.mu.Unlock()
partition, ok := s.partitions[name]
if !ok {
return 0, false
}
delete(s.partitions, name)
return partition.BusyCount(), true
}

// TryAcquire a token from a partition
func (s *LookupPartitionStrategy) TryAcquire(ctx context.Context) (token core.StrategyToken, ok bool) {
s.mu.Lock()
Expand Down
31 changes: 31 additions & 0 deletions strategy/lookup_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,35 @@ func TestLookupPartitionStrategy(t *testing.T) {
asrt.Equal(1, busyCount)
asrt.Equal(1, strategy.BusyCount())
})

t.Run("AddRemoveDynamically", func(t2 *testing.T) {
t2.Parallel()
asrt := assert.New(t2)
testPartitions := makeTestLookupPartitions()
strategy, err := NewLookupPartitionStrategyWithMetricRegistry(
testPartitions,
nil,
1,
core.EmptyMetricRegistryInstance,
)
asrt.NoError(err, "failed to create strategy")
asrt.NotNil(strategy)

// add a partition
testPartition := NewLookupPartitionWithMetricRegistry(
"test1",
0.7,
1,
core.EmptyMetricRegistryInstance,
)
strategy.AddPartition(testPartition.Name(), testPartition)
binLimit, err := strategy.BinLimit("test1")
asrt.NoError(err)
asrt.Equal(1, binLimit)

// remove a partition
strategy.RemovePartition("test1")
binLimit, err = strategy.BinLimit("test1")
asrt.Error(err)
})
}
37 changes: 37 additions & 0 deletions strategy/predicate_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,43 @@ func NewPredicatePartitionStrategyWithMetricRegistry(
return strategy, nil
}

// AddPartition will dynamically add a partition
// will return false if this partition is already defined, otherwise true if successfully added
func (s *PredicatePartitionStrategy) AddPartition(partition *PredicatePartition) bool {
s.mu.Lock()
defer s.mu.Unlock()
exists := false
for _, p := range s.partitions {
if p == partition {
exists = true
break
}
}
if exists {
return false
}
s.partitions = append(s.partitions, partition)
return true
}

// RemovePartitionsMatching will remove partitions dynamically
// will return the removed matching partitions, and true if there are at least 1 removed partition
func (s *PredicatePartitionStrategy) RemovePartitionsMatching(matcher context.Context) ([]*PredicatePartition, bool) {
s.mu.Lock()
defer s.mu.Unlock()
kept := make([]*PredicatePartition, 0)
removed := make([]*PredicatePartition, 0)
for _, p := range s.partitions {
if p.predicate(matcher) {
removed = append(removed, p)
} else {
kept = append(kept, p)
}
}
s.partitions = kept
return removed, len(removed) > 0
}

// TryAcquire a token from a partition
func (s *PredicatePartitionStrategy) TryAcquire(ctx context.Context) (core.StrategyToken, bool) {
s.mu.Lock()
Expand Down
34 changes: 34 additions & 0 deletions strategy/predicate_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,38 @@ func TestPredicatePartition(t *testing.T) {
asrt.Equal(1, busyCount)
asrt.Equal(1, strategy.BusyCount())
})

t.Run("AddRemoveDynamically", func(t2 *testing.T) {
t2.Parallel()
asrt := assert.New(t2)

testPartitions := makeTestPartitions()
strategy, err := NewPredicatePartitionStrategyWithMetricRegistry(
testPartitions,
1,
core.EmptyMetricRegistryInstance)
asrt.NoError(err, "failed to create strategy")
asrt.NotNil(strategy)
strategy.SetLimit(10)

// add a partition
testPartition := NewPredicatePartitionWithMetricRegistry(
"test1",
0.7,
matchers.StringPredicateMatcher("test1", false),
core.EmptyMetricRegistryInstance,
)
strategy.AddPartition(testPartition)
ctxTest := context.WithValue(context.Background(), matchers.StringPredicateContextKey, "test1")
token, ok := strategy.TryAcquire(ctxTest)
asrt.True(ok)
asrt.NotNil(token)

// remove a partition
strategy.RemovePartitionsMatching(ctxTest)
// we get the default token now
token, ok = strategy.TryAcquire(ctxTest)
asrt.False(ok)
asrt.False(token.IsAcquired())
})
}

0 comments on commit 9e98b29

Please sign in to comment.