Skip to content

Commit

Permalink
feat: add UsageHandler interface (#87)
Browse files Browse the repository at this point in the history
Because

- We want to collect the usage of each component. We'll provide a
`UsageHandler` interface to allow the pipeline-backend or other services
to pass the usage handler implementation here.

This commit

- Adds `UsageHandler` interface.
  • Loading branch information
donch1989 committed Apr 18, 2024
1 parent a6de70e commit b9d9645
Show file tree
Hide file tree
Showing 32 changed files with 138 additions and 109 deletions.
9 changes: 9 additions & 0 deletions pkg/base/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type IComponent interface {
GetTaskInputSchemas() map[string]string
// Get task output schemas
GetTaskOutputSchemas() map[string]string

// Get usage handler
GetUsageHandler() UsageHandler
}

// Component is the basic component struct
Expand All @@ -76,6 +79,8 @@ type Component struct {
taskInputSchemas map[string]string
taskOutputSchemas map[string]string

UsageHandler UsageHandler

// Logger
Logger *zap.Logger
}
Expand Down Expand Up @@ -543,6 +548,10 @@ func (comp *Component) getDefinitionByID(defID string) (interface{}, error) {
return val, nil
}

func (comp *Component) GetUsageHandler() UsageHandler {
return comp.UsageHandler
}

// ConvertFromStructpb converts from structpb.Struct to a struct
func ConvertFromStructpb(from *structpb.Struct, to interface{}) error {
inputJSON, err := protojson.Marshal(from)
Expand Down
13 changes: 13 additions & 0 deletions pkg/base/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (e *Execution) ExecuteWithValidation(inputs []*structpb.Struct) ([]*structp
return nil, err
}

if e.Component.GetUsageHandler() != nil {
if err := e.Component.GetUsageHandler().Check(); err != nil {
return nil, err
}
}

outputs, err := e.ComponentExecution.Execute(inputs)
if err != nil {
return nil, err
Expand All @@ -167,6 +173,13 @@ func (e *Execution) ExecuteWithValidation(inputs []*structpb.Struct) ([]*structp
if err := e.Validate(outputs, e.Component.GetTaskOutputSchemas()[task], "outputs"); err != nil {
return nil, err
}

if e.Component.GetUsageHandler() != nil {
if err := e.Component.GetUsageHandler().Collect(); err != nil {
return nil, err
}
}

return outputs, err
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/base/usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package base

// TODO: add parameters for usage check and collection
type UsageHandler interface {
Check() error
Collect() error
}
8 changes: 4 additions & 4 deletions pkg/connector/airbyte/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Execution struct {
connector *Connector
}

func Init(logger *zap.Logger, options ConnectorOptions) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler, options ConnectorOptions) base.IConnector {
once.Do(func() {

dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
Expand All @@ -73,7 +73,7 @@ func Init(logger *zap.Logger, options ConnectorOptions) base.IConnector {

connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
dockerClient: dockerClient,
cache: cache,
Expand Down Expand Up @@ -103,9 +103,9 @@ func Init(logger *zap.Logger, options ConnectorOptions) base.IConnector {
return connector
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
e.connector = c
return e, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/connector/archetypeai/v0/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestConnector_Execute(t *testing.T) {
}

logger := zap.NewNop()
connector := Init(logger)
connector := Init(logger, nil)
defID := uuid.Must(uuid.NewV4())

for _, tc := range testcases {
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestConnector_CreateExecution(t *testing.T) {
c := qt.New(t)

logger := zap.NewNop()
connector := Init(logger)
connector := Init(logger, nil)
defID := uuid.Must(uuid.NewV4())

c.Run("nok - unsupported task", func(c *qt.C) {
Expand All @@ -304,7 +304,7 @@ func TestConnector_Test(t *testing.T) {
c := qt.New(t)

logger := zap.NewNop()
connector := Init(logger)
connector := Init(logger, nil)
defID := uuid.Must(uuid.NewV4())

c.Run("ok - connected", func(c *qt.C) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/connector/archetypeai/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ type execution struct {

// Init returns an implementation of IConnector that interacts with Archetype
// AI.
func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {
baseConn = &connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
if err := baseConn.LoadConnectorDefinition(definitionJSON, tasksJSON, nil); err != nil {
Expand All @@ -62,9 +62,9 @@ func Init(logger *zap.Logger) base.IConnector {
}

// CreateExecution returns an IExecution that executes tasks in Archetype AI.
func (c *connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &execution{
client: newClient(config, logger),
client: newClient(connection, logger),
}

switch task {
Expand All @@ -81,7 +81,7 @@ func (c *connector) CreateExecution(defUID uuid.UUID, task string, config *struc
)
}

e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)

return e, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/connector/bigquery/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type Execution struct {
base.Execution
}

func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
err := connector.LoadConnectorDefinition(definitionJSON, tasksJSON, nil)
Expand All @@ -52,9 +52,9 @@ func Init(logger *zap.Logger) base.IConnector {
return connector
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
return e, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/connector/googlecloudstorage/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ type Execution struct {
base.Execution
}

func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
err := connector.LoadConnectorDefinition(definitionJSON, tasksJSON, nil)
Expand All @@ -51,9 +51,9 @@ func Init(logger *zap.Logger) base.IConnector {
return connector
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
return e, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/connector/googlesearch/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type Execution struct {
base.Execution
}

func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
err := connector.LoadConnectorDefinition(definitionJSON, tasksJSON, nil)
Expand All @@ -53,9 +53,9 @@ func Init(logger *zap.Logger) base.IConnector {
return connector
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
return e, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/huggingface/v0/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestConnector_ExecuteSpeechRecognition(t *testing.T) {

func testTask(c *qt.C, p taskParams) {
logger := zap.NewNop()
connector := Init(logger)
connector := Init(logger, nil)
defID := uuid.Must(uuid.NewV4())

c.Run("nok - HTTP client error - "+p.task, func(c *qt.C) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/connector/huggingface/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type Execution struct {
base.Execution
}

func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
err := connector.LoadConnectorDefinition(definitionJSON, tasksJSON, nil)
Expand All @@ -70,9 +70,9 @@ func Init(logger *zap.Logger) base.IConnector {
return connector
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
return e, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/connector/instill/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type Execution struct {
base.Execution
}

func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {
connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
err := connector.LoadConnectorDefinition(definitionJSON, tasksJSON, nil)
Expand All @@ -57,9 +57,9 @@ func Init(logger *zap.Logger) base.IConnector {
return connector
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
return e, nil
}

Expand Down
36 changes: 18 additions & 18 deletions pkg/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,29 @@ type ConnectorOptions struct {
Airbyte airbyte.ConnectorOptions
}

func Init(logger *zap.Logger, options ConnectorOptions) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler, options ConnectorOptions) base.IConnector {
once.Do(func() {

connector = &Connector{
Connector: base.Connector{Component: base.Component{Logger: logger}},
Connector: base.Connector{Component: base.Component{Logger: logger, UsageHandler: usageHandler}},
connectorUIDMap: map[uuid.UUID]base.IConnector{},
connectorIDMap: map[string]base.IConnector{},
}

connector.(*Connector).ImportDefinitions(stabilityai.Init(logger))
connector.(*Connector).ImportDefinitions(instill.Init(logger))
connector.(*Connector).ImportDefinitions(huggingface.Init(logger))
connector.(*Connector).ImportDefinitions(openai.Init(logger))
connector.(*Connector).ImportDefinitions(archetypeai.Init(logger))
connector.(*Connector).ImportDefinitions(numbers.Init(logger))
connector.(*Connector).ImportDefinitions(airbyte.Init(logger, options.Airbyte))
connector.(*Connector).ImportDefinitions(bigquery.Init(logger))
connector.(*Connector).ImportDefinitions(googlecloudstorage.Init(logger))
connector.(*Connector).ImportDefinitions(googlesearch.Init(logger))
connector.(*Connector).ImportDefinitions(pinecone.Init(logger))
connector.(*Connector).ImportDefinitions(redis.Init(logger))
connector.(*Connector).ImportDefinitions(restapi.Init(logger))
connector.(*Connector).ImportDefinitions(website.Init(logger))
connector.(*Connector).ImportDefinitions(stabilityai.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(instill.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(huggingface.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(openai.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(archetypeai.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(numbers.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(airbyte.Init(logger, usageHandler, options.Airbyte))
connector.(*Connector).ImportDefinitions(bigquery.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(googlecloudstorage.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(googlesearch.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(pinecone.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(redis.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(restapi.Init(logger, usageHandler))
connector.(*Connector).ImportDefinitions(website.Init(logger, usageHandler))

})
return connector
Expand All @@ -77,8 +77,8 @@ func (c *Connector) ImportDefinitions(con base.IConnector) {
}
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
return c.connectorUIDMap[defUID].CreateExecution(defUID, task, config, logger)
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
return c.connectorUIDMap[defUID].CreateExecution(defUID, task, connection, logger)
}

func (c *Connector) Test(defUID uuid.UUID, config *structpb.Struct, logger *zap.Logger) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/connector/numbers/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ type Output struct {
AssetUrls []string `json:"asset_urls"`
}

func Init(logger *zap.Logger) base.IConnector {
func Init(logger *zap.Logger, usageHandler base.UsageHandler) base.IConnector {
once.Do(func() {

connector = &Connector{
Connector: base.Connector{
Component: base.Component{Logger: logger},
Component: base.Component{Logger: logger, UsageHandler: usageHandler},
},
}
err := connector.LoadConnectorDefinition(definitionJSON, tasksJSON, nil)
Expand Down Expand Up @@ -201,9 +201,9 @@ func (e *Execution) registerAsset(data []byte, reg Register) (string, error) {
}
}

func (c *Connector) CreateExecution(defUID uuid.UUID, task string, config *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
func (c *Connector) CreateExecution(defUID uuid.UUID, task string, connection *structpb.Struct, logger *zap.Logger) (base.IExecution, error) {
e := &Execution{}
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, config, logger)
e.Execution = base.CreateExecutionHelper(e, c, defUID, task, connection, logger)
return e, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/connector/openai/v0/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestConnector_Execute(t *testing.T) {
c := qt.New(t)

logger := zap.NewNop()
connector := Init(logger)
connector := Init(logger, nil)
defID := uuid.Must(uuid.NewV4())

testcases := []struct {
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestConnector_Test(t *testing.T) {
c := qt.New(t)

logger := zap.NewNop()
connector := Init(logger)
connector := Init(logger, nil)
defID := uuid.Must(uuid.NewV4())

c.Run("nok - error", func(c *qt.C) {
Expand Down
Loading

0 comments on commit b9d9645

Please sign in to comment.