Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): add yaml stream encoding for resource manager #2817

Merged
merged 5 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 25 additions & 6 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,31 @@ jobs:
cd ./testing/cli-e2etest
make test

config-e2e:
runs-on: ubuntu-latest
outputs:
CYPRESS_CONTAINERS: ${{ steps.configure-cypress-containers.outputs.CYPRESS_CONTAINERS }}
steps:
- name: Configure Cypress containers
id: configure-cypress-containers
run: |
# env.CYPRESS_RECORD_KEY is required for parallelization, so if it's empty run a single container
if [ "${{env.CYPRESS_RECORD_KEY}}" = "" ]; then
echo "CYPRESS_CONTAINERS=[1]" >> $GITHUB_OUTPUT
else
echo "CYPRESS_CONTAINERS=[1,2,3,4,5,6,7,8]" >> $GITHUB_OUTPUT
fi

e2e:
needs: [build-docker]
needs: [build-docker, config-e2e]
name: WebUI End-to-end tests
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# run copies of the current job in parallel
containers: [1, 2, 3, 4, 5, 6, 7, 8]

containers: ${{fromJson(needs.config-e2e.outputs.CYPRESS_CONTAINERS)}}

steps:
- name: Checkout
Expand Down Expand Up @@ -381,12 +397,15 @@ jobs:
run: |
docker load --input dist/image.tar

- name: Start services
- name: Run integration tests
run: |
./run.sh down up
./run.sh logstt > /tmp/docker-log &
./scripts/wait-for-port.sh 11633

- name: Run integration tests
run: |
./run.sh cypress-ci || (cat /tmp/docker-log; exit 1)
if [ "${{env.CYPRESS_RECORD_KEY}}" = "" ]; then
# if this is not container #1, the script won't get here, so don't need for additional checks
./run.sh cypress
else
./run.sh cypress-ci
fi
10 changes: 9 additions & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export TAG=${TAG:-dev}
opts="-f docker-compose.yaml -f examples/docker-compose.demo.yaml"

help_message() {
echo "usage: ./run.sh [cypress|tracetests|up|build|down|tracetest-logs|logs|ps|restart]"
echo "usage: ./run.sh [cypress|tracetests|up|stop|build|down|tracetest-logs|logs|ps|restart]"
}

restart() {
Expand Down Expand Up @@ -43,6 +43,10 @@ up() {
docker compose $opts up -d --remove-orphans
}

stop() {
docker compose $opts stop
}

cypress-ci() {

echo "Running cypress"
Expand Down Expand Up @@ -103,6 +107,10 @@ while [[ $# -gt 0 ]]; do
CMD+=("up")
shift
;;
stop)
CMD+=("stop")
shift
;;
build)
CMD+=("build")
shift
Expand Down
109 changes: 109 additions & 0 deletions server/resourcemanager/encoding.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package resourcemanager

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"reflect"

"github.com/goccy/go-yaml"
)
Expand All @@ -19,6 +22,7 @@ type encoder interface {
var encoders = []encoder{
jsonEncoder,
yamlEncoder,
yamlStreamEncoder{},
}

var defaultEncoder = jsonEncoder
Expand All @@ -35,6 +39,111 @@ var yamlEncoder = basicEncoder{
unmarshalFn: yaml.Unmarshal,
}

type yamlStreamEncoder struct{}

func (e yamlStreamEncoder) ContentType() string {
return "text/yaml-stream"
}

func (e yamlStreamEncoder) Accepts(contentType string) bool {
return contentType == e.ContentType()
}

func (e yamlStreamEncoder) Marshal(in interface{}) (out []byte, err error) {
targetField, err := getYamlStreamField(in, "items", reflect.Slice)
if err != nil {
return nil, err
}

// iterate over each element in the slice
for i := 0; i < targetField.Len(); i++ {
// get the element
elem := targetField.Index(i)

// marshal the element
elemBytes, err := yaml.Marshal(elem.Interface())
if err != nil {
return nil, fmt.Errorf("cannot marshal yaml: %w", err)
}

// append the document separator
out = append(out, []byte("---\n")...)

// append the marshaled element
out = append(out, elemBytes...)
}

return out, nil
}

func (e yamlStreamEncoder) Unmarshal(in []byte, out interface{}) (err error) {
targetField, err := getYamlStreamField(out, "items", reflect.Slice)
if err != nil {
return err
}

decoder := yaml.NewDecoder(bytes.NewReader(in), yaml.Strict())
// iterate over each document in the yaml stream
for {
// we need to create a new instance of the slice element for each document
// 1. get the type of the slice element
elemType := targetField.Type().Elem()
// 2. create a new instance of the slice element
elem := reflect.New(elemType).Elem()

// decode the yaml into the slice element. it needs to be a pointer to an interface{}
err := decoder.Decode(elem.Addr().Interface())
if errors.Is(err, io.EOF) {
// no more documents in the stream
break
}

if err != nil {
// the current document is invalid. return the error
return fmt.Errorf("cannot unmarshal yaml: %w", err)
}

// append the slice element to the target slice
targetField.Set(reflect.Append(targetField, elem))
}

// if there's an error, ignore the count.
countField, _ := getYamlStreamField(out, "count", reflect.Int)
if countField.IsValid() {
countField.SetInt(int64(targetField.Len()))
}

return nil
}

// getYamlStreamField returns the field in `target` with the name as the value of its yamlstream tag.
// it returns an error if the field is not found or if the field is not of the specified kind.
func getYamlStreamField(target interface{}, name string, kind reflect.Kind) (reflect.Value, error) {
v := reflect.ValueOf(target)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
t := v.Type()
var yamlStreamField reflect.Value
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
if field.Tag.Get("yamlstream") == name {
yamlStreamField = v.Field(i)
break
}
}

if !yamlStreamField.IsValid() {
return reflect.Value{}, fmt.Errorf("no field defined as yamlstream %s found", name)
}

if yamlStreamField.Kind() != kind {
return reflect.Value{}, fmt.Errorf("field defined as yamlstream %s is not of kind %s", name, kind)
}

return yamlStreamField, nil
}

type basicEncoder struct {
contentType string
marshalFn func(interface{}) ([]byte, error)
Expand Down
86 changes: 86 additions & 0 deletions server/resourcemanager/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,89 @@ name: example
})

}

func TestYamlStreamEncoding(t *testing.T) {
t.Run("Success", func(t *testing.T) {
t.Parallel()

list := resourcemanager.ResourceList[sampleResource]{
Count: 2,
Items: []resourcemanager.Resource[sampleResource]{
{
Type: "sample",
Spec: sampleResource{
ID: "1",
Name: "the name",
SomeValue: "the value",
},
},
{
Type: "sample",
Spec: sampleResource{
ID: "2",
Name: "other name",
SomeValue: "other value",
},
},
},
}

yamlEncoded := `---
type: sample
spec:
id: "1"
name: the name
some_value: the value
---
type: sample
spec:
id: "2"
name: other name
some_value: other value
`

req, _ := http.NewRequest(http.MethodGet, "/", strings.NewReader(yamlEncoded))
req.Header.Set("Accept", "text/yaml-stream")

enc := resourcemanager.EncoderFromRequest(req)

// assert that yaml is correctly marshaled into ResourceList
actualRequestDecoded := resourcemanager.ResourceList[sampleResource]{}
err := enc.DecodeRequestBody(&actualRequestDecoded)
require.NoError(t, err)

assert.Equal(t, list, actualRequestDecoded)

// assert that ResourceList is correctly unmarshaled into yaml multidoc
rec := httptest.NewRecorder()
err = enc.WriteEncodedResponse(rec, 200, list)
require.NoError(t, err)
resp := rec.Result()
response, err := io.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, yamlEncoded, string(response))
})

t.Run("Unsupported entity", func(t *testing.T) {
t.Parallel()

yamlEncoded := `type: sample
spec:
id: "2"
name: other name
some_value: other value
`

req, _ := http.NewRequest(http.MethodGet, "/", strings.NewReader(yamlEncoded))
req.Header.Set("Accept", "text/yaml-stream")

enc := resourcemanager.EncoderFromRequest(req)

// assert that yaml is correctly marshaled into ResourceList
actualRequestDecoded := sampleResource{}
err := enc.DecodeRequestBody(&actualRequestDecoded)
require.Error(t, err)
})

}
7 changes: 4 additions & 3 deletions server/resourcemanager/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type ResourceSpec interface {
}

type ResourceList[T ResourceSpec] struct {
Count int `json:"count"`
Items []any `json:"items"`
Count int `json:"count" yamlstream:"count"`
Items []Resource[T] `json:"items" yamlstream:"items"`
}

type Resource[T ResourceSpec] struct {
Expand Down Expand Up @@ -418,7 +418,7 @@ func (m *manager[T]) list(w http.ResponseWriter, r *http.Request) {
// of records inside "item"
resourceList := ResourceList[T]{
Count: count,
Items: []any{},
Items: []Resource[T]{},
}

for _, item := range items {
Expand All @@ -431,6 +431,7 @@ func (m *manager[T]) list(w http.ResponseWriter, r *http.Request) {
}

err = encoder.WriteEncodedResponse(w, http.StatusOK, resourceList)

if err != nil {
writeError(w, encoder, http.StatusInternalServerError, fmt.Errorf("cannot marshal entity: %w", err))
}
Expand Down