From 3afda586b759ebed800a86b2dcf51bbc147b60dd Mon Sep 17 00:00:00 2001 From: Alex Peats-Bond Date: Wed, 30 Nov 2016 15:51:21 -0800 Subject: [PATCH] oneway redis --- docker-compose.yml | 8 +- glide.lock | 14 +- glide.yaml | 1 + .../crossdock/client/dispatcher/dispatcher.go | 7 + internal/crossdock/server/oneway/server.go | 17 ++ transport/redis/client.go | 46 ++++++ transport/redis/inbound.go | 153 ++++++++++++++++++ transport/redis/inbound_test.go | 54 +++++++ transport/redis/outbound.go | 101 ++++++++++++ transport/redis/outbound_test.go | 133 +++++++++++++++ transport/redis/redis5.go | 79 +++++++++ transport/redis/redistest/client.go | 101 ++++++++++++ 12 files changed, 711 insertions(+), 3 deletions(-) create mode 100644 transport/redis/client.go create mode 100644 transport/redis/inbound.go create mode 100644 transport/redis/inbound_test.go create mode 100644 transport/redis/outbound.go create mode 100644 transport/redis/outbound_test.go create mode 100644 transport/redis/redis5.go create mode 100644 transport/redis/redistest/client.go diff --git a/docker-compose.yml b/docker-compose.yml index 233ebc74b..fa8524e24 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: image: crossdock/crossdock dns_search: . links: + - redis - go - node - java @@ -29,7 +30,7 @@ services: - AXIS_HTTPSERVER=go - AXIS_CLIENT_ONEWAY=go - AXIS_SERVER_ONEWAY=go - - AXIS_TRANSPORT_ONEWAY=http + - AXIS_TRANSPORT_ONEWAY=http,redis - BEHAVIOR_RAW=client,server,transport - BEHAVIOR_JSON=client,server,transport @@ -58,6 +59,8 @@ services: build: . ports: - "8080-8088" + environment: + - REDIS=enabled node: dns_search: . @@ -84,3 +87,6 @@ services: - 8080 environment: - SYNC=1 + + redis: + image: redis diff --git a/glide.lock b/glide.lock index c3a44b961..c0624a85a 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: d07a774c552cf67c25cf338fd6bf8e6eacf7597f8baaa417b01d141b85a13ab4 -updated: 2016-11-14T15:42:41.962533657-08:00 +hash: 20edceacdd207bcd7a41691f98bcf86273401c6aec50d64ee780f59b4b6b0dea +updated: 2016-11-18T14:07:30.713214673-08:00 imports: - name: github.com/apache/thrift version: 4337983d157dd7041c17340107682f26d2c0c795 @@ -87,6 +87,16 @@ imports: version: 1529f889eb4b594d1f047f2fb8d5b3cc85c8f006 subpackages: - go/ast/astutil +- name: gopkg.in/bsm/ratelimit.v1 + version: db14e161995a5177acef654cb0dd785e8ee8bc22 +- name: gopkg.in/redis.v5 + version: b6bfe529a846fbb9a58c832ce71c61b6fde12c15 + subpackages: + - internal + - internal/consistenthash + - internal/hashtag + - internal/pool + - internal/proto - name: gopkg.in/yaml.v2 version: a83829b6f1293c91addabc89d0571c246397bbf4 testImports: [] diff --git a/glide.yaml b/glide.yaml index 21c6609a4..27bde3764 100644 --- a/glide.yaml +++ b/glide.yaml @@ -30,3 +30,4 @@ import: - package: github.com/uber/jaeger-client-go version: ^1 - package: gopkg.in/yaml.v2 +- package: gopkg.in/redis.v5 diff --git a/internal/crossdock/client/dispatcher/dispatcher.go b/internal/crossdock/client/dispatcher/dispatcher.go index c4c7c2d45..8f65d4eb7 100644 --- a/internal/crossdock/client/dispatcher/dispatcher.go +++ b/internal/crossdock/client/dispatcher/dispatcher.go @@ -30,6 +30,7 @@ import ( "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" + "go.uber.org/yarpc/transport/redis" tch "go.uber.org/yarpc/transport/tchannel" "github.com/crossdock/crossdock-go" @@ -94,6 +95,12 @@ func CreateOnewayDispatcher(t crossdock.T, handler raw.OnewayHandler) (yarpc.Dis hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)), http.NewTransport(), )) + + case "redis": + outbound = redis.NewOnewayOutbound( + redis.NewRedis5Client("redis:6379"), + "yarpc/oneway") + default: fatals.Fail("", "unknown transport %q", trans) } diff --git a/internal/crossdock/server/oneway/server.go b/internal/crossdock/server/oneway/server.go index b3fe5b291..2ede553e1 100644 --- a/internal/crossdock/server/oneway/server.go +++ b/internal/crossdock/server/oneway/server.go @@ -22,6 +22,8 @@ package oneway import ( "fmt" + "os" + "time" "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" @@ -29,14 +31,24 @@ import ( "go.uber.org/yarpc/internal/crossdock/thrift/oneway/yarpc/onewayserver" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" + "go.uber.org/yarpc/transport/redis" ) var dispatcher yarpc.Dispatcher +const redisAddr = "redis:6379" + // Start starts the test server that clients will make requests to func Start() { inbounds := []transport.Inbound{http.NewInbound(":8084")} + if redisIsAvailable() { + rds := redis.NewInbound( + redis.NewRedis5Client(redisAddr), + "yarpc/oneway", "yarpc/oneway/processing", time.Second) + inbounds = append(inbounds, rds) + } + dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "oneway-server", Inbounds: inbounds, @@ -63,3 +75,8 @@ func Stop() { fmt.Println("failed to stop:", err.Error()) } } + +// redisIsAvailable checks to see if a redis server is available +func redisIsAvailable() bool { + return os.Getenv("REDIS") == "enabled" +} diff --git a/transport/redis/client.go b/transport/redis/client.go new file mode 100644 index 000000000..0098a5449 --- /dev/null +++ b/transport/redis/client.go @@ -0,0 +1,46 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package redis + +import "time" + +//go:generate mockgen -destination=redistest/client.go -package=redistest go.uber.org/yarpc/transport/redis Client + +// Client is a subset of redis commands used to manage a queue +type Client interface { + // Start creates the connection to redis + // + // This MAY be called more than once. + Start() error + // Stop ends the redis connection + // + // This May be called more than once. + Stop() error + + // LPush adds item to the queue + LPush(queue string, item []byte) error + // This MUST return an error if the blocking call does not receive an item + // BRPopLPush moves an item from the primary queue into a processing list. + // within the timeout. + BRPopLPush(from, to string, timeout time.Duration) ([]byte, error) + // LRem removes one item from the queue key + LRem(queue string, item []byte) error +} diff --git a/transport/redis/inbound.go b/transport/redis/inbound.go new file mode 100644 index 000000000..c1bef9a6b --- /dev/null +++ b/transport/redis/inbound.go @@ -0,0 +1,153 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package redis + +import ( + "context" + "time" + + "go.uber.org/yarpc/internal/errors" + "go.uber.org/yarpc/internal/request" + "go.uber.org/yarpc/serialize" + "go.uber.org/yarpc/transport" + "go.uber.org/yarpc/transport/internal" + + "github.com/opentracing/opentracing-go" +) + +// Inbound is a redis inbound that reads from the given queueKey. This will +// wait for an item in the queue or until the timout is reached before trying +// to read again. +type Inbound struct { + registry transport.Registry + tracer opentracing.Tracer + + client Client + timeout time.Duration + queueKey string + processingKey string + + stop chan struct{} +} + +// NewInbound creates a redis transport.Inbound +func NewInbound(client Client, queueKey, processingKey string, timeout time.Duration) *Inbound { + return &Inbound{ + tracer: opentracing.GlobalTracer(), + client: client, + timeout: timeout, + stop: make(chan struct{}), + } +} + +// WithTracer configures a tracer on this inbound. +func (i *Inbound) WithTracer(tracer opentracing.Tracer) *Inbound { + i.tracer = tracer + return i +} + +// WithRegistry configures a registry to handle incoming requests, +// as a chained method for convenience. +func (i *Inbound) WithRegistry(registry transport.Registry) *Inbound { + i.registry = registry + return i +} + +// SetRegistry configures a registry to handle incoming requests. +// This satisfies the transport.Inbound interface, and would be called +// by a dispatcher when it starts. +func (i *Inbound) SetRegistry(registry transport.Registry) { + i.registry = registry +} + +// Start starts the inbound, reading from the queueKey +func (i *Inbound) Start() error { + if i.registry == nil { + return errors.NoRegistryError{} + } + + err := i.client.Start() + if err != nil { + return err + } + + go i.start() + return nil +} + +func (i *Inbound) start() { + for { + select { + case <-i.stop: + return + default: + i.handle() + } + } +} + +// Stop ends the connection to redis +func (i *Inbound) Stop() error { + close(i.stop) + return i.client.Stop() +} + +func (i *Inbound) handle() error { + // TODO: logging + item, err := i.client.BRPopLPush(i.queueKey, i.processingKey, i.timeout) + if err != nil { + return err + } + defer i.client.LRem(i.queueKey, item) + + start := time.Now() + + spanContext, req, err := serialize.FromBytes(i.tracer, item) + if err != nil { + return err + } + + ctx, span := transport.ExtractOpenTracingSpan(context.Background(), spanContext, req, i.tracer, "redis", start) + defer span.Finish() + + v := request.Validator{Request: req} + req, err = v.Validate(ctx) + if err != nil { + return transport.UpdateSpanWithErr(span, err) + } + + spec, err := i.registry.Choose(ctx, req) + if err != nil { + return transport.UpdateSpanWithErr(span, err) + } + + if spec.Type() != transport.Oneway { + err = errors.UnsupportedTypeError{Transport: "redis", Type: string(spec.Type())} + return transport.UpdateSpanWithErr(span, err) + } + + req, err = v.ValidateOneway(ctx) + if err != nil { + return transport.UpdateSpanWithErr(span, err) + } + + return internal.SafelyCallOnewayHandler(ctx, spec.Oneway(), req) +} diff --git a/transport/redis/inbound_test.go b/transport/redis/inbound_test.go new file mode 100644 index 000000000..5e81c5bd6 --- /dev/null +++ b/transport/redis/inbound_test.go @@ -0,0 +1,54 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package redis + +import ( + "testing" + + "go.uber.org/yarpc/transport/redis/redistest" + "go.uber.org/yarpc/transport/transporttest" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestOperationOrder(t *testing.T) { + mockCtrl := gomock.NewController(t) + client := redistest.NewMockClient(mockCtrl) + + startCall := client.EXPECT().Start() + getCall := client.EXPECT(). + BRPopLPush(gomock.Any(), gomock.Any(), gomock.Any()). + After(startCall) + client.EXPECT(). + LRem(gomock.Any(), gomock.Any()). + After(getCall) + client.EXPECT().Stop() + + inbound := NewInbound(client, "queueKey", "processingKey", 0) + inbound.SetRegistry(&transporttest.MockRegistry{}) + + err := inbound.Start() + assert.NoError(t, err, "error starting redis inbound") + + err = inbound.Stop() + assert.NoError(t, err, "error stopping redis inbound") +} diff --git a/transport/redis/outbound.go b/transport/redis/outbound.go new file mode 100644 index 000000000..12ebe7449 --- /dev/null +++ b/transport/redis/outbound.go @@ -0,0 +1,101 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package redis + +import ( + "context" + "time" + + "go.uber.org/atomic" + "go.uber.org/yarpc/internal/errors" + "go.uber.org/yarpc/serialize" + "go.uber.org/yarpc/transport" + + "github.com/opentracing/opentracing-go" +) + +var errOutboundNotStarted = errors.ErrOutboundNotStarted("redis.Outbound") + +// Outbound is a redis OnewayOutbound +type Outbound struct { + client Client + tracer opentracing.Tracer + queueKey string + + started *atomic.Bool +} + +// NewOnewayOutbound creates a redis transport.OnewayOutbound +func NewOnewayOutbound(client Client, queueKey string) *Outbound { + return &Outbound{ + client: client, + tracer: opentracing.GlobalTracer(), + started: atomic.NewBool(false), + } +} + +// WithTracer configures a tracer for the outbound +func (o *Outbound) WithTracer(tracer opentracing.Tracer) *Outbound { + o.tracer = tracer + return o +} + +// Start creates connection to the redis instance +func (o *Outbound) Start() error { + var err error + if !o.started.Swap(true) { + err = o.client.Start() + } + return err +} + +// Stop stops the redis connection +func (o *Outbound) Stop() error { + var err error + if o.started.Swap(false) { + err = o.client.Stop() + } + return err +} + +// CallOneway makes a oneway request using redis +func (o *Outbound) CallOneway(ctx context.Context, req *transport.Request) (transport.Ack, error) { + if !o.started.Load() { + panic(errOutboundNotStarted) + } + + ctx, span := transport.CreateOpenTracingSpan(ctx, req, o.tracer, "redis", time.Now()) + defer span.Finish() + + marshalledRPC, err := serialize.ToBytes(o.tracer, span.Context(), req) + if err != nil { + return nil, transport.UpdateSpanWithErr(span, err) + } + + err = o.client.LPush(o.queueKey, marshalledRPC) + ack := time.Now() + + if err != nil { + return nil, transport.UpdateSpanWithErr(span, err) + } + + return ack, nil +} diff --git a/transport/redis/outbound_test.go b/transport/redis/outbound_test.go new file mode 100644 index 000000000..7c6f4f1a1 --- /dev/null +++ b/transport/redis/outbound_test.go @@ -0,0 +1,133 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package redis + +import ( + "bytes" + "context" + "sync" + "testing" + + "go.uber.org/yarpc/encoding/raw" + "go.uber.org/yarpc/transport" + "go.uber.org/yarpc/transport/redis/redistest" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCall(t *testing.T) { + mockCtrl := gomock.NewController(t) + client := redistest.NewMockClient(mockCtrl) + + client.EXPECT().Start() + client.EXPECT().LPush(gomock.Any(), gomock.Any()) + client.EXPECT().Stop() + + out := NewOnewayOutbound(client, "queueKey") + err := out.Start() + assert.NoError(t, err, "could not start redis outbound") + + ack, err := out.CallOneway(context.Background(), &transport.Request{ + Caller: "caller", + Service: "service", + Encoding: raw.Encoding, + Procedure: "hello", + Body: bytes.NewReader([]byte("hello!")), + }) + assert.NotNil(t, ack) + assert.NoError(t, err) + + assert.NoError(t, out.Stop(), "error stoping redis outbound") +} + +func TestStartMultiple(t *testing.T) { + mockCtrl := gomock.NewController(t) + client := redistest.NewMockClient(mockCtrl) + client.EXPECT().Start().Times(1).Return(nil) + + out := NewOnewayOutbound(client, "queueKey") + + var wg sync.WaitGroup + signal := make(chan struct{}) + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-signal + + err := out.Start() + assert.NoError(t, err) + }() + } + close(signal) + wg.Wait() +} + +func TestStopMultiple(t *testing.T) { + mockCtrl := gomock.NewController(t) + client := redistest.NewMockClient(mockCtrl) + client.EXPECT().Start().Times(1).Return(nil) + client.EXPECT().Stop().Times(1).Return(nil) + + out := NewOnewayOutbound(client, "queueKey") + + err := out.Start() + require.NoError(t, err) + + var wg sync.WaitGroup + signal := make(chan struct{}) + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-signal + + err := out.Stop() + assert.NoError(t, err) + }() + } + close(signal) + wg.Wait() +} + +func TestCallWithoutStarting(t *testing.T) { + mockCtrl := gomock.NewController(t) + client := redistest.NewMockClient(mockCtrl) + client.EXPECT().Start().Times(1).Return(nil) + + out := NewOnewayOutbound(client, "queueKey") + + assert.Panics(t, func() { + out.CallOneway( + context.Background(), + &transport.Request{ + Caller: "caller", + Service: "service", + Encoding: raw.Encoding, + Procedure: "foo", + Body: bytes.NewReader([]byte("sup")), + }) + }) +} diff --git a/transport/redis/redis5.go b/transport/redis/redis5.go new file mode 100644 index 000000000..a6709603b --- /dev/null +++ b/transport/redis/redis5.go @@ -0,0 +1,79 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package redis + +import ( + "errors" + "time" + + redis5 "gopkg.in/redis.v5" +) + +type redis5Client struct { + addr string + client *redis5.Client +} + +// NewRedis5Client creates a new Client implementation using gopkg.in/redis.v5 +func NewRedis5Client(addr string) Client { + return &redis5Client{addr: addr} +} + +func (c *redis5Client) Start() error { + c.client = redis5.NewClient( + &redis5.Options{Addr: c.addr}, + ) + + return c.client.Ping().Err() +} + +func (c *redis5Client) Stop() error { + return c.client.Close() +} + +func (c *redis5Client) LPush(queueKey string, item []byte) error { + cmd := c.client.LPush(queueKey, item) + if cmd.Err() != nil { + return errors.New("could not push item onto queue") + } + return nil +} + +func (c *redis5Client) BRPopLPush(queueKey, processingKey string, timeout time.Duration) ([]byte, error) { + cmd := c.client.BRPopLPush(queueKey, processingKey, timeout) + + item, _ := cmd.Bytes() + // No bytes means that we timed out waiting for something in our queue + // and we should try again + if len(item) == 0 { + return nil, errors.New("no item found in queue") + } + + return item, nil +} + +func (c *redis5Client) LRem(key string, item []byte) error { + removed := c.client.LRem(key, 1, item).Val() + if removed <= 0 { + return errors.New("could not remove item from queue") + } + return nil +} diff --git a/transport/redis/redistest/client.go b/transport/redis/redistest/client.go new file mode 100644 index 000000000..828aab097 --- /dev/null +++ b/transport/redis/redistest/client.go @@ -0,0 +1,101 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Automatically generated by MockGen. DO NOT EDIT! +// Source: go.uber.org/yarpc/transport/redis (interfaces: Client) + +package redistest + +import ( + gomock "github.com/golang/mock/gomock" + time "time" +) + +// Mock of Client interface +type MockClient struct { + ctrl *gomock.Controller + recorder *_MockClientRecorder +} + +// Recorder for MockClient (not exported) +type _MockClientRecorder struct { + mock *MockClient +} + +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &_MockClientRecorder{mock} + return mock +} + +func (_m *MockClient) EXPECT() *_MockClientRecorder { + return _m.recorder +} + +func (_m *MockClient) BRPopLPush(_param0 string, _param1 string, _param2 time.Duration) ([]byte, error) { + ret := _m.ctrl.Call(_m, "BRPopLPush", _param0, _param1, _param2) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockClientRecorder) BRPopLPush(arg0, arg1, arg2 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "BRPopLPush", arg0, arg1, arg2) +} + +func (_m *MockClient) LPush(_param0 string, _param1 []byte) error { + ret := _m.ctrl.Call(_m, "LPush", _param0, _param1) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockClientRecorder) LPush(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "LPush", arg0, arg1) +} + +func (_m *MockClient) LRem(_param0 string, _param1 []byte) error { + ret := _m.ctrl.Call(_m, "LRem", _param0, _param1) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockClientRecorder) LRem(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "LRem", arg0, arg1) +} + +func (_m *MockClient) Start() error { + ret := _m.ctrl.Call(_m, "Start") + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockClientRecorder) Start() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Start") +} + +func (_m *MockClient) Stop() error { + ret := _m.ctrl.Call(_m, "Stop") + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockClientRecorder) Stop() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Stop") +}