Skip to content

Commit

Permalink
oneway redis
Browse files Browse the repository at this point in the history
  • Loading branch information
peats-bond committed Dec 6, 2016
1 parent 2047dff commit 3afda58
Show file tree
Hide file tree
Showing 12 changed files with 711 additions and 3 deletions.
8 changes: 7 additions & 1 deletion docker-compose.yml
Expand Up @@ -5,6 +5,7 @@ services:
image: crossdock/crossdock
dns_search: .
links:
- redis
- go
- node
- java
Expand All @@ -29,7 +30,7 @@ services:
- AXIS_HTTPSERVER=go
- AXIS_CLIENT_ONEWAY=go
- AXIS_SERVER_ONEWAY=go
- AXIS_TRANSPORT_ONEWAY=http
- AXIS_TRANSPORT_ONEWAY=http,redis

- BEHAVIOR_RAW=client,server,transport
- BEHAVIOR_JSON=client,server,transport
Expand Down Expand Up @@ -58,6 +59,8 @@ services:
build: .
ports:
- "8080-8088"
environment:
- REDIS=enabled

node:
dns_search: .
Expand All @@ -84,3 +87,6 @@ services:
- 8080
environment:
- SYNC=1

redis:
image: redis
14 changes: 12 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Expand Up @@ -30,3 +30,4 @@ import:
- package: github.com/uber/jaeger-client-go
version: ^1
- package: gopkg.in/yaml.v2
- package: gopkg.in/redis.v5
7 changes: 7 additions & 0 deletions internal/crossdock/client/dispatcher/dispatcher.go
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
"go.uber.org/yarpc/transport/redis"
tch "go.uber.org/yarpc/transport/tchannel"

"github.com/crossdock/crossdock-go"
Expand Down Expand Up @@ -94,6 +95,12 @@ func CreateOnewayDispatcher(t crossdock.T, handler raw.OnewayHandler) (yarpc.Dis
hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)),
http.NewTransport(),
))

case "redis":
outbound = redis.NewOnewayOutbound(
redis.NewRedis5Client("redis:6379"),
"yarpc/oneway")

default:
fatals.Fail("", "unknown transport %q", trans)
}
Expand Down
17 changes: 17 additions & 0 deletions internal/crossdock/server/oneway/server.go
Expand Up @@ -22,21 +22,33 @@ package oneway

import (
"fmt"
"os"
"time"

"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/crossdock/thrift/oneway/yarpc/onewayserver"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
"go.uber.org/yarpc/transport/redis"
)

var dispatcher yarpc.Dispatcher

const redisAddr = "redis:6379"

// Start starts the test server that clients will make requests to
func Start() {
inbounds := []transport.Inbound{http.NewInbound(":8084")}

if redisIsAvailable() {
rds := redis.NewInbound(
redis.NewRedis5Client(redisAddr),
"yarpc/oneway", "yarpc/oneway/processing", time.Second)
inbounds = append(inbounds, rds)
}

dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: "oneway-server",
Inbounds: inbounds,
Expand All @@ -63,3 +75,8 @@ func Stop() {
fmt.Println("failed to stop:", err.Error())
}
}

// redisIsAvailable checks to see if a redis server is available
func redisIsAvailable() bool {
return os.Getenv("REDIS") == "enabled"
}
46 changes: 46 additions & 0 deletions transport/redis/client.go
@@ -0,0 +1,46 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package redis

import "time"

//go:generate mockgen -destination=redistest/client.go -package=redistest go.uber.org/yarpc/transport/redis Client

// Client is a subset of redis commands used to manage a queue
type Client interface {
// Start creates the connection to redis
//
// This MAY be called more than once.
Start() error
// Stop ends the redis connection
//
// This May be called more than once.
Stop() error

// LPush adds item to the queue
LPush(queue string, item []byte) error
// This MUST return an error if the blocking call does not receive an item
// BRPopLPush moves an item from the primary queue into a processing list.
// within the timeout.
BRPopLPush(from, to string, timeout time.Duration) ([]byte, error)
// LRem removes one item from the queue key
LRem(queue string, item []byte) error
}
153 changes: 153 additions & 0 deletions transport/redis/inbound.go
@@ -0,0 +1,153 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package redis

import (
"context"
"time"

"go.uber.org/yarpc/internal/errors"
"go.uber.org/yarpc/internal/request"
"go.uber.org/yarpc/serialize"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/internal"

"github.com/opentracing/opentracing-go"
)

// Inbound is a redis inbound that reads from the given queueKey. This will
// wait for an item in the queue or until the timout is reached before trying
// to read again.
type Inbound struct {
registry transport.Registry
tracer opentracing.Tracer

client Client
timeout time.Duration
queueKey string
processingKey string

stop chan struct{}
}

// NewInbound creates a redis transport.Inbound
func NewInbound(client Client, queueKey, processingKey string, timeout time.Duration) *Inbound {
return &Inbound{
tracer: opentracing.GlobalTracer(),
client: client,
timeout: timeout,
stop: make(chan struct{}),
}
}

// WithTracer configures a tracer on this inbound.
func (i *Inbound) WithTracer(tracer opentracing.Tracer) *Inbound {
i.tracer = tracer
return i
}

// WithRegistry configures a registry to handle incoming requests,
// as a chained method for convenience.
func (i *Inbound) WithRegistry(registry transport.Registry) *Inbound {
i.registry = registry
return i
}

// SetRegistry configures a registry to handle incoming requests.
// This satisfies the transport.Inbound interface, and would be called
// by a dispatcher when it starts.
func (i *Inbound) SetRegistry(registry transport.Registry) {
i.registry = registry
}

// Start starts the inbound, reading from the queueKey
func (i *Inbound) Start() error {
if i.registry == nil {
return errors.NoRegistryError{}
}

err := i.client.Start()
if err != nil {
return err
}

go i.start()
return nil
}

func (i *Inbound) start() {
for {
select {
case <-i.stop:
return
default:
i.handle()
}
}
}

// Stop ends the connection to redis
func (i *Inbound) Stop() error {
close(i.stop)
return i.client.Stop()
}

func (i *Inbound) handle() error {
// TODO: logging
item, err := i.client.BRPopLPush(i.queueKey, i.processingKey, i.timeout)
if err != nil {
return err
}
defer i.client.LRem(i.queueKey, item)

start := time.Now()

spanContext, req, err := serialize.FromBytes(i.tracer, item)
if err != nil {
return err
}

ctx, span := transport.ExtractOpenTracingSpan(context.Background(), spanContext, req, i.tracer, "redis", start)
defer span.Finish()

v := request.Validator{Request: req}
req, err = v.Validate(ctx)
if err != nil {
return transport.UpdateSpanWithErr(span, err)
}

spec, err := i.registry.Choose(ctx, req)
if err != nil {
return transport.UpdateSpanWithErr(span, err)
}

if spec.Type() != transport.Oneway {
err = errors.UnsupportedTypeError{Transport: "redis", Type: string(spec.Type())}
return transport.UpdateSpanWithErr(span, err)
}

req, err = v.ValidateOneway(ctx)
if err != nil {
return transport.UpdateSpanWithErr(span, err)
}

return internal.SafelyCallOnewayHandler(ctx, spec.Oneway(), req)
}
54 changes: 54 additions & 0 deletions transport/redis/inbound_test.go
@@ -0,0 +1,54 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package redis

import (
"testing"

"go.uber.org/yarpc/transport/redis/redistest"
"go.uber.org/yarpc/transport/transporttest"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)

func TestOperationOrder(t *testing.T) {
mockCtrl := gomock.NewController(t)
client := redistest.NewMockClient(mockCtrl)

startCall := client.EXPECT().Start()
getCall := client.EXPECT().
BRPopLPush(gomock.Any(), gomock.Any(), gomock.Any()).
After(startCall)
client.EXPECT().
LRem(gomock.Any(), gomock.Any()).
After(getCall)
client.EXPECT().Stop()

inbound := NewInbound(client, "queueKey", "processingKey", 0)
inbound.SetRegistry(&transporttest.MockRegistry{})

err := inbound.Start()
assert.NoError(t, err, "error starting redis inbound")

err = inbound.Stop()
assert.NoError(t, err, "error stopping redis inbound")
}

0 comments on commit 3afda58

Please sign in to comment.