diff --git a/client/buffer.go b/client/buffer.go deleted file mode 100644 index 72c3c529d7..0000000000 --- a/client/buffer.go +++ /dev/null @@ -1,13 +0,0 @@ -package client - -import ( - "io" -) - -type buffer struct { - io.ReadWriter -} - -func (b *buffer) Close() error { - return nil -} diff --git a/client/client.go b/client/client.go index bb4053258f..bdb8799eaa 100644 --- a/client/client.go +++ b/client/client.go @@ -1,33 +1,15 @@ package client +type RequestFunc func(address string) (err error) + type Client interface { - NewRequest(string, string, interface{}) Request - NewProtoRequest(string, string, interface{}) Request - NewJsonRequest(string, string, interface{}) Request - Call(interface{}, interface{}) error - CallRemote(string, string, interface{}, interface{}) error + NewRequest(service string, f RequestFunc) error } var ( - client = NewRpcClient() + client = NewGRPCClient() ) -func Call(request Request, response interface{}) error { - return client.Call(request, response) -} - -func CallRemote(address, path string, request Request, response interface{}) error { - return client.CallRemote(address, path, request, response) -} - -func NewRequest(service, method string, request interface{}) Request { - return client.NewRequest(service, method, request) -} - -func NewProtoRequest(service, method string, request interface{}) Request { - return client.NewProtoRequest(service, method, request) -} - -func NewJsonRequest(service, method string, request interface{}) Request { - return client.NewJsonRequest(service, method, request) +func NewRequest(service string, f RequestFunc) error { + return client.NewRequest(service, f) } diff --git a/client/grpc_client.go b/client/grpc_client.go new file mode 100644 index 0000000000..2402fbf254 --- /dev/null +++ b/client/grpc_client.go @@ -0,0 +1,56 @@ +package client + +import ( + "fmt" + "github.com/asim/go-micro/registry" + "math/rand" + "net/http" + "time" + + "github.com/asim/go-micro/errors" + "google.golang.org/grpc" +) + +type headerRoundTripper struct { + r http.RoundTripper +} + +type GRPCClient struct{} + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func (r *GRPCClient) NewRequest(serviceName string, f RequestFunc) error { + service, err := registry.GetService(serviceName) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + if len(service.Nodes()) == 0 { + return errors.NotFound("go.micro.client", "Service not found") + } + + n := rand.Int() % len(service.Nodes()) + node := service.Nodes()[n] + address := fmt.Sprintf("%s:%d", node.Address(), node.Port()) + + return f(address) +} + +func NewGRPCClient() *GRPCClient { + return &GRPCClient{} +} + +func GRPCRequest(f func(cc *grpc.ClientConn) error) RequestFunc { + return func(address string) error { + fmt.Println(address) + cc, err := grpc.Dial(address) + if err != nil { + return err + } + defer cc.Close() + + return f(cc) + } +} diff --git a/client/request.go b/client/request.go deleted file mode 100644 index 9cbe919cdb..0000000000 --- a/client/request.go +++ /dev/null @@ -1,9 +0,0 @@ -package client - -type Request interface { - Service() string - Method() string - ContentType() string - Request() interface{} - Headers() Headers -} diff --git a/client/rpc_client.go b/client/rpc_client.go deleted file mode 100644 index 438a642376..0000000000 --- a/client/rpc_client.go +++ /dev/null @@ -1,157 +0,0 @@ -package client - -import ( - "bytes" - "fmt" - "io/ioutil" - "math/rand" - "net/http" - "net/url" - "time" - - "github.com/asim/go-micro/errors" - "github.com/asim/go-micro/registry" - rpc "github.com/youtube/vitess/go/rpcplus" - js "github.com/youtube/vitess/go/rpcplus/jsonrpc" - pb "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -type headerRoundTripper struct { - r http.RoundTripper -} - -type RpcClient struct{} - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -func (t *headerRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - r.Header.Set("X-Client-Version", "1.0") - return t.r.RoundTrip(r) -} - -func (r *RpcClient) call(address, path string, request Request, response interface{}) error { - pReq := &rpc.Request{ - ServiceMethod: request.Method(), - } - - reqB := bytes.NewBuffer(nil) - defer reqB.Reset() - buf := &buffer{ - reqB, - } - - var cc rpc.ClientCodec - switch request.ContentType() { - case "application/octet-stream": - cc = pb.NewClientCodec(buf) - case "application/json": - cc = js.NewClientCodec(buf) - default: - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Unsupported request type: %s", request.ContentType())) - } - - err := cc.WriteRequest(pReq, request.Request()) - if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error writing request: %v", err)) - } - - client := &http.Client{} - client.Transport = &headerRoundTripper{http.DefaultTransport} - - request.Headers().Set("Content-Type", request.ContentType()) - - hreq := &http.Request{ - Method: "POST", - URL: &url.URL{ - Scheme: "http", - Host: address, - Path: path, - }, - Header: request.Headers().(http.Header), - Body: buf, - ContentLength: int64(reqB.Len()), - Host: address, - } - - rsp, err := client.Do(hreq) - if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) - } - defer rsp.Body.Close() - - b, err := ioutil.ReadAll(rsp.Body) - if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error reading response: %v", err)) - } - - rspB := bytes.NewBuffer(b) - defer rspB.Reset() - rBuf := &buffer{ - rspB, - } - - switch rsp.Header.Get("Content-Type") { - case "application/octet-stream": - cc = pb.NewClientCodec(rBuf) - case "application/json": - cc = js.NewClientCodec(rBuf) - default: - return errors.InternalServerError("go.micro.client", string(b)) - } - - pRsp := &rpc.Response{} - err = cc.ReadResponseHeader(pRsp) - if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error reading response headers: %v", err)) - } - - if len(pRsp.Error) > 0 { - return errors.Parse(pRsp.Error) - } - - err = cc.ReadResponseBody(response) - if err != nil { - return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error reading response body: %v", err)) - } - - return nil -} - -func (r *RpcClient) CallRemote(address, path string, request Request, response interface{}) error { - return r.call(address, path, request, response) -} - -// TODO: Call(..., opts *Options) error { -func (r *RpcClient) Call(request Request, response interface{}) error { - service, err := registry.GetService(request.Service()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - if len(service.Nodes()) == 0 { - return errors.NotFound("go.micro.client", "Service not found") - } - - n := rand.Int() % len(service.Nodes()) - node := service.Nodes()[n] - address := fmt.Sprintf("%s:%d", node.Address(), node.Port()) - return r.call(address, "/_rpc", request, response) -} - -func (r *RpcClient) NewRequest(service, method string, request interface{}) *RpcRequest { - return r.NewProtoRequest(service, method, request) -} - -func (r *RpcClient) NewProtoRequest(service, method string, request interface{}) *RpcRequest { - return newRpcRequest(service, method, request, "application/octet-stream") -} - -func (r *RpcClient) NewJsonRequest(service, method string, request interface{}) *RpcRequest { - return newRpcRequest(service, method, request, "application/json") -} - -func NewRpcClient() *RpcClient { - return &RpcClient{} -} diff --git a/client/rpc_request.go b/client/rpc_request.go deleted file mode 100644 index e95d4ac351..0000000000 --- a/client/rpc_request.go +++ /dev/null @@ -1,45 +0,0 @@ -package client - -import ( - "net/http" -) - -type RpcRequest struct { - service, method, contentType string - request interface{} - headers http.Header -} - -func newRpcRequest(service, method string, request interface{}, contentType string) *RpcRequest { - return &RpcRequest{ - service: service, - method: method, - request: request, - contentType: contentType, - headers: make(http.Header), - } -} - -func (r *RpcRequest) ContentType() string { - return r.contentType -} - -func (r *RpcRequest) Headers() Headers { - return r.headers -} - -func (r *RpcRequest) Service() string { - return r.service -} - -func (r *RpcRequest) Method() string { - return r.method -} - -func (r *RpcRequest) Request() interface{} { - return r.request -} - -func NewRpcRequest(service, method string, request interface{}, contentType string) *RpcRequest { - return newRpcRequest(service, method, request, contentType) -} diff --git a/examples/greeter-client/greeter-client b/examples/greeter-client/greeter-client new file mode 100755 index 0000000000..d01ac5584f Binary files /dev/null and b/examples/greeter-client/greeter-client differ diff --git a/examples/greeter-client/main.go b/examples/greeter-client/main.go new file mode 100644 index 0000000000..1ca90d05b5 --- /dev/null +++ b/examples/greeter-client/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "os" + + "github.com/asim/go-micro/client" + log "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + pb "github.com/asim/go-micro/examples/greeter-service/proto" +) + +const ( + defaultName = "world" +) + +func main() { + name := defaultName + if len(os.Args) > 1 { + name = os.Args[1] + } + + // Create new request to service go.micro.service.go-template + var r *pb.HelloReply + err := client.NewRequest("go.micro.service.greeter", client.GRPCRequest(func(cc *grpc.ClientConn) (err error) { + c := pb.NewGreeterClient(cc) + r, err = c.SayHello(context.Background(), &pb.HelloRequest{Name: name}) + return + })) + if err != nil { + log.Fatalf("could not greet: %v", err) + } + + log.Infof("Greeting: %s", r.Message) + +} diff --git a/examples/greeter-service/Dockerfile b/examples/greeter-service/Dockerfile new file mode 100644 index 0000000000..4149ef7133 --- /dev/null +++ b/examples/greeter-service/Dockerfile @@ -0,0 +1,3 @@ +FROM scratch +ADD template / +ENTRYPOINT [ "/template" ] diff --git a/examples/greeter-service/README.md b/examples/greeter-service/README.md new file mode 100644 index 0000000000..1531841328 --- /dev/null +++ b/examples/greeter-service/README.md @@ -0,0 +1,30 @@ +# Template Service + +An example Go service running with go-micro + +### Prerequisites + +Install Consul +[https://www.consul.io/intro/getting-started/install.html](https://www.consul.io/intro/getting-started/install.html) + +Run Consul +``` +$ consul agent -server -bootstrap-expect 1 -data-dir /tmp/consul +``` + +Run Service +``` +$ go run main.go + +1416690099281057746 [Debug] Rpc handler /_rpc +1416690099281092588 [Debug] Starting server go.micro.service.template id go.micro.service.template-c0bfcb44-728a-11e4-b099-68a86d0d36b6 +1416690099281192941 [Debug] Listening on [::]:58264 +1416690099281215346 [Debug] Registering go.micro.service.template-c0bfcb44-728a-11e4-b099-68a86d0d36b6 +``` + +Test Service +``` +$ go run go-micro/examples/service_client.go + +go.micro.service.template-c0bfcb44-728a-11e4-b099-68a86d0d36b6: Hello John +``` diff --git a/examples/greeter-service/greeter-service b/examples/greeter-service/greeter-service new file mode 100755 index 0000000000..b7a758d44d Binary files /dev/null and b/examples/greeter-service/greeter-service differ diff --git a/examples/greeter-service/handlers/handlers.go b/examples/greeter-service/handlers/handlers.go new file mode 100644 index 0000000000..e52f48db97 --- /dev/null +++ b/examples/greeter-service/handlers/handlers.go @@ -0,0 +1,8 @@ +package handlers + +type handlers struct { +} + +func New() *handlers { + return &handlers{} +} diff --git a/examples/greeter-service/handlers/say_hello.go b/examples/greeter-service/handlers/say_hello.go new file mode 100644 index 0000000000..bf61dadf04 --- /dev/null +++ b/examples/greeter-service/handlers/say_hello.go @@ -0,0 +1,16 @@ +package handlers + +import ( + "fmt" + pb "github.com/asim/go-micro/examples/greeter-service/proto" + + "golang.org/x/net/context" +) + +func (h *handlers) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + if in.Name == "world" { + return nil, fmt.Errorf("No world here") + } + + return &pb.HelloReply{Message: "Hello " + in.Name}, nil +} diff --git a/examples/greeter-service/hello-service b/examples/greeter-service/hello-service new file mode 100755 index 0000000000..ba47105be6 Binary files /dev/null and b/examples/greeter-service/hello-service differ diff --git a/examples/greeter-service/main.go b/examples/greeter-service/main.go new file mode 100644 index 0000000000..2dbe4ae6c3 --- /dev/null +++ b/examples/greeter-service/main.go @@ -0,0 +1,33 @@ +package main + +//go:generate protoc -I ./proto ./proto/service.proto --go_out=plugins=grpc:proto + +import ( + "github.com/asim/go-micro/cmd" + "github.com/asim/go-micro/server" + log "github.com/golang/glog" + "google.golang.org/grpc" + + "github.com/asim/go-micro/examples/greeter-service/handlers" + + pb "github.com/asim/go-micro/examples/greeter-service/proto" +) + +func main() { + server.Name = "go.micro.service.greeter" + + // Initialise Server + cmd.Init() + server.Init() + + // Register Handlers + handlers := handlers.New() + server.Register(server.GRPCHandler(func(s *grpc.Server) { + pb.RegisterGreeterServer(s, handlers) + })) + + // Run server + if err := server.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/examples/greeter-service/pod.json b/examples/greeter-service/pod.json new file mode 100644 index 0000000000..1bb0e2aef6 --- /dev/null +++ b/examples/greeter-service/pod.json @@ -0,0 +1,21 @@ +{ + "kind": "Pod", + "apiVersion": "v1beta1", + "id": "template-service", + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "template-service", + "containers": [{ + "name": "template-service", + "image": "chuhnk/go-template", + "ports": [{"name": "template-service", "containerPort": 8080}], + "command": ["--registry=kubernetes", "--bind_address=:8080"] + }], + } + }, + "labels": { + "name": "go.micro.service.template", + } +} + diff --git a/examples/greeter-service/proto/route_guide.proto b/examples/greeter-service/proto/route_guide.proto new file mode 100644 index 0000000000..5ea4fcf5b6 --- /dev/null +++ b/examples/greeter-service/proto/route_guide.proto @@ -0,0 +1,121 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package proto; + +// Interface exported by the server. +service RouteGuide { + // A simple RPC. + // + // Obtains the feature at a given position. + // + // If no feature is found for the given point, a feature with an empty name + // should be returned. + rpc GetFeature(Point) returns (Feature) {} + + // A server-to-client streaming RPC. + // + // Obtains the Features available within the given Rectangle. Results are + // streamed rather than returned at once (e.g. in a response message with a + // repeated field), as the rectangle may cover a large area and contain a + // huge number of features. + rpc ListFeatures(Rectangle) returns (stream Feature) {} + + // A client-to-server streaming RPC. + // + // Accepts a stream of Points on a route being traversed, returning a + // RouteSummary when traversal is completed. + rpc RecordRoute(stream Point) returns (RouteSummary) {} + + // A Bidirectional streaming RPC. + // + // Accepts a stream of RouteNotes sent while a route is being traversed, + // while receiving other RouteNotes (e.g. from other users). + rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} +} + +// Points are represented as latitude-longitude pairs in the E7 representation +// (degrees multiplied by 10**7 and rounded to the nearest integer). +// Latitudes should be in the range +/- 90 degrees and longitude should be in +// the range +/- 180 degrees (inclusive). +message Point { + int32 latitude = 1; + int32 longitude = 2; +} + +// A latitude-longitude rectangle, represented as two diagonally opposite +// points "lo" and "hi". +message Rectangle { + // One corner of the rectangle. + Point lo = 1; + + // The other corner of the rectangle. + Point hi = 2; +} + +// A feature names something at a given point. +// +// If a feature could not be named, the name is empty. +message Feature { + // The name of the feature. + string name = 1; + + // The point where the feature is detected. + Point location = 2; +} + +// A RouteNote is a message sent while at a given point. +message RouteNote { + // The location from which the message is sent. + Point location = 1; + + // The message to be sent. + string message = 2; +} + +// A RouteSummary is received in response to a RecordRoute rpc. +// +// It contains the number of individual points received, the number of +// detected features, and the total distance covered as the cumulative sum of +// the distance between each point. +message RouteSummary { + // The number of points received. + int32 point_count = 1; + + // The number of known features passed while traversing the route. + int32 feature_count = 2; + + // The distance covered in metres. + int32 distance = 3; + + // The duration of the traversal in seconds. + int32 elapsed_time = 4; +} diff --git a/examples/greeter-service/proto/service.pb.go b/examples/greeter-service/proto/service.pb.go new file mode 100644 index 0000000000..5ca2da7bc7 --- /dev/null +++ b/examples/greeter-service/proto/service.pb.go @@ -0,0 +1,109 @@ +// Code generated by protoc-gen-go. +// source: service.proto +// DO NOT EDIT! + +/* +Package proto is a generated protocol buffer package. + +It is generated from these files: + service.proto + +It has these top-level messages: + HelloRequest + HelloReply +*/ +package proto + +import proto1 "github.com/golang/protobuf/proto" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto1.Marshal + +// The request message containing the user's name. +type HelloRequest struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` +} + +func (m *HelloRequest) Reset() { *m = HelloRequest{} } +func (m *HelloRequest) String() string { return proto1.CompactTextString(m) } +func (*HelloRequest) ProtoMessage() {} + +// The response message containing the greetings +type HelloReply struct { + Message string `protobuf:"bytes,1,opt,name=message" json:"message,omitempty"` +} + +func (m *HelloReply) Reset() { *m = HelloReply{} } +func (m *HelloReply) String() string { return proto1.CompactTextString(m) } +func (*HelloReply) ProtoMessage() {} + +func init() { +} + +// Client API for Greeter service + +type GreeterClient interface { + // Sends a greeting + SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) +} + +type greeterClient struct { + cc *grpc.ClientConn +} + +func NewGreeterClient(cc *grpc.ClientConn) GreeterClient { + return &greeterClient{cc} +} + +func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { + out := new(HelloReply) + err := grpc.Invoke(ctx, "/proto.Greeter/SayHello", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Greeter service + +type GreeterServer interface { + // Sends a greeting + SayHello(context.Context, *HelloRequest) (*HelloReply, error) +} + +func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { + s.RegisterService(&_Greeter_serviceDesc, srv) +} + +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, buf []byte) (interface{}, error) { + in := new(HelloRequest) + if err := proto1.Unmarshal(buf, in); err != nil { + return nil, err + } + out, err := srv.(GreeterServer).SayHello(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +var _Greeter_serviceDesc = grpc.ServiceDesc{ + ServiceName: "proto.Greeter", + HandlerType: (*GreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _Greeter_SayHello_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, +} diff --git a/examples/greeter-service/proto/service.proto b/examples/greeter-service/proto/service.proto new file mode 100644 index 0000000000..00d8d5586e --- /dev/null +++ b/examples/greeter-service/proto/service.proto @@ -0,0 +1,48 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package proto; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/examples/greeter-service/service.json b/examples/greeter-service/service.json new file mode 100644 index 0000000000..4dc6472775 --- /dev/null +++ b/examples/greeter-service/service.json @@ -0,0 +1,9 @@ +{ + "id": "template-service", + "kind": "Service", + "apiVersion": "v1beta1", + "port": 9091, + "containerPort": 8080, + "selector": { "name": "go.micro.service.template" }, + "labels": { "name": "go.micro.service.template" } +} diff --git a/examples/service_client.go b/examples/service_client.go deleted file mode 100644 index e0e4d08094..0000000000 --- a/examples/service_client.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "fmt" - - "code.google.com/p/goprotobuf/proto" - "github.com/asim/go-micro/client" - example "github.com/asim/go-micro/template/proto/example" -) - -func main() { - // Create new request to service go.micro.service.go-template, method Example.Call - req := client.NewRequest("go.micro.service.template", "Example.Call", &example.Request{ - Name: proto.String("John"), - }) - - // Set arbitrary headers - req.Headers().Set("X-User-Id", "john") - req.Headers().Set("X-From-Id", "script") - - rsp := &example.Response{} - - // Call service - if err := client.Call(req, rsp); err != nil { - fmt.Println(err) - return - } - - fmt.Println(rsp.GetMsg()) -} diff --git a/micro.go b/micro.go new file mode 100644 index 0000000000..3e1d54a81f --- /dev/null +++ b/micro.go @@ -0,0 +1,17 @@ +package micro + +// Config is used to store configuration about the environment +type Config struct { + Registry string + RegistryAddr string + + Store string + StoreAddr string +} + +func DefaultConsulConfig() Config { + return Config{ + Registry: "consul", + Store: "consul", + } +} diff --git a/server/buffer.go b/server/buffer.go deleted file mode 100644 index e3f6ebec11..0000000000 --- a/server/buffer.go +++ /dev/null @@ -1,14 +0,0 @@ -package server - -import ( - "io" -) - -type buffer struct { - io.Reader - io.Writer -} - -func (b *buffer) Close() error { - return nil -} diff --git a/server/context.go b/server/context.go deleted file mode 100644 index f556f0d33f..0000000000 --- a/server/context.go +++ /dev/null @@ -1,35 +0,0 @@ -package server - -import ( - "time" - - "code.google.com/p/go.net/context" -) - -type ctx struct{} - -func (ctx *ctx) Deadline() (deadline time.Time, ok bool) { - return time.Time{}, false -} - -func (ctx *ctx) Done() <-chan struct{} { - return nil -} - -func (ctx *ctx) Err() error { - return nil -} - -func (ctx *ctx) Value(key interface{}) interface{} { - return nil -} - -func newContext(parent context.Context, s *serverContext) context.Context { - return context.WithValue(parent, "serverContext", s) -} - -// return server.Context -func NewContext(ctx context.Context) (Context, bool) { - c, ok := ctx.Value("serverContext").(*serverContext) - return c, ok -} diff --git a/server/grpc_server.go b/server/grpc_server.go new file mode 100644 index 0000000000..3ba5154e23 --- /dev/null +++ b/server/grpc_server.go @@ -0,0 +1,75 @@ +package server + +import ( + "net" + "sync" + + log "github.com/golang/glog" + "google.golang.org/grpc" +) + +type GRPCServer struct { + mtx sync.RWMutex + rpc *grpc.Server + address string + exit chan chan error +} + +func (s *GRPCServer) Address() string { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.address +} + +func (s *GRPCServer) Init() error { + return nil +} + +func (s *GRPCServer) Register(handler HandlerFunc) error { + handler(s) + return nil +} + +func (s *GRPCServer) Start() error { + l, err := net.Listen("tcp", s.address) + if err != nil { + return err + } + + log.Infof("Listening on %s", l.Addr().String()) + + s.mtx.Lock() + s.address = l.Addr().String() + s.mtx.Unlock() + + go s.rpc.Serve(l) + + go func() { + ch := <-s.exit + ch <- l.Close() + }() + + return nil +} + +func (s *GRPCServer) Stop() error { + ch := make(chan error) + s.exit <- ch + return <-ch +} + +func NewGRPCServer(address string) *GRPCServer { + return &GRPCServer{ + rpc: grpc.NewServer(), + address: address, + exit: make(chan chan error), + } +} + +type GRPCHandlerFunc func(s *grpc.Server) + +func GRPCHandler(f GRPCHandlerFunc) HandlerFunc { + return func(srv Server) { + f(srv.(*GRPCServer).rpc) + } +} diff --git a/server/headers.go b/server/headers.go deleted file mode 100644 index 9cdf78e779..0000000000 --- a/server/headers.go +++ /dev/null @@ -1,8 +0,0 @@ -package server - -type Headers interface { - Add(string, string) - Del(string) - Get(string) string - Set(string, string) -} diff --git a/server/health_checker.go b/server/health_checker.go deleted file mode 100644 index 612aaa58b5..0000000000 --- a/server/health_checker.go +++ /dev/null @@ -1,21 +0,0 @@ -package server - -import ( - "io" - "net/http" - "net/url" -) - -func registerHealthChecker(mux *http.ServeMux) { - req := &http.Request{ - Method: "GET", - URL: &url.URL{ - Path: HealthPath, - }, - } - if _, path := mux.Handler(req); path != HealthPath { - mux.HandleFunc(HealthPath, func(w http.ResponseWriter, r *http.Request) { - io.WriteString(w, "ok") - }) - } -} diff --git a/server/receiver.go b/server/receiver.go deleted file mode 100644 index f7498e3032..0000000000 --- a/server/receiver.go +++ /dev/null @@ -1,6 +0,0 @@ -package server - -type Receiver interface { - Name() string - Handler() interface{} -} diff --git a/server/request.go b/server/request.go deleted file mode 100644 index 6c620f4f92..0000000000 --- a/server/request.go +++ /dev/null @@ -1,6 +0,0 @@ -package server - -type Request interface { - Headers() Headers - Session(string) string -} diff --git a/server/rpc_receiver.go b/server/rpc_receiver.go deleted file mode 100644 index 11d25bb181..0000000000 --- a/server/rpc_receiver.go +++ /dev/null @@ -1,29 +0,0 @@ -package server - -type RpcReceiver struct { - name string - handler interface{} -} - -func newRpcReceiver(name string, handler interface{}) *RpcReceiver { - return &RpcReceiver{ - name: name, - handler: handler, - } -} - -func (r *RpcReceiver) Name() string { - return r.name -} - -func (r *RpcReceiver) Handler() interface{} { - return r.handler -} - -func NewRpcReceiver(handler interface{}) *RpcReceiver { - return newRpcReceiver("", handler) -} - -func NewNamedRpcReceiver(name string, handler interface{}) *RpcReceiver { - return newRpcReceiver(name, handler) -} diff --git a/server/rpc_server.go b/server/rpc_server.go deleted file mode 100644 index 4a72238b99..0000000000 --- a/server/rpc_server.go +++ /dev/null @@ -1,215 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "io/ioutil" - "net" - "net/http" - "runtime/debug" - "strconv" - "sync" - - "github.com/asim/go-micro/errors" - log "github.com/golang/glog" - rpc "github.com/youtube/vitess/go/rpcplus" - js "github.com/youtube/vitess/go/rpcplus/jsonrpc" - pb "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -type RpcServer struct { - mtx sync.RWMutex - rpc *rpc.Server - address string - exit chan chan error -} - -var ( - HealthPath = "/_status/health" - RpcPath = "/_rpc" -) - -func executeRequestSafely(c *serverContext, r *http.Request) { - defer func() { - if x := recover(); x != nil { - log.Warningf("Panicked on request: %v", r) - log.Warningf("%v: %v", x, string(debug.Stack())) - err := errors.InternalServerError("go.micro.server", "Unexpected error") - c.WriteHeader(500) - c.Write([]byte(err.Error())) - } - }() - - http.DefaultServeMux.ServeHTTP(c, r) -} - -func (s *RpcServer) handler(w http.ResponseWriter, r *http.Request) { - c := &serverContext{ - req: &serverRequest{r}, - outHeader: w.Header(), - } - - ctxs.Lock() - ctxs.m[r] = c - ctxs.Unlock() - defer func() { - ctxs.Lock() - delete(ctxs.m, r) - ctxs.Unlock() - }() - - // Patch up RemoteAddr so it looks reasonable. - if addr := r.Header.Get("X-Forwarded-For"); len(addr) > 0 { - r.RemoteAddr = addr - } else { - // Should not normally reach here, but pick a sensible default anyway. - r.RemoteAddr = "127.0.0.1" - } - // The address in the headers will most likely be of these forms: - // 123.123.123.123 - // 2001:db8::1 - // net/http.Request.RemoteAddr is specified to be in "IP:port" form. - if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { - // Assume the remote address is only a host; add a default port. - r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") - } - - executeRequestSafely(c, r) - c.outHeader = nil // make sure header changes aren't respected any more - - // Avoid nil Write call if c.Write is never called. - if c.outCode != 0 { - w.WriteHeader(c.outCode) - } - if c.outBody != nil { - w.Write(c.outBody) - } -} - -func (s *RpcServer) Address() string { - s.mtx.RLock() - defer s.mtx.RUnlock() - return s.address -} - -func (s *RpcServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - serveCtx := getServerContext(req) - - // TODO: get user scope from context - // check access - - if req.Method != "POST" { - err := errors.BadRequest("go.micro.server", "Method not allowed") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - return - } - defer req.Body.Close() - - b, err := ioutil.ReadAll(req.Body) - if err != nil { - errr := errors.InternalServerError("go.micro.server", fmt.Sprintf("Error reading request body: %v", err)) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - log.Errorf("Erroring reading request body: %v", err) - return - } - - rbq := bytes.NewBuffer(b) - rsp := bytes.NewBuffer(nil) - defer rsp.Reset() - defer rbq.Reset() - - buf := &buffer{ - rbq, - rsp, - } - - var cc rpc.ServerCodec - switch req.Header.Get("Content-Type") { - case "application/octet-stream": - cc = pb.NewServerCodec(buf) - case "application/json": - cc = js.NewServerCodec(buf) - default: - err = errors.InternalServerError("go.micro.server", fmt.Sprintf("Unsupported content-type: %v", req.Header.Get("Content-Type"))) - w.WriteHeader(500) - w.Write([]byte(err.Error())) - return - } - - ctx := newContext(&ctx{}, serveCtx) - err = s.rpc.ServeRequestWithContext(ctx, cc) - if err != nil { - // This should not be possible. - w.WriteHeader(500) - w.Write([]byte(err.Error())) - log.Errorf("Erroring serving request: %v", err) - return - } - - w.Header().Set("Content-Type", req.Header.Get("Content-Type")) - w.Header().Set("Content-Length", strconv.Itoa(rsp.Len())) - w.Write(rsp.Bytes()) -} - -func (s *RpcServer) Init() error { - log.Infof("Rpc handler %s", RpcPath) - http.Handle(RpcPath, s) - return nil -} - -func (s *RpcServer) NewReceiver(handler interface{}) Receiver { - return newRpcReceiver("", handler) -} - -func (s *RpcServer) NewNamedReceiver(name string, handler interface{}) Receiver { - return newRpcReceiver(name, handler) -} - -func (s *RpcServer) Register(r Receiver) error { - if len(r.Name()) > 0 { - s.rpc.RegisterName(r.Name(), r.Handler()) - return nil - } - - s.rpc.Register(r.Handler()) - return nil -} - -func (s *RpcServer) Start() error { - registerHealthChecker(http.DefaultServeMux) - - l, err := net.Listen("tcp", s.address) - if err != nil { - return err - } - - log.Infof("Listening on %s", l.Addr().String()) - - s.mtx.Lock() - s.address = l.Addr().String() - s.mtx.Unlock() - - go http.Serve(l, http.HandlerFunc(s.handler)) - - go func() { - ch := <-s.exit - ch <- l.Close() - }() - - return nil -} - -func (s *RpcServer) Stop() error { - ch := make(chan error) - s.exit <- ch - return <-ch -} - -func NewRpcServer(address string) *RpcServer { - return &RpcServer{ - rpc: rpc.NewServer(), - address: address, - exit: make(chan chan error), - } -} diff --git a/server/server.go b/server/server.go index 31a108bac6..0621ca875a 100644 --- a/server/server.go +++ b/server/server.go @@ -12,12 +12,12 @@ import ( log "github.com/golang/glog" ) +type HandlerFunc func(Server) + type Server interface { Address() string Init() error - NewReceiver(interface{}) Receiver - NewNamedReceiver(string, interface{}) Receiver - Register(Receiver) error + Register(HandlerFunc) error Start() error Stop() error } @@ -41,22 +41,14 @@ func Init() error { } if DefaultServer == nil { - DefaultServer = NewRpcServer(Address) + DefaultServer = NewGRPCServer(Address) } return DefaultServer.Init() } -func NewReceiver(handler interface{}) Receiver { - return DefaultServer.NewReceiver(handler) -} - -func NewNamedReceiver(path string, handler interface{}) Receiver { - return DefaultServer.NewNamedReceiver(path, handler) -} - -func Register(r Receiver) error { - return DefaultServer.Register(r) +func Register(handler HandlerFunc) error { + return DefaultServer.Register(handler) } func Run() error { diff --git a/server/server_context.go b/server/server_context.go deleted file mode 100644 index f986266b9d..0000000000 --- a/server/server_context.go +++ /dev/null @@ -1,120 +0,0 @@ -package server - -import ( - "net/http" - "sync" - - "github.com/asim/go-micro/client" - log "github.com/golang/glog" -) - -var ctxs = struct { - sync.Mutex - m map[*http.Request]*serverContext -}{ - m: make(map[*http.Request]*serverContext), -} - -// A server context interface -type Context interface { - Request() Request // the request made to the server - Headers() Headers // the response headers - NewRequest(string, string, interface{}) client.Request // a new scoped client request - NewProtoRequest(string, string, interface{}) client.Request // a new scoped client request - NewJsonRequest(string, string, interface{}) client.Request // a new scoped client request -} - -// context represents the context of an in-flight HTTP request. -// It implements the appengine.Context and http.ResponseWriter interfaces. -type serverContext struct { - req *serverRequest - outCode int - outHeader http.Header - outBody []byte -} - -// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status -// codes do not permit a response body (nor response entity headers such as -// Content-Length, Content-Type, etc). -func bodyAllowedForStatus(status int) bool { - switch { - case status >= 100 && status <= 199: - return false - case status == 204: - return false - case status == 304: - return false - } - return true -} - -func getServerContext(req *http.Request) *serverContext { - ctxs.Lock() - c := ctxs.m[req] - ctxs.Unlock() - - if c == nil { - // Someone passed in an http.Request that is not in-flight. - panic("NewContext passed an unknown http.Request") - } - return c -} - -func (c *serverContext) NewRequest(service, method string, request interface{}) client.Request { - req := client.NewRequest(service, method, request) - // TODO: set headers and scope - req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) - return req -} - -func (c *serverContext) NewProtoRequest(service, method string, request interface{}) client.Request { - req := client.NewProtoRequest(service, method, request) - // TODO: set headers and scope - req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) - return req -} - -func (c *serverContext) NewJsonRequest(service, method string, request interface{}) client.Request { - req := client.NewJsonRequest(service, method, request) - // TODO: set headers and scope - req.Headers().Set("X-User-Session", c.Request().Session("X-User-Session")) - return req -} - -// The response headers -func (c *serverContext) Headers() Headers { - return c.outHeader -} - -// The response headers -func (c *serverContext) Header() http.Header { - return c.outHeader -} - -// The request made to the server -func (c *serverContext) Request() Request { - return c.req -} - -func (c *serverContext) Write(b []byte) (int, error) { - if c.outCode == 0 { - c.WriteHeader(http.StatusOK) - } - if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { - return 0, http.ErrBodyNotAllowed - } - c.outBody = append(c.outBody, b...) - return len(b), nil -} - -func (c *serverContext) WriteHeader(code int) { - if c.outCode != 0 { - log.Error("WriteHeader called multiple times on request.") - return - } - c.outCode = code -} - -func GetContext(r *http.Request) *serverContext { - return getServerContext(r) -} diff --git a/server/server_request.go b/server/server_request.go deleted file mode 100644 index 9058ec526f..0000000000 --- a/server/server_request.go +++ /dev/null @@ -1,25 +0,0 @@ -package server - -import ( - "net/http" -) - -type serverRequest struct { - req *http.Request -} - -func (s *serverRequest) Headers() Headers { - return s.req.Header -} - -func (s *serverRequest) Session(name string) string { - if sess := s.Headers().Get(name); len(sess) > 0 { - return sess - } - - c, err := s.req.Cookie(name) - if err != nil { - return "" - } - return c.Value -}