Skip to content

Commit

Permalink
feat: adopt system variables (#92)
Browse files Browse the repository at this point in the history
Because

- Some connectors or operators require data from system variables
instead of user input, we treat this kind of data as system variables.

This commit
- Adopts system variables.
  • Loading branch information
donch1989 committed Apr 20, 2024
1 parent 389ee18 commit e8ae4e1
Show file tree
Hide file tree
Showing 38 changed files with 116 additions and 125 deletions.
22 changes: 14 additions & 8 deletions pkg/base/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type IConnector interface {
IComponent

LoadConnectorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error
GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error)
GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error)

CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*ExecutionWrapper, error)
Test(sysVars map[string]any, connection *structpb.Struct) error
Expand All @@ -44,9 +44,10 @@ type IConnectorExecution interface {
}

type BaseConnectorExecution struct {
Connector IConnector
Connection *structpb.Struct
Task string
Connector IConnector
SystemVariables map[string]any
Connection *structpb.Struct
Task string
}

func (c *BaseConnector) GetID() string {
Expand All @@ -63,7 +64,7 @@ func (c *BaseConnector) GetLogger() *zap.Logger {
func (c *BaseConnector) GetUsageHandler() UsageHandler {
return c.UsageHandler
}
func (c *BaseConnector) GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
func (c *BaseConnector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
return c.definition, nil
}

Expand Down Expand Up @@ -130,13 +131,15 @@ func (c *BaseConnector) LoadConnectorDefinition(definitionJSONBytes []byte, task
return err
}

// deprecated, will be removed soon
c.definition.Spec.ResourceSpecification, err = c.refineResourceSpec(c.definition.Spec.ResourceSpecification)
connection, err := c.refineResourceSpec(c.definition.Spec.ResourceSpecification)
if err != nil {
return err
}
// deprecated, will be removed soon
c.definition.Spec.ResourceSpecification = &structpb.Struct{}

connectionPropStruct := &structpb.Struct{Fields: map[string]*structpb.Value{}}
connectionPropStruct.Fields["connection"] = structpb.NewStructValue(c.definition.Spec.ResourceSpecification)
connectionPropStruct.Fields["connection"] = structpb.NewStructValue(connection)
c.definition.Spec.ComponentSpecification.Fields["properties"] = structpb.NewStructValue(connectionPropStruct)

c.definition.Spec.DataSpecifications, err = generateDataSpecs(taskStructs)
Expand Down Expand Up @@ -265,6 +268,9 @@ func (e *BaseConnectorExecution) GetConnector() IConnector {
func (e *BaseConnectorExecution) GetConnection() *structpb.Struct {
return e.Connection
}
func (e *BaseConnectorExecution) GetSystemVariables() map[string]any {
return e.SystemVariables
}
func (e *BaseConnectorExecution) GetLogger() *zap.Logger {
return e.Connector.GetLogger()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/base/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestConnector_ListConnectorDefinitions(t *testing.T) {
map[string][]byte{"additional.json": connectorAdditionalJSON})
c.Assert(err, qt.IsNil)

got, err := conn.GetConnectorDefinition(nil)
got, err := conn.GetConnectorDefinition(nil, nil)
c.Assert(err, qt.IsNil)
c.Check(wantConnectorDefinitionJSON, qt.JSONEquals, got)
}
12 changes: 8 additions & 4 deletions pkg/base/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type IOperator interface {
IComponent

LoadOperatorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error
GetOperatorDefinition(component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error)
GetOperatorDefinition(sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error)
CreateExecution(sysVars map[string]any, task string) (*ExecutionWrapper, error)
}

Expand All @@ -36,8 +36,9 @@ type IOperatorExecution interface {
}

type BaseOperatorExecution struct {
Operator IOperator
Task string
Operator IOperator
SystemVariables map[string]any
Task string
}

func (o *BaseOperator) GetID() string {
Expand All @@ -59,7 +60,7 @@ func (o *BaseOperator) GetTaskOutputSchemas() map[string]string {
return o.taskOutputSchemas
}

func (o *BaseOperator) GetOperatorDefinition(component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) {
func (o *BaseOperator) GetOperatorDefinition(sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) {
return o.definition, nil
}

Expand Down Expand Up @@ -130,6 +131,9 @@ func (e *BaseOperatorExecution) GetTask() string {
func (e *BaseOperatorExecution) GetOperator() IOperator {
return e.Operator
}
func (e *BaseOperatorExecution) GetSystemVariables() map[string]any {
return e.SystemVariables
}
func (e *BaseOperatorExecution) GetLogger() *zap.Logger {
return e.Operator.GetLogger()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/base/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestOperator_ListOperatorDefinitions(t *testing.T) {
err := op.LoadOperatorDefinition(operatorDefJSON, operatorTasksJSON, nil)
c.Assert(err, qt.IsNil)

got, err := op.GetOperatorDefinition(nil)
got, err := op.GetOperatorDefinition(nil, nil)
c.Assert(err, qt.IsNil)
c.Check(wantOperatorDefinitionJSON, qt.JSONEquals, got)
}
21 changes: 1 addition & 20 deletions pkg/base/testdata/wantConnectorDefinition.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,7 @@
"documentation_url": "https://www.instill.tech/docs/latest/vdp/ai-connectors/openai",
"icon": "OpenAI/openai.svg",
"spec": {
"resource_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": true,
"instillShortDescription": "",
"properties": {
"api_key": {
"description": "Fill your OpenAI API key. To find your keys, visit your OpenAI's API Keys page.",
"instillCredentialField": true,
"instillShortDescription": "Fill your OpenAI API key. To find your keys, visit your OpenAI's API Keys page.",
"instillUIOrder": 0,
"title": "API Key",
"type": "string"
}
},
"required": [
"api_key"
],
"title": "OpenAI Connector Resource",
"type": "object"
},
"resource_specification": {},
"component_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"oneOf": [
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/airbyte/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
return &base.ExecutionWrapper{Execution: &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
}}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/archetypeai/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
e := &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
client: newClient(connection, c.Logger),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/bigquery/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
return &base.ExecutionWrapper{Execution: &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
}}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/googlecloudstorage/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
return &base.ExecutionWrapper{Execution: &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
}}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/googlesearch/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
return &base.ExecutionWrapper{Execution: &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
}}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/huggingface/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
return &base.ExecutionWrapper{Execution: &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
}}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/image_classification.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *execution) executeImageClassification(grpcClient modelPB.ModelPublicSer
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/image_to_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *execution) executeImageToImage(grpcClient modelPB.ModelPublicServiceCli
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/instance_segmentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *execution) executeInstanceSegmentation(grpcClient modelPB.ModelPublicSe
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/keypoint_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *execution) executeKeyPointDetection(grpcClient modelPB.ModelPublicServi
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
44 changes: 22 additions & 22 deletions pkg/connector/instill/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,27 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector {

func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) {
return &base.ExecutionWrapper{Execution: &execution{
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task},
BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task},
}}, nil
}

func getMode(config *structpb.Struct) string {
return config.GetFields()["mode"].GetStringValue()
}

func getAPIKey(config *structpb.Struct) string {
func getAPIKey(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return config.GetFields()["header_authorization"].GetStringValue()
return vars["__PIPELINE_HEADER_AUTHORIZATION"].(string)
}
return fmt.Sprintf("Bearer %s", config.GetFields()["api_token"].GetStringValue())
}
func getInstillUserUID(config *structpb.Struct) string {
return config.GetFields()["instill_user_uid"].GetStringValue()
func getInstillUserUID(vars map[string]any, config *structpb.Struct) string {
return vars["__PIPELINE_USER_UID"].(string)
}

func getModelServerURL(config *structpb.Struct) string {
func getModelServerURL(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return config.GetFields()["instill_model_backend"].GetStringValue()
return vars["__MODEL_BACKEND"].(string)
}
serverURL := config.GetFields()["server_url"].GetStringValue()
if strings.HasPrefix(serverURL, "https://") {
Expand All @@ -94,9 +94,9 @@ func getModelServerURL(config *structpb.Struct) string {
return serverURL
}

func getMgmtServerURL(config *structpb.Struct) string {
func getMgmtServerURL(vars map[string]any, config *structpb.Struct) string {
if getMode(config) == internalMode {
return config.GetFields()["instill_mgmt_backend"].GetStringValue()
return vars["__MGMT_BACKEND"].(string)
}
serverURL := config.GetFields()["server_url"].GetStringValue()
if strings.HasPrefix(serverURL, "https://") {
Expand All @@ -110,10 +110,10 @@ func getMgmtServerURL(config *structpb.Struct) string {
}
return serverURL
}
func getRequestMetadata(cfg *structpb.Struct) metadata.MD {
func getRequestMetadata(vars map[string]any, cfg *structpb.Struct) metadata.MD {
return metadata.Pairs(
"Authorization", getAPIKey(cfg),
"Instill-User-Uid", getInstillUserUID(cfg),
"Authorization", getAPIKey(vars, cfg),
"Instill-User-Uid", getInstillUserUID(vars, cfg),
"Instill-Auth-Type", "user",
)
}
Expand All @@ -125,18 +125,18 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro
return inputs, fmt.Errorf("invalid input")
}

gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.Connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.SystemVariables, e.Connection))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}

mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.Connection))
mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.SystemVariables, e.Connection))
if mgmtGRPCCLientConn != nil {
defer mgmtGRPCCLientConn.Close()
}

modelNameSplits := strings.Split(inputs[0].GetFields()["model_name"].GetStringValue(), "/")
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
nsResp, err := mgmtGRPCCLient.CheckNamespace(ctx, &mgmtPB.CheckNamespaceRequest{
Id: modelNameSplits[0],
})
Expand Down Expand Up @@ -186,11 +186,11 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro
}

func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) error {
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, connection))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, connection))
_, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{})
if err != nil {
return err
Expand All @@ -216,23 +216,23 @@ type ModelsResp struct {
}

// Generate the model_name enum based on the task
func (c *connector) GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
oriDef, err := c.BaseConnector.GetConnectorDefinition(nil)
func (c *connector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) {
oriDef, err := c.BaseConnector.GetConnectorDefinition(nil, nil)
if err != nil {
return nil, err
}
def := proto.Clone(oriDef).(*pipelinePB.ConnectorDefinition)

if component != nil && component.Connection != nil {
if getModelServerURL(component.Connection) == "" {
if getModelServerURL(sysVars, component.Connection) == "" {
return def, nil
}

gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(component.Connection))
gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, component.Connection))
if gRPCCLientConn != nil {
defer gRPCCLientConn.Close()
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(component.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, component.Connection))
// We should query by pages and accumulate them in the future

pageToken := ""
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/object_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (e *execution) executeObjectDetection(grpcClient modelPB.ModelPublicService
TaskInputs: taskInputs,
}

ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *execution) executeOCR(grpcClient modelPB.ModelPublicServiceClient, mode
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/semantic_segmentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (e *execution) executeSemanticSegmentation(grpcClient modelPB.ModelPublicSe
Name: modelName,
TaskInputs: taskInputs,
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/instill/v0/text_generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (e *execution) executeTextGeneration(grpcClient modelPB.ModelPublicServiceC
Name: modelName,
TaskInputs: []*modelPB.TaskInput{{Input: taskInput}},
}
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection))
ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection))
res, err := grpcClient.TriggerUserModel(ctx, &req)
if err != nil || res == nil {
return nil, err
Expand Down

0 comments on commit e8ae4e1

Please sign in to comment.