Skip to content

Commit

Permalink
Refactor proxy store
Browse files Browse the repository at this point in the history
Filtering and sorting needs to operate on unstructured data. It also
needs to operate after the parallel partitioner, higher in the store
stack. This means that the proxy Store needs to return raw, unstructured
data up to the partitioner. This change moves all conversions from
unstructured Kubernetes types to apiserver objects up from the proxy
store into the partitioner.
  • Loading branch information
cmurphy committed Dec 14, 2022
1 parent 1d4a130 commit 60d234d
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 213 deletions.
32 changes: 16 additions & 16 deletions pkg/stores/partition/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"encoding/base64"
"encoding/json"

"github.com/rancher/apiserver/pkg/types"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

// Partition represents a named grouping of kubernetes resources,
Expand All @@ -33,7 +33,7 @@ type ParallelPartitionLister struct {
}

// PartitionLister lists objects for one partition.
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error)
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error)

// Err returns the latest error encountered.
func (p *ParallelPartitionLister) Err() error {
Expand Down Expand Up @@ -72,7 +72,7 @@ func indexOrZero(partitions []Partition, name string) int {
// List returns a stream of objects up to the requested limit.
// If the continue token is not empty, it decodes it and returns the stream
// starting at the indicated marker.
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) {
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []unstructured.Unstructured, error) {
var state listState
if resume != "" {
bytes, err := base64.StdEncoding.DecodeString(resume)
Expand All @@ -88,7 +88,7 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
}
}

result := make(chan []types.APIObject)
result := make(chan []unstructured.Unstructured)
go p.feeder(ctx, state, limit, result)
return result, nil
}
Expand Down Expand Up @@ -120,7 +120,7 @@ type listState struct {
// 100000, the result is truncated and a continue token is generated that
// indicates the partition and offset for the client to start on in the next
// request.
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) {
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []unstructured.Unstructured) {
var (
sem = semaphore.NewWeighted(p.Concurrency)
capacity = limit
Expand Down Expand Up @@ -183,25 +183,25 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
}

if state.Revision == "" {
state.Revision = list.Revision
state.Revision = list.GetResourceVersion()
}

if p.revision == "" {
p.revision = list.Revision
p.revision = list.GetResourceVersion()
}

// We have already seen the first objects in the list, truncate up to the offset.
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) {
list.Objects = list.Objects[state.Offset:]
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Items) {
list.Items = list.Items[state.Offset:]
}

// Case 1: the capacity has been reached across all goroutines but the list is still only partial,
// so save the state so that the next page can be requested later.
if len(list.Objects) > capacity {
result <- list.Objects[:capacity]
if len(list.Items) > capacity {
result <- list.Items[:capacity]
// save state to redo this list at this offset
p.state = &listState{
Revision: list.Revision,
Revision: list.GetResourceVersion(),
PartitionName: partition.Name(),
Continue: cont,
Offset: capacity,
Expand All @@ -210,16 +210,16 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
capacity = 0
return nil
}
result <- list.Objects
capacity -= len(list.Objects)
result <- list.Items
capacity -= len(list.Items)
// Case 2: all objects have been returned, we are done.
if list.Continue == "" {
if list.GetContinue() == "" {
return nil
}
// Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
// We're not yet up to capacity and have not retrieved every object,
// so loop again and get more data.
state.Continue = list.Continue
state.Continue = list.GetContinue()
state.PartitionName = partition.Name()
state.Offset = 0
}
Expand Down
137 changes: 126 additions & 11 deletions pkg/stores/partition/store.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
// Package partition implements a store with parallel partitioning of data
// so that segmented data can be concurrently collected and returned as a single data set.
package partition

import (
"context"
"fmt"
"net/http"
"reflect"
"strconv"

"github.com/rancher/apiserver/pkg/types"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)

const defaultLimit = 100000
Expand All @@ -15,15 +24,25 @@ const defaultLimit = 100000
type Partitioner interface {
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error)
Store(apiOp *types.APIRequest, partition Partition) (UnstructuredStore, error)
}

// Store implements types.Store for partitions.
type Store struct {
Partitioner Partitioner
}

func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) {
// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types.
type UnstructuredStore interface {
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error)
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error)
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error)
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error)
}

func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (UnstructuredStore, error) {
p, err := s.Partitioner.Lookup(apiOp, schema, verb, id)
if err != nil {
return nil, err
Expand All @@ -39,7 +58,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return types.APIObject{}, err
}

return target.Delete(apiOp, schema, id)
obj, err := target.Delete(apiOp, schema, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
}

// ByID looks up a single object by its ID.
Expand All @@ -49,14 +72,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string
return types.APIObject{}, err
}

return target.ByID(apiOp, schema, id)
obj, err := target.ByID(apiOp, schema, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
}

func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
cont string, revision string, limit int) (types.APIObjectList, error) {
cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
store, err := s.Partitioner.Store(apiOp, partition)
if err != nil {
return types.APIObjectList{}, err
return nil, err
}

req := apiOp.Clone()
Expand Down Expand Up @@ -88,7 +115,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
}

lister := ParallelPartitionLister{
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) {
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
},
Concurrency: 3,
Expand All @@ -104,7 +131,10 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
}

for items := range list {
result.Objects = append(result.Objects, items...)
for _, item := range items {
item := item
result.Objects = append(result.Objects, toAPI(schema, &item))
}
}

result.Revision = lister.Revision()
Expand All @@ -119,7 +149,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
return types.APIObject{}, err
}

return target.Create(apiOp, schema, data)
obj, err := target.Create(apiOp, schema, data)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
}

// Update updates a single object in the store.
Expand All @@ -129,7 +163,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
return types.APIObject{}, err
}

return target.Update(apiOp, schema, data, id)
obj, err := target.Update(apiOp, schema, data, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
}

// Watch returns a channel of events for a list or resource.
Expand Down Expand Up @@ -159,7 +197,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
return err
}
for i := range c {
response <- i
response <- toAPIEvent(apiOp, schema, i)
}
return nil
})
Expand Down Expand Up @@ -189,3 +227,80 @@ func getLimit(req *http.Request) int {
}
return limit
}

func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return types.APIObject{}
}

if unstr, ok := obj.(*unstructured.Unstructured); ok {
obj = moveToUnderscore(unstr)
}

apiObject := types.APIObject{
Type: schema.ID,
Object: obj,
}

m, err := meta.Accessor(obj)
if err != nil {
return apiObject
}

id := m.GetName()
ns := m.GetNamespace()
if ns != "" {
id = fmt.Sprintf("%s/%s", ns, id)
}

apiObject.ID = id
return apiObject
}

func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured {
if obj == nil {
return nil
}

for k := range types.ReservedFields {
v, ok := obj.Object[k]
if ok {
delete(obj.Object, k)
obj.Object["_"+k] = v
}
}

return obj
}

func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Event) types.APIEvent {
name := types.ChangeAPIEvent
switch event.Type {
case watch.Deleted:
name = types.RemoveAPIEvent
case watch.Added:
name = types.CreateAPIEvent
case watch.Error:
name = "resource.error"
}

apiEvent := types.APIEvent{
Name: name,
}

if event.Type == watch.Error {
status, _ := event.Object.(*metav1.Status)
apiEvent.Error = fmt.Errorf(status.Message)
return apiEvent
}

apiEvent.Object = toAPI(schema, event.Object)

m, err := meta.Accessor(event.Object)
if err != nil {
return apiEvent
}

apiEvent.Revision = m.GetResourceVersion()
return apiEvent
}

0 comments on commit 60d234d

Please sign in to comment.