diff --git a/api/config.yaml b/api/config.yaml index 16a1ed5615..7061effe1a 100644 --- a/api/config.yaml +++ b/api/config.yaml @@ -133,6 +133,9 @@ components: grpcEndpoint: type: string description: "gRPC endpoint for Pokeshop API" + kafkaBroker: + type: string + description: "kafka broker for Pokeshop API" DemoOpenTelemetryStore: type: object description: "Represents the settings of the Open Telemetry Store demonstration." diff --git a/cli/installer/templates/provision.yaml.tpl b/cli/installer/templates/provision.yaml.tpl index 81dcb61fc7..8d6a7a3386 100644 --- a/cli/installer/templates/provision.yaml.tpl +++ b/cli/installer/templates/provision.yaml.tpl @@ -50,7 +50,8 @@ spec: enabled: true pokeshop: httpEndpoint: {{ .pokeshopHttp }} - grpcEndpoint: {{ .pokeshopGrpc }}{{end}}{{ if eq .enableOtelDemo "true" }} + grpcEndpoint: {{ .pokeshopGrpc }} + kafkaBroker: {{ .pokeshopKafka }}{{end}}{{ if eq .enableOtelDemo "true" }} --- type: Demo spec: diff --git a/cli/installer/tracetest.go b/cli/installer/tracetest.go index f76ef3807f..93f4b0b763 100644 --- a/cli/installer/tracetest.go +++ b/cli/installer/tracetest.go @@ -18,6 +18,7 @@ func configureDemoApp(conf configuration, ui cliUI.UI) configuration { case "docker-compose": conf.set("demo.endpoint.pokeshop.http", "http://demo-api:8081") conf.set("demo.endpoint.pokeshop.grpc", "demo-rpc:8082") + conf.set("demo.endpoint.pokeshop.kafka", "stream:9092") conf.set("demo.endpoint.otel.frontend", "http://otel-frontend:8084") conf.set("demo.endpoint.otel.product_catalog", "otel-productcatalogservice:3550") conf.set("demo.endpoint.otel.cart", "otel-cartservice:7070") @@ -25,6 +26,7 @@ func configureDemoApp(conf configuration, ui cliUI.UI) configuration { case "kubernetes": conf.set("demo.endpoint.pokeshop.http", "http://demo-pokemon-api.demo") conf.set("demo.endpoint.pokeshop.grpc", "demo-pokemon-api.demo:8082") + conf.set("demo.endpoint.pokeshop.kafka", "stream.demo:9092") conf.set("demo.endpoint.otel.frontend", "http://otel-frontend.otel-demo:8084") conf.set("demo.endpoint.otel.product_catalog", "otel-productcatalogservice.otel-demo:3550") conf.set("demo.endpoint.otel.cart", "otel-cartservice.otel-demo:7070") @@ -111,6 +113,7 @@ func getTracetestProvisionFileContents(ui cliUI.UI, config configuration) []byte "enableOtelDemo": fmt.Sprintf("%t", config.Bool("demo.enable.otel")), "pokeshopHttp": config.String("demo.endpoint.pokeshop.http"), "pokeshopGrpc": config.String("demo.endpoint.pokeshop.grpc"), + "pokeshopKafka": config.String("demo.endpoint.pokeshop.kafka"), "otelFrontend": config.String("demo.endpoint.otel.frontend"), "otelProductCatalog": config.String("demo.endpoint.otel.product_catalog"), "otelCart": config.String("demo.endpoint.otel.cart"), diff --git a/cli/openapi/api_resource_api.go b/cli/openapi/api_resource_api.go index 6c8c669206..cb89fa02d6 100644 --- a/cli/openapi/api_resource_api.go +++ b/cli/openapi/api_resource_api.go @@ -471,7 +471,6 @@ func (r ApiCreateVariableSetRequest) Execute() (*VariableSetResource, *http.Resp /* CreateVariableSet Create a VariableSet -CreateVariableSet Create a VariableSet Create a VariableSet that can be used by tests and test suites @@ -1034,9 +1033,9 @@ func (r ApiDeleteVariableSetRequest) Execute() (*http.Response, error) { } /* -DeleteVariableSet Delete an variable set +DeleteVariableSet Delete a variable set -Delete an variable set from Tracetest +Delete a variable set from Tracetest @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background(). @param variableSetId ID of a VariableSet used on Tracetest to inject values into tests and TestSuites diff --git a/cli/openapi/model_demo_pokeshop.go b/cli/openapi/model_demo_pokeshop.go index 53d1901454..db6bb3fe34 100644 --- a/cli/openapi/model_demo_pokeshop.go +++ b/cli/openapi/model_demo_pokeshop.go @@ -23,6 +23,8 @@ type DemoPokeshop struct { HttpEndpoint *string `json:"httpEndpoint,omitempty"` // gRPC endpoint for Pokeshop API GrpcEndpoint *string `json:"grpcEndpoint,omitempty"` + // kafka broker for Pokeshop API + KafkaBroker *string `json:"kafkaBroker,omitempty"` } // NewDemoPokeshop instantiates a new DemoPokeshop object @@ -106,6 +108,38 @@ func (o *DemoPokeshop) SetGrpcEndpoint(v string) { o.GrpcEndpoint = &v } +// GetKafkaBroker returns the KafkaBroker field value if set, zero value otherwise. +func (o *DemoPokeshop) GetKafkaBroker() string { + if o == nil || isNil(o.KafkaBroker) { + var ret string + return ret + } + return *o.KafkaBroker +} + +// GetKafkaBrokerOk returns a tuple with the KafkaBroker field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *DemoPokeshop) GetKafkaBrokerOk() (*string, bool) { + if o == nil || isNil(o.KafkaBroker) { + return nil, false + } + return o.KafkaBroker, true +} + +// HasKafkaBroker returns a boolean if a field has been set. +func (o *DemoPokeshop) HasKafkaBroker() bool { + if o != nil && !isNil(o.KafkaBroker) { + return true + } + + return false +} + +// SetKafkaBroker gets a reference to the given string and assigns it to the KafkaBroker field. +func (o *DemoPokeshop) SetKafkaBroker(v string) { + o.KafkaBroker = &v +} + func (o DemoPokeshop) MarshalJSON() ([]byte, error) { toSerialize, err := o.ToMap() if err != nil { @@ -122,6 +156,9 @@ func (o DemoPokeshop) ToMap() (map[string]interface{}, error) { if !isNil(o.GrpcEndpoint) { toSerialize["grpcEndpoint"] = o.GrpcEndpoint } + if !isNil(o.KafkaBroker) { + toSerialize["kafkaBroker"] = o.KafkaBroker + } return toSerialize, nil } diff --git a/examples/docker-compose.demo.yaml b/examples/docker-compose.demo.yaml index f78bbec55d..211ffe7464 100644 --- a/examples/docker-compose.demo.yaml +++ b/examples/docker-compose.demo.yaml @@ -18,6 +18,34 @@ services: timeout: 5s retries: 60 + stream: + image: confluentinc/cp-kafka:latest-ubi8 + ports: + - 29092:29092 + environment: + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://stream:9092,PLAINTEXT_HOST://127.0.0.1:29092 + - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,PLAINTEXT_HOST://:29092 + - KAFKA_CONTROLLER_QUORUM_VOTERS=1@0.0.0.0:9093 + - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - KAFKA_PROCESS_ROLES=controller,broker + - KAFKA_NODE_ID=1 + - KAFKA_METADATA_LOG_SEGMENT_MS=15000 + - KAFKA_METADATA_MAX_RETENTION_MS=60000 + - KAFKA_METADATA_LOG_MAX_RECORD_BYTES_BETWEEN_SNAPSHOTS=2800 + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_HEAP_OPTS=-Xmx200m -Xms200m + - CLUSTER_ID=ckjPoprWQzOf0-FuNkGfFQ + healthcheck: + test: nc -z stream 9092 + start_period: 10s + interval: 5s + timeout: 10s + retries: 10 + demo-api: image: kubeshop/demo-pokemon-api:latest restart: unless-stopped @@ -88,3 +116,25 @@ services: condition: service_healthy queue: condition: service_healthy + + demo-streaming-worker: + image: kubeshop/demo-pokemon-api:latest + environment: + DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres?schema=public + POKE_API_BASE_URL: https://pokeapi.co/api/v2 + COLLECTOR_ENDPOINT: http://otel-collector:4317 + ZIPKIN_URL: http://localhost:9411 + NPM_RUN_COMMAND: stream-worker + KAFKA_BROKER: 'stream:9092' + KAFKA_TOPIC: 'pokemon' + KAFKA_CLIENT_ID: 'streaming-worker' + REDIS_URL: cache + depends_on: + postgres: + condition: service_healthy + stream: + condition: service_healthy + cache: + condition: service_healthy + otel-collector: + condition: service_started diff --git a/server/config/demo/demo_entities.go b/server/config/demo/demo_entities.go index ddfd7ba463..b578711d41 100644 --- a/server/config/demo/demo_entities.go +++ b/server/config/demo/demo_entities.go @@ -40,6 +40,7 @@ func (d Demo) Validate() error { type PokeshopDemo struct { HTTPEndpoint string `json:"httpEndpoint,omitempty"` GRPCEndpoint string `json:"grpcEndpoint,omitempty"` + KafkaBroker string `json:"kafkaBroker,omitempty"` } type OpenTelemetryStoreDemo struct { diff --git a/server/config/demo/demo_repository_test.go b/server/config/demo/demo_repository_test.go index ebfa5d634a..ea87cc574d 100644 --- a/server/config/demo/demo_repository_test.go +++ b/server/config/demo/demo_repository_test.go @@ -21,6 +21,7 @@ func TestPokeshopDemoResource(t *testing.T) { Pokeshop: &demo.PokeshopDemo{ HTTPEndpoint: "http://dev-endpoint:1234", GRPCEndpoint: "dev-grpc:9091", + KafkaBroker: "dev-kafka:9092", }, } @@ -32,6 +33,7 @@ func TestPokeshopDemoResource(t *testing.T) { Pokeshop: &demo.PokeshopDemo{ HTTPEndpoint: "http://stg-endpoint:1234", GRPCEndpoint: "stg-grpc:9091", + KafkaBroker: "stg-kafka:9092", }, } @@ -43,6 +45,7 @@ func TestPokeshopDemoResource(t *testing.T) { Pokeshop: &demo.PokeshopDemo{ HTTPEndpoint: "http://prod-endpoint:1234", GRPCEndpoint: "prod-grpc:9091", + KafkaBroker: "prod-kafka:9092", }, } @@ -85,7 +88,8 @@ func TestPokeshopDemoResource(t *testing.T) { "type": "pokeshop", "pokeshop": { "httpEndpoint": "http://dev-endpoint:1234", - "grpcEndpoint": "dev-grpc:9091" + "grpcEndpoint": "dev-grpc:9091", + "kafkaBroker": "dev-kafka:9092" } } }`, @@ -98,7 +102,8 @@ func TestPokeshopDemoResource(t *testing.T) { "type": "pokeshop", "pokeshop": { "httpEndpoint": "http://new-dev-endpoint:1234", - "grpcEndpoint": "new-dev-grpc:9091" + "grpcEndpoint": "new-dev-grpc:9091", + "kafkaBroker": "new-dev-kafka:9092" } } }`, diff --git a/server/go.mod b/server/go.mod index b0d6c7ae2b..fc53ca2f34 100644 --- a/server/go.mod +++ b/server/go.mod @@ -97,12 +97,12 @@ require ( github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.16.6 // indirect github.com/knadh/koanf v1.4.0 // indirect diff --git a/server/openapi/api_resource_api.go b/server/openapi/api_resource_api.go index 181f2e7641..09e5c0dc34 100644 --- a/server/openapi/api_resource_api.go +++ b/server/openapi/api_resource_api.go @@ -463,7 +463,7 @@ func (c *ResourceApiApiController) DeleteTestSuite(w http.ResponseWriter, r *htt } -// DeleteVariableSet - Delete an variable set +// DeleteVariableSet - Delete a variable set func (c *ResourceApiApiController) DeleteVariableSet(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) variableSetIdParam := params["variableSetId"] diff --git a/server/openapi/api_resource_api_service.go b/server/openapi/api_resource_api_service.go index b0764d8152..3e02d1e332 100644 --- a/server/openapi/api_resource_api_service.go +++ b/server/openapi/api_resource_api_service.go @@ -178,7 +178,7 @@ func (s *ResourceApiApiService) DeleteTestSuite(ctx context.Context, testSuiteId return Response(http.StatusNotImplemented, nil), errors.New("DeleteTestSuite method not implemented") } -// DeleteVariableSet - Delete an variable set +// DeleteVariableSet - Delete a variable set func (s *ResourceApiApiService) DeleteVariableSet(ctx context.Context, variableSetId string) (ImplResponse, error) { // TODO - update DeleteVariableSet with the required logic for this service method. // Add api_resource_api_service.go to the .openapi-generator-ignore to avoid overwriting this service implementation when updating open api generation. diff --git a/server/openapi/model_demo_pokeshop.go b/server/openapi/model_demo_pokeshop.go index 4f080bc793..40ec16d621 100644 --- a/server/openapi/model_demo_pokeshop.go +++ b/server/openapi/model_demo_pokeshop.go @@ -17,6 +17,9 @@ type DemoPokeshop struct { // gRPC endpoint for Pokeshop API GrpcEndpoint string `json:"grpcEndpoint,omitempty"` + + // kafka broker for Pokeshop API + KafkaBroker string `json:"kafkaBroker,omitempty"` } // AssertDemoPokeshopRequired checks if the required fields are not zero-ed diff --git a/web/src/components/Settings/Demo/PokeshopFields.tsx b/web/src/components/Settings/Demo/PokeshopFields.tsx index d58098f805..cdb286b8da 100644 --- a/web/src/components/Settings/Demo/PokeshopFields.tsx +++ b/web/src/components/Settings/Demo/PokeshopFields.tsx @@ -17,6 +17,12 @@ const PokeshopFields = () => { + + + + + + ); }; diff --git a/web/src/constants/Demo.constants.ts b/web/src/constants/Demo.constants.ts index bebd78897d..f322b357c3 100644 --- a/web/src/constants/Demo.constants.ts +++ b/web/src/constants/Demo.constants.ts @@ -14,7 +14,11 @@ const userId = '2491f868-88f1-4345-8836-d5d8511a9f83'; export function getPokeshopDemo(demoSettings: Demo) { const { - pokeshop: {httpEndpoint: pokeshopHttp = '', grpcEndpoint: pokeshopGrpc = ''}, + pokeshop: { + httpEndpoint: pokeshopHttp = '', + grpcEndpoint: pokeshopGrpc = '', + kafkaBroker: pokeshopKafka = '', + }, } = demoSettings; return { @@ -114,6 +118,17 @@ export function getPokeshopDemo(demoSettings: Demo) { command: `curl -XPOST -H "Content-type: application/json" --data '{"id":52}' '${pokeshopHttp}/pokemon/import'`, }, ], + [SupportedPlugins.Kafka]: [ + { + name: 'Pokeshop - Import from Stream', + brokerUrls: [ `${pokeshopKafka}` ], + topic: 'pokemon', + headers: [], + messageKey: 'snorlax-key', + messageValue: '{"id":143}', + description: 'Import a Pokemon via Stream', + } + ] }; } @@ -267,7 +282,7 @@ export function getDemoByPluginMap(demos: Demo[]) { [SupportedPlugins.Postman]: (pokeshopDemoMap && pokeshopDemoMap[SupportedPlugins.Postman]) || [], [SupportedPlugins.CURL]: (pokeshopDemoMap && pokeshopDemoMap[SupportedPlugins.CURL]) || [], [SupportedPlugins.TraceID]: [], - [SupportedPlugins.Kafka]: [], + [SupportedPlugins.Kafka]: (pokeshopDemoMap && pokeshopDemoMap[SupportedPlugins.Kafka]) || [], [SupportedPlugins.OpenAPI]: [], }; } diff --git a/web/src/models/Demo.model.ts b/web/src/models/Demo.model.ts index a1f96ae8d6..f10a1f4a97 100644 --- a/web/src/models/Demo.model.ts +++ b/web/src/models/Demo.model.ts @@ -9,7 +9,7 @@ const Demo = ({ name = '', type = 'pokeshop', enabled, - pokeshop: {httpEndpoint = '', grpcEndpoint = ''} = {}, + pokeshop: {httpEndpoint = '', grpcEndpoint = '', kafkaBroker = ''} = {}, opentelemetryStore: { frontendEndpoint = '', productCatalogEndpoint = '', @@ -26,6 +26,7 @@ const Demo = ({ pokeshop: { httpEndpoint, grpcEndpoint, + kafkaBroker, }, opentelemetryStore: { frontendEndpoint, diff --git a/web/src/types/Generated.types.ts b/web/src/types/Generated.types.ts index 1cd53fce34..5b59a25b3a 100644 --- a/web/src/types/Generated.types.ts +++ b/web/src/types/Generated.types.ts @@ -18,17 +18,17 @@ export interface paths { /** delete a TestSuite */ delete: operations["deleteTestSuite"]; }; - "/testsuites/{testsuiteId}/version/{version}": { + "/testsuites/{testSuiteId}/version/{version}": { /** get a TestSuite specific version */ get: operations["getTestSuiteVersion"]; }; - "/testsuites/{testsuiteId}/run": { + "/testsuites/{testSuiteId}/run": { /** Get all runs from a particular TestSuite */ get: operations["getTestSuiteRuns"]; /** run a particular TestSuite */ post: operations["runTestSuite"]; }; - "/testsuites/{testsuiteId}/run/{runId}": { + "/testsuites/{testSuiteId}/run/{runId}": { /** Get a specific run from a particular TestSuite */ get: operations["getTestSuiteRun"]; /** Delete a specific run from a particular TestSuite */ @@ -181,7 +181,7 @@ export interface paths { get: operations["getVariableSet"]; /** Update a VariableSet used on Tracetest */ put: operations["updateVariableSet"]; - /** Delete an variable set from Tracetest */ + /** Delete a variable set from Tracetest */ delete: operations["deleteVariableSet"]; }; "/version": { @@ -1006,7 +1006,7 @@ export interface operations { }; }; }; - /** Delete an variable set from Tracetest */ + /** Delete a variable set from Tracetest */ deleteVariableSet: { parameters: {}; responses: { @@ -1216,6 +1216,8 @@ export interface external { httpEndpoint?: string; /** @description gRPC endpoint for Pokeshop API */ grpcEndpoint?: string; + /** @description kafka broker for Pokeshop API */ + kafkaBroker?: string; }; /** @description Represents the settings of the Open Telemetry Store demonstration. */ DemoOpenTelemetryStore: { @@ -1611,7 +1613,7 @@ export interface external { /** @description version of the test */ version: number; /** @description id of the TestSuite */ - testsuiteId: string; + testSuiteId: string; /** @description indicates how many resources can be returned by each page */ take: number; /** @description indicates how many resources will be skipped when paginating */