Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instill): drop support for "external mode" #101

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/base/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type IConnector interface {
IComponent

LoadConnectorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error

// Note: Some content in the definition JSON schema needs to be generated by sysVars or component setting.
GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error)

CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*ExecutionWrapper, error)
Expand Down
3 changes: 3 additions & 0 deletions pkg/base/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ type IOperator interface {
IComponent

LoadOperatorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error

// Note: Some content in the definition JSON schema needs to be generated by sysVars or component setting.
GetOperatorDefinition(sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error)

CreateExecution(sysVars map[string]any, task string) (*ExecutionWrapper, error)
}

Expand Down
49 changes: 1 addition & 48 deletions pkg/connector/instill/v0/config/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,7 @@
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": true,
"oneOf": [
{
"title": "Instill Model Connector Internal Mode",
"properties": {
"mode": {
"const": "Internal Mode"
}
}
},
{
"title": "Instill Model Connector External Mode",
"properties": {
"api_token": {
"description": "To access models on Instill Core/Cloud, enter your Instill Core/Cloud API Token. You can find your tokens by visiting your Console's Settings > API Tokens page.",
"instillUpstreamTypes": [
"reference"
],
"instillAcceptFormats": [
"string"
],
"instillCredentialField": true,
"instillUIOrder": 0,
"title": "API Token",
"type": "string"
},
"mode": {
"const": "External Mode"
},
"server_url": {
"default": "https://api.instill.tech",
"description": "Base URL for the Instill Cloud API. To access models on Instill Cloud, use the base URL `https://api.instill.tech`. To access models on your local Instill Core, use the base URL `http://api-gateway:8080`.",
"instillUpstreamTypes": [
"value"
],
"instillAcceptFormats": [
"string"
],
"instillUIOrder": 1,
"title": "Server URL",
"type": "string"
}
},
"required": [
"api_token",
"server_url"
]
}
],
"properties": {},
"title": "Instill Model Connector",
"type": "object"
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/image_classification.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *execution) executeImageClassification(grpcClient modelPB.ModelPublicSer
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/image_to_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *execution) executeImageToImage(grpcClient modelPB.ModelPublicServiceCli
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/instance_segmentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *execution) executeInstanceSegmentation(grpcClient modelPB.ModelPublicSe
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/keypoint_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *execution) executeKeyPointDetection(grpcClient modelPB.ModelPublicServi
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
166 changes: 70 additions & 96 deletions pkg/connector/instill/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"go.uber.org/zap"
"google.golang.org/grpc/metadata"
Expand All @@ -21,10 +22,6 @@ import (
pipelinePB "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
)

const (
internalMode = "Internal Mode"
)

var (
//go:embed config/definition.json
definitionJSON []byte
Expand Down Expand Up @@ -64,57 +61,33 @@ func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb
}}, nil
}

func getMode(config *structpb.Struct) string {
return config.GetFields()["mode"].GetStringValue()
}

func getAPIKey(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return vars["__PIPELINE_HEADER_AUTHORIZATION"].(string)
func getHeaderAuthorization(vars map[string]any) string {
if v, ok := vars["__PIPELINE_HEADER_AUTHORIZATION"]; ok {
return v.(string)
}
return fmt.Sprintf("Bearer %s", config.GetFields()["api_token"].GetStringValue())
return ""
}
func getInstillUserUID(vars map[string]any, config *structpb.Struct) string {
func getInstillUserUID(vars map[string]any) string {
return vars["__PIPELINE_USER_UID"].(string)
}

func getModelServerURL(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return vars["__MODEL_BACKEND"].(string)
func getModelServerURL(vars map[string]any) string {
if v, ok := vars["__MODEL_BACKEND"]; ok {
return v.(string)
}
serverURL := config.GetFields()["server_url"].GetStringValue()
if strings.HasPrefix(serverURL, "https://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":443"
}
} else if strings.HasPrefix(serverURL, "http://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":80"
}
}
return serverURL
return ""
}

func getMgmtServerURL(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return vars["__MGMT_BACKEND"].(string)
func getMgmtServerURL(vars map[string]any) string {
if v, ok := vars["__MGMT_BACKEND"]; ok {
return v.(string)
}
serverURL := config.GetFields()["server_url"].GetStringValue()
if strings.HasPrefix(serverURL, "https://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":443"
}
} else if strings.HasPrefix(serverURL, "http://") {
if len(strings.Split(serverURL, ":")) == 2 {
serverURL = serverURL + ":80"
}
}
return serverURL
return ""
}
func getRequestMetadata(vars map[string]any, cfg *structpb.Struct) metadata.MD {
func getRequestMetadata(vars map[string]any) metadata.MD {
return metadata.Pairs(
"Authorization", getAPIKey(vars, cfg),
"Instill-User-Uid", getInstillUserUID(vars, cfg),
"Authorization", getHeaderAuthorization(vars),
"Instill-User-Uid", getInstillUserUID(vars),
"Instill-Auth-Type", "user",
)
}
Expand All @@ -126,18 +99,21 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro
return inputs, fmt.Errorf("invalid input")
}

gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.SystemVariables, e.Connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.SystemVariables))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}

mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.SystemVariables, e.Connection))
mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.SystemVariables))
if mgmtGRPCCLientConn != nil {
defer mgmtGRPCCLientConn.Close()
}

modelNameSplits := strings.Split(inputs[0].GetFields()["model_name"].GetStringValue(), "/")
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(e.SystemVariables))
nsResp, err := mgmtGRPCCLient.CheckNamespace(ctx, &mgmtPB.CheckNamespaceRequest{
Id: modelNameSplits[0],
})
Expand Down Expand Up @@ -187,11 +163,14 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro
}

func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) error {
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, connection))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(sysVars))
_, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{})
if err != nil {
return err
Expand All @@ -200,74 +179,69 @@ func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) er
return nil
}

// func (c *connector) GetConnectorDefinitionByID(defID string, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
// def, err := c.Connector.GetConnectorDefinitionByID(defID, component)
// if err != nil {
// return nil, err
// }

// return c.GetConnectorDefinitionByUID(uuid.FromStringOrNil(def.Uid), component)
// }

type ModelsResp struct {
Models []struct {
Name string `json:"name"`
Task string `json:"task"`
} `json:"models"`
}

// Generate the model_name enum based on the task
// Generate the `model_name` enum based on the task.
// This implementation is a temporary solution due to the incomplete feature set of Instill Model.
// We'll re-implement this after Instill Model is stable.
func (c *connector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
oriDef, err := c.BaseConnector.GetConnectorDefinition(nil, nil)
if err != nil {
return nil, err
}
def := proto.Clone(oriDef).(*pipelinePB.ConnectorDefinition)

if component != nil && component.Connection != nil {
if getModelServerURL(sysVars, component.Connection) == "" {
return def, nil
}
if getModelServerURL(sysVars) == "" {
return def, nil
}

gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, component.Connection))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ctx = metadata.NewOutgoingContext(ctx, getRequestMetadata(sysVars))

pageToken := ""
models := []*modelPB.Model{}
for {
resp, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{PageToken: &pageToken})
if err != nil {

return def, nil
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, component.Connection))
// We should query by pages and accumulate them in the future

pageToken := ""
models := []*modelPB.Model{}
for {
resp, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{PageToken: &pageToken})
if err != nil {
return def, nil
}
models = append(models, resp.Models...)
pageToken = resp.NextPageToken
if pageToken == "" {
break
}
models = append(models, resp.Models...)
pageToken = resp.NextPageToken
if pageToken == "" {
break
}
}

modelNameMap := map[string]*structpb.ListValue{}
modelNameMap := map[string]*structpb.ListValue{}

modelName := &structpb.ListValue{}
for _, model := range models {
if _, ok := modelNameMap[model.Task.String()]; !ok {
modelNameMap[model.Task.String()] = &structpb.ListValue{}
}
namePaths := strings.Split(model.Name, "/")
modelName.Values = append(modelName.Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
modelNameMap[model.Task.String()].Values = append(modelNameMap[model.Task.String()].Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
modelName := &structpb.ListValue{}
for _, model := range models {
if _, ok := modelNameMap[model.Task.String()]; !ok {
modelNameMap[model.Task.String()] = &structpb.ListValue{}
}
for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values {
task := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["task"].GetStructValue().Fields["const"].GetStringValue()
if _, ok := modelNameMap[task]; ok {
addModelEnum(sch.GetStructValue().Fields, modelNameMap[task])
}

namePaths := strings.Split(model.Name, "/")
modelName.Values = append(modelName.Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
modelNameMap[model.Task.String()].Values = append(modelNameMap[model.Task.String()].Values, structpb.NewStringValue(fmt.Sprintf("%s/%s", namePaths[1], namePaths[3])))
}
for _, sch := range def.Spec.ComponentSpecification.Fields["oneOf"].GetListValue().Values {
task := sch.GetStructValue().Fields["properties"].GetStructValue().Fields["task"].GetStructValue().Fields["const"].GetStringValue()
if _, ok := modelNameMap[task]; ok {
addModelEnum(sch.GetStructValue().Fields, modelNameMap[task])
}

}
return def, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/object_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (e *execution) executeObjectDetection(grpcClient modelPB.ModelPublicService
TaskInputs: taskInputs,
}

ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *execution) executeOCR(grpcClient modelPB.ModelPublicServiceClient, mode
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/semantic_segmentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *execution) executeSemanticSegmentation(grpcClient modelPB.ModelPublicSe
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/text_generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e *execution) executeTextGeneration(grpcClient modelPB.ModelPublicServiceC
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
Loading
Loading