Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

Commit

Permalink
issue #53: implement workplace counter
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilsk committed Oct 24, 2018
1 parent 2d5f25a commit 560475c
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 27 deletions.
7 changes: 6 additions & 1 deletion pkg/service/guard/internal/const.go
@@ -1,5 +1,10 @@
package internal

import "time"

const (
maxLicenses = 5000
maxLicenses = 5000
maxWorkplaces = 25
licenseTTL = 15 * time.Minute
workplaceTTL = 24 * time.Hour
)
169 changes: 159 additions & 10 deletions pkg/service/guard/internal/counter.go
@@ -1,14 +1,22 @@
package internal

import (
"sort"
"sync"
"sync/atomic"
"time"

domain "github.com/kamilsk/guard/pkg/service/types"
)

// LicenseRequests is a blob of memory to store request counters.
var LicenseRequests = NewLicenseRequestCounter(maxLicenses)
// TODO issue#draft {

var (
// LicenseRequests is a blob of memory to store request counters.
LicenseRequests = NewLicenseRequestCounter(maxLicenses)
// LicenseWorkplaces is a blob of memory to store workplace counters.
LicenseWorkplaces = NewLicenseWorkplaceCounter(maxLicenses, maxWorkplaces)
)

// NewLicenseRequestCounter returns a blob of memory
// to store license request counters.
Expand All @@ -17,15 +25,14 @@ func NewLicenseRequestCounter(capacity int) interface {
Rollback(license domain.ID)
} {
return &lrCounter{
mu: &sync.RWMutex{},
pool: make([]*uint32, 0, capacity),
pool: make([]uint32, 0, capacity),
idx: make(map[domain.ID]int, capacity),
}
}

type lrCounter struct {
mu *sync.RWMutex
pool []*uint32
mu sync.RWMutex
pool []uint32
idx map[domain.ID]int
}

Expand All @@ -40,7 +47,7 @@ func (c *lrCounter) Increment(license domain.ID) uint32 {
i = c.init(license)
}

return atomic.AddUint32(c.pool[i], 1)
return atomic.AddUint32(&c.pool[i], 1)
}

// Rollback decrements request counter of the license by 1.
Expand All @@ -57,7 +64,7 @@ func (c *lrCounter) Rollback(license domain.ID) {
i := c.idx[license]
c.mu.RUnlock()

atomic.AddUint32(c.pool[i], ^uint32(0))
atomic.AddUint32(&c.pool[i], ^uint32(0))
}

func (c *lrCounter) init(license domain.ID) int {
Expand All @@ -73,8 +80,150 @@ func (c *lrCounter) init(license domain.ID) int {
panic("segmentation fault: increase license request counter capacity")
}

i, counter := len(c.pool), new(uint32)
i = len(c.pool)
c.idx[license] = i
c.pool = append(c.pool, 0)
return i
}

// NewLicenseWorkplaceCounter returns a blob of memory
// to store license workplace counters.
func NewLicenseWorkplaceCounter(capacity, wpCapacity int) interface {
Acquire(license, workplace domain.ID, capacity int) bool
} {
return &lwCounter{
pool: make([]slot, 0, capacity),
idx: make(map[domain.ID]int, capacity),
wpc: wpCapacity,
}
}

type lwCounter struct {
mu sync.RWMutex
pool []slot
idx map[domain.ID]int
wpc int
}

// Acquire tries to hold available workplace slot.
func (c *lwCounter) Acquire(license, workplace domain.ID, capacity int) bool {
if capacity == 0 {
return true
}

c.mu.RLock()
i, found := c.idx[license]
c.mu.RUnlock()

if !found {
i = c.init(license)
}

return c.pool[i].acquire(workplace, capacity)
}

func (c *lwCounter) init(license domain.ID) int {
c.mu.Lock()
defer c.mu.Unlock()

i, found := c.idx[license]
if found {
return i
}

if len(c.pool)+1 > cap(c.pool) {
panic("segmentation fault: increase license workplace counter capacity")
}

i = len(c.pool)
c.idx[license] = i
c.pool = append(c.pool, counter)
c.pool = append(c.pool, slot{
pool: make([]record, 0, c.wpc),
idx: make(map[domain.ID]int, c.wpc),
wpc: c.wpc,
})
return i
}

type slot struct {
mu sync.Mutex
pool []record
idx map[domain.ID]int
wpc int // to check assertion fast without lock to prevent data race
}

func (s *slot) acquire(workplace domain.ID, capacity int) bool {
// assert(capacity <= cap(s.pool))
if capacity > s.wpc {
panic("segmentation fault: increase license workplace slot capacity")
}

s.mu.Lock()
defer s.mu.Unlock()

if len(s.pool) > capacity {
s.shrink(capacity)
}

i, found := s.idx[workplace]
if found {
s.pool[i].touch()
return true
}

if i = len(s.pool); i < capacity {
s.idx[workplace] = i
s.pool = append(s.pool, record{id: workplace, lastActive: time.Now()})
return true
}

// try to displace
now := time.Now()
for i = range s.pool {
if now.Sub(s.pool[i].lastActive) > workplaceTTL {
delete(s.idx, s.pool[i].id)
s.idx[workplace] = i
s.pool[i] = record{id: workplace, lastActive: now}
return true
}
}
return false
}

func (s *slot) shrink(size int) {
s.idx = make(map[domain.ID]int, size)
sort.Sort(sort.Reverse(recordsByActivity(s.pool)))
s.pool = s.pool[:size]
for i := range s.pool {
s.idx[s.pool[i].id] = i
}
}

type record struct {
id domain.ID
lastActive time.Time
}

func (r *record) touch() {
r.lastActive = time.Now()
}

type recordsByActivity []record

// Len is the number of elements in the collection.
func (rr recordsByActivity) Len() int {
return len(rr)
}

// Less reports whether the element with
// index i should sort before the element with index j.
func (rr recordsByActivity) Less(i, j int) bool {
return rr[i].lastActive.Before(rr[j].lastActive)
}

// Swap swaps the elements with indexes i and j.
func (rr recordsByActivity) Swap(i, j int) {
rr[i], rr[j] = rr[j], rr[i]
}

// issue#draft }
57 changes: 56 additions & 1 deletion pkg/service/guard/internal/counter_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -29,12 +30,14 @@ func TestLicenseRequestCounter(t *testing.T) {
wg.Add(requests)
for i := 0; i < requests; i++ {
go func() {
defer wg.Done()

r := rand.New(rand.NewSource(time.Now().Unix()))
<-starter

license := licenses[r.Intn(len(licenses))]
counter.Increment(license)
counter.Rollback(license)
wg.Done()
}()
}
close(starter)
Expand Down Expand Up @@ -63,3 +66,55 @@ func BenchmarkLicenseRequestCounter(b *testing.B) {
counter.Increment(licenses[r.Intn(len(licenses))])
}
}

func TestLicenseWorkplaceCounter(t *testing.T) {
var wg sync.WaitGroup

threads := runtime.GOMAXPROCS(0)
starter := make(chan struct{})
counter := NewLicenseWorkplaceCounter(threads, threads)

ids := make([]domain.ID, threads)
for i := range ids {
ids[i] = domain.ID(fmt.Sprintf("10000000-2000-4000-8000-1600000000%02d", i+1))
}

passed, requests := uint32(0), 3*threads
wg.Add(requests)
for i := 0; i < requests; i++ {
go func() {
defer wg.Done()

r := rand.New(rand.NewSource(time.Now().Unix()))
<-starter

license, workplace := ids[r.Intn(len(ids))], ids[r.Intn(len(ids))]
if counter.Acquire(license, workplace, threads) {
atomic.AddUint32(&passed, 1)
}
}()
}
close(starter)
wg.Wait()

if passed == 0 {
t.Fail()
}
}

func BenchmarkLicenseWorkplaceCounter(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().Unix()))

ids := make([]domain.ID, runtime.GOMAXPROCS(0))
for i := range ids {
ids[i] = domain.ID(fmt.Sprintf("10000000-2000-4000-8000-1600000000%02d", i+1))
}
counter := NewLicenseWorkplaceCounter(len(ids), len(ids))

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
counter.Acquire(ids[r.Intn(len(ids))], ids[r.Intn(len(ids))], len(ids))
}
}
32 changes: 17 additions & 15 deletions pkg/service/guard/license.go
Expand Up @@ -46,21 +46,24 @@ func (service *licenseService) Check(ctx context.Context, req request.CheckLicen
license.ID, license.Contract = entity.ID, entity.Contract
}

checkers := []func(*domain.License) error{
service.checkLifetimeLimits,
service.checkRateLimits,
service.checkRequestLimits,
service.checkWorkplaceLimits,
// TODO issue#composite
if err := service.checkLifetimeLimits(&license, req.Workplace); err != nil {
return resp.With(err)
}
results := make([]error, len(checkers))
for i, check := range checkers {
results[i] = check(&license)
if err := service.checkRateLimits(&license, req.Workplace); err != nil {
return resp.With(err)
}
if err := service.checkRequestLimits(&license, req.Workplace); err != nil {
return resp.With(err)
}
if err := service.checkWorkplaceLimits(&license, req.Workplace); err != nil {
return resp.With(err)
}

return
}

func (service *licenseService) checkLifetimeLimits(license *domain.License) error {
func (service *licenseService) checkLifetimeLimits(license *domain.License, _ domain.ID) error {
now := time.Now()
if license.Since != nil {
if license.Since.After(now) {
Expand All @@ -75,7 +78,7 @@ func (service *licenseService) checkLifetimeLimits(license *domain.License) erro
return nil
}

func (service *licenseService) checkRateLimits(license *domain.License) error {
func (service *licenseService) checkRateLimits(license *domain.License, _ domain.ID) error {
if license.Rate.IsValid() {
// TODO issue#future
// errors.New(http.StatusText(http.StatusTooManyRequests))
Expand All @@ -84,7 +87,7 @@ func (service *licenseService) checkRateLimits(license *domain.License) error {
return nil
}

func (service *licenseService) checkRequestLimits(license *domain.License) error {
func (service *licenseService) checkRequestLimits(license *domain.License, _ domain.ID) error {
counter := internal.LicenseRequests
if license.Requests > 0 && license.Requests < counter.Increment(license.ID) {
go counter.Rollback(license.ID)
Expand All @@ -93,10 +96,9 @@ func (service *licenseService) checkRequestLimits(license *domain.License) error
return nil
}

func (service *licenseService) checkWorkplaceLimits(license *domain.License) error {
if license.Workplaces > 0 {
// errors.New(http.StatusText(http.StatusPaymentRequired))
return nil
func (service *licenseService) checkWorkplaceLimits(license *domain.License, workplace domain.ID) error {
if !internal.LicenseWorkplaces.Acquire(license.ID, workplace, int(license.Workplaces)) {
return errors.New(http.StatusText(http.StatusPaymentRequired))
}
return nil
}

0 comments on commit 560475c

Please sign in to comment.