diff --git a/codegen/client.go b/codegen/client.go index 4d1490822..af5e37ece 100644 --- a/codegen/client.go +++ b/codegen/client.go @@ -23,9 +23,9 @@ package codegen import ( "path/filepath" - yaml "github.com/ghodss/yaml" + "github.com/ghodss/yaml" "github.com/pkg/errors" - validator "gopkg.in/validator.v2" + "gopkg.in/validator.v2" ) type clientConfig interface { @@ -231,6 +231,63 @@ type GRPCClientConfig struct { Config *ClientIDLConfig `yaml:"config" json:"config" validate:"nonzero"` } +func newGRPCClientConfig(raw []byte) (*GRPCClientConfig, error) { + config := &GRPCClientConfig{} + if errUnmarshal := yaml.Unmarshal(raw, config); errUnmarshal != nil { + return nil, errors.Wrap( + errUnmarshal, "could not parse gRPC client config data") + } + + validator.SetValidationFunc("exposedMethods", validateExposedMethods) + if errValidate := validator.Validate(config); errValidate != nil { + return nil, errors.Wrap( + errValidate, "grpc client config validation failed") + } + + return config, nil +} + +func newGRPCClientSpec( + clientType string, + config *ClientIDLConfig, + instance *ModuleInstance, + h *PackageHelper, +) (*ClientSpec, error) { + protoFile := filepath.Join(h.ThriftIDLPath(), config.IDLFile) + protoSpec, err := NewProtoModuleSpec(protoFile, false) + if err != nil { + return nil, errors.Wrapf( + err, "could not build proto spec for proto file %s: ", protoFile, + ) + } + + cspec := &ClientSpec{ + ModuleSpec: protoSpec, + YAMLFile: instance.YAMLFileName, + JSONFile: instance.JSONFileName, + ClientType: clientType, + ImportPackagePath: instance.PackageInfo.ImportPackagePath(), + ImportPackageAlias: instance.PackageInfo.ImportPackageAlias(), + ExportName: instance.PackageInfo.ExportName, + ExportType: instance.PackageInfo.ExportType, + ThriftFile: protoFile, + ClientID: instance.InstanceName, + ClientName: instance.PackageInfo.QualifiedInstanceName, + ExposedMethods: config.ExposedMethods, + SidecarRouter: config.SidecarRouter, + } + + return cspec, nil +} + +// NewClientSpec creates a client spec from a client module instance +func (c *GRPCClientConfig) NewClientSpec( + instance *ModuleInstance, + h *PackageHelper, +) (*ClientSpec, error) { + return newGRPCClientSpec(c.Type, c.Config, instance, h) +} + func clientType(raw []byte) (string, error) { clientConfig := ClassConfigBase{} if err := yaml.Unmarshal(raw, &clientConfig); err != nil { @@ -252,6 +309,8 @@ func newClientConfig(raw []byte) (clientConfig, error) { return newHTTPClientConfig(raw) case "tchannel": return newTChannelClientConfig(raw) + case "grpc": + return newGRPCClientConfig(raw) case "custom": return newCustomClientConfig(raw) default: diff --git a/codegen/client_test.go b/codegen/client_test.go index b6fbafd7d..ab658aa6c 100644 --- a/codegen/client_test.go +++ b/codegen/client_test.go @@ -72,6 +72,26 @@ config: exposedMethods: a: method ` + grpcClientYAML = ` +name: test +type: grpc +dependencies: + client: + - a + - b +config: + idlFileSha: idlFileSha + idlFile: clients/echo/echo.proto + customImportPath: path + fixture: + importPath: import + scenarios: + scenario: + - s1 + - s2 + exposedMethods: + a: method +` customClientYAML = ` name: test @@ -107,6 +127,14 @@ func TestNewTChannelClientConfigUnmarshalFilure(t *testing.T) { assert.Equal(t, expectedErr, err.Error()) } +func TestNewGRPCClientConfigUnmarshalFilure(t *testing.T) { + invalidYAML := "{{{" + _, err := newGRPCClientConfig([]byte(invalidYAML)) + expectedErr := "could not parse gRPC client config data: error converting YAML to JSON: yaml: line 1: did not find expected node content" + assert.Error(t, err) + assert.Equal(t, expectedErr, err.Error()) +} + func TestNewCustomClientConfigUnmarshalFilure(t *testing.T) { invalidYAML := "{{{" _, err := newCustomClientConfig([]byte(invalidYAML)) @@ -134,6 +162,7 @@ type: %s func TestNewClientConfigSubConfigMissing(t *testing.T) { doSubConfigMissingTest(t, "http") doSubConfigMissingTest(t, "tchannel") + doSubConfigMissingTest(t, "grpc") doSubConfigMissingTest(t, "custom") } @@ -157,6 +186,7 @@ config: func TestThriftFileMissingValidation(t *testing.T) { doThriftFileMissingTest(t, "http") doThriftFileMissingTest(t, "tchannel") + doThriftFileMissingTest(t, "grpc") } func doThriftFileShaMissingTest(t *testing.T, clientType string) { @@ -175,6 +205,7 @@ config: func TestThriftFileShaMissingValidation(t *testing.T) { doThriftFileShaMissingTest(t, "http") doThriftFileShaMissingTest(t, "tchannel") + doThriftFileShaMissingTest(t, "grpc") } func TestCustomClientRequiresCustomImportPath(t *testing.T) { @@ -215,6 +246,7 @@ config: func TestNewClientConfigDuplicatedMethodsFailure(t *testing.T) { doDuplicatedExposedMethodsTest(t, "http") doDuplicatedExposedMethodsTest(t, "tchannel") + doDuplicatedExposedMethodsTest(t, "grpc") } func TestGetConfigTypeFailure(t *testing.T) { @@ -321,6 +353,34 @@ func TestNewClientConfigGetTChannelClient(t *testing.T) { assert.Equal(t, &expectedClient, client) } +func TestNewClientConfigGetGRPCClient(t *testing.T) { + client, err := newClientConfig([]byte(grpcClientYAML)) + expectedClient := GRPCClientConfig{ + ClassConfigBase: ClassConfigBase{ + Name: "test", + Type: "grpc", + }, + Dependencies: Dependencies{ + Client: []string{"a", "b"}, + }, + Config: &ClientIDLConfig{ + ExposedMethods: map[string]string{ + "a": "method", + }, + IDLFileSha: "idlFileSha", + IDLFile: "clients/echo/echo.proto", + Fixture: &Fixture{ + ImportPath: "import", + Scenarios: map[string][]string{ + "scenario": {"s1", "s2"}, + }, + }, + }, + } + assert.NoError(t, err) + assert.Equal(t, &expectedClient, client) +} + func TestNewClientConfigGetCustomClient(t *testing.T) { client, err := newClientConfig([]byte(customClientYAML)) expectedClient := CustomClientConfig{ @@ -371,21 +431,27 @@ func newTestPackageHelper(t *testing.T) *PackageHelper { } -func TestHTTPClientNewClientSpecFailedWithThriftCompilation(t *testing.T) { - configYAML := ` +func TestClientNewClientSpecFailedWithThriftCompilation(t *testing.T) { + testNewClientSpecFailedWithCompilation(t, "http") + testNewClientSpecFailedWithCompilation(t, "grpc") +} + +func testNewClientSpecFailedWithCompilation(t *testing.T, clientType string) { + configYAML := fmt.Sprintf(` name: test -type: http +type: %s config: idlFileSha: idlFileSha idlFile: NOT_EXIST exposedMethods: a: method -` +`, clientType) client, errClient := newClientConfig([]byte(configYAML)) assert.NoError(t, errClient) h := newTestPackageHelper(t) _, errSpec := client.NewClientSpec(nil /* ModuleInstance */, h) + fmt.Println(errSpec) assert.Error(t, errSpec) } @@ -438,6 +504,45 @@ func TestTChannelClientNewClientSpec(t *testing.T) { doNewClientSpecTest(t, []byte(tchannelClientYAML), "tchannel") } +func TestGRPCClientNewClientSpec(t *testing.T) { + client, errClient := newClientConfig([]byte(grpcClientYAML)) + assert.NoError(t, errClient) + instance := &ModuleInstance{ + YAMLFileName: "YAMLFileName", + JSONFileName: "JSONFileName", + InstanceName: "InstanceName", + PackageInfo: &PackageInfo{ + ExportName: "ExportName", + ExportType: "ExportType", + QualifiedInstanceName: "QualifiedInstanceName", + }, + } + h := newTestPackageHelper(t) + + idlFile := filepath.Join(h.ThriftIDLPath(), "clients/echo/echo.proto") + expectedSpec := &ClientSpec{ + ModuleSpec: nil, + YAMLFile: instance.YAMLFileName, + JSONFile: instance.JSONFileName, + ClientType: "grpc", + ImportPackagePath: instance.PackageInfo.ImportPackagePath(), + ImportPackageAlias: instance.PackageInfo.ImportPackageAlias(), + ExportName: instance.PackageInfo.ExportName, + ExportType: instance.PackageInfo.ExportType, + ThriftFile: idlFile, + ClientID: instance.InstanceName, + ClientName: instance.PackageInfo.QualifiedInstanceName, + ExposedMethods: map[string]string{ + "a": "method", + }, + } + + spec, errSpec := client.NewClientSpec(instance, h) + spec.ModuleSpec = nil // Not interested in ModuleSpec here + assert.NoError(t, errSpec) + assert.Equal(t, expectedSpec, spec) +} + func TestCustomClientNewClientSpec(t *testing.T) { client, errClient := newClientConfig([]byte(customClientYAML)) assert.NoError(t, errClient) diff --git a/codegen/module_system.go b/codegen/module_system.go index 7991482de..b7c09d3ac 100644 --- a/codegen/module_system.go +++ b/codegen/module_system.go @@ -23,12 +23,11 @@ package codegen import ( "encoding/json" "net/textproto" - "path" "path/filepath" "sort" "strings" - yaml "github.com/ghodss/yaml" + "github.com/ghodss/yaml" "github.com/pkg/errors" "go.uber.org/thriftrw/compile" ) @@ -306,7 +305,7 @@ func NewDefaultModuleSystem( ) } - if err := system.RegisterClassType("client", "grpc", &YarpcClientGenerator{ + if err := system.RegisterClassType("client", "grpc", &GRPCClientGenerator{ templates: tmpl, packageHelper: h, }); err != nil { @@ -669,13 +668,13 @@ func (g *TChannelClientGenerator) Generate( // reverse index and validate the exposed methods map func reverseExposedMethods(clientSpec *ClientSpec, instance *ModuleInstance) (map[string]string, error) { reversed := map[string]string{} - for exposedMethod, thriftMethod := range clientSpec.ExposedMethods { - reversed[thriftMethod] = exposedMethod - if !hasMethod(clientSpec, thriftMethod) { + for exposedMethod, idlMethod := range clientSpec.ExposedMethods { + reversed[idlMethod] = exposedMethod + if !hasMethod(clientSpec, idlMethod) { return nil, errors.Errorf( "Invalid exposedMethods for client %q, method %q not found", instance.InstanceName, - thriftMethod, + idlMethod, ) } } @@ -683,12 +682,20 @@ func reverseExposedMethods(clientSpec *ClientSpec, instance *ModuleInstance) (ma return reversed, nil } -func hasMethod(cspec *ClientSpec, thriftMethod string) bool { - segments := strings.Split(thriftMethod, "::") +func hasMethod(cspec *ClientSpec, idlMethod string) bool { + segments := strings.Split(idlMethod, "::") service := segments[0] method := segments[1] - for _, s := range cspec.ModuleSpec.Services { + if cspec.ModuleSpec.Services != nil { + return hasThriftMethod(cspec.ModuleSpec.Services, service, method) + } + + return hasProtoMethod(cspec.ModuleSpec.ProtoServices, service, method) +} + +func hasThriftMethod(thriftSpec []*ServiceSpec, service, method string) bool { + for _, s := range thriftSpec { if s.Name == service { for _, m := range s.Methods { if m.Name == method { @@ -696,7 +703,19 @@ func hasMethod(cspec *ClientSpec, thriftMethod string) bool { } } } + } + return false +} +func hasProtoMethod(protoSpec []*ProtoService, service, method string) bool { + for _, s := range protoSpec { + if s.Name == service { + for _, m := range s.RPC { + if m.Name == method { + return true + } + } + } } return false } @@ -785,60 +804,92 @@ func (g *CustomClientGenerator) Generate( } /* - * yarpc client generator + * gRPC client generator */ -// YarpcClientGenerator generates grpc clients. -type YarpcClientGenerator struct { +// GRPCClientGenerator generates grpc clients. +type GRPCClientGenerator struct { templates *Template packageHelper *PackageHelper } -// ComputeSpec returns the spec for a yarpc client -func (g *YarpcClientGenerator) ComputeSpec( +// ComputeSpec returns the spec for a gRPC client +func (g *GRPCClientGenerator) ComputeSpec( instance *ModuleInstance, ) (interface{}, error) { - return (*ClientSpec)(nil), nil + // Parse the client config from the endpoint YAML file + clientConfig, err := newClientConfig(instance.YAMLFileRaw) + if err != nil { + return nil, errors.Wrapf( + err, + "error reading gRPC client %q YAML config", + instance.InstanceName, + ) + } + + clientSpec, err := clientConfig.NewClientSpec( + instance, + g.packageHelper, + ) + if err != nil { + return nil, errors.Wrapf( + err, + "error initializing gRPCClientSpec for %q", + instance.InstanceName, + ) + } + + return clientSpec, nil } -// Generate returns the yarpc client build result, which contains the files and +// Generate returns the gRPC client build result, which contains the files and // the generated client spec -func (g *YarpcClientGenerator) Generate( +func (g *GRPCClientGenerator) Generate( instance *ModuleInstance, ) (*BuildResult, error) { - clientConfig := &GRPCClientConfig{} - if err := yaml.Unmarshal(instance.YAMLFileRaw, &clientConfig); err != nil { + clientSpecUntyped, err := g.ComputeSpec(instance) + if err != nil { return nil, errors.Wrapf( err, - "Error reading yarpc client %q YAML config", + "error initializing GRPCClientSpec for %q", instance.InstanceName, ) } + clientSpec := clientSpecUntyped.(*ClientSpec) - data := &struct { - Instance *ModuleInstance - GenPkg string - }{ - Instance: instance, + reversedMethods, err := reverseExposedMethods(clientSpec, instance) + if err != nil { + return nil, err } - parts := strings.Split(clientConfig.Config.IDLFile, "/") - genDir := strings.Join(parts[0:len(parts)-1], "/") - - data.GenPkg = path.Join( + parts := strings.Split(clientSpec.ThriftFile, "/") + genDir := strings.Join(parts[len(parts)-3:len(parts)-1], "/") + genPkg := filepath.Join( g.packageHelper.GenCodePackage(), genDir, ) + // @rpatali: Update all struct to use more general field IDLFile instead of thriftFile. + clientMeta := &ClientMeta{ + ProtoServices: clientSpec.ModuleSpec.ProtoServices, + Instance: instance, + ExportName: clientSpec.ExportName, + ExportType: clientSpec.ExportType, + Services: nil, + IncludedPackages: nil, + ClientID: clientSpec.ClientID, + ExposedMethods: reversedMethods, + GenPkg: genPkg, + } client, err := g.templates.ExecTemplate( - "yarpc_client.tmpl", - data, + "grpc_client.tmpl", + clientMeta, g.packageHelper, ) if err != nil { return nil, errors.Wrapf( err, - "Error executing YARPC client template for %q", + "Error executing gRPC client template for %q", instance.InstanceName, ) } @@ -1470,11 +1521,13 @@ type ClientMeta struct { ClientID string IncludedPackages []GoPackageImport Services []*ServiceSpec + ProtoServices []*ProtoService ExposedMethods map[string]string SidecarRouter string Fixture *Fixture StagingReqHeader string DeputyReqHeader string + GenPkg string } func findMethod( diff --git a/codegen/module_test.go b/codegen/module_test.go index 219197aff..cad78af18 100644 --- a/codegen/module_test.go +++ b/codegen/module_test.go @@ -31,13 +31,6 @@ import ( "github.com/stretchr/testify/assert" ) -type handler struct{} - -var testHandler = handler{} -var staticHandler = handler{} -var variableHandler = handler{} -var splatHandler = handler{} - type TestClientSpec struct { Info string } @@ -52,6 +45,9 @@ var tSpec = TestClientSpec{ var hSpec = TestClientSpec{ Info: "http", } +var gSpec = TestClientSpec{ + Info: "grpc", +} type TestHTTPClientGenerator struct{} @@ -85,6 +81,22 @@ func (*TestTChannelClientGenerator) ComputeSpec( return &tSpec, nil } +type TestGRPCClientGenerator struct{} + +func (*TestGRPCClientGenerator) Generate( + instance *ModuleInstance, +) (*BuildResult, error) { + return &BuildResult{ + Spec: &tSpec, + }, nil +} + +func (*TestGRPCClientGenerator) ComputeSpec( + instance *ModuleInstance, +) (interface{}, error) { + return &tSpec, nil +} + type TestHTTPEndpointGenerator struct{} func (*TestHTTPEndpointGenerator) Generate( @@ -147,6 +159,15 @@ func TestExampleService(t *testing.T) { t.Errorf("Unexpected error registering tchannel client class type: %s", err) } + err = moduleSystem.RegisterClassType( + "client", + "grpc", + &TestGRPCClientGenerator{}, + ) + if err != nil { + t.Errorf("Unexpected error regarding grpc client class type :%s", err) + } + err = moduleSystem.RegisterClass(ModuleClass{ Name: "endpoint", NamePlural: "endpoints", @@ -257,6 +278,42 @@ func TestExampleService(t *testing.T) { }, } + expectedGRPCClientInstance := ModuleInstance{ + BaseDirectory: testServiceDir, + ClassName: "client", + ClassType: "grpc", + Directory: "clients/example-example", + InstanceName: "example-grpc", + JSONFileName: "", + YAMLFileName: "client-config.yaml", + PackageInfo: &PackageInfo{ + ExportName: "NewClient", + ExportType: "Client", + GeneratedPackageAlias: "exampleClientGenerated", + GeneratedPackagePath: "github.com/uber/zanzibar/codegen/test-service/build/clients/example-grpc", + IsExportGenerated: true, + PackageAlias: "exampleClientStatic", + PackageName: "exampleClient", + PackagePath: "github.com/uber/zanzibar/codegen/test-service/clients/example-grpc", + }, + Dependencies: []ModuleDependency{ + { + ClassName: "client", + InstanceName: "example-dependency", + }, + }, + ResolvedDependencies: map[string][]*ModuleInstance{ + "client": { + &expectedClientDependency, + }, + }, + RecursiveDependencies: map[string][]*ModuleInstance{ + "client": { + &expectedClientDependency, + }, + }, + } + expectedEmbeddedClient := ModuleInstance{ BaseDirectory: testServiceDir, ClassName: "client", @@ -398,6 +455,7 @@ func TestExampleService(t *testing.T) { &expectedClientInstance, &expectedClientDependency, &expectedEmbeddedClient, + &expectedGRPCClientInstance, } // Note: Stable ordering is not required by POSIX expectedEndpoints := []*ModuleInstance{ @@ -492,6 +550,15 @@ func TestExampleServiceIncremental(t *testing.T) { t.Errorf("Unexpected error registering tchannel client class type: %s", err) } + err = moduleSystem.RegisterClassType( + "client", + "grpc", + &TestGRPCClientGenerator{}, + ) + if err != nil { + t.Errorf("Unexpected error regarding grpc client class type :%s", err) + } + err = moduleSystem.RegisterClass(ModuleClass{ Name: "endpoint", NamePlural: "endpoints", @@ -548,6 +615,10 @@ func TestExampleServiceIncremental(t *testing.T) { ClassName: "client", InstanceName: "example", }, + { + ClassName: "client", + InstanceName: "example-grpc", + }, }, resolvedModules, true, @@ -615,6 +686,42 @@ func TestExampleServiceIncremental(t *testing.T) { }, } + expectedGRPCClientInstance := ModuleInstance{ + BaseDirectory: testServiceDir, + ClassName: "client", + ClassType: "grpc", + Directory: "clients/example-example", + InstanceName: "example-grpc", + JSONFileName: "", + YAMLFileName: "client-config.yaml", + PackageInfo: &PackageInfo{ + ExportName: "NewClient", + ExportType: "Client", + GeneratedPackageAlias: "exampleClientGenerated", + GeneratedPackagePath: "github.com/uber/zanzibar/codegen/test-service/build/clients/example-grpc", + IsExportGenerated: true, + PackageAlias: "exampleClientStatic", + PackageName: "exampleClient", + PackagePath: "github.com/uber/zanzibar/codegen/test-service/clients/example-grpc", + }, + Dependencies: []ModuleDependency{ + { + ClassName: "client", + InstanceName: "example-dependency", + }, + }, + ResolvedDependencies: map[string][]*ModuleInstance{ + "client": { + &expectedClientDependency, + }, + }, + RecursiveDependencies: map[string][]*ModuleInstance{ + "client": { + &expectedClientDependency, + }, + }, + } + expectedHealthEndpointInstance := ModuleInstance{ BaseDirectory: testServiceDir, ClassName: "endpoint", @@ -731,6 +838,7 @@ func TestExampleServiceIncremental(t *testing.T) { expectedClients := []*ModuleInstance{ &expectedClientInstance, + &expectedGRPCClientInstance, } expectedEndpoints := []*ModuleInstance{ &expectedHealthEndpointInstance, @@ -861,7 +969,7 @@ func TestDefaultDependency(t *testing.T) { for className, classInstances := range instances { if className == "client" { - expectedLen := 2 + expectedLen := 3 if len(classInstances) != expectedLen { t.Errorf( "Expected %d client class instance but found %d", @@ -880,7 +988,7 @@ func TestDefaultDependency(t *testing.T) { } for _, instance := range classInstances { - expectedLen = 3 + expectedLen = 4 if len(instance.Dependencies) != expectedLen { t.Errorf( "Expected %s to have %d dependencies but found %d", @@ -889,7 +997,7 @@ func TestDefaultDependency(t *testing.T) { len(instance.Dependencies), ) } - expectedLen = 2 + expectedLen = 3 if len(instance.ResolvedDependencies["client"]) != expectedLen { t.Errorf( "Expected %s to have %d resolved dependencies but found %d", @@ -898,7 +1006,7 @@ func TestDefaultDependency(t *testing.T) { len(instance.ResolvedDependencies["client"]), ) } - expectedLen = 2 + expectedLen = 3 if len(instance.RecursiveDependencies["client"]) != expectedLen { t.Errorf( "Expected %s to have %d recursive dependencies but found %d", @@ -982,7 +1090,7 @@ func TestSingleDefaultDependency(t *testing.T) { for className, classInstances := range instances { if className == "client" { - expectedLen := 2 + expectedLen := 3 if len(classInstances) != expectedLen { t.Errorf( "Expected %d client class instance but found %d", diff --git a/codegen/proto.go b/codegen/proto.go new file mode 100644 index 000000000..90fdc6642 --- /dev/null +++ b/codegen/proto.go @@ -0,0 +1,99 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package codegen + +import ( + "github.com/emicklei/proto" +) + +// ProtoService is an internal representation of Proto service and methods in that service. +type ProtoService struct { + Name string + RPC []*ProtoRPC +} + +// ProtoRPC is an internal representation of Proto RPC method and its request/response types. +type ProtoRPC struct { + Name string + Request *ProtoMessage + Response *ProtoMessage +} + +// ProtoMessage is an internal representation of a Proto Message. +type ProtoMessage struct { + Name string +} + +type visitor struct { + protoServices []*ProtoService +} + +func newVisitor() *visitor { + return &visitor{ + protoServices: make([]*ProtoService, 0), + } +} + +func (v *visitor) Visit(proto *proto.Proto) []*ProtoService { + for _, e := range proto.Elements { + e.Accept(v) + } + return v.protoServices +} + +func (v *visitor) VisitService(e *proto.Service) { + v.protoServices = append(v.protoServices, &ProtoService{ + Name: e.Name, + RPC: make([]*ProtoRPC, 0), + }) + for _, c := range e.Elements { + c.Accept(v) + } +} + +func (v *visitor) VisitRPC(r *proto.RPC) { + s := v.protoServices[len(v.protoServices)-1] + s.RPC = append(s.RPC, &ProtoRPC{ + Name: r.Name, + Request: &ProtoMessage{Name: r.RequestType}, + Response: &ProtoMessage{Name: r.ReturnsType}, + }) +} + +// From the current use case, the following visits are no-op +// since we only require the service, rpc methods and the request/response +// types of those methods. + +func (v *visitor) VisitMessage(e *proto.Message) {} +func (v *visitor) VisitSyntax(e *proto.Syntax) {} +func (v *visitor) VisitPackage(e *proto.Package) {} +func (v *visitor) VisitOption(e *proto.Option) {} +func (v *visitor) VisitImport(e *proto.Import) {} +func (v *visitor) VisitNormalField(e *proto.NormalField) {} +func (v *visitor) VisitEnumField(e *proto.EnumField) {} +func (v *visitor) VisitEnum(e *proto.Enum) {} +func (v *visitor) VisitComment(e *proto.Comment) {} +func (v *visitor) VisitOneof(o *proto.Oneof) {} +func (v *visitor) VisitOneofField(o *proto.OneOfField) {} +func (v *visitor) VisitReserved(r *proto.Reserved) {} +func (v *visitor) VisitMapField(f *proto.MapField) {} +func (v *visitor) VisitGroup(g *proto.Group) {} +func (v *visitor) VisitExtensions(e *proto.Extensions) {} diff --git a/codegen/proto_test.go b/codegen/proto_test.go new file mode 100644 index 000000000..78ef74b7a --- /dev/null +++ b/codegen/proto_test.go @@ -0,0 +1,172 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package codegen + +import ( + "strings" + "testing" + + "github.com/emicklei/proto" + "github.com/stretchr/testify/assert" +) + +const ( + singleServiceSpec = ` + syntax = "proto3"; + package echo; + + message Request { string message = 1; } + message Response { string message = 1; } + + service EchoService { + rpc EchoMethod(Request) returns (Response); + } + ` + multiServiceSpec = ` + syntax = "proto3"; + package echo; + + message Request1 { string message = 1; } + message Response1 { string message = 1; } + message Request2 { string message = 1; } + message Response2 { string message = 1; } + + service EchoService { + rpc EchoMethod1(Request1) returns (Response1); + rpc EchoMethod2(Request2) returns (Response2); + } + ` + mixedServiceSpec = ` + syntax = "proto3"; + package echo; + + message Request1 { string message = 1; } + message Response1 { string message = 1; } + message Response2 { string message = 1; } + + service EchoService { + rpc EchoMethod1(Request1) returns (Response1); + rpc EchoMethod2(Request1) returns (Response2); + } + ` + noServiceSpec = ` + syntax = "proto3"; + package echo; + + message Request { string message = 1; } + message Response { string message = 1; } + ` + emptyServiceSpec = ` + syntax = "proto3"; + package echo; + + message Request { string message = 1; } + message Response { string message = 1; } + + service EchoService {} + ` +) + +var ( + singleServiceSpecList = []*ProtoService{{ + Name: "EchoService", + RPC: []*ProtoRPC{ + { + Name: "EchoMethod", + Request: &ProtoMessage{ + Name: "Request", + }, + Response: &ProtoMessage{ + Name: "Response", + }, + }, + }, + }} + multiServiceSpecList = []*ProtoService{{ + Name: "EchoService", + RPC: []*ProtoRPC{ + { + Name: "EchoMethod1", + Request: &ProtoMessage{ + Name: "Request1", + }, + Response: &ProtoMessage{ + Name: "Response1", + }, + }, + { + Name: "EchoMethod2", + Request: &ProtoMessage{ + Name: "Request2", + }, + Response: &ProtoMessage{ + Name: "Response2", + }, + }, + }, + }} + mixedServiceSpecList = []*ProtoService{{ + Name: "EchoService", + RPC: []*ProtoRPC{ + { + Name: "EchoMethod1", + Request: &ProtoMessage{ + Name: "Request1", + }, + Response: &ProtoMessage{ + Name: "Response1", + }, + }, + { + Name: "EchoMethod2", + Request: &ProtoMessage{ + Name: "Request1", + }, + Response: &ProtoMessage{ + Name: "Response2", + }, + }, + }, + }} + noServiceSpecList = make([]*ProtoService, 0) + emptyServiceSpecList = []*ProtoService{{ + Name: "EchoService", + RPC: make([]*ProtoRPC, 0), + }} +) + +func TestRunner(t *testing.T) { + assertElementMatch(t, singleServiceSpec, singleServiceSpecList) + assertElementMatch(t, multiServiceSpec, multiServiceSpecList) + assertElementMatch(t, mixedServiceSpec, mixedServiceSpecList) + assertElementMatch(t, noServiceSpec, noServiceSpecList) + assertElementMatch(t, emptyServiceSpec, emptyServiceSpecList) +} + +func assertElementMatch(t *testing.T, specRaw string, specParsed []*ProtoService) { + r := strings.NewReader(specRaw) + parser := proto.NewParser(r) + protoSpec, err := parser.Parse() + assert.NoErrorf(t, err, "proto spec parsing failed") + + v := newVisitor().Visit(protoSpec) + assert.ElementsMatch(t, v, specParsed) +} diff --git a/codegen/service.go b/codegen/service.go index 22bb94da9..dec8700c0 100644 --- a/codegen/service.go +++ b/codegen/service.go @@ -22,9 +22,12 @@ package codegen import ( "fmt" + "os" + "path/filepath" "sort" "strings" + "github.com/emicklei/proto" "github.com/pkg/errors" "go.uber.org/thriftrw/compile" ) @@ -34,6 +37,8 @@ type ModuleSpec struct { // CompiledModule is the resolved module from thrift file // that will contain modules and typedefs not directly mounted on AST CompiledModule *compile.Module `json:"omitempty"` + // ProtoModule foo + ProtoModule *proto.Proto // Source thrift file to generate the code. ThriftFile string // Whether the ThriftFile should have annotations or not @@ -47,6 +52,7 @@ type ModuleSpec struct { // Generated imports IncludedPackages []GoPackageImport Services []*ServiceSpec + ProtoServices []*ProtoService } // GoPackageImport ... @@ -71,6 +77,32 @@ type ServiceSpec struct { CompileSpec *compile.ServiceSpec } +// NewProtoModuleSpec returns a specification for a proto module. +func NewProtoModuleSpec(protoFile string, isEndpoint bool) (*ModuleSpec, error) { + reader, err := os.Open(protoFile) + if err != nil { + return nil, errors.Wrap(err, "failed reading proto file") + } + defer func() { _ = reader.Close() }() + + parser := proto.NewParser(reader) + protoModules, err := parser.Parse() + if err != nil { + return nil, errors.Wrap(err, "failed parsing proto file") + } + protoServices := newVisitor().Visit(protoModules) + + moduleSpec := &ModuleSpec{ + ProtoModule: protoModules, + ProtoServices: protoServices, + ThriftFile: protoFile, + WantAnnot: false, + IsEndpoint: isEndpoint, + PackageName: packageName(filepath.Base(protoModules.Filename)), + } + return moduleSpec, nil +} + // NewModuleSpec returns a specification for a thrift module func NewModuleSpec( thrift string, diff --git a/codegen/service_test.go b/codegen/service_test.go index 2139671ea..0141e2ae7 100644 --- a/codegen/service_test.go +++ b/codegen/service_test.go @@ -21,9 +21,12 @@ package codegen_test import ( + "io/ioutil" + "os" "testing" "github.com/stretchr/testify/assert" + "github.com/uber/zanzibar/codegen" ) @@ -33,6 +36,23 @@ func TestModuleSpec(t *testing.T) { assert.NoError(t, err, "unable to parse the thrift file") } +func TestProtoModuleSpec(t *testing.T) { + echoProto := "../examples/example-gateway/idl/clients/echo/echo.proto" + _, err := codegen.NewProtoModuleSpec(echoProto, false) + assert.NoError(t, err, "unable to parse the proto file") +} + +func TestProtoModuleSpecParseError(t *testing.T) { + tmpFile, err := ioutil.TempFile("../examples/example-gateway/idl/clients/", "temp*.proto") + assert.NoError(t, err, "failed to create temp file") + defer os.Remove(tmpFile.Name()) + _, err = tmpFile.WriteString("test") + assert.NoError(t, err, "failed writing to temp file") + + _, err = codegen.NewProtoModuleSpec(tmpFile.Name(), false) + assert.Error(t, err, "failed parsing proto file") +} + func TestExceptionValidation(t *testing.T) { var ( barClientThrift = "../examples/example-gateway/idl/clients/bar/bar.thrift" diff --git a/codegen/template_bundle/template_files.go b/codegen/template_bundle/template_files.go index 0cc5538e2..a6e8defee 100644 --- a/codegen/template_bundle/template_files.go +++ b/codegen/template_bundle/template_files.go @@ -7,6 +7,7 @@ // codegen/templates/endpoint_test.tmpl // codegen/templates/endpoint_test_tchannel_client.tmpl // codegen/templates/fixture_types.tmpl +// codegen/templates/grpc_client.tmpl // codegen/templates/http_client.tmpl // codegen/templates/main.tmpl // codegen/templates/main_test.tmpl @@ -24,7 +25,6 @@ // codegen/templates/workflow.tmpl // codegen/templates/workflow_mock.tmpl // codegen/templates/workflow_mock_clients_type.tmpl -// codegen/templates/yarpc_client.tmpl // DO NOT EDIT! package templates @@ -939,6 +939,185 @@ func fixture_typesTmpl() (*asset, error) { return a, nil } +var _grpc_clientTmpl = []byte(`{{- /* template to render gateway gRPC client code */ -}} +{{- $instance := .Instance }} +{{- $genPkg := .GenPkg }} +package {{$instance.PackageInfo.PackageName}} + +import ( + "context" + + "github.com/afex/hystrix-go/hystrix" + "go.uber.org/yarpc" + + module "{{$instance.PackageInfo.ModulePackagePath}}" + gen "{{$genPkg}}" + zanzibar "github.com/uber/zanzibar/runtime" +) + +{{$clientID := $instance.InstanceName -}} +{{$exposedMethods := .ExposedMethods -}} +{{- $clientName := printf "%sClient" (camel $clientID) }} +{{- $exportName := .ExportName}} + +// Client defines {{$clientID}} client interface. +type Client interface { +{{range $i, $svc := .ProtoServices -}} + {{range $j, $method := $svc.RPC -}} + {{title $method.Name}} ( + ctx context.Context, + request *gen.{{$method.Request.Name}}, + opts ...yarpc.CallOption, + ) (*gen.{{$method.Response.Name}}, error) + {{- end -}} +{{- end}} +} + +// {{$clientName}} is the gRPC client for downstream service. +type {{$clientName}} struct { + client gen.{{pascal $clientID}}YARPCClient + opts *zanzibar.GRPCClientOpts +} + +// NewClient returns a new gRPC client for service {{$clientID}} +func {{$exportName}}(deps *module.Dependencies) Client { + oc := deps.Default.GRPCClientDispatcher.MustOutboundConfig("{{$clientID}}") + var routingKey string + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.routingKey") { + routingKey = deps.Default.Config.MustGetString("clients.{{$clientID}}.routingKey") + } + var requestUUIDHeaderKey string + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.requestUUIDHeaderKey") { + requestUUIDHeaderKey = deps.Default.Config.MustGetString("clients.{{$clientID}}.requestUUIDHeaderKey") + } + timeoutInMS := int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.timeout")) + methodNames := map[string]string{ + {{range $i, $svc := .ProtoServices -}} + {{range $j, $method := $svc.RPC -}} + "{{printf "%s::%s" $svc.Name $method.Name}}": "{{$method.Name}}", + {{- end -}} + {{- end}} + } + return &{{$clientName}}{ + client: gen.New{{pascal $clientID}}YARPCClient(oc), + opts: zanzibar.NewGRPCClientOpts( + deps.Default.Logger, + deps.Default.ContextMetrics, + deps.Default.ContextExtractor, + methodNames, + "{{$clientID}}", // user serviceName + "{{$clientID}}", + routingKey, + requestUUIDHeaderKey, + configureCicruitBreaker(deps, timeoutInMS), + timeoutInMS, + ), + } +} + +func configureCicruitBreaker(deps *module.Dependencies, timeoutVal int) bool { + // circuitBreakerDisabled sets whether circuit-breaker should be disabled + circuitBreakerDisabled := false + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.circuitBreakerDisabled") { + circuitBreakerDisabled = deps.Default.Config.MustGetBoolean("clients.{{$clientID}}.circuitBreakerDisabled") + } + if circuitBreakerDisabled { + return false + } + // sleepWindowInMilliseconds sets the amount of time, after tripping the circuit, + // to reject requests before allowing attempts again to determine if the circuit should again be closed + sleepWindowInMilliseconds := 5000 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.sleepWindowInMilliseconds") { + sleepWindowInMilliseconds = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.sleepWindowInMilliseconds")) + } + // maxConcurrentRequests sets how many requests can be run at the same time, beyond which requests are rejected + maxConcurrentRequests := 20 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.maxConcurrentRequests") { + maxConcurrentRequests = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.maxConcurrentRequests")) + } + // errorPercentThreshold sets the error percentage at or above which the circuit should trip open + errorPercentThreshold := 20 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.errorPercentThreshold") { + errorPercentThreshold = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.errorPercentThreshold")) + } + // requestVolumeThreshold sets a minimum number of requests that will trip the circuit in a rolling window of 10s + // For example, if the value is 20, then if only 19 requests are received in the rolling window of 10 seconds + // the circuit will not trip open even if all 19 failed. + requestVolumeThreshold := 20 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.requestVolumeThreshold") { + requestVolumeThreshold = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.requestVolumeThreshold")) + } + + hystrix.ConfigureCommand("{{$clientID}}", hystrix.CommandConfig{ + MaxConcurrentRequests: maxConcurrentRequests, + ErrorPercentThreshold: errorPercentThreshold, + SleepWindow: sleepWindowInMilliseconds, + RequestVolumeThreshold: requestVolumeThreshold, + Timeout: timeoutVal, + }) + return circuitBreakerDisabled +} + +{{range $i, $svc := .ProtoServices -}} +{{range $j, $method := $svc.RPC -}} +{{if $method.Name -}} +// {{$method.Name}} is a client RPC call for method {{printf "%s::%s" $svc.Name $method.Name}}. +func (e *{{$clientName}}) {{$method.Name}}( + ctx context.Context, + request *gen.{{$method.Request.Name}}, + opts ...yarpc.CallOption, +) (*gen.{{$method.Response.Name}}, error) { + var result *gen.{{$method.Response.Name}} + var err error + + ctx, callHelper := zanzibar.NewGRPCClientCallHelper(ctx, "{{printf "%s::%s" $svc.Name $method.Name}}", e.opts) + + if e.opts.RoutingKey != "" { + opts = append(opts, yarpc.WithRoutingKey(e.opts.RoutingKey)) + } + if e.opts.RequestUUIDHeaderKey != "" { + reqUUID := zanzibar.RequestUUIDFromCtx(ctx) + if reqUUID != "" { + opts = append(opts, yarpc.WithHeader(e.opts.RequestUUIDHeaderKey, reqUUID)) + } + } + ctx, cancel := context.WithTimeout(ctx, e.opts.Timeout) + defer cancel() + + runFunc := e.client.{{$method.Name}} + callHelper.Start() + if e.opts.CircuitBreakerDisabled { + result, err = runFunc(ctx, request, opts...) + } else { + err = hystrix.DoC(ctx, "{{$clientID}}", func(ctx context.Context) error { + result, err = runFunc(ctx, request, opts...) + return err + }, nil) + } + callHelper.Finish(ctx, err) + + return result, err +} +{{end -}} +{{end -}} +{{end}} +`) + +func grpc_clientTmplBytes() ([]byte, error) { + return _grpc_clientTmpl, nil +} + +func grpc_clientTmpl() (*asset, error) { + bytes, err := grpc_clientTmplBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "grpc_client.tmpl", size: 6031, mode: os.FileMode(420), modTime: time.Unix(1, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + var _http_clientTmpl = []byte(`{{- /* template to render edge gateway http client code */ -}} {{- $instance := .Instance }} package {{$instance.PackageInfo.PackageName}} @@ -1715,7 +1894,7 @@ func InitializeDependencies( Config: g.Config, Channel: g.Channel, - YARPCClientDispatcher: g.YAPRCClientDispatcher, + GRPCClientDispatcher: g.GRPCClientDispatcher, } {{range $idx, $className := $instance.DependencyOrder}} @@ -1747,7 +1926,7 @@ func module_initializerTmpl() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "module_initializer.tmpl", size: 2454, mode: os.FileMode(420), modTime: time.Unix(1, 0)} + info := bindataFileInfo{name: "module_initializer.tmpl", size: 2452, mode: os.FileMode(420), modTime: time.Unix(1, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -3327,42 +3506,6 @@ func workflow_mock_clients_typeTmpl() (*asset, error) { return a, nil } -var _yarpc_clientTmpl = []byte(`{{- $instance := .Instance }} -{{- $genPkg := .GenPkg }} -{{- $clientID := $instance.InstanceName -}} - -package {{$instance.PackageInfo.PackageName}} - -import ( - module "{{$instance.PackageInfo.ModulePackagePath}}" - gen "{{$genPkg}}" -) - -// Client defines the {{$clientID}} client interface. -type Client = gen.{{pascal $clientID}}YARPCClient - -// NewClient creates a now {{$clientID}} grpc client, panics if config for {{$clientID}} is missing. -func NewClient(deps *module.Dependencies) Client{ - oc := deps.Default.YARPCClientDispatcher.MustOutboundConfig("{{$clientID}}") - return gen.New{{pascal $clientID}}YARPCClient(oc) -} -`) - -func yarpc_clientTmplBytes() ([]byte, error) { - return _yarpc_clientTmpl, nil -} - -func yarpc_clientTmpl() (*asset, error) { - bytes, err := yarpc_clientTmplBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "yarpc_client.tmpl", size: 620, mode: os.FileMode(420), modTime: time.Unix(1, 0)} - a := &asset{bytes: bytes, info: info} - return a, nil -} - // Asset loads and returns the asset for the given name. // It returns an error if the asset could not be found or // could not be loaded. @@ -3422,6 +3565,7 @@ var _bindata = map[string]func() (*asset, error){ "endpoint_test.tmpl": endpoint_testTmpl, "endpoint_test_tchannel_client.tmpl": endpoint_test_tchannel_clientTmpl, "fixture_types.tmpl": fixture_typesTmpl, + "grpc_client.tmpl": grpc_clientTmpl, "http_client.tmpl": http_clientTmpl, "main.tmpl": mainTmpl, "main_test.tmpl": main_testTmpl, @@ -3439,7 +3583,6 @@ var _bindata = map[string]func() (*asset, error){ "workflow.tmpl": workflowTmpl, "workflow_mock.tmpl": workflow_mockTmpl, "workflow_mock_clients_type.tmpl": workflow_mock_clients_typeTmpl, - "yarpc_client.tmpl": yarpc_clientTmpl, } // AssetDir returns the file names below a certain @@ -3490,6 +3633,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "endpoint_test.tmpl": {endpoint_testTmpl, map[string]*bintree{}}, "endpoint_test_tchannel_client.tmpl": {endpoint_test_tchannel_clientTmpl, map[string]*bintree{}}, "fixture_types.tmpl": {fixture_typesTmpl, map[string]*bintree{}}, + "grpc_client.tmpl": {grpc_clientTmpl, map[string]*bintree{}}, "http_client.tmpl": {http_clientTmpl, map[string]*bintree{}}, "main.tmpl": {mainTmpl, map[string]*bintree{}}, "main_test.tmpl": {main_testTmpl, map[string]*bintree{}}, @@ -3507,7 +3651,6 @@ var _bintree = &bintree{nil, map[string]*bintree{ "workflow.tmpl": {workflowTmpl, map[string]*bintree{}}, "workflow_mock.tmpl": {workflow_mockTmpl, map[string]*bintree{}}, "workflow_mock_clients_type.tmpl": {workflow_mock_clients_typeTmpl, map[string]*bintree{}}, - "yarpc_client.tmpl": {yarpc_clientTmpl, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory diff --git a/codegen/templates/grpc_client.tmpl b/codegen/templates/grpc_client.tmpl new file mode 100644 index 000000000..b02680878 --- /dev/null +++ b/codegen/templates/grpc_client.tmpl @@ -0,0 +1,162 @@ +{{- /* template to render gateway gRPC client code */ -}} +{{- $instance := .Instance }} +{{- $genPkg := .GenPkg }} +package {{$instance.PackageInfo.PackageName}} + +import ( + "context" + + "github.com/afex/hystrix-go/hystrix" + "go.uber.org/yarpc" + + module "{{$instance.PackageInfo.ModulePackagePath}}" + gen "{{$genPkg}}" + zanzibar "github.com/uber/zanzibar/runtime" +) + +{{$clientID := $instance.InstanceName -}} +{{$exposedMethods := .ExposedMethods -}} +{{- $clientName := printf "%sClient" (camel $clientID) }} +{{- $exportName := .ExportName}} + +// Client defines {{$clientID}} client interface. +type Client interface { +{{range $i, $svc := .ProtoServices -}} + {{range $j, $method := $svc.RPC -}} + {{title $method.Name}} ( + ctx context.Context, + request *gen.{{$method.Request.Name}}, + opts ...yarpc.CallOption, + ) (*gen.{{$method.Response.Name}}, error) + {{- end -}} +{{- end}} +} + +// {{$clientName}} is the gRPC client for downstream service. +type {{$clientName}} struct { + client gen.{{pascal $clientID}}YARPCClient + opts *zanzibar.GRPCClientOpts +} + +// NewClient returns a new gRPC client for service {{$clientID}} +func {{$exportName}}(deps *module.Dependencies) Client { + oc := deps.Default.GRPCClientDispatcher.MustOutboundConfig("{{$clientID}}") + var routingKey string + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.routingKey") { + routingKey = deps.Default.Config.MustGetString("clients.{{$clientID}}.routingKey") + } + var requestUUIDHeaderKey string + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.requestUUIDHeaderKey") { + requestUUIDHeaderKey = deps.Default.Config.MustGetString("clients.{{$clientID}}.requestUUIDHeaderKey") + } + timeoutInMS := int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.timeout")) + methodNames := map[string]string{ + {{range $i, $svc := .ProtoServices -}} + {{range $j, $method := $svc.RPC -}} + "{{printf "%s::%s" $svc.Name $method.Name}}": "{{$method.Name}}", + {{- end -}} + {{- end}} + } + return &{{$clientName}}{ + client: gen.New{{pascal $clientID}}YARPCClient(oc), + opts: zanzibar.NewGRPCClientOpts( + deps.Default.Logger, + deps.Default.ContextMetrics, + deps.Default.ContextExtractor, + methodNames, + "{{$clientID}}", // user serviceName + "{{$clientID}}", + routingKey, + requestUUIDHeaderKey, + configureCicruitBreaker(deps, timeoutInMS), + timeoutInMS, + ), + } +} + +func configureCicruitBreaker(deps *module.Dependencies, timeoutVal int) bool { + // circuitBreakerDisabled sets whether circuit-breaker should be disabled + circuitBreakerDisabled := false + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.circuitBreakerDisabled") { + circuitBreakerDisabled = deps.Default.Config.MustGetBoolean("clients.{{$clientID}}.circuitBreakerDisabled") + } + if circuitBreakerDisabled { + return false + } + // sleepWindowInMilliseconds sets the amount of time, after tripping the circuit, + // to reject requests before allowing attempts again to determine if the circuit should again be closed + sleepWindowInMilliseconds := 5000 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.sleepWindowInMilliseconds") { + sleepWindowInMilliseconds = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.sleepWindowInMilliseconds")) + } + // maxConcurrentRequests sets how many requests can be run at the same time, beyond which requests are rejected + maxConcurrentRequests := 20 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.maxConcurrentRequests") { + maxConcurrentRequests = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.maxConcurrentRequests")) + } + // errorPercentThreshold sets the error percentage at or above which the circuit should trip open + errorPercentThreshold := 20 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.errorPercentThreshold") { + errorPercentThreshold = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.errorPercentThreshold")) + } + // requestVolumeThreshold sets a minimum number of requests that will trip the circuit in a rolling window of 10s + // For example, if the value is 20, then if only 19 requests are received in the rolling window of 10 seconds + // the circuit will not trip open even if all 19 failed. + requestVolumeThreshold := 20 + if deps.Default.Config.ContainsKey("clients.{{$clientID}}.requestVolumeThreshold") { + requestVolumeThreshold = int(deps.Default.Config.MustGetInt("clients.{{$clientID}}.requestVolumeThreshold")) + } + + hystrix.ConfigureCommand("{{$clientID}}", hystrix.CommandConfig{ + MaxConcurrentRequests: maxConcurrentRequests, + ErrorPercentThreshold: errorPercentThreshold, + SleepWindow: sleepWindowInMilliseconds, + RequestVolumeThreshold: requestVolumeThreshold, + Timeout: timeoutVal, + }) + return circuitBreakerDisabled +} + +{{range $i, $svc := .ProtoServices -}} +{{range $j, $method := $svc.RPC -}} +{{if $method.Name -}} +// {{$method.Name}} is a client RPC call for method {{printf "%s::%s" $svc.Name $method.Name}}. +func (e *{{$clientName}}) {{$method.Name}}( + ctx context.Context, + request *gen.{{$method.Request.Name}}, + opts ...yarpc.CallOption, +) (*gen.{{$method.Response.Name}}, error) { + var result *gen.{{$method.Response.Name}} + var err error + + ctx, callHelper := zanzibar.NewGRPCClientCallHelper(ctx, "{{printf "%s::%s" $svc.Name $method.Name}}", e.opts) + + if e.opts.RoutingKey != "" { + opts = append(opts, yarpc.WithRoutingKey(e.opts.RoutingKey)) + } + if e.opts.RequestUUIDHeaderKey != "" { + reqUUID := zanzibar.RequestUUIDFromCtx(ctx) + if reqUUID != "" { + opts = append(opts, yarpc.WithHeader(e.opts.RequestUUIDHeaderKey, reqUUID)) + } + } + ctx, cancel := context.WithTimeout(ctx, e.opts.Timeout) + defer cancel() + + runFunc := e.client.{{$method.Name}} + callHelper.Start() + if e.opts.CircuitBreakerDisabled { + result, err = runFunc(ctx, request, opts...) + } else { + err = hystrix.DoC(ctx, "{{$clientID}}", func(ctx context.Context) error { + result, err = runFunc(ctx, request, opts...) + return err + }, nil) + } + callHelper.Finish(ctx, err) + + return result, err +} +{{end -}} +{{end -}} +{{end}} diff --git a/codegen/templates/module_initializer.tmpl b/codegen/templates/module_initializer.tmpl index dc6bab5e9..e7cb8d4c7 100644 --- a/codegen/templates/module_initializer.tmpl +++ b/codegen/templates/module_initializer.tmpl @@ -47,7 +47,7 @@ func InitializeDependencies( Config: g.Config, Channel: g.Channel, - YARPCClientDispatcher: g.YAPRCClientDispatcher, + GRPCClientDispatcher: g.GRPCClientDispatcher, } {{range $idx, $className := $instance.DependencyOrder}} diff --git a/codegen/templates/yarpc_client.tmpl b/codegen/templates/yarpc_client.tmpl deleted file mode 100644 index fc7419faf..000000000 --- a/codegen/templates/yarpc_client.tmpl +++ /dev/null @@ -1,19 +0,0 @@ -{{- $instance := .Instance }} -{{- $genPkg := .GenPkg }} -{{- $clientID := $instance.InstanceName -}} - -package {{$instance.PackageInfo.PackageName}} - -import ( - module "{{$instance.PackageInfo.ModulePackagePath}}" - gen "{{$genPkg}}" -) - -// Client defines the {{$clientID}} client interface. -type Client = gen.{{pascal $clientID}}YARPCClient - -// NewClient creates a now {{$clientID}} grpc client, panics if config for {{$clientID}} is missing. -func NewClient(deps *module.Dependencies) Client{ - oc := deps.Default.YARPCClientDispatcher.MustOutboundConfig("{{$clientID}}") - return gen.New{{pascal $clientID}}YARPCClient(oc) -} diff --git a/codegen/test-service/clients/example-grpc/client-config.yaml b/codegen/test-service/clients/example-grpc/client-config.yaml new file mode 100644 index 000000000..709b42212 --- /dev/null +++ b/codegen/test-service/clients/example-grpc/client-config.yaml @@ -0,0 +1,8 @@ +config: + headers: + x-service-name: example-grpc +dependencies: + client: + - example-dependency +name: example-grpc +type: grpc diff --git a/examples/example-gateway/build/app/demo/services/xyz/module/init.go b/examples/example-gateway/build/app/demo/services/xyz/module/init.go index e8c0c90ff..179f27cfa 100644 --- a/examples/example-gateway/build/app/demo/services/xyz/module/init.go +++ b/examples/example-gateway/build/app/demo/services/xyz/module/init.go @@ -88,7 +88,7 @@ func InitializeDependencies( Config: g.Config, Channel: g.Channel, - YARPCClientDispatcher: g.YAPRCClientDispatcher, + GRPCClientDispatcher: g.GRPCClientDispatcher, } initializedClientDependencies := &ClientDependenciesNodes{} diff --git a/examples/example-gateway/build/clients/echo/echo.go b/examples/example-gateway/build/clients/echo/echo.go index 2f1ef967e..c6267f644 100644 --- a/examples/example-gateway/build/clients/echo/echo.go +++ b/examples/example-gateway/build/clients/echo/echo.go @@ -24,15 +24,140 @@ package echoclient import ( + "context" + + "github.com/afex/hystrix-go/hystrix" + "go.uber.org/yarpc" + module "github.com/uber/zanzibar/examples/example-gateway/build/clients/echo/module" gen "github.com/uber/zanzibar/examples/example-gateway/build/gen-code/clients/echo" + zanzibar "github.com/uber/zanzibar/runtime" ) -// Client defines the echo client interface. -type Client = gen.EchoYARPCClient +// Client defines echo client interface. +type Client interface { + Echo( + ctx context.Context, + request *gen.Request, + opts ...yarpc.CallOption, + ) (*gen.Response, error) +} -// NewClient creates a now echo grpc client, panics if config for echo is missing. +// echoClient is the gRPC client for downstream service. +type echoClient struct { + client gen.EchoYARPCClient + opts *zanzibar.GRPCClientOpts +} + +// NewClient returns a new gRPC client for service echo func NewClient(deps *module.Dependencies) Client { - oc := deps.Default.YARPCClientDispatcher.MustOutboundConfig("echo") - return gen.NewEchoYARPCClient(oc) + oc := deps.Default.GRPCClientDispatcher.MustOutboundConfig("echo") + var routingKey string + if deps.Default.Config.ContainsKey("clients.echo.routingKey") { + routingKey = deps.Default.Config.MustGetString("clients.echo.routingKey") + } + var requestUUIDHeaderKey string + if deps.Default.Config.ContainsKey("clients.echo.requestUUIDHeaderKey") { + requestUUIDHeaderKey = deps.Default.Config.MustGetString("clients.echo.requestUUIDHeaderKey") + } + timeoutInMS := int(deps.Default.Config.MustGetInt("clients.echo.timeout")) + methodNames := map[string]string{ + "Echo::Echo": "Echo", + } + return &echoClient{ + client: gen.NewEchoYARPCClient(oc), + opts: zanzibar.NewGRPCClientOpts( + deps.Default.Logger, + deps.Default.ContextMetrics, + deps.Default.ContextExtractor, + methodNames, + "echo", // user serviceName + "echo", + routingKey, + requestUUIDHeaderKey, + configureCicruitBreaker(deps, timeoutInMS), + timeoutInMS, + ), + } +} + +func configureCicruitBreaker(deps *module.Dependencies, timeoutVal int) bool { + // circuitBreakerDisabled sets whether circuit-breaker should be disabled + circuitBreakerDisabled := false + if deps.Default.Config.ContainsKey("clients.echo.circuitBreakerDisabled") { + circuitBreakerDisabled = deps.Default.Config.MustGetBoolean("clients.echo.circuitBreakerDisabled") + } + if circuitBreakerDisabled { + return false + } + // sleepWindowInMilliseconds sets the amount of time, after tripping the circuit, + // to reject requests before allowing attempts again to determine if the circuit should again be closed + sleepWindowInMilliseconds := 5000 + if deps.Default.Config.ContainsKey("clients.echo.sleepWindowInMilliseconds") { + sleepWindowInMilliseconds = int(deps.Default.Config.MustGetInt("clients.echo.sleepWindowInMilliseconds")) + } + // maxConcurrentRequests sets how many requests can be run at the same time, beyond which requests are rejected + maxConcurrentRequests := 20 + if deps.Default.Config.ContainsKey("clients.echo.maxConcurrentRequests") { + maxConcurrentRequests = int(deps.Default.Config.MustGetInt("clients.echo.maxConcurrentRequests")) + } + // errorPercentThreshold sets the error percentage at or above which the circuit should trip open + errorPercentThreshold := 20 + if deps.Default.Config.ContainsKey("clients.echo.errorPercentThreshold") { + errorPercentThreshold = int(deps.Default.Config.MustGetInt("clients.echo.errorPercentThreshold")) + } + // requestVolumeThreshold sets a minimum number of requests that will trip the circuit in a rolling window of 10s + // For example, if the value is 20, then if only 19 requests are received in the rolling window of 10 seconds + // the circuit will not trip open even if all 19 failed. + requestVolumeThreshold := 20 + if deps.Default.Config.ContainsKey("clients.echo.requestVolumeThreshold") { + requestVolumeThreshold = int(deps.Default.Config.MustGetInt("clients.echo.requestVolumeThreshold")) + } + + hystrix.ConfigureCommand("echo", hystrix.CommandConfig{ + MaxConcurrentRequests: maxConcurrentRequests, + ErrorPercentThreshold: errorPercentThreshold, + SleepWindow: sleepWindowInMilliseconds, + RequestVolumeThreshold: requestVolumeThreshold, + Timeout: timeoutVal, + }) + return circuitBreakerDisabled +} + +// Echo is a client RPC call for method Echo::Echo. +func (e *echoClient) Echo( + ctx context.Context, + request *gen.Request, + opts ...yarpc.CallOption, +) (*gen.Response, error) { + var result *gen.Response + var err error + + ctx, callHelper := zanzibar.NewGRPCClientCallHelper(ctx, "Echo::Echo", e.opts) + + if e.opts.RoutingKey != "" { + opts = append(opts, yarpc.WithRoutingKey(e.opts.RoutingKey)) + } + if e.opts.RequestUUIDHeaderKey != "" { + reqUUID := zanzibar.RequestUUIDFromCtx(ctx) + if reqUUID != "" { + opts = append(opts, yarpc.WithHeader(e.opts.RequestUUIDHeaderKey, reqUUID)) + } + } + ctx, cancel := context.WithTimeout(ctx, e.opts.Timeout) + defer cancel() + + runFunc := e.client.Echo + callHelper.Start() + if e.opts.CircuitBreakerDisabled { + result, err = runFunc(ctx, request, opts...) + } else { + err = hystrix.DoC(ctx, "echo", func(ctx context.Context) error { + result, err = runFunc(ctx, request, opts...) + return err + }, nil) + } + callHelper.Finish(ctx, err) + + return result, err } diff --git a/examples/example-gateway/build/services/echo-gateway/module/init.go b/examples/example-gateway/build/services/echo-gateway/module/init.go index 45a3af493..f92b5cd8e 100644 --- a/examples/example-gateway/build/services/echo-gateway/module/init.go +++ b/examples/example-gateway/build/services/echo-gateway/module/init.go @@ -85,7 +85,7 @@ func InitializeDependencies( Config: g.Config, Channel: g.Channel, - YARPCClientDispatcher: g.YAPRCClientDispatcher, + GRPCClientDispatcher: g.GRPCClientDispatcher, } initializedClientDependencies := &ClientDependenciesNodes{} diff --git a/examples/example-gateway/build/services/example-gateway/module/init.go b/examples/example-gateway/build/services/example-gateway/module/init.go index 67570a43d..af76868ad 100644 --- a/examples/example-gateway/build/services/example-gateway/module/init.go +++ b/examples/example-gateway/build/services/example-gateway/module/init.go @@ -130,7 +130,7 @@ func InitializeDependencies( Config: g.Config, Channel: g.Channel, - YARPCClientDispatcher: g.YAPRCClientDispatcher, + GRPCClientDispatcher: g.GRPCClientDispatcher, } initializedClientDependencies := &ClientDependenciesNodes{} diff --git a/examples/example-gateway/clients/echo/client-config.yaml b/examples/example-gateway/clients/echo/client-config.yaml index f839d9677..bc27aa61b 100644 --- a/examples/example-gateway/clients/echo/client-config.yaml +++ b/examples/example-gateway/clients/echo/client-config.yaml @@ -2,3 +2,5 @@ name: echo type: grpc config: idlFile: clients/echo/echo.proto + exposedMethods: + "EchoEcho": "Echo::Echo" diff --git a/examples/example-gateway/config/test.yaml b/examples/example-gateway/config/test.yaml index 20f2f64ef..84b61a091 100644 --- a/examples/example-gateway/config/test.yaml +++ b/examples/example-gateway/config/test.yaml @@ -22,6 +22,7 @@ clients.corge.timeout: 10000 clients.corge.timeoutPerAttempt: 2000 clients.corge.maxConcurrentRequests: 1000 clients.corge.errorPercentThreshold: 20 +clients.echo.timeout: 20 clients.google-now.ip: 127.0.0.1 clients.google-now.port: 14120 clients.google-now.timeout: 10000 diff --git a/glide.lock b/glide.lock index d9282f2ba..6457a45c0 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 4492c157f8db00ea300b73b2f96ab088282ae275a241391eaf5101d828d7049b -updated: 2019-08-21T16:07:47.235227-07:00 +hash: 88d305204a739687702bbb8e6e1395eb0b3496bd67541e187f36d9d64dc7b590 +updated: 2019-08-23T00:40:19.337162-07:00 imports: - name: github.com/afex/hystrix-go version: fa1af6a1f4f56e0e50d427fe901cd604d8c6fb8a @@ -23,6 +23,8 @@ imports: version: d8f796af33cc11cb798c1aaeb27a4ebc5099927d subpackages: - spew +- name: github.com/emicklei/proto + version: 81cafc1ff2e77b4342b32f8824cfb5b722bfd79c - name: github.com/fatih/structtag version: 76ae1d6d2117609598c7d4e8f3e938145f204e8f - name: github.com/ghodss/yaml @@ -337,7 +339,7 @@ imports: - unicode/bidi - unicode/norm - name: golang.org/x/tools - version: 547ecf7b1ef191ac7fb91078460aa55f0fb4416d + version: 65e3620a7ae7ac25e8494a60f0e5ef4e4fba03b3 subpackages: - cmd/goimports - cmd/stringer diff --git a/glide.yaml b/glide.yaml index d3ae46ccd..3d2e12249 100644 --- a/glide.yaml +++ b/glide.yaml @@ -84,4 +84,6 @@ import: subpackages: - prometheus - prometheus/internal - - prometheus/promhttp \ No newline at end of file + - prometheus/promhttp + +- package: github.com/emicklei/proto diff --git a/runtime/gateway.go b/runtime/gateway.go index ac6321b80..b56d32c77 100644 --- a/runtime/gateway.go +++ b/runtime/gateway.go @@ -103,8 +103,8 @@ type Gateway struct { TChannelRouter *TChannelRouter Tracer opentracing.Tracer - // Yarpc client dispatcher for yarpc client lifecycle management - YAPRCClientDispatcher *yarpc.Dispatcher + // gRPC client dispatcher for gRPC client lifecycle management + GRPCClientDispatcher *yarpc.Dispatcher atomLevel *zap.AtomicLevel loggerFile *os.File @@ -137,8 +137,8 @@ type DefaultDependencies struct { Config *StaticConfig Channel *tchannel.Channel - // dispatcher for managing yarpc(grpc) clients - YARPCClientDispatcher *yarpc.Dispatcher + // dispatcher for managing gRPC clients + GRPCClientDispatcher *yarpc.Dispatcher } // CreateGateway func @@ -225,8 +225,8 @@ func CreateGateway( return nil, err } - // setup YARPC client dispatcher afrer metrics, logger and tracer - if err := gateway.setupYARPCClientDispatcher(config); err != nil { + // setup gRPC client dispatcher after metrics, logger and tracer + if err := gateway.setupGRPCClientDispatcher(config); err != nil { return nil, err } @@ -294,9 +294,9 @@ func (gateway *Gateway) Bootstrap() error { gateway.RootScope.Counter("startup.success").Inc(1) - err = gateway.YAPRCClientDispatcher.Start() + err = gateway.GRPCClientDispatcher.Start() if err != nil { - gateway.Logger.Error("Error starting YARPC client dispatcher", zap.Error(err)) + gateway.Logger.Error("error starting gRPC client dispatcher", zap.Error(err)) return err } @@ -401,8 +401,8 @@ func (gateway *Gateway) Shutdown() { swg.Add(1) go func() { defer swg.Done() - if err := gateway.YAPRCClientDispatcher.Stop(); err != nil { - ec <- errors.Wrap(err, "error stopping yarpc client dispatcher") + if err := gateway.GRPCClientDispatcher.Stop(); err != nil { + ec <- errors.Wrap(err, "error stopping gRPC client dispatcher") } }() @@ -788,7 +788,7 @@ func (gateway *Gateway) setupTChannel(config *StaticConfig) error { return nil } -func (gateway *Gateway) setupYARPCClientDispatcher(config *StaticConfig) error { +func (gateway *Gateway) setupGRPCClientDispatcher(config *StaticConfig) error { ip := config.MustGetString("sidecarRouter.default.grpc.ip") port := config.MustGetInt("sidecarRouter.default.grpc.port") address := fmt.Sprintf("%s:%d", ip, port) @@ -823,7 +823,7 @@ func (gateway *Gateway) setupYARPCClientDispatcher(config *StaticConfig) error { Tally: gateway.RootScope, }, }) - gateway.YAPRCClientDispatcher = dispatcher + gateway.GRPCClientDispatcher = dispatcher return nil } diff --git a/runtime/grpc_client.go b/runtime/grpc_client.go new file mode 100644 index 000000000..3f2f77949 --- /dev/null +++ b/runtime/grpc_client.go @@ -0,0 +1,158 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package zanzibar + +import ( + "context" + "strings" + "time" + + "go.uber.org/yarpc/yarpcerrors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// GRPCClientOpts used to configure various client options. +type GRPCClientOpts struct { + ServiceName string + ClientID string + MethodNames map[string]string + Loggers map[string]*zap.Logger + Metrics ContextMetrics + ContextExtractor ContextExtractor + RoutingKey string + RequestUUIDHeaderKey string + CircuitBreakerDisabled bool + Timeout time.Duration + ScopeTags map[string]map[string]string +} + +// NewGRPCClientOpts creates a new instance of GRPCClientOpts. +func NewGRPCClientOpts( + logger *zap.Logger, + metrics ContextMetrics, + contextExtractor ContextExtractor, + methodNames map[string]string, + clientID, serviceName, routingKey, requestUUIDHeaderKey string, + circuitBreakerDisabled bool, + timeoutInMS int, +) *GRPCClientOpts { + numMethods := len(methodNames) + loggers := make(map[string]*zap.Logger, numMethods) + for serviceMethod, methodName := range methodNames { + loggers[serviceMethod] = logger.With( + zap.String(logFieldClientID, clientID), + zap.String(logFieldClientMethod, methodName), + zap.String(logFieldClientService, serviceName), + ) + } + scopeTags := make(map[string]map[string]string) + for serviceMethod, methodName := range methodNames { + scopeTags[serviceMethod] = map[string]string{ + scopeTagClient: clientID, + scopeTagClientMethod: methodName, + scopeTagsTargetService: serviceName, + } + } + return &GRPCClientOpts{ + ServiceName: serviceName, + ClientID: clientID, + MethodNames: methodNames, + Loggers: loggers, + Metrics: metrics, + ContextExtractor: contextExtractor, + RoutingKey: routingKey, + RequestUUIDHeaderKey: requestUUIDHeaderKey, + CircuitBreakerDisabled: circuitBreakerDisabled, + Timeout: time.Duration(timeoutInMS) * time.Millisecond, + ScopeTags: scopeTags, + } +} + +// GRPCClientCallHelper is used to track internal state of logging and metrics. +type GRPCClientCallHelper interface { + // Start method should be used just before calling the actual gRPC client method call. + Start() + // Finish method should be used right after the actual call to gRPC client method. + Finish(ctx context.Context, err error) context.Context +} + +type callHelper struct { + startTime time.Time + finishTime time.Time + logger *zap.Logger + metrics ContextMetrics + extractor ContextExtractor +} + +// NewGRPCClientCallHelper used to initialize a helper that will +// be used to track logging and metric for a gRPC Client call. +func NewGRPCClientCallHelper(ctx context.Context, serviceMethod string, opts *GRPCClientOpts) (context.Context, GRPCClientCallHelper) { + ctx = WithScopeTags(ctx, opts.ScopeTags[serviceMethod]) + return ctx, &callHelper{ + logger: opts.Loggers[serviceMethod], + metrics: opts.Metrics, + extractor: opts.ContextExtractor, + } +} + +// Start method should be used just before calling the actual gRPC client method call. +// This method starts a timer used for metric. +func (c *callHelper) Start() { + c.startTime = time.Now() +} + +// Finish method should be used right after the actual call to gRPC client method. +// This method emits latency and error metric as well as logging in case of error. +func (c *callHelper) Finish(ctx context.Context, err error) context.Context { + c.finishTime = time.Now() + c.metrics.RecordTimer(ctx, clientLatency, c.finishTime.Sub(c.startTime)) + fields := []zapcore.Field{ + zap.Time(logFieldRequestStartTime, c.startTime), + zap.Time(logFieldRequestFinishedTime, c.finishTime), + } + ctx = WithEndpointRequestHeadersField(ctx, map[string]string{}) + if c.extractor != nil { + fields = append(fields, c.extractor.ExtractLogFields(ctx)...) + } + fields = append(fields, GetLogFieldsFromCtx(ctx)...) + if err != nil { + if yarpcerrors.IsStatus(err) { + yarpcErr := yarpcerrors.FromError(err) + errCode := strings.Builder{} + errCode.WriteString("client.errors.") + errCode.WriteString(yarpcErr.Code().String()) + c.metrics.IncCounter(ctx, errCode.String(), 1) + + fields = append(fields, zap.Int("code", int(yarpcErr.Code()))) + fields = append(fields, zap.String("message", yarpcErr.Message())) + fields = append(fields, zap.String("name", yarpcErr.Name())) + } else { + fields = append(fields, zap.Error(err)) + } + c.metrics.IncCounter(ctx, "client.errors", 1) + c.logger.Warn("Failed to send outgoing client gRPC request", fields...) + return ctx + } + c.logger.Info("Finished an outgoing client gRPC request", fields...) + c.metrics.IncCounter(ctx, "client.success", 1) + return ctx +} diff --git a/runtime/grpc_client_test.go b/runtime/grpc_client_test.go new file mode 100644 index 000000000..165d7f96d --- /dev/null +++ b/runtime/grpc_client_test.go @@ -0,0 +1,164 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package zanzibar + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + "go.uber.org/yarpc/yarpcerrors" + "go.uber.org/zap" + "golang.org/x/net/context" +) + +const ( + serviceName = "Echo" + clientID = "Echo" + methodName = "Echo" + routingKey = "routingKey" + requestUUIDHeaderKey = "reqID" + circuitBreakerDisabled = false + timeoutInMS = 10 + serviceMethod = "Echo::Echo" +) + +var ( + scopeExtractor = func(ctx context.Context) map[string]string { + tags := map[string]string{} + headers := GetEndpointRequestHeadersFromCtx(ctx) + tags["regionname"] = headers["Regionname"] + tags["device"] = headers["Device"] + tags["deviceversion"] = headers["Deviceversion"] + + return tags + } + logFieldsExtractors = func(ctx context.Context) []zap.Field { + reqHeaders := GetEndpointRequestHeadersFromCtx(ctx) + fields := make([]zap.Field, 0, len(reqHeaders)) + for k, v := range reqHeaders { + fields = append(fields, zap.String(k, v)) + } + return fields + } + logger = zap.NewNop() + metrics = NewContextMetrics(tally.NoopScope) + extractors = &ContextExtractors{ + ScopeTagsExtractors: []ContextScopeTagsExtractor{scopeExtractor}, + LogFieldsExtractors: []ContextLogFieldsExtractor{logFieldsExtractors}, + } + methodNames = map[string]string{ + serviceMethod: methodName, + } + expectedTimeout = time.Duration(timeoutInMS) * time.Millisecond + expectedLoggers = map[string]*zap.Logger{ + serviceMethod: logger, + } + expectedScopeTags = map[string]map[string]string{ + serviceMethod: { + scopeTagClient: clientID, + scopeTagClientMethod: methodName, + scopeTagsTargetService: serviceName, + }, + } +) + +func TestNewGRPCClientOpts(t *testing.T) { + actual := NewGRPCClientOpts( + logger, + metrics, + extractors, + methodNames, + clientID, + serviceName, + routingKey, + requestUUIDHeaderKey, + circuitBreakerDisabled, + timeoutInMS, + ) + expected := &GRPCClientOpts{ + serviceName, + clientID, + methodNames, + expectedLoggers, + metrics, + extractors, + routingKey, + requestUUIDHeaderKey, + circuitBreakerDisabled, + expectedTimeout, + expectedScopeTags, + } + assert.Equal(t, expected, actual) +} + +func TestGRPCCallHelper(t *testing.T) { + ctx := context.Background() + opts := NewGRPCClientOpts( + logger, + metrics, + extractors, + methodNames, + clientID, + serviceName, + routingKey, + requestUUIDHeaderKey, + circuitBreakerDisabled, + timeoutInMS, + ) + _, actual := NewGRPCClientCallHelper(ctx, serviceMethod, opts) + expected := &callHelper{ + logger: expectedLoggers[serviceMethod], + metrics: metrics, + extractor: extractors, + } + assert.Equal(t, expected, actual) +} + +func testCallHelper(t *testing.T, err error) { + helper := &callHelper{ + logger: logger, + metrics: metrics, + extractor: extractors, + } + + assert.Zero(t, helper.startTime, "startTime not initialized to zero") + assert.Zero(t, helper.finishTime, "finishTime not initialized to zero") + helper.Start() + assert.NotZero(t, helper.startTime, "startTime didn't update after calling Start()") + assert.Zero(t, helper.finishTime, "finishTime update after calling Start()") + + // Adding sleep just to make sure startTime and finishTime are never same. + time.Sleep(10 * time.Millisecond) + + ctx := context.Background() + helper.Finish(ctx, err) + assert.NotZero(t, helper.startTime, "startTime initialized to zero calling Finish()") + assert.NotZero(t, helper.finishTime, "finishTime initialized to zero calling Finish()") +} + +func TestCallHelper(t *testing.T) { + testCallHelper(t, nil) + testCallHelper(t, errors.New("mock error")) + testCallHelper(t, yarpcerrors.Newf(1, "CodeCancelled")) +} diff --git a/runtime/static_config.go b/runtime/static_config.go index 7bd227c4d..5d8286e7a 100644 --- a/runtime/static_config.go +++ b/runtime/static_config.go @@ -142,9 +142,6 @@ func (conf *StaticConfig) MustGetFloat(key string) float64 { } if value, contains := conf.configValues[key]; contains { - if v, ok := value.(int); ok { - return float64(v) - } return value.(float64) } @@ -159,9 +156,6 @@ func mustConvertableToInt(value interface{}, key string) int64 { } return int64(v) } - if v, ok := value.(int); ok { - return int64(v) - } return value.(int64) } diff --git a/test/clients/echo/echo_test.go b/test/clients/echo/echo_test.go index f910fbdbf..aa052b022 100644 --- a/test/clients/echo/echo_test.go +++ b/test/clients/echo/echo_test.go @@ -29,14 +29,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/zanzibar/config" - "github.com/uber/zanzibar/examples/example-gateway/build/clients/echo" - "github.com/uber/zanzibar/examples/example-gateway/build/clients/echo/module" - "github.com/uber/zanzibar/examples/example-gateway/build/gen-code/clients/echo" - "github.com/uber/zanzibar/runtime" + "github.com/uber-go/tally" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/transport/grpc" + "go.uber.org/zap" + + "github.com/uber/zanzibar/config" + echoclient "github.com/uber/zanzibar/examples/example-gateway/build/clients/echo" + "github.com/uber/zanzibar/examples/example-gateway/build/clients/echo/module" + "github.com/uber/zanzibar/examples/example-gateway/build/gen-code/clients/echo" + zanzibar "github.com/uber/zanzibar/runtime" ) type echoServer struct{} @@ -77,7 +80,10 @@ func TestEcho(t *testing.T) { client := echoclient.NewClient(&module.Dependencies{ Default: &zanzibar.DefaultDependencies{ - YARPCClientDispatcher: dispatcher, + GRPCClientDispatcher: dispatcher, + Config: sc, + Logger: zap.NewNop(), + ContextMetrics: zanzibar.NewContextMetrics(tally.NewTestScope("", nil)), }, })