Skip to content
This repository has been archived by the owner on Aug 24, 2023. It is now read-only.

Commit

Permalink
fix: probe requests block thread (#13)
Browse files Browse the repository at this point in the history
Because

- avoid probing request block subsequent request

This commit

- add nested threading in probing
  • Loading branch information
heiruwu committed Apr 13, 2023
1 parent f44bfbe commit 0294b2a
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 237 deletions.
51 changes: 36 additions & 15 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -207,32 +208,52 @@ func main() {

go func() {
logger.Info("[controller] control loop started")
var mainWG sync.WaitGroup
for {
logger.Info("[Controller] --------------Start probing------------")

mainWG.Add(5)

// Backend services
if err := service.ProbeBackend(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
go func() {
defer mainWG.Done()
if err := service.ProbeBackend(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
}()

// Models
if err := service.ProbeModels(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
go func() {
defer mainWG.Done()
if err := service.ProbeModels(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
}()

// Connectors
if err := service.ProbeSourceConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
if err := service.ProbeDestinationConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
go func() {
defer mainWG.Done()
if err := service.ProbeSourceConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
}()
go func() {
defer mainWG.Done()
if err := service.ProbeDestinationConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
}()

// Pipelines
if err := service.ProbePipelines(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
go func() {
defer mainWG.Done()
if err := service.ProbePipelines(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil {
logger.Error(err.Error())
}
}()

time.Sleep(config.Config.Server.LoopInterval * time.Second)
mainWG.Wait()
}
}()

Expand Down
177 changes: 108 additions & 69 deletions pkg/service/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"sync"

"cloud.google.com/go/longrunning/autogen/longrunningpb"
"github.com/instill-ai/controller/internal/logger"
Expand All @@ -16,6 +17,10 @@ import (
func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.CancelFunc) error {
defer cancel()

logger, _ := logger.GetZapLogger()

var wg sync.WaitGroup

resp, err := s.connectorPublicClient.ListSourceConnectors(ctx, &connectorPB.ListSourceConnectorsRequest{})

if err != nil {
Expand All @@ -26,6 +31,8 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc
nextPageToken := &resp.NextPageToken
totalSize := resp.TotalSize

wg.Add(int(totalSize))

for totalSize > util.DefaultPageSize {
resp, err := s.connectorPublicClient.ListSourceConnectors(ctx, &connectorPB.ListSourceConnectorsRequest{
PageToken: nextPageToken,
Expand All @@ -41,50 +48,67 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc
}

for _, connector := range connectors {
resourceName := util.ConvertRequestToResourceName(connector.Name)

// if user desires disconnected
if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED {
if err := s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: resourceName,
State: &controllerPB.Resource_ConnectorState{
ConnectorState: connectorPB.Connector_STATE_DISCONNECTED,
},
}); err != nil {
return err
}
}
// if user desires connected
workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName)
// check if there is an ongoing workflow
if workflowId != nil {
opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_SOURCE_CONNECTOR)
if err != nil {
return err
}
if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil {
return err
}
// if not trigger connector check workflow
} else {
resp, err := s.connectorPrivateClient.CheckSourceConnector(ctx, &connectorPB.CheckSourceConnectorRequest{
Name: connector.Name,
})
if err != nil {
return err

go func(connector *connectorPB.SourceConnector) {
defer wg.Done()

resourceName := util.ConvertRequestToResourceName(connector.Name)

// if user desires disconnected
if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED {
if err := s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: resourceName,
State: &controllerPB.Resource_ConnectorState{
ConnectorState: connectorPB.Connector_STATE_DISCONNECTED,
},
}); err != nil {
logger.Error(err.Error())
return
}
}
// non grpc/http connector, save workflowid
if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil {
return err
// if user desires connected
workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName)
// check if there is an ongoing workflow
if workflowId != nil {
opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_SOURCE_CONNECTOR)
if err != nil {
logger.Error(err.Error())
return
}
if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil {
logger.Error(err.Error())
return
}
// if not trigger connector check workflow
} else {
resp, err := s.connectorPrivateClient.CheckSourceConnector(ctx, &connectorPB.CheckSourceConnectorRequest{
Name: connector.Name,
})
if err != nil {
logger.Error(err.Error())
return
}
// non grpc/http connector, save workflowid
if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil {
logger.Error(err.Error())
return
}
}
}
}(connector)
}

wg.Wait()

return nil
}

func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context.CancelFunc) error {
defer cancel()

logger, _ := logger.GetZapLogger()

var wg sync.WaitGroup

resp, err := s.connectorPublicClient.ListDestinationConnectors(ctx, &connectorPB.ListDestinationConnectorsRequest{})

if err != nil {
Expand All @@ -109,44 +133,59 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context
connectors = append(connectors, resp.DestinationConnectors...)
}

wg.Add(len(connectors))

for _, connector := range connectors {
resourceName := util.ConvertRequestToResourceName(connector.Name)

// if user desires disconnected
if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED {
if err := s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: resourceName,
State: &controllerPB.Resource_ConnectorState{
ConnectorState: connectorPB.Connector_STATE_DISCONNECTED,
},
}); err != nil {
return err
}
}
// if user desires connected
workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName)
// check if there is an ongoing workflow
if workflowId != nil {
opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_DESTINATION_CONNECTOR)
if err != nil {
return err
}
if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil {
return err
}
// if not trigger connector check workflow
} else {
resp, err := s.connectorPrivateClient.CheckDestinationConnector(ctx, &connectorPB.CheckDestinationConnectorRequest{
Name: connector.Name,
})
if err != nil {
return err

go func(connector *connectorPB.DestinationConnector) {
defer wg.Done()

resourceName := util.ConvertRequestToResourceName(connector.Name)

// if user desires disconnected
if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED {
if err := s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: resourceName,
State: &controllerPB.Resource_ConnectorState{
ConnectorState: connectorPB.Connector_STATE_DISCONNECTED,
},
}); err != nil {
logger.Error(err.Error())
return
}
}
if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil {
return err
// if user desires connected
workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName)
// check if there is an ongoing workflow
if workflowId != nil {
opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_DESTINATION_CONNECTOR)
if err != nil {
logger.Error(err.Error())
return
}
if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil {
logger.Error(err.Error())
return
}
// if not trigger connector check workflow
} else {
resp, err := s.connectorPrivateClient.CheckDestinationConnector(ctx, &connectorPB.CheckDestinationConnectorRequest{
Name: connector.Name,
})
if err != nil {
logger.Error(err.Error())
return
}
if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil {
logger.Error(err.Error())
return
}
}
}
}(connector)
}

wg.Wait()

return nil
}

Expand Down
48 changes: 30 additions & 18 deletions pkg/service/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/instill-ai/controller/internal/logger"
"github.com/instill-ai/controller/internal/util"
Expand All @@ -16,6 +17,8 @@ func (s *service) ProbeModels(ctx context.Context, cancel context.CancelFunc) er

logger, _ := logger.GetZapLogger()

var wg sync.WaitGroup

resp, err := s.modelPublicClient.ListModels(ctx, &modelPB.ListModelsRequest{})

if err != nil {
Expand All @@ -40,28 +43,37 @@ func (s *service) ProbeModels(ctx context.Context, cancel context.CancelFunc) er
models = append(models, resp.Models...)
}

wg.Add(len(models))

for _, model := range models {
resp, err := s.modelPrivateClient.CheckModel(ctx, &modelPB.CheckModelRequest{
Name: model.Name,
})

if err != nil {
return err
}
go func(model *modelPB.Model) {
defer wg.Done()

if resp, err := s.modelPrivateClient.CheckModel(ctx, &modelPB.CheckModelRequest{
Name: model.Name,
}); err == nil {
if err = s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: util.ConvertRequestToResourceName(model.Name),
State: &controllerPB.Resource_ModelState{
ModelState: resp.State,
},
}); err != nil {
logger.Error(err.Error())
return
}
} else {
logger.Error(err.Error())
return
}

logResp, _ := s.GetResourceState(ctx, util.ConvertRequestToResourceName(model.Name))
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))
}(model)

err = s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: util.ConvertRequestToResourceName(model.Name),
State: &controllerPB.Resource_ModelState{
ModelState: resp.State,
},
})
}

if err != nil {
return err
}
wg.Wait()

logResp, _ := s.GetResourceState(ctx, util.ConvertRequestToResourceName(model.Name))
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))
}
return nil
}

0 comments on commit 0294b2a

Please sign in to comment.