From e8ae4e145fc022f3a86a1f4b93b3fe5967bc44a2 Mon Sep 17 00:00:00 2001 From: "Chang, Hui-Tang" Date: Sun, 21 Apr 2024 04:33:47 +0800 Subject: [PATCH] feat: adopt system variables (#92) Because - Some connectors or operators require data from system variables instead of user input, we treat this kind of data as system variables. This commit - Adopts system variables. --- pkg/base/connector.go | 22 +++++--- pkg/base/connector_test.go | 2 +- pkg/base/operator.go | 12 ++-- pkg/base/operator_test.go | 2 +- .../testdata/wantConnectorDefinition.json | 21 +------ pkg/connector/airbyte/v0/main.go | 2 +- pkg/connector/archetypeai/v0/main.go | 2 +- pkg/connector/bigquery/v0/main.go | 2 +- pkg/connector/googlecloudstorage/v0/main.go | 2 +- pkg/connector/googlesearch/v0/main.go | 2 +- pkg/connector/huggingface/v0/main.go | 2 +- .../instill/v0/image_classification.go | 2 +- pkg/connector/instill/v0/image_to_image.go | 2 +- .../instill/v0/instance_segmentation.go | 2 +- .../instill/v0/keypoint_detection.go | 2 +- pkg/connector/instill/v0/main.go | 44 +++++++-------- pkg/connector/instill/v0/object_detection.go | 2 +- pkg/connector/instill/v0/ocr.go | 2 +- .../instill/v0/semantic_segmentation.go | 2 +- pkg/connector/instill/v0/text_generation.go | 2 +- .../instill/v0/text_generation_chat.go | 2 +- pkg/connector/instill/v0/text_to_image.go | 2 +- .../instill/v0/visual_question_answering.go | 2 +- pkg/connector/main.go | 10 ++-- pkg/connector/numbers/v0/main.go | 55 ++++++++++--------- pkg/connector/openai/v0/main.go | 2 +- pkg/connector/pinecone/v0/main.go | 2 +- pkg/connector/redis/v0/main.go | 2 +- pkg/connector/restapi/v0/main.go | 6 +- pkg/connector/stabilityai/v0/main.go | 2 +- pkg/connector/website/v0/main.go | 2 +- pkg/operator/base64/v0/main.go | 2 +- pkg/operator/end/v0/main.go | 2 +- pkg/operator/image/v0/main.go | 2 +- pkg/operator/json/v0/main.go | 2 +- pkg/operator/main.go | 11 ++-- pkg/operator/start/v0/main.go | 2 +- pkg/operator/text/v0/main.go | 2 +- 38 files changed, 116 insertions(+), 125 deletions(-) diff --git a/pkg/base/connector.go b/pkg/base/connector.go index e5d011f1..693c0f9f 100644 --- a/pkg/base/connector.go +++ b/pkg/base/connector.go @@ -18,7 +18,7 @@ type IConnector interface { IComponent LoadConnectorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error - GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) + GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*ExecutionWrapper, error) Test(sysVars map[string]any, connection *structpb.Struct) error @@ -44,9 +44,10 @@ type IConnectorExecution interface { } type BaseConnectorExecution struct { - Connector IConnector - Connection *structpb.Struct - Task string + Connector IConnector + SystemVariables map[string]any + Connection *structpb.Struct + Task string } func (c *BaseConnector) GetID() string { @@ -63,7 +64,7 @@ func (c *BaseConnector) GetLogger() *zap.Logger { func (c *BaseConnector) GetUsageHandler() UsageHandler { return c.UsageHandler } -func (c *BaseConnector) GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { +func (c *BaseConnector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { return c.definition, nil } @@ -130,13 +131,15 @@ func (c *BaseConnector) LoadConnectorDefinition(definitionJSONBytes []byte, task return err } - // deprecated, will be removed soon - c.definition.Spec.ResourceSpecification, err = c.refineResourceSpec(c.definition.Spec.ResourceSpecification) + connection, err := c.refineResourceSpec(c.definition.Spec.ResourceSpecification) if err != nil { return err } + // deprecated, will be removed soon + c.definition.Spec.ResourceSpecification = &structpb.Struct{} + connectionPropStruct := &structpb.Struct{Fields: map[string]*structpb.Value{}} - connectionPropStruct.Fields["connection"] = structpb.NewStructValue(c.definition.Spec.ResourceSpecification) + connectionPropStruct.Fields["connection"] = structpb.NewStructValue(connection) c.definition.Spec.ComponentSpecification.Fields["properties"] = structpb.NewStructValue(connectionPropStruct) c.definition.Spec.DataSpecifications, err = generateDataSpecs(taskStructs) @@ -265,6 +268,9 @@ func (e *BaseConnectorExecution) GetConnector() IConnector { func (e *BaseConnectorExecution) GetConnection() *structpb.Struct { return e.Connection } +func (e *BaseConnectorExecution) GetSystemVariables() map[string]any { + return e.SystemVariables +} func (e *BaseConnectorExecution) GetLogger() *zap.Logger { return e.Connector.GetLogger() } diff --git a/pkg/base/connector_test.go b/pkg/base/connector_test.go index 24f2232f..8c2afffe 100644 --- a/pkg/base/connector_test.go +++ b/pkg/base/connector_test.go @@ -33,7 +33,7 @@ func TestConnector_ListConnectorDefinitions(t *testing.T) { map[string][]byte{"additional.json": connectorAdditionalJSON}) c.Assert(err, qt.IsNil) - got, err := conn.GetConnectorDefinition(nil) + got, err := conn.GetConnectorDefinition(nil, nil) c.Assert(err, qt.IsNil) c.Check(wantConnectorDefinitionJSON, qt.JSONEquals, got) } diff --git a/pkg/base/operator.go b/pkg/base/operator.go index 425d1aa0..937aaf04 100644 --- a/pkg/base/operator.go +++ b/pkg/base/operator.go @@ -15,7 +15,7 @@ type IOperator interface { IComponent LoadOperatorDefinition(definitionJSON []byte, tasksJSON []byte, additionalJSONBytes map[string][]byte) error - GetOperatorDefinition(component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) + GetOperatorDefinition(sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) CreateExecution(sysVars map[string]any, task string) (*ExecutionWrapper, error) } @@ -36,8 +36,9 @@ type IOperatorExecution interface { } type BaseOperatorExecution struct { - Operator IOperator - Task string + Operator IOperator + SystemVariables map[string]any + Task string } func (o *BaseOperator) GetID() string { @@ -59,7 +60,7 @@ func (o *BaseOperator) GetTaskOutputSchemas() map[string]string { return o.taskOutputSchemas } -func (o *BaseOperator) GetOperatorDefinition(component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) { +func (o *BaseOperator) GetOperatorDefinition(sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) { return o.definition, nil } @@ -130,6 +131,9 @@ func (e *BaseOperatorExecution) GetTask() string { func (e *BaseOperatorExecution) GetOperator() IOperator { return e.Operator } +func (e *BaseOperatorExecution) GetSystemVariables() map[string]any { + return e.SystemVariables +} func (e *BaseOperatorExecution) GetLogger() *zap.Logger { return e.Operator.GetLogger() } diff --git a/pkg/base/operator_test.go b/pkg/base/operator_test.go index 37a952ab..598fd6d1 100644 --- a/pkg/base/operator_test.go +++ b/pkg/base/operator_test.go @@ -28,7 +28,7 @@ func TestOperator_ListOperatorDefinitions(t *testing.T) { err := op.LoadOperatorDefinition(operatorDefJSON, operatorTasksJSON, nil) c.Assert(err, qt.IsNil) - got, err := op.GetOperatorDefinition(nil) + got, err := op.GetOperatorDefinition(nil, nil) c.Assert(err, qt.IsNil) c.Check(wantOperatorDefinitionJSON, qt.JSONEquals, got) } diff --git a/pkg/base/testdata/wantConnectorDefinition.json b/pkg/base/testdata/wantConnectorDefinition.json index 0cb6e34e..874bb161 100644 --- a/pkg/base/testdata/wantConnectorDefinition.json +++ b/pkg/base/testdata/wantConnectorDefinition.json @@ -6,26 +6,7 @@ "documentation_url": "https://www.instill.tech/docs/latest/vdp/ai-connectors/openai", "icon": "OpenAI/openai.svg", "spec": { - "resource_specification": { - "$schema": "http://json-schema.org/draft-07/schema#", - "additionalProperties": true, - "instillShortDescription": "", - "properties": { - "api_key": { - "description": "Fill your OpenAI API key. To find your keys, visit your OpenAI's API Keys page.", - "instillCredentialField": true, - "instillShortDescription": "Fill your OpenAI API key. To find your keys, visit your OpenAI's API Keys page.", - "instillUIOrder": 0, - "title": "API Key", - "type": "string" - } - }, - "required": [ - "api_key" - ], - "title": "OpenAI Connector Resource", - "type": "object" - }, + "resource_specification": {}, "component_specification": { "$schema": "http://json-schema.org/draft-07/schema#", "oneOf": [ diff --git a/pkg/connector/airbyte/v0/main.go b/pkg/connector/airbyte/v0/main.go index 16d1ee51..0c3fa402 100644 --- a/pkg/connector/airbyte/v0/main.go +++ b/pkg/connector/airbyte/v0/main.go @@ -46,7 +46,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/archetypeai/v0/main.go b/pkg/connector/archetypeai/v0/main.go index 28dda2a5..eeb4ff74 100644 --- a/pkg/connector/archetypeai/v0/main.go +++ b/pkg/connector/archetypeai/v0/main.go @@ -65,7 +65,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { e := &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, client: newClient(connection, c.Logger), } diff --git a/pkg/connector/bigquery/v0/main.go b/pkg/connector/bigquery/v0/main.go index 35c0e1c6..f171766a 100644 --- a/pkg/connector/bigquery/v0/main.go +++ b/pkg/connector/bigquery/v0/main.go @@ -54,7 +54,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/googlecloudstorage/v0/main.go b/pkg/connector/googlecloudstorage/v0/main.go index 0a69c099..7e8304be 100644 --- a/pkg/connector/googlecloudstorage/v0/main.go +++ b/pkg/connector/googlecloudstorage/v0/main.go @@ -53,7 +53,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/googlesearch/v0/main.go b/pkg/connector/googlesearch/v0/main.go index 95cbedd8..74dc18d9 100644 --- a/pkg/connector/googlesearch/v0/main.go +++ b/pkg/connector/googlesearch/v0/main.go @@ -55,7 +55,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/huggingface/v0/main.go b/pkg/connector/huggingface/v0/main.go index f7d4cecc..83519936 100644 --- a/pkg/connector/huggingface/v0/main.go +++ b/pkg/connector/huggingface/v0/main.go @@ -72,7 +72,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/instill/v0/image_classification.go b/pkg/connector/instill/v0/image_classification.go index b954496c..33e6b52a 100644 --- a/pkg/connector/instill/v0/image_classification.go +++ b/pkg/connector/instill/v0/image_classification.go @@ -47,7 +47,7 @@ func (e *execution) executeImageClassification(grpcClient modelPB.ModelPublicSer Name: modelName, TaskInputs: taskInputs, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/image_to_image.go b/pkg/connector/instill/v0/image_to_image.go index 1be5f7da..6715f90d 100644 --- a/pkg/connector/instill/v0/image_to_image.go +++ b/pkg/connector/instill/v0/image_to_image.go @@ -65,7 +65,7 @@ func (e *execution) executeImageToImage(grpcClient modelPB.ModelPublicServiceCli Name: modelName, TaskInputs: []*modelPB.TaskInput{{Input: taskInput}}, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/instance_segmentation.go b/pkg/connector/instill/v0/instance_segmentation.go index 2520c1f1..96ba8029 100644 --- a/pkg/connector/instill/v0/instance_segmentation.go +++ b/pkg/connector/instill/v0/instance_segmentation.go @@ -45,7 +45,7 @@ func (e *execution) executeInstanceSegmentation(grpcClient modelPB.ModelPublicSe Name: modelName, TaskInputs: taskInputs, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/keypoint_detection.go b/pkg/connector/instill/v0/keypoint_detection.go index a3e90500..f4be42f7 100644 --- a/pkg/connector/instill/v0/keypoint_detection.go +++ b/pkg/connector/instill/v0/keypoint_detection.go @@ -41,7 +41,7 @@ func (e *execution) executeKeyPointDetection(grpcClient modelPB.ModelPublicServi Name: modelName, TaskInputs: taskInputs, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/main.go b/pkg/connector/instill/v0/main.go index dcf92432..0632f7f8 100644 --- a/pkg/connector/instill/v0/main.go +++ b/pkg/connector/instill/v0/main.go @@ -59,7 +59,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } @@ -67,19 +67,19 @@ func getMode(config *structpb.Struct) string { return config.GetFields()["mode"].GetStringValue() } -func getAPIKey(config *structpb.Struct) string { +func getAPIKey(vars map[string]any, config *structpb.Struct) string { if getMode(config) == internalMode { - return config.GetFields()["header_authorization"].GetStringValue() + return vars["__PIPELINE_HEADER_AUTHORIZATION"].(string) } return fmt.Sprintf("Bearer %s", config.GetFields()["api_token"].GetStringValue()) } -func getInstillUserUID(config *structpb.Struct) string { - return config.GetFields()["instill_user_uid"].GetStringValue() +func getInstillUserUID(vars map[string]any, config *structpb.Struct) string { + return vars["__PIPELINE_USER_UID"].(string) } -func getModelServerURL(config *structpb.Struct) string { +func getModelServerURL(vars map[string]any, config *structpb.Struct) string { if getMode(config) == internalMode { - return config.GetFields()["instill_model_backend"].GetStringValue() + return vars["__MODEL_BACKEND"].(string) } serverURL := config.GetFields()["server_url"].GetStringValue() if strings.HasPrefix(serverURL, "https://") { @@ -94,9 +94,9 @@ func getModelServerURL(config *structpb.Struct) string { return serverURL } -func getMgmtServerURL(config *structpb.Struct) string { +func getMgmtServerURL(vars map[string]any, config *structpb.Struct) string { if getMode(config) == internalMode { - return config.GetFields()["instill_mgmt_backend"].GetStringValue() + return vars["__MGMT_BACKEND"].(string) } serverURL := config.GetFields()["server_url"].GetStringValue() if strings.HasPrefix(serverURL, "https://") { @@ -110,10 +110,10 @@ func getMgmtServerURL(config *structpb.Struct) string { } return serverURL } -func getRequestMetadata(cfg *structpb.Struct) metadata.MD { +func getRequestMetadata(vars map[string]any, cfg *structpb.Struct) metadata.MD { return metadata.Pairs( - "Authorization", getAPIKey(cfg), - "Instill-User-Uid", getInstillUserUID(cfg), + "Authorization", getAPIKey(vars, cfg), + "Instill-User-Uid", getInstillUserUID(vars, cfg), "Instill-Auth-Type", "user", ) } @@ -125,18 +125,18 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro return inputs, fmt.Errorf("invalid input") } - gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.Connection)) + gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(e.SystemVariables, e.Connection)) if gRPCCLientConn != nil { defer gRPCCLientConn.Close() } - mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.Connection)) + mgmtGRPCCLient, mgmtGRPCCLientConn := initMgmtPublicServiceClient(getMgmtServerURL(e.SystemVariables, e.Connection)) if mgmtGRPCCLientConn != nil { defer mgmtGRPCCLientConn.Close() } modelNameSplits := strings.Split(inputs[0].GetFields()["model_name"].GetStringValue(), "/") - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) nsResp, err := mgmtGRPCCLient.CheckNamespace(ctx, &mgmtPB.CheckNamespaceRequest{ Id: modelNameSplits[0], }) @@ -186,11 +186,11 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro } func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) error { - gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(connection)) + gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, connection)) if gRPCCLientConn != nil { defer gRPCCLientConn.Close() } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, connection)) _, err := gRPCCLient.ListModels(ctx, &modelPB.ListModelsRequest{}) if err != nil { return err @@ -216,23 +216,23 @@ type ModelsResp struct { } // Generate the model_name enum based on the task -func (c *connector) GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { - oriDef, err := c.BaseConnector.GetConnectorDefinition(nil) +func (c *connector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { + oriDef, err := c.BaseConnector.GetConnectorDefinition(nil, nil) if err != nil { return nil, err } def := proto.Clone(oriDef).(*pipelinePB.ConnectorDefinition) if component != nil && component.Connection != nil { - if getModelServerURL(component.Connection) == "" { + if getModelServerURL(sysVars, component.Connection) == "" { return def, nil } - gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(component.Connection)) + gRPCCLient, gRPCCLientConn := initModelPublicServiceClient(getModelServerURL(sysVars, component.Connection)) if gRPCCLientConn != nil { defer gRPCCLientConn.Close() } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(component.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(sysVars, component.Connection)) // We should query by pages and accumulate them in the future pageToken := "" diff --git a/pkg/connector/instill/v0/object_detection.go b/pkg/connector/instill/v0/object_detection.go index 8102edd3..29971a77 100644 --- a/pkg/connector/instill/v0/object_detection.go +++ b/pkg/connector/instill/v0/object_detection.go @@ -48,7 +48,7 @@ func (e *execution) executeObjectDetection(grpcClient modelPB.ModelPublicService TaskInputs: taskInputs, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/ocr.go b/pkg/connector/instill/v0/ocr.go index 86ab6045..c7f5092c 100644 --- a/pkg/connector/instill/v0/ocr.go +++ b/pkg/connector/instill/v0/ocr.go @@ -41,7 +41,7 @@ func (e *execution) executeOCR(grpcClient modelPB.ModelPublicServiceClient, mode Name: modelName, TaskInputs: []*modelPB.TaskInput{{Input: taskInput}}, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/semantic_segmentation.go b/pkg/connector/instill/v0/semantic_segmentation.go index bcf1e03c..2da5623e 100644 --- a/pkg/connector/instill/v0/semantic_segmentation.go +++ b/pkg/connector/instill/v0/semantic_segmentation.go @@ -45,7 +45,7 @@ func (e *execution) executeSemanticSegmentation(grpcClient modelPB.ModelPublicSe Name: modelName, TaskInputs: taskInputs, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/text_generation.go b/pkg/connector/instill/v0/text_generation.go index 826eba49..d4ae9a7c 100644 --- a/pkg/connector/instill/v0/text_generation.go +++ b/pkg/connector/instill/v0/text_generation.go @@ -44,7 +44,7 @@ func (e *execution) executeTextGeneration(grpcClient modelPB.ModelPublicServiceC Name: modelName, TaskInputs: []*modelPB.TaskInput{{Input: taskInput}}, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/text_generation_chat.go b/pkg/connector/instill/v0/text_generation_chat.go index 8bf3c612..cca7d6de 100644 --- a/pkg/connector/instill/v0/text_generation_chat.go +++ b/pkg/connector/instill/v0/text_generation_chat.go @@ -44,7 +44,7 @@ func (e *execution) executeTextGenerationChat(grpcClient modelPB.ModelPublicServ Name: modelName, TaskInputs: []*modelPB.TaskInput{{Input: taskInput}}, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/text_to_image.go b/pkg/connector/instill/v0/text_to_image.go index 2b98040b..2fd90889 100644 --- a/pkg/connector/instill/v0/text_to_image.go +++ b/pkg/connector/instill/v0/text_to_image.go @@ -56,7 +56,7 @@ func (e *execution) executeTextToImage(grpcClient modelPB.ModelPublicServiceClie Name: modelName, TaskInputs: []*modelPB.TaskInput{{Input: taskInput}}, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/instill/v0/visual_question_answering.go b/pkg/connector/instill/v0/visual_question_answering.go index f30d4a4e..695e52a9 100644 --- a/pkg/connector/instill/v0/visual_question_answering.go +++ b/pkg/connector/instill/v0/visual_question_answering.go @@ -44,7 +44,7 @@ func (e *execution) executeVisualQuestionAnswering(grpcClient modelPB.ModelPubli Name: modelName, TaskInputs: []*modelPB.TaskInput{{Input: taskInput}}, } - ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.Connection)) + ctx := metadata.NewOutgoingContext(context.Background(), getRequestMetadata(e.SystemVariables, e.Connection)) res, err := grpcClient.TriggerUserModel(ctx, &req) if err != nil || res == nil { return nil, err diff --git a/pkg/connector/main.go b/pkg/connector/main.go index 537973b9..511f50c2 100644 --- a/pkg/connector/main.go +++ b/pkg/connector/main.go @@ -82,17 +82,17 @@ func (cs *ConnectorStore) CreateExecution(defUID uuid.UUID, sysVars map[string]a return nil, fmt.Errorf("connector definition not found") } -func (cs *ConnectorStore) GetConnectorDefinitionByUID(defUID uuid.UUID, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { +func (cs *ConnectorStore) GetConnectorDefinitionByUID(defUID uuid.UUID, sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { if con, ok := cs.connectorUIDMap[defUID]; ok { - return con.con.GetConnectorDefinition(component) + return con.con.GetConnectorDefinition(sysVars, component) } return nil, fmt.Errorf("connector definition not found") } // Get the operator definition by definition id -func (cs *ConnectorStore) GetConnectorDefinitionByID(defID string, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { +func (cs *ConnectorStore) GetConnectorDefinitionByID(defID string, sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { if con, ok := cs.connectorIDMap[defID]; ok { - return con.con.GetConnectorDefinition(component) + return con.con.GetConnectorDefinition(sysVars, component) } return nil, fmt.Errorf("connector definition not found") } @@ -101,7 +101,7 @@ func (cs *ConnectorStore) GetConnectorDefinitionByID(defID string, component *pi func (cs *ConnectorStore) ListConnectorDefinitions(returnTombstone bool) []*pipelinePB.ConnectorDefinition { defs := []*pipelinePB.ConnectorDefinition{} for _, con := range cs.connectorUIDMap { - def, err := con.con.GetConnectorDefinition(nil) + def, err := con.con.GetConnectorDefinition(nil, nil) if err == nil { if !def.Tombstone || returnTombstone { defs = append(defs, def) diff --git a/pkg/connector/numbers/v0/main.go b/pkg/connector/numbers/v0/main.go index 7812814b..dd44db0c 100644 --- a/pkg/connector/numbers/v0/main.go +++ b/pkg/connector/numbers/v0/main.go @@ -45,22 +45,21 @@ type CommitCustomLicense struct { Document *string `json:"document,omitempty"` } type CommitCustom struct { - AssetCreator *string `json:"assetCreator,omitempty"` - DigitalSourceType *string `json:"digitalSourceType,omitempty"` - MiningPreference *string `json:"miningPreference,omitempty"` - GeneratedThrough string `json:"generatedThrough"` - GeneratedBy *string `json:"generatedBy,omitempty"` - CreatorWallet *string `json:"creatorWallet,omitempty"` - License *CommitCustomLicense `json:"license,omitempty"` - Metadata *struct { - Pipeline *struct { - UID *string `json:"uid,omitempty"` - Recipe interface{} `json:"recipe,omitempty"` - } `json:"pipeline,omitempty"` - Owner *struct { - UID *string `json:"uid,omitempty"` - } `json:"owner,omitempty"` - } `json:"instillMetadata,omitempty"` + AssetCreator *string `json:"assetCreator,omitempty"` + DigitalSourceType *string `json:"digitalSourceType,omitempty"` + MiningPreference *string `json:"miningPreference,omitempty"` + GeneratedThrough string `json:"generatedThrough"` + GeneratedBy *string `json:"generatedBy,omitempty"` + CreatorWallet *string `json:"creatorWallet,omitempty"` + License *CommitCustomLicense `json:"license,omitempty"` + Metadata *CommitCustomMetadata `json:"instillMetadata,omitempty"` +} + +type CommitCustomMetadata struct { + Pipeline struct { + UID string + Recipe interface{} + } } type Meta struct { @@ -90,15 +89,6 @@ type Input struct { Name *string `json:"name,omitempty"` Document *string `json:"document,omitempty"` } `json:"license,omitempty"` - Metadata *struct { - Pipeline *struct { - UID *string `json:"uid,omitempty"` - Recipe interface{} `json:"recipe,omitempty"` - } `json:"pipeline,omitempty"` - Owner *struct { - UID *string `json:"uid,omitempty"` - } `json:"owner,omitempty"` - } `json:"metadata,omitempty"` } type Output struct { @@ -123,7 +113,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } @@ -234,6 +224,17 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro } } + var r any + _ = json.Unmarshal([]byte(e.GetSystemVariables()["__PIPELINE_RECIPE"].(string)), &r) + meta := CommitCustomMetadata{ + Pipeline: struct { + UID string + Recipe interface{} + }{ + UID: e.GetSystemVariables()["__PIPELINE_UID"].(string), + Recipe: r, + }, + } commitCustom := CommitCustom{ AssetCreator: inputStruct.AssetCreator, DigitalSourceType: inputStruct.DigitalSourceType, @@ -241,7 +242,7 @@ func (e *execution) Execute(inputs []*structpb.Struct) ([]*structpb.Struct, erro GeneratedThrough: "https://instill.tech", //TODO: support Core Host GeneratedBy: inputStruct.GeneratedBy, License: commitLicense, - Metadata: inputStruct.Metadata, + Metadata: &meta, } reg := Register{ diff --git a/pkg/connector/openai/v0/main.go b/pkg/connector/openai/v0/main.go index b274d58e..6a22c603 100644 --- a/pkg/connector/openai/v0/main.go +++ b/pkg/connector/openai/v0/main.go @@ -63,7 +63,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/pinecone/v0/main.go b/pkg/connector/pinecone/v0/main.go index 73d0ac54..2a981091 100644 --- a/pkg/connector/pinecone/v0/main.go +++ b/pkg/connector/pinecone/v0/main.go @@ -55,7 +55,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/redis/v0/main.go b/pkg/connector/redis/v0/main.go index 5936207e..e7a74cda 100644 --- a/pkg/connector/redis/v0/main.go +++ b/pkg/connector/redis/v0/main.go @@ -54,7 +54,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/restapi/v0/main.go b/pkg/connector/restapi/v0/main.go index 752266f3..17ae5f66 100644 --- a/pkg/connector/restapi/v0/main.go +++ b/pkg/connector/restapi/v0/main.go @@ -74,7 +74,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } @@ -171,8 +171,8 @@ func (c *connector) Test(sysVars map[string]any, connection *structpb.Struct) er } // Generate the model_name enum based on the task -func (c *connector) GetConnectorDefinition(component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { - oriDef, err := c.BaseConnector.GetConnectorDefinition(nil) +func (c *connector) GetConnectorDefinition(sysVars map[string]any, component *pipelinePB.ConnectorComponent) (*pipelinePB.ConnectorDefinition, error) { + oriDef, err := c.BaseConnector.GetConnectorDefinition(nil, nil) if err != nil { return nil, err } diff --git a/pkg/connector/stabilityai/v0/main.go b/pkg/connector/stabilityai/v0/main.go index 0b45aa47..18a02704 100644 --- a/pkg/connector/stabilityai/v0/main.go +++ b/pkg/connector/stabilityai/v0/main.go @@ -55,7 +55,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/connector/website/v0/main.go b/pkg/connector/website/v0/main.go index aee1657c..f40b30ae 100644 --- a/pkg/connector/website/v0/main.go +++ b/pkg/connector/website/v0/main.go @@ -51,7 +51,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *connector { func (c *connector) CreateExecution(sysVars map[string]any, connection *structpb.Struct, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, Connection: connection, Task: task}, + BaseConnectorExecution: base.BaseConnectorExecution{Connector: c, SystemVariables: sysVars, Connection: connection, Task: task}, }}, nil } diff --git a/pkg/operator/base64/v0/main.go b/pkg/operator/base64/v0/main.go index f4e6829c..fe47a234 100644 --- a/pkg/operator/base64/v0/main.go +++ b/pkg/operator/base64/v0/main.go @@ -59,7 +59,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *operator { func (o *operator) CreateExecution(sysVars map[string]any, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, Task: task}, + BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, SystemVariables: sysVars, Task: task}, }}, nil } diff --git a/pkg/operator/end/v0/main.go b/pkg/operator/end/v0/main.go index 006e6a96..80d9e48f 100644 --- a/pkg/operator/end/v0/main.go +++ b/pkg/operator/end/v0/main.go @@ -46,7 +46,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *operator { func (o *operator) CreateExecution(sysVars map[string]any, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, Task: task}, + BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, SystemVariables: sysVars, Task: task}, }}, nil } diff --git a/pkg/operator/image/v0/main.go b/pkg/operator/image/v0/main.go index 8d8fd5c7..7a8fa04f 100644 --- a/pkg/operator/image/v0/main.go +++ b/pkg/operator/image/v0/main.go @@ -57,7 +57,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *operator { func (o *operator) CreateExecution(sysVars map[string]any, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, Task: task}, + BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, SystemVariables: sysVars, Task: task}, }}, nil } diff --git a/pkg/operator/json/v0/main.go b/pkg/operator/json/v0/main.go index 8ce749b6..78fb5912 100644 --- a/pkg/operator/json/v0/main.go +++ b/pkg/operator/json/v0/main.go @@ -61,7 +61,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *operator { func (o *operator) CreateExecution(sysVars map[string]any, task string) (*base.ExecutionWrapper, error) { e := &execution{ - BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, Task: task}, + BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, SystemVariables: sysVars, Task: task}, } switch task { diff --git a/pkg/operator/main.go b/pkg/operator/main.go index 272971da..355eb103 100644 --- a/pkg/operator/main.go +++ b/pkg/operator/main.go @@ -65,18 +65,17 @@ func (os *OperatorStore) CreateExecution(defUID uuid.UUID, sysVars map[string]an return nil, fmt.Errorf("operator definition not found") } -func (os *OperatorStore) GetOperatorDefinitionByUID(defUID uuid.UUID, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) { - fmt.Println("defUID", defUID) +func (os *OperatorStore) GetOperatorDefinitionByUID(defUID uuid.UUID, sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) { if op, ok := os.operatorUIDMap[defUID]; ok { - return op.op.GetOperatorDefinition(component) + return op.op.GetOperatorDefinition(sysVars, component) } return nil, fmt.Errorf("operator definition not found") } // Get the operator definition by definition id -func (os *OperatorStore) GetOperatorDefinitionByID(defID string, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) { +func (os *OperatorStore) GetOperatorDefinitionByID(defID string, sysVars map[string]any, component *pipelinePB.OperatorComponent) (*pipelinePB.OperatorDefinition, error) { if op, ok := os.operatorIDMap[defID]; ok { - return op.op.GetOperatorDefinition(component) + return op.op.GetOperatorDefinition(sysVars, component) } return nil, fmt.Errorf("operator definition not found") } @@ -85,7 +84,7 @@ func (os *OperatorStore) GetOperatorDefinitionByID(defID string, component *pipe func (os *OperatorStore) ListOperatorDefinitions(returnTombstone bool) []*pipelinePB.OperatorDefinition { defs := []*pipelinePB.OperatorDefinition{} for _, op := range os.operatorUIDMap { - def, err := op.op.GetOperatorDefinition(nil) + def, err := op.op.GetOperatorDefinition(nil, nil) if err == nil { if !def.Tombstone || returnTombstone { defs = append(defs, def) diff --git a/pkg/operator/start/v0/main.go b/pkg/operator/start/v0/main.go index 18eca615..17492d24 100644 --- a/pkg/operator/start/v0/main.go +++ b/pkg/operator/start/v0/main.go @@ -46,7 +46,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *operator { func (o *operator) CreateExecution(sysVars map[string]any, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, Task: task}, + BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, SystemVariables: sysVars, Task: task}, }}, nil } diff --git a/pkg/operator/text/v0/main.go b/pkg/operator/text/v0/main.go index 0dbd86f1..f5d1d3f8 100644 --- a/pkg/operator/text/v0/main.go +++ b/pkg/operator/text/v0/main.go @@ -56,7 +56,7 @@ func Init(l *zap.Logger, u base.UsageHandler) *operator { func (o *operator) CreateExecution(sysVars map[string]any, task string) (*base.ExecutionWrapper, error) { return &base.ExecutionWrapper{Execution: &execution{ - BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, Task: task}, + BaseOperatorExecution: base.BaseOperatorExecution{Operator: o, SystemVariables: sysVars, Task: task}, }}, nil }