Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
feat: adopt private public apis for connector (#64)
Browse files Browse the repository at this point in the history
Because

- support private API used for internal services

This commit

- add gRPC test cases
- support private API
- update to the latest protobuf

This PR also resolves INS-127.

Co-authored-by: Ping-Lin Chang <ping-lin.chang@instill.tech>
  • Loading branch information
Phelan164 and pinglin committed Mar 20, 2023
1 parent 0f418cd commit 4162b4b
Show file tree
Hide file tree
Showing 72 changed files with 10,834 additions and 334 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ unit-test: ## Run unit test

.PHONY: integration-test
integration-test: ## Run integration test
@TEST_FOLDER_ABS_PATH=${PWD} k6 run -e MODE=$(MODE) integration-test/grpc.js --no-usage-report --quiet
@TEST_FOLDER_ABS_PATH=${PWD} k6 run -e MODE=$(MODE) integration-test/rest.js --no-usage-report --quiet

.PHONY: integration-test-protocol
Expand Down
99 changes: 74 additions & 25 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func main() {
// ZapAdapter implements log.Logger interface and can be passed
// to the client constructor using client using client.Options.
Namespace: connWorker.Namespace,
Logger: zapadapter.NewZapAdapter(logger),
HostPort: config.Config.Temporal.ClientOptions.HostPort,
Logger: zapadapter.NewZapAdapter(logger),
HostPort: config.Config.Temporal.ClientOptions.HostPort,
})
if err != nil {
logger.Fatal(err.Error())
Expand All @@ -101,7 +101,7 @@ func main() {
grpc_zap.WithDecider(func(fullMethodName string, err error) bool {
// will not log gRPC calls if it was a call to liveness or readiness and no error was raised
if err == nil {
if match, _ := regexp.MatchString("vdp.connector.v1alpha.ConnectorService/.*ness$", fullMethodName); match {
if match, _ := regexp.MatchString("vdp.connector.v1alpha.ConnectorPublicService/.*ness$", fullMethodName); match {
return false
}
}
Expand All @@ -126,32 +126,61 @@ func main() {
grpcServerOpts = append(grpcServerOpts, grpc.Creds(creds))
}

mgmtAdminServiceClient, mgmtAdminServiceClientConn := external.InitMgmtAdminServiceClient()
if mgmtAdminServiceClientConn != nil {
defer mgmtAdminServiceClientConn.Close()
mgmtPrivateServiceClient, mgmtPrivateServiceClientConn := external.InitMgmtPrivateServiceClient()
if mgmtPrivateServiceClientConn != nil {
defer mgmtPrivateServiceClientConn.Close()
}

pipelineServiceClient, pipelineServiceClientConn := external.InitPipelineServiceClient()
if pipelineServiceClientConn != nil {
defer pipelineServiceClientConn.Close()
pipelinePublicServiceClient, pipelinePublicServiceClientConn := external.InitPipelinePublicServiceClient()
if pipelinePublicServiceClientConn != nil {
defer pipelinePublicServiceClientConn.Close()
}

repository := repository.NewRepository(db)

grpcS := grpc.NewServer(grpcServerOpts...)
reflection.Register(grpcS)
privateGrpcS := grpc.NewServer(grpcServerOpts...)
reflection.Register(privateGrpcS)

connectorPB.RegisterConnectorServiceServer(
grpcS,
handler.NewHandler(
publicGrpcS := grpc.NewServer(grpcServerOpts...)
reflection.Register(publicGrpcS)

connectorPB.RegisterConnectorPrivateServiceServer(
privateGrpcS,
handler.NewPrivateHandler(
service.NewService(
repository,
mgmtAdminServiceClient,
pipelineServiceClient,
mgmtPrivateServiceClient,
pipelinePublicServiceClient,
temporalClient,
)))

gwS := runtime.NewServeMux(
connectorPB.RegisterConnectorPublicServiceServer(
publicGrpcS,
handler.NewPublicHandler(
service.NewService(
repository,
mgmtPrivateServiceClient,
pipelinePublicServiceClient,
temporalClient,
)))

privateServeMux := runtime.NewServeMux(
runtime.WithForwardResponseOption(httpResponseModifier),
runtime.WithErrorHandler(errorHandler),
runtime.WithIncomingHeaderMatcher(customMatcher),
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: true,
EmitUnpopulated: true,
UseEnumNumbers: false,
},
UnmarshalOptions: protojson.UnmarshalOptions{
DiscardUnknown: true,
},
}),
)

publicServeMux := runtime.NewServeMux(
runtime.WithForwardResponseOption(httpResponseModifier),
runtime.WithErrorHandler(errorHandler),
runtime.WithIncomingHeaderMatcher(customMatcher),
Expand All @@ -177,7 +206,7 @@ func main() {
if usageServiceClientConn != nil {
defer usageServiceClientConn.Close()
}
usg = usage.NewUsage(ctx, repository, mgmtAdminServiceClient, usageServiceClient)
usg = usage.NewUsage(ctx, repository, mgmtPrivateServiceClient, usageServiceClient)
if usg != nil {
usg.StartReporter(ctx)
}
Expand All @@ -191,27 +220,46 @@ func main() {
dialOpts = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
}

if err := connectorPB.RegisterConnectorServiceHandlerFromEndpoint(ctx, gwS, fmt.Sprintf(":%v", config.Config.Server.Port), dialOpts); err != nil {
if err := connectorPB.RegisterConnectorPrivateServiceHandlerFromEndpoint(ctx, privateServeMux, fmt.Sprintf(":%v", config.Config.Server.PrivatePort), dialOpts); err != nil {
logger.Fatal(err.Error())
}

httpServer := &http.Server{
Addr: fmt.Sprintf(":%v", config.Config.Server.Port),
Handler: grpcHandlerFunc(grpcS, gwS, config.Config.Server.CORSOrigins),
if err := connectorPB.RegisterConnectorPublicServiceHandlerFromEndpoint(ctx, publicServeMux, fmt.Sprintf(":%v", config.Config.Server.PublicPort), dialOpts); err != nil {
logger.Fatal(err.Error())
}

privateHTTPServer := &http.Server{
Addr: fmt.Sprintf(":%v", config.Config.Server.PrivatePort),
Handler: grpcHandlerFunc(privateGrpcS, privateServeMux, config.Config.Server.CORSOrigins),
}

publicHTTPServer := &http.Server{
Addr: fmt.Sprintf(":%v", config.Config.Server.PublicPort),
Handler: grpcHandlerFunc(publicGrpcS, publicServeMux, config.Config.Server.CORSOrigins),
}

// Wait for interrupt signal to gracefully shutdown the server with a timeout of 5 seconds.
quitSig := make(chan os.Signal, 1)
errSig := make(chan error)
if config.Config.Server.HTTPS.Cert != "" && config.Config.Server.HTTPS.Key != "" {
go func() {
if err := httpServer.ListenAndServeTLS(config.Config.Server.HTTPS.Cert, config.Config.Server.HTTPS.Key); err != nil {
if err := privateHTTPServer.ListenAndServeTLS(config.Config.Server.HTTPS.Cert, config.Config.Server.HTTPS.Key); err != nil {
errSig <- err
}
}()
go func() {
if err := publicHTTPServer.ListenAndServeTLS(config.Config.Server.HTTPS.Cert, config.Config.Server.HTTPS.Key); err != nil {
errSig <- err
}
}()
} else {
go func() {
if err := httpServer.ListenAndServe(); err != nil {
if err := privateHTTPServer.ListenAndServe(); err != nil {
errSig <- err
}
}()
go func() {
if err := publicHTTPServer.ListenAndServe(); err != nil {
errSig <- err
}
}()
Expand All @@ -231,6 +279,7 @@ func main() {
usg.TriggerSingleReporter(ctx)
}
logger.Info("Shutting down server...")
grpcS.GracefulStop()
privateGrpcS.GracefulStop()
publicGrpcS.GracefulStop()
}
}
17 changes: 9 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ type AppConfig struct {

// ServerConfig defines HTTP server configurations
type ServerConfig struct {
Port int `koanf:"port"`
HTTPS struct {
PrivatePort int `koanf:"privateport"`
PublicPort int `koanf:"publicport"`
HTTPS struct {
Cert string `koanf:"cert"`
Key string `koanf:"key"`
}
Expand Down Expand Up @@ -76,19 +77,19 @@ type TemporalConfig struct {

// MgmtBackendConfig related to mgmt-backend
type MgmtBackendConfig struct {
Host string `koanf:"host"`
AdminPort int `koanf:"adminport"`
HTTPS struct {
Host string `koanf:"host"`
PrivatePort int `koanf:"privateport"`
HTTPS struct {
Cert string `koanf:"cert"`
Key string `koanf:"key"`
}
}

// PipelineBackendConfig related to pipeline-backend
type PipelineBackendConfig struct {
Host string `koanf:"host"`
Port int `koanf:"port"`
HTTPS struct {
Host string `koanf:"host"`
PublicPort int `koanf:"publicport"`
HTTPS struct {
Cert string `koanf:"cert"`
Key string `koanf:"key"`
}
Expand Down
7 changes: 4 additions & 3 deletions config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
server:
port: 8082
privateport: 3082
publicport: 8082
https:
cert:
key:
Expand Down Expand Up @@ -31,13 +32,13 @@ temporal:
hostport: temporal:7233
pipelinebackend:
host: pipeline-backend
port: 8081
publicport: 8081
https:
cert:
key:
mgmtbackend:
host: mgmt-backend
adminport: 3084
privateport: 3084
https:
cert:
key:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0
github.com/iancoleman/strcase v0.2.0
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230217111731-b78c700241b2
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230314172914-1976737846cb
github.com/instill-ai/usage-client v0.2.2-alpha
github.com/instill-ai/x v0.2.0-alpha
github.com/jackc/pgx/v5 v5.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1066,8 +1066,8 @@ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230217111731-b78c700241b2 h1:TLK82ewEE54IgE71Er+rY5wq7kXVSS0pQd17C6hW+34=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230217111731-b78c700241b2/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230314172914-1976737846cb h1:CxIAqYo7klOycw/qzTAmQJ0JHn74UuJjUzssPAF4KNw=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230314172914-1976737846cb/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/usage-client v0.2.2-alpha h1:EQyHpgzZ26TEIL9UoaqchTf+LnKaidUGhKlUEFR68I8=
github.com/instill-ai/usage-client v0.2.2-alpha/go.mod h1:RpVnioKQBoJZsE1qTiZlPQUQXUALTGzhBl8ju9rm5+U=
github.com/instill-ai/x v0.2.0-alpha h1:8yszKP9DE8bvSRAtEpOwqhG2wwqU3olhTqhwoiLrHfc=
Expand Down
24 changes: 15 additions & 9 deletions integration-test/const.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
let proto
let pHost, cHost, mHost
let pPort, cPort, mPort
let cPrivatePort, cPublicPort, pPublicPort, mPublicPort

if (__ENV.MODE == "api-gateway") {
// api-gateway mode
proto = "http"
pHost = cHost = mHost = "api-gateway"
pPort = cPort = mPort = 8080
cPrivatePort = 3082
pPublicPort = cPublicPort = mPublicPort = 8080
} else if (__ENV.MODE == "localhost") {
// localhost mode for GitHub Actions
proto = "http"
pHost = cHost = mHost = "localhost"
pPort = cPort = mPort = 8080
cPrivatePort = 3082
pPublicPort = cPublicPort = mPublicPort = 8080
} else {
// direct microservice mode
proto = "http"
pHost = "pipeline-backend"
cHost = "connector-backend"
mHost = "model-backend"
pPort = 8081
cPort = 8082
mPort = 8083
cPrivatePort = 3082
cPublicPort = 8082
pPublicPort = 8081
mPublicPort = 8083
}

export const pipelineHost = `${proto}://${pHost}:${pPort}`;
export const connectorHost = `${proto}://${cHost}:${cPort}`;
export const modelHost = `${proto}://${mHost}:${mPort}`;
export const connectorPrivateHost = `${proto}://${cHost}:${cPrivatePort}`;
export const connectorPublicHost = `${proto}://${cHost}:${cPublicPort}`;
export const connectorGRPCPrivateHost = `${cHost}:${cPrivatePort}`;
export const connectorGRPCPublicHost = `${cHost}:${cPublicPort}`;
export const pipelinePublicHost = `${proto}://${pHost}:${pPublicPort}`;
export const modelPublicHost = `${proto}://${mHost}:${mPublicPort}`;

export const csvDstDefRscName = "destination-connector-definitions/destination-csv"
export const csvDstDefRscPermalink = "destination-connector-definitions/8be1cf83-fde1-477f-a4ad-318d23c9f3c6"
Expand Down

0 comments on commit 4162b4b

Please sign in to comment.