Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
feat(controller): add connector state monitoring with controller (#72)
Browse files Browse the repository at this point in the history
Because

- support connector state monitoring with controller

This commit

- add public /watch endpoint for querying connector state from etcd through controller
- add private /check endpoint for controller to probe and trigger longrunning checkConnector workflow

Co-authored-by: Ping-Lin Chang <ping-lin.chang@instill.tech>
  • Loading branch information
heiruwu and pinglin committed Apr 7, 2023
1 parent 2b7d917 commit d4092e6
Show file tree
Hide file tree
Showing 33 changed files with 851 additions and 211 deletions.
7 changes: 7 additions & 0 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func main() {
defer pipelinePublicServiceClientConn.Close()
}

controllerClient, controllerClientConn := external.InitControllerPrivateServiceClient()
if controllerClientConn != nil {
defer controllerClientConn.Close()
}

repository := repository.NewRepository(db)

privateGrpcS := grpc.NewServer(grpcServerOpts...)
Expand All @@ -153,6 +158,7 @@ func main() {
mgmtPrivateServiceClient,
pipelinePublicServiceClient,
temporalClient,
controllerClient,
)))

connectorPB.RegisterConnectorPublicServiceServer(
Expand All @@ -163,6 +169,7 @@ func main() {
mgmtPrivateServiceClient,
pipelinePublicServiceClient,
temporalClient,
controllerClient,
)))

privateServeMux := runtime.NewServeMux(
Expand Down
1 change: 1 addition & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func main() {
w.RegisterActivity(cw.CheckActivity)
w.RegisterWorkflow(cw.WriteWorkflow)
w.RegisterActivity(cw.WriteActivity)
w.RegisterWorkflow(cw.AddSearchAttributeWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type AppConfig struct {
MgmtBackend MgmtBackendConfig `koanf:"mgmtbackend"`
PipelineBackend PipelineBackendConfig `koanf:"pipelinebackend"`
UsageBackend UsageBackendConfig `koanf:"usagebackend"`
Controller ControllerConfig `koanf:"controller"`
}

// ServerConfig defines HTTP server configurations
Expand Down Expand Up @@ -102,6 +103,16 @@ type UsageBackendConfig struct {
Port int `koanf:"port"`
}

// ControllerConfig related to controller
type ControllerConfig struct {
Host string `koanf:"host"`
Port int `koanf:"port"`
HTTPS struct {
Cert string `koanf:"cert"`
Key string `koanf:"key"`
}
}

// Init - Assign global config to decoded config struct
func Init() error {

Expand Down
6 changes: 6 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ usagebackend:
tlsenabled: true
host: usage.instill.tech
port: 443
controller:
host: controller
port: 3085
https:
cert:
key:
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/instill-ai/connector-backend
go 1.19

require (
cloud.google.com/go/longrunning v0.3.0
github.com/allegro/bigcache v1.2.1
github.com/docker/docker v20.10.13+incompatible
github.com/ghodss/yaml v1.0.0
Expand All @@ -13,7 +14,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0
github.com/iancoleman/strcase v0.2.0
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b
github.com/instill-ai/usage-client v0.2.2-alpha
github.com/instill-ai/usage-client v0.2.3-alpha
github.com/instill-ai/x v0.2.0-alpha
github.com/jackc/pgx/v5 v5.2.0
github.com/knadh/koanf v1.5.0
Expand All @@ -23,7 +24,7 @@ require (
go.temporal.io/api v1.16.0
go.temporal.io/sdk v1.21.0
go.uber.org/zap v1.24.0
golang.org/x/net v0.5.0
golang.org/x/net v0.9.0
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57
google.golang.org/grpc v1.52.3
google.golang.org/protobuf v1.28.1
Expand All @@ -33,7 +34,6 @@ require (
)

require (
cloud.google.com/go/longrunning v0.3.0 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/catalinc/hashcash v0.0.0-20220723060415-5e3ec3e24f67 // indirect
github.com/containerd/containerd v1.6.1 // indirect
Expand Down Expand Up @@ -71,8 +71,8 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.1.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
17 changes: 8 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,10 @@ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230314172914-1976737846cb h1:CxIAqYo7klOycw/qzTAmQJ0JHn74UuJjUzssPAF4KNw=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230314172914-1976737846cb/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329112911-d3741ca31777 h1:pJ3eyFqd5LW7mXRNbNZM5KkvmkRbz4d4envYT6T/SpM=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329112911-d3741ca31777/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b h1:BI97L8e4pkbQVcqRyQsR9/Q1/4pXB+zFGUWOEL/hZ6U=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/usage-client v0.2.2-alpha h1:EQyHpgzZ26TEIL9UoaqchTf+LnKaidUGhKlUEFR68I8=
github.com/instill-ai/usage-client v0.2.2-alpha/go.mod h1:RpVnioKQBoJZsE1qTiZlPQUQXUALTGzhBl8ju9rm5+U=
github.com/instill-ai/usage-client v0.2.3-alpha h1:jdbCH6WJ7eUVudGtuDWaEsWmGX09ZFUnAHx+8S2MnDY=
github.com/instill-ai/usage-client v0.2.3-alpha/go.mod h1:G0bnSyJw3ul8iNTpAGNslB18Qux0aMZF08h+CRICumw=
github.com/instill-ai/x v0.2.0-alpha h1:8yszKP9DE8bvSRAtEpOwqhG2wwqU3olhTqhwoiLrHfc=
github.com/instill-ai/x v0.2.0-alpha/go.mod h1:/UEx/zFyMo7so2ctBY0pzjmIoJB9Qz5Y4gvwU2FoU74=
github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ=
Expand Down Expand Up @@ -1803,8 +1799,9 @@ golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfS
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -1990,8 +1987,9 @@ golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand All @@ -2012,8 +2010,9 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
9 changes: 8 additions & 1 deletion integration-test/const.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ export const params = {
};

const randomUUID = uuidv4();
export const paramsWithJwt = {
export const paramsGRPCWithJwt = {
metadata: {
"Content-Type": "application/json",
"Jwt-Sub": randomUUID,
},
}

export const paramsHTTPWithJwt = {
headers: {
"Content-Type": "application/json",
"Jwt-Sub": randomUUID,
Expand Down
4 changes: 2 additions & 2 deletions integration-test/grpc-destination-connector-private.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ export function CheckGet() {
var currentTime = new Date().getTime();
var timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
var res = clientPrivate.invoke('vdp.connector.v1alpha.ConnectorPrivateService/GetDestinationConnectorAdmin', {
var res = clientPublic.invoke('vdp.connector.v1alpha.ConnectorPublicService/WatchDestinationConnector', {
name: `destination-connectors/${resCSVDst.message.destinationConnector.id}`
})
if (res.message.destinationConnector.connector.state === "STATE_CONNECTED") {
if (res.message.state === "STATE_CONNECTED") {
break
}
sleep(1)
Expand Down

0 comments on commit d4092e6

Please sign in to comment.