Skip to content

Commit

Permalink
use streaming API for listing identities (#129)
Browse files Browse the repository at this point in the history
This commit modifies the `/v1/identity/list` API
to respond a nd-json stream of identity<->policy
mappings instead of one (eventually large) json
object.

The `/v1/identity/list` API is now similar to
the `/v1/key/list` API.

With the streaming API, a client can iterate over
some/all identities while using just a constant
amount of memory.

This is a major breaking API change.

Signed-off-by: Andreas Auernhammer <aead@mail.de>
  • Loading branch information
Andreas Auernhammer committed Jun 4, 2021
1 parent 9ebb289 commit 984966c
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 51 deletions.
52 changes: 27 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (d *DEK) UnmarshalBinary(data []byte) error {
// }
// if err := iterator.Err(); err != nil {
// }
// if err := iterator.Close(); err != nil {
// }
//
// Once done with iterating over the list of KeyDescription
// objects, an iterator should be closed using the Close
Expand All @@ -218,15 +220,16 @@ func (d *DEK) UnmarshalBinary(data []byte) error {
// In general, a KeyIterator does not provide any guarantees
// about ordering or the when its underlying source is modified
// concurrently.
// Particularly, if a key is created or deleted at the KES a
// KeyIterator may or may not be affected by this change.
// Particularly, if a key is created or deleted at the KES server
// the KeyIterator may or may not be affected by this change.
type KeyIterator struct {
response *http.Response
decoder *json.Decoder

last KeyDescription
err error
closed bool
last KeyDescription
nextErr error // error encountered in Next()
closeErr error // error encountered in Close()
closed bool
}

// KeyDescription describes a cryptographic key at a KES server.
Expand All @@ -242,14 +245,14 @@ type KeyDescription struct {
// or if the KeyIterator encountered an error. The error,
// if any, can be retrieved via the Err method.
func (i *KeyIterator) Next() bool {
if i.closed || i.err != nil {
if i.closed || i.nextErr != nil {
return false
}
if err := i.decoder.Decode(&i.last); err != nil {
if err == io.EOF {
i.err = i.Close()
i.nextErr = i.Close()
} else {
i.err = err
i.nextErr = err
}
return false
}
Expand All @@ -266,19 +269,21 @@ func (i *KeyIterator) Value() KeyDescription { return i.last }

// Err returns the first error encountered by the KeyIterator,
// if any.
func (i *KeyIterator) Err() error { return i.err }
func (i *KeyIterator) Err() error { return i.nextErr }

// Close closes the underlying connection to the KES server
// and returns any encountered error, if any.
// and returns any encountered error.
func (i *KeyIterator) Close() error {
i.closed = true
if err := i.response.Body.Close(); err != nil {
return err
}
if err := parseErrorTrailer(i.response.Trailer); err != nil {
return err
if !i.closed {
i.closed = true
if err := i.response.Body.Close(); err != nil {
i.closeErr = err
}
if err := parseErrorTrailer(i.response.Trailer); err != nil && i.closeErr == nil {
i.closeErr = err
}
}
return nil
return i.closeErr
}

// Version tries to fetch the version information from the
Expand Down Expand Up @@ -644,7 +649,7 @@ func (c *Client) AssignIdentity(ctx context.Context, policy string, id Identity)
return nil
}

func (c *Client) ListIdentities(ctx context.Context, pattern string) (map[Identity]string, error) {
func (c *Client) ListIdentities(ctx context.Context, pattern string) (*IdentityIterator, error) {
client := retry(c.HTTPClient)
resp, err := client.Send(ctx, http.MethodGet, c.Endpoints, path.Join("/v1/identity/list", url.PathEscape(pattern)), nil)
if err != nil {
Expand All @@ -653,13 +658,10 @@ func (c *Client) ListIdentities(ctx context.Context, pattern string) (map[Identi
if resp.StatusCode != http.StatusOK {
return nil, parseErrorResponse(resp)
}

const limit = 64 * 1024 * 1024 // There might be many identities
response := map[Identity]string{}
if err = json.NewDecoder(io.LimitReader(resp.Body, limit)).Decode(&response); err != nil {
return nil, err
}
return response, nil
return &IdentityIterator{
response: resp,
decoder: json.NewDecoder(resp.Body),
}, nil
}

func (c *Client) ForgetIdentity(ctx context.Context, id Identity) error {
Expand Down
46 changes: 28 additions & 18 deletions cmd/kes/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package main

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
stdlog "log"
"os"
"sort"
"strings"

"github.com/minio/kes"
)
Expand Down Expand Up @@ -132,35 +134,43 @@ func listIdentity(args []string) {
pattern = cli.Arg(0)
}

identityRoles, err := newClient(insecureSkipVerify).ListIdentities(cancelOnSignal(os.Interrupt, os.Kill), pattern)
identities, err := newClient(insecureSkipVerify).ListIdentities(cancelOnSignal(os.Interrupt, os.Kill), pattern)
if err != nil {
if errors.Is(err, context.Canceled) {
os.Exit(1) // When the operation is canceled, don't print an error message
}
stdlog.Fatalf("Error: failed to list identities matching %q: %v", pattern, err)
}
identities := make([]string, 0, len(identityRoles))
for id := range identityRoles {
identities = append(identities, id.String())
}
sort.Strings(identities)

if isTerm(os.Stdout) {
fmt.Println("{")
for _, id := range identities {
fmt.Printf(" %s => %s\n", id, identityRoles[kes.Identity(id)])
sortedIdentities := make([]kes.IdentityDescription, 0, 100)
for identities.Next() {
sortedIdentities = append(sortedIdentities, identities.Value())
}
if err = identities.Err(); err != nil {
stdlog.Fatalf("Error: failed to list identities matching %q: %v", pattern, err)
}
if err = identities.Close(); err != nil {
stdlog.Fatalf("Error: failed to list identities matching %q: %v", pattern, err)
}
sort.Slice(sortedIdentities, func(i, j int) bool {
return strings.Compare(sortedIdentities[i].Identity.String(), sortedIdentities[j].Identity.String()) < 0
})

for _, id := range sortedIdentities {
fmt.Printf("%s => %s\n", id.Identity, id.Policy)
}
fmt.Println("}")
} else {
fmt.Print("{")
for i, id := range identities {
if i < len(identities)-1 {
fmt.Printf(`"%s":"%s",`, id, identityRoles[kes.Identity(id)])
} else {
fmt.Printf(`"%s":"%s"`, id, identityRoles[kes.Identity(id)])
}
encoder := json.NewEncoder(os.Stdout)
for identities.Next() {
encoder.Encode(identities.Value())
}
if err = identities.Err(); err != nil {
stdlog.Fatalf("Error: failed to list identities matching %q: %v", pattern, err)
}
if err = identities.Close(); err != nil {
stdlog.Fatalf("Error: failed to list identities matching %q: %v", pattern, err)
}
fmt.Print("}")
}
}

Expand Down
4 changes: 4 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kes

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -119,6 +120,9 @@ func parseErrorResponse(resp *http.Response) error {
}

func parseErrorTrailer(trailer http.Header) error {
if _, ok := trailer["Status"]; !ok {
return errors.New("kes: unexpected EOF: no HTTP status trailer")
}
status, err := strconv.Atoi(trailer.Get("Status"))
if err != nil {
return fmt.Errorf("kes: invalid HTTP trailer - Status: %q", trailer.Get("Status"))
Expand Down
88 changes: 88 additions & 0 deletions identity.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package kes

import (
"encoding/json"
"io"
"net/http"
)

// IdentityUnknown is the identity returned
// by an IdentityFunc if it cannot map a
// particular X.509 certificate to an actual
Expand All @@ -19,3 +25,85 @@ func (id Identity) IsUnknown() bool { return id == IdentityUnknown }
// String returns the string representation of
// the identity.
func (id Identity) String() string { return string(id) }

// IdentityIterator iterates over list of IdentityDescription objects.
// for iterator.Next() {
// _ = iterator.Value() // Use the IdentityDescription
// }
// if err := iterator.Err(); err != nil {
// }
// if err := iterator.Close(); err != nil {
// }
//
// Once done with iterating over the list of IdentityDescription
// objects, an iterator should be closed using the Close
// method.
//
// In general, an IdentityIterator does not provide any guarantees
// about ordering or the when its underlying source is modified
// concurrently.
// Particularly, if an identity is created or deleted at the KES server
// the IdentityIterator may or may not be affected by this change.
type IdentityIterator struct {
response *http.Response
decoder *json.Decoder

last IdentityDescription
nextErr error // error encountered in Next()
closeErr error // error encountered in Close()
closed bool
}

// IdentityDescription describes an identity at a KES server.
type IdentityDescription struct {
Identity Identity `json:"identity"`
Policy string `json:"policy"`
}

// Next returns true if there is another IdentityDescription.
// This IdentityDescription can be retrieved via the Value method.
//
// It returns false once there are no more IdentityDescriptions
// or if the IdentityIterator encountered an error. The error,
// if any, can beretrieved via the Err method.
func (i *IdentityIterator) Next() bool {
if i.closed || i.nextErr != nil {
return false
}
if err := i.decoder.Decode(&i.last); err != nil {
if err == io.EOF {
i.nextErr = i.Close()
} else {
i.nextErr = err
}
return false
}
return true
}

// Value returns the current IdentityDescription. It returns
// the same IdentityDescription until Next is called again.
//
// If the IdentityIterator has been closed or if Next has not
// been called once resp. Next returns false then the behavior
// of Value is undefined.
func (i *IdentityIterator) Value() IdentityDescription { return i.last }

// Err returns the first error encountered by the IdentityIterator,
// if any.
func (i *IdentityIterator) Err() error { return i.nextErr }

// Close closes the underlying connection to the KES server and
// returns any encountered error.
func (i *IdentityIterator) Close() error {
if !i.closed {
i.closed = true
if err := i.response.Body.Close(); err != nil {
i.closeErr = err
}
if err := parseErrorTrailer(i.response.Trailer); err != nil && i.closeErr == nil {
i.closeErr = err
}
}
return i.closeErr
}
54 changes: 46 additions & 8 deletions internal/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func HandleListKeys(store *secret.Store) http.HandlerFunc {
Name string
}
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Trailer", "Status, Error")
w.Header().Set("Trailer", "Status,Error")

iterator, err := store.List(r.Context())
if err != nil {
Expand Down Expand Up @@ -433,9 +433,12 @@ func HandleListKeys(store *secret.Store) http.HandlerFunc {
}
return
}

if !hasWritten {
w.WriteHeader(http.StatusOK)
}
w.Header().Set("Status", strconv.Itoa(http.StatusOK))
w.Header().Set("Error", "")
w.WriteHeader(http.StatusOK) // Ensure to write HTTP header/trailer even when no keys are listed
}
}

Expand Down Expand Up @@ -540,14 +543,49 @@ func HandleAssignIdentity(roles *auth.Roles) http.HandlerFunc {

func HandleListIdentities(roles *auth.Roles) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
pattern := pathBase(r.URL.Path)
identities := map[kes.Identity]string{}
for id, policy := range roles.Identities() {
if ok, err := path.Match(pattern, id.String()); ok && err == nil {
identities[id] = policy
type Response struct {
Identity kes.Identity `json:"identity"`
Policy string `json:"policy"`
}
w.Header().Set("Trailer", "Status,Error")

var (
pattern = pathBase(r.URL.Path)
encoder = json.NewEncoder(w)
hasWritten bool
)
w.Header().Set("Content-Type", "application/x-ndjson")
for identity, policy := range roles.Identities() {
if ok, err := path.Match(pattern, identity.String()); ok && err == nil {
hasWritten = true
err = encoder.Encode(Response{
Identity: identity,
Policy: policy,
})

// Once we encounter ErrHandlerTimeout the client connection
// has time'd out and we can stop sending responses.
if err == http.ErrHandlerTimeout {
break
}

// If there is another error we be conservative and try to
// inform the client that something went wrong. However,
// if we fail to write to the underlying connection there is
// not really something we can do except stop iterating and
// not waste server resources.
if err != nil {
ErrorTrailer(w, err)
return
}
}
}
json.NewEncoder(w).Encode(identities)

if !hasWritten {
w.WriteHeader(http.StatusOK)
}
w.Header().Set("Status", strconv.Itoa(http.StatusOK))
w.Header().Set("Error", "")
}
}

Expand Down

0 comments on commit 984966c

Please sign in to comment.