Skip to content

Commit

Permalink
feat: add real-time streaming functionality to APIs.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcodelf authored and thxCode committed Mar 31, 2023
1 parent d66e0ce commit 33518ed
Show file tree
Hide file tree
Showing 10 changed files with 757 additions and 36 deletions.
124 changes: 118 additions & 6 deletions pkg/apis/application/handler.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package application

import (
"context"

"entgo.io/ent/dialect/sql"
"github.com/gin-gonic/gin"
"k8s.io/client-go/rest"

"github.com/seal-io/seal/pkg/apis/application/view"
"github.com/seal-io/seal/pkg/apis/runtime"
"github.com/seal-io/seal/pkg/dao"
"github.com/seal-io/seal/pkg/dao/model"
"github.com/seal-io/seal/pkg/dao/model/application"
"github.com/seal-io/seal/pkg/dao/model/applicationinstance"
"github.com/seal-io/seal/pkg/dao/model/applicationmodulerelationship"
"github.com/seal-io/seal/pkg/dao/model/environment"
"github.com/seal-io/seal/pkg/dao/types"
"github.com/seal-io/seal/pkg/topic/datamessage"
"github.com/seal-io/seal/utils/topic"
)

func Handle(mc model.ClientSet, kc *rest.Config, tc bool) Handler {
Expand Down Expand Up @@ -71,8 +77,12 @@ func (h Handler) Update(ctx *gin.Context, req view.UpdateRequest) error {
}

func (h Handler) Get(ctx *gin.Context, req view.GetRequest) (view.GetResponse, error) {
return h.getApplicationOutput(ctx, req.ID)
}

func (h Handler) getApplicationOutput(ctx context.Context, id types.ID) (*model.ApplicationOutput, error) {
var entity, err = h.modelClient.Applications().Query().
Where(application.ID(req.ID)).
Where(application.ID(id)).
// must extract modules.
WithModules(func(rq *model.ApplicationModuleRelationshipQuery) {
rq.Order(model.Asc(applicationmodulerelationship.FieldCreateTime)).
Expand All @@ -93,6 +103,55 @@ func (h Handler) Get(ctx *gin.Context, req view.GetRequest) (view.GetResponse, e
return model.ExposeApplication(entity), nil
}

func (h Handler) Stream(ctx runtime.RequestStream, req view.StreamRequest) error {
var t, err = topic.Subscribe(datamessage.Application)
if err != nil {
return err
}

defer func() { t.Unsubscribe() }()
for {
var event topic.Event
event, err = t.Receive(ctx)
if err != nil {
return err
}
dm, ok := event.Data.(datamessage.Message)
if !ok {
continue
}
var streamData view.StreamResponse
for _, id := range dm.Data {
if id != req.ID {
continue
}

switch dm.Type {
case datamessage.EventCreate, datamessage.EventUpdate:
entity, err := h.getApplicationOutput(ctx, id)
if err != nil {
return err
}
streamData = view.StreamResponse{
Type: dm.Type,
IDs: dm.Data,
Collection: []*model.ApplicationOutput{entity},
}
case datamessage.EventDelete:
streamData = view.StreamResponse{
Type: dm.Type,
IDs: dm.Data,
}
}
}

err = ctx.SendJSON(streamData)
if err != nil {
return err
}
}
}

// Batch APIs

func (h Handler) CollectionDelete(ctx *gin.Context, req view.CollectionDeleteRequest) error {
Expand Down Expand Up @@ -143,9 +202,17 @@ func (h Handler) CollectionGet(ctx *gin.Context, req view.CollectionGetRequest)
if orders, ok := req.Sorting(sortFields, model.Desc(application.FieldCreateTime)); ok {
query.Order(orders...)
}
entities, err := h.getCollectionQuery(query).All(ctx)
if err != nil {
return nil, 0, err
}

return model.ExposeApplications(entities), cnt, nil
}

func (h Handler) getCollectionQuery(query *model.ApplicationQuery) *model.ApplicationQuery {
// get application with instances and environments
entities, err := query.
return query.
// allow returning without sorting keys.
Unique(false).
// must extract application instances.
Expand Down Expand Up @@ -178,13 +245,58 @@ func (h Handler) CollectionGet(ctx *gin.Context, req view.CollectionGetRequest)
WithEnvironment(func(eq *model.EnvironmentQuery) {
eq.Select(environment.FieldName)
})
}).
All(ctx)
})
}

func (h Handler) CollectionStream(ctx runtime.RequestStream, req view.CollectionStreamRequest) error {
var t, err = topic.Subscribe(datamessage.Application)
if err != nil {
return nil, 0, err
return err
}

return model.ExposeApplications(entities), cnt, nil
var query = h.modelClient.Applications().Query()
if fields, ok := req.Extracting(getFields, getFields...); ok {
query.Select(fields...)
}

defer func() { t.Unsubscribe() }()
for {
var event topic.Event
event, err = t.Receive(ctx)
if err != nil {
return err
}
dm, ok := event.Data.(datamessage.Message)
if !ok {
continue
}

var streamData view.StreamResponse
switch dm.Type {
case datamessage.EventCreate, datamessage.EventUpdate:
entities, err := h.getCollectionQuery(query.Clone()).
Where(application.IDIn(dm.Data...)).
All(ctx)
if err != nil {
return err
}
streamData = view.StreamResponse{
Type: dm.Type,
IDs: dm.Data,
Collection: model.ExposeApplications(entities),
}
case datamessage.EventDelete:
streamData = view.StreamResponse{
Type: dm.Type,
IDs: dm.Data,
}
}

err = ctx.SendJSON(streamData)
if err != nil {
return err
}
}
}

// Extensional APIs
32 changes: 32 additions & 0 deletions pkg/apis/application/view/io.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package view

import (
"context"
"errors"

"github.com/seal-io/seal/pkg/apis/runtime"
"github.com/seal-io/seal/pkg/dao/model"
"github.com/seal-io/seal/pkg/dao/model/application"
"github.com/seal-io/seal/pkg/dao/model/predicate"
"github.com/seal-io/seal/pkg/dao/types"
"github.com/seal-io/seal/pkg/topic/datamessage"
)

// Basic APIs
Expand Down Expand Up @@ -74,6 +77,31 @@ func (r *GetRequest) Validate() error {

type GetResponse = *model.ApplicationOutput

type StreamResponse struct {
Type datamessage.EventType `json:"type"`
IDs []types.ID `json:"ids"`
Collection []*model.ApplicationOutput `json:"collection"`
}

type StreamRequest struct {
ID types.ID `uri:"id"`
}

func (r *StreamRequest) ValidateWith(ctx context.Context, input any) error {
if !r.ID.Valid(0) {
return errors.New("invalid id: blank")
}
var client = input.(model.ClientSet)
exist, err := client.Applications().Query().
Where(application.ID(r.ID)).
Exist(ctx)
if err != nil || !exist {
return runtime.Errorw(err, "invalid id: not found")
}

return nil
}

// Batch APIs

type CollectionDeleteRequest []*model.ApplicationQueryInput
Expand Down Expand Up @@ -111,4 +139,8 @@ func (r *CollectionGetRequest) Validate() error {

type CollectionGetResponse = []*model.ApplicationOutput

type CollectionStreamRequest struct {
runtime.RequestExtracting `query:",inline"`
}

// Extensional APIs
119 changes: 118 additions & 1 deletion pkg/apis/applicationinstance/handler.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package applicationinstance

import (
"context"

"github.com/gin-gonic/gin"
"k8s.io/client-go/rest"

"github.com/seal-io/seal/pkg/apis/applicationinstance/view"
"github.com/seal-io/seal/pkg/apis/runtime"
"github.com/seal-io/seal/pkg/dao"
"github.com/seal-io/seal/pkg/dao/model"
"github.com/seal-io/seal/pkg/dao/model/applicationinstance"
"github.com/seal-io/seal/pkg/dao/model/applicationresource"
"github.com/seal-io/seal/pkg/dao/model/environment"
"github.com/seal-io/seal/pkg/dao/types"
"github.com/seal-io/seal/pkg/dao/types/status"
"github.com/seal-io/seal/pkg/platform"
"github.com/seal-io/seal/pkg/platform/deployer"
"github.com/seal-io/seal/pkg/platformk8s/intercept"
"github.com/seal-io/seal/pkg/platformtf"
"github.com/seal-io/seal/pkg/topic/datamessage"
"github.com/seal-io/seal/utils/log"
"github.com/seal-io/seal/utils/topic"
)

func Handle(mc model.ClientSet, kc *rest.Config, tc bool) Handler {
Expand Down Expand Up @@ -126,8 +132,12 @@ func (h Handler) Delete(ctx *gin.Context, req view.DeleteRequest) error {
}

func (h Handler) Get(ctx *gin.Context, req view.GetRequest) (view.GetResponse, error) {
return h.getEntityOutput(ctx, req.ID)
}

func (h Handler) getEntityOutput(ctx context.Context, id types.ID) (*model.ApplicationInstanceOutput, error) {
var entity, err = h.modelClient.ApplicationInstances().Query().
Where(applicationinstance.ID(req.ID)).
Where(applicationinstance.ID(id)).
WithEnvironment(func(eq *model.EnvironmentQuery) {
eq.Select(environment.FieldName)
}).
Expand All @@ -139,6 +149,55 @@ func (h Handler) Get(ctx *gin.Context, req view.GetRequest) (view.GetResponse, e
return model.ExposeApplicationInstance(entity), nil
}

func (h Handler) Stream(ctx runtime.RequestStream, req view.StreamRequest) error {
var t, err = topic.Subscribe(datamessage.ApplicationInstance)
if err != nil {
return err
}

defer func() { t.Unsubscribe() }()
for {
var event topic.Event
event, err = t.Receive(ctx)
if err != nil {
return err
}
dm, ok := event.Data.(datamessage.Message)
if !ok {
continue
}

var streamData view.StreamResponse
for _, id := range dm.Data {
if id != req.ID {
continue
}

switch dm.Type {
case datamessage.EventCreate, datamessage.EventUpdate:
entityOutput, err := h.getEntityOutput(ctx, id)
if err != nil {
return err
}
streamData = view.StreamResponse{
Type: dm.Type,
Collection: []*model.ApplicationInstanceOutput{entityOutput},
}
case datamessage.EventDelete:
streamData = view.StreamResponse{
Type: dm.Type,
IDs: dm.Data,
}
}
}

err = ctx.SendJSON(streamData)
if err != nil {
return err
}
}
}

// Batch APIs

var (
Expand Down Expand Up @@ -192,6 +251,64 @@ func (h Handler) CollectionGet(ctx *gin.Context, req view.CollectionGetRequest)
return model.ExposeApplicationInstances(entities), cnt, nil
}

func (h Handler) CollectionStream(ctx runtime.RequestStream, req view.CollectionStreamRequest) error {
var t, err = topic.Subscribe(datamessage.ApplicationInstance)
if err != nil {
return err
}

var query = h.modelClient.ApplicationInstances().Query()
if fields, ok := req.Extracting(getFields, getFields...); ok {
query.Select(fields...)
}

defer func() { t.Unsubscribe() }()
for {
var event topic.Event
event, err = t.Receive(ctx)
if err != nil {
return err
}
dm, ok := event.Data.(datamessage.Message)
if !ok {
continue
}

var streamData view.StreamResponse
switch dm.Type {
case datamessage.EventCreate, datamessage.EventUpdate:
entities, err := query.Clone().
// allow returning without sorting keys.
Unique(false).
// must extract environment.
Select(applicationinstance.FieldEnvironmentID).
Where(applicationinstance.IDIn(dm.Data...)).
WithEnvironment(func(eq *model.EnvironmentQuery) {
eq.Select(environment.FieldName)
}).
All(ctx)

if err != nil {
return err
}
streamData = view.StreamResponse{
Type: dm.Type,
Collection: model.ExposeApplicationInstances(entities),
}
case datamessage.EventDelete:
streamData = view.StreamResponse{
Type: dm.Type,
IDs: dm.Data,
}
}

err = ctx.SendJSON(streamData)
if err != nil {
return err
}
}
}

// Extensional APIs

func (h Handler) RouteUpgrade(ctx *gin.Context, req view.RouteUpgradeRequest) error {
Expand Down

0 comments on commit 33518ed

Please sign in to comment.