Skip to content

Commit

Permalink
feat(controller): add model state monitoring with controller (#323)
Browse files Browse the repository at this point in the history
Because

- support model state monitoring with controller

This commit

- add /watch endpoint for querying model state from etcd through
controller
- add private /check endpoint for controller to query model state from
triton
- use /watch in integration-test to reflect model current state
- model state in postgresql now only has two states, STATE_ONLINE and
STATE_OFFLINE

---------

Co-authored-by: Phelan164 <phelan.nguyen@instill.tech>
Co-authored-by: Ping-Lin Chang <ping-lin.chang@instill.tech>
  • Loading branch information
3 people committed Apr 7, 2023
1 parent ad30c7a commit 4397826
Show file tree
Hide file tree
Showing 37 changed files with 974 additions and 303 deletions.
5 changes: 4 additions & 1 deletion cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func main() {
redisClient := redis.NewClient(&config.Config.Cache.Redis.RedisOptions)
defer redisClient.Close()

controllerClient, controllerClientConn := external.InitControllerPrivateServiceClient()
defer controllerClientConn.Close()

temporalClient, err := client.Dial(client.Options{
// ZapAdapter implements log.Logger interface and can be passed
// to the client constructor using client using client.Options.
Expand All @@ -150,7 +153,7 @@ func main() {

repository := repository.NewRepository(db)

service := service.NewService(repository, triton, pipelinePublicServiceClient, redisClient, temporalClient)
service := service.NewService(repository, triton, pipelinePublicServiceClient, redisClient, temporalClient, controllerClient)

modelPB.RegisterModelPublicServiceServer(
publicGrpcS,
Expand Down
6 changes: 5 additions & 1 deletion cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.temporal.io/sdk/worker"

"github.com/instill-ai/model-backend/config"
"github.com/instill-ai/model-backend/pkg/external"
"github.com/instill-ai/model-backend/pkg/logger"
"github.com/instill-ai/model-backend/pkg/repository"
"github.com/instill-ai/model-backend/pkg/triton"
Expand All @@ -37,6 +38,9 @@ func main() {
triton := triton.NewTriton()
defer triton.Close()

controllerClient, controllerClientConn := external.InitControllerPrivateServiceClient()
defer controllerClientConn.Close()

clientNamespace, err := client.NewNamespaceClient(client.Options{
HostPort: config.Config.Temporal.ClientOptions.HostPort,
})
Expand All @@ -57,7 +61,7 @@ func main() {
}
}

cw := modelWorker.NewWorker(repository.NewRepository(db), triton)
cw := modelWorker.NewWorker(repository.NewRepository(db), triton, controllerClient)

c, err := client.Dial(client.Options{
// ZapAdapter implements log.Logger interface and can be passed
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ type PipelineBackendConfig struct {
}
}

// ControllerConfig related to controller
type ControllerConfig struct {
Host string `koanf:"host"`
PrivatePort int `koanf:"privateport"`
HTTPS struct {
Cert string `koanf:"cert"`
Key string `koanf:"key"`
}
}

// MaxBatchSizeConfig defines the maximum size of the batch of a AI task
type MaxBatchSizeConfig struct {
Unspecified int `koanf:"unspecified"`
Expand Down Expand Up @@ -116,6 +126,7 @@ type AppConfig struct {
PipelineBackend PipelineBackendConfig `koanf:"pipelinebackend"`
MaxBatchSizeLimitation MaxBatchSizeConfig `koanf:"maxbatchsizelimitation"`
Temporal TemporalConfig `koanf:"temporal"`
Controller ControllerConfig `koanf:"controller"`
}

// Config - Global variable to export
Expand Down
6 changes: 6 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ maxbatchsizelimitation:
temporal:
clientoptions:
hostport: temporal:7233
controller:
host: controller
privateport: 3085
https:
cert:
key:
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1298,8 +1298,6 @@ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329080556-5a5b33087eee h1:2lPiCnIm24S6gD3G5GxfW4O5ICSyYlrqaBDJFS/iSiM=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329080556-5a5b33087eee/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b h1:BI97L8e4pkbQVcqRyQsR9/Q1/4pXB+zFGUWOEL/hZ6U=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/usage-client v0.2.2-alpha h1:EQyHpgzZ26TEIL9UoaqchTf+LnKaidUGhKlUEFR68I8=
Expand Down
2 changes: 1 addition & 1 deletion integration-test/grpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export default () => {
queryModelPrivate.GetModel()
queryModelPrivate.ListModels()
queryModelPrivate.LookUpModel()
}
}

// Create model API
createModel.CreateModel()
Expand Down
10 changes: 5 additions & 5 deletions integration-test/grpc_create_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ export function CreateModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createOperationRes.message.operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand All @@ -80,10 +80,10 @@ export function CreateModel() {
currentTime = new Date().getTime();
timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModel', {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.model.state === "STATE_ONLINE") {
if (res.message.state === "STATE_ONLINE") {
break
}
sleep(1)
Expand Down
10 changes: 5 additions & 5 deletions integration-test/grpc_deploy_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ export function DeployUndeployModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand All @@ -77,10 +77,10 @@ export function DeployUndeployModel() {
currentTime = new Date().getTime();
timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModel', {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.model.state === "STATE_ONLINE") {
if (res.message.state === "STATE_ONLINE") {
break
}
sleep(1)
Expand Down
20 changes: 10 additions & 10 deletions integration-test/grpc_infer_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ export function InferModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand All @@ -78,10 +78,10 @@ export function InferModel() {
currentTime = new Date().getTime();
timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModel', {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.model.state === "STATE_ONLINE") {
if (res.message.state === "STATE_ONLINE") {
break
}
sleep(1)
Expand Down Expand Up @@ -166,10 +166,10 @@ export function InferModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand All @@ -190,10 +190,10 @@ export function InferModel() {
currentTime = new Date().getTime();
timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModel', {
var res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.model.state === "STATE_ONLINE") {
if (res.message.state === "STATE_ONLINE") {
break
}
sleep(1)
Expand Down
33 changes: 29 additions & 4 deletions integration-test/grpc_model_operation.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ export function ListModelOperations() {
"ListModelOperations response operations[0].metadata": (r) => r.message.operations[0].metadata === null,
});

// Check model creation finished
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
currentTime = new Date().getTime();
}

check(client.invoke('vdp.model.v1alpha.ModelPublicService/DeleteModel', {
name: "models/" + model_id
}), {
Expand Down Expand Up @@ -99,10 +113,10 @@ export function CancelModelOperation() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand All @@ -123,11 +137,22 @@ export function CancelModelOperation() {
'CancelModelOperation status is OK': (r) => r && r.status === grpc.StatusOK,
});

while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.state !== "STATE_UNSPECIFIED") {
break
}
sleep(1)
currentTime = new Date().getTime();
}

check(client.invoke('vdp.model.v1alpha.ModelPublicService/DeleteModel', {
name: "models/" + model_id
}), {
'Delete model status is OK': (r) => r && r.status === grpc.StatusOK,
});
client.close();
});
};
};
6 changes: 3 additions & 3 deletions integration-test/grpc_publish_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ export function PublishUnPublishModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down
18 changes: 9 additions & 9 deletions integration-test/grpc_query_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ export function GetModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down Expand Up @@ -123,10 +123,10 @@ export function ListModels() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down Expand Up @@ -186,10 +186,10 @@ export function LookupModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down
18 changes: 9 additions & 9 deletions integration-test/grpc_query_model_private.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ export function GetModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = publicClient.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = publicClient.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down Expand Up @@ -136,10 +136,10 @@ export function ListModels() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = publicClient.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = publicClient.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down Expand Up @@ -205,10 +205,10 @@ export function LookUpModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = publicClient.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = publicClient.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down
6 changes: 3 additions & 3 deletions integration-test/grpc_update_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ export function UpdateModel() {
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/GetModelOperation', {
name: createClsModelRes.json().operation.name
let res = client.invoke('vdp.model.v1alpha.ModelPublicService/WatchModel', {
name: `models/${model_id}`
}, {})
if (res.message.operation.done === true) {
if (res.message.state === "STATE_OFFLINE") {
break
}
sleep(1)
Expand Down

0 comments on commit 4397826

Please sign in to comment.