Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'docker_fix' of github.com:uber/cherami-server into dock…
Browse files Browse the repository at this point in the history
…er_fix
  • Loading branch information
datoug committed Feb 8, 2017
2 parents 87945c4 + 1ad97fb commit c229e86
Show file tree
Hide file tree
Showing 10 changed files with 611 additions and 12 deletions.
12 changes: 6 additions & 6 deletions .travis.yml
Expand Up @@ -35,13 +35,13 @@ install:
- go get golang.org/x/tools/cmd/cover
- ccm create test -v 2.2.8 -n 1 -s
- sudo ln -sf /home/travis/.local/bin/cqlsh /usr/local/bin/cqlsh
- wget https://github.com/uber/cherami-server/releases/download/rocksdb-4.11.2-trusty/librocksdb.so.4.11.2
- ln -s librocksdb.so.4.11.2 librocksdb.so.4.11
- ln -s librocksdb.so.4.11.2 librocksdb.so.4
- ln -s librocksdb.so.4.11.2 librocksdb.so
- glide install && touch vendor/glide.updated
- pushd vendor/github.com/cockroachdb/c-rocksdb/internal
- travis_wait 20 make shared_lib
- popd
- export CGO_CFLAGS="$CGO_FLAGS -I`pwd`/vendor/github.com/cockroachdb/c-rocksdb/internal/include"
- export CGO_LDFLAGS="$CGO_LDFLAGS -L`pwd` -lrocksdb"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:`pwd`"
- export CGO_LDFLAGS="$CGO_LDFLAGS -L`pwd`/vendor/github.com/cockroachdb/c-rocksdb/internal -lrocksdb"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:`pwd`/vendor/github.com/cockroachdb/c-rocksdb/internal"

script:
- EMBEDROCKSDB=0 make cover_ci
Expand Down
24 changes: 24 additions & 0 deletions clients/storehost/client.go
Expand Up @@ -67,3 +67,27 @@ func (s *StoreClientImpl) ReadMessages(req *store.ReadMessagesRequest) (*store.R

return s.client.ReadMessages(ctx, req)
}

// GetAddressFromTimestamp queries store for the address corresponding to the given timestamp
func (s *StoreClientImpl) GetAddressFromTimestamp(req *store.GetAddressFromTimestampRequest) (*store.GetAddressFromTimestampResult_, error) {
ctx, cancel := tcthrift.NewContext(2 * time.Second)
defer cancel()

return s.client.GetAddressFromTimestamp(ctx, req)
}

// SealExtent seals an extent on the specified store
func (s *StoreClientImpl) SealExtent(req *store.SealExtentRequest) error {
ctx, cancel := tcthrift.NewContext(2 * time.Second)
defer cancel()

return s.client.SealExtent(ctx, req)
}

// PurgeMessages seals an extent on the specified store
func (s *StoreClientImpl) PurgeMessages(req *store.PurgeMessagesRequest) (*store.PurgeMessagesResult_, error) {
ctx, cancel := tcthrift.NewContext(2 * time.Second)
defer cancel()

return s.client.PurgeMessages(ctx, req)
}
71 changes: 71 additions & 0 deletions cmd/tools/admin/main.go
Expand Up @@ -628,6 +628,77 @@ func main() {
},
},
},
{
Name: "seal-check",
Aliases: []string{"sc"},
Usage: "seal-check <dest> [--seal]",
Flags: []cli.Flag{
cli.StringFlag{
Name: "prefix, pf",
Value: "/",
Usage: "only process destinations with prefix",
},
cli.BoolFlag{
Name: "seal",
Usage: "seal extents on replica that are not sealed",
},
cli.BoolFlag{
Name: "verbose, v",
Usage: "verbose output",
},
cli.BoolFlag{
Name: "veryverbose, vv",
Usage: "very verbose output",
},
},
Action: func(c *cli.Context) {
admin.SealConsistencyCheck(c)
},
},
{
Name: "store-seal",
Aliases: []string{"seal"},
Usage: "seal <store_uuid> <extent_uuid> [<seqnum>]",
Action: func(c *cli.Context) {
admin.StoreSealExtent(c)
},
},
{
Name: "store-isextentsealed",
Aliases: []string{"issealed"},
Usage: "issealed <store_uuid> <extent_uuid>",
Action: func(c *cli.Context) {
admin.StoreIsExtentSealed(c)
},
},
{
Name: "store-gaft",
Aliases: []string{"gaft"},
Usage: "gaft <store_uuid> <extent_uuid> <timestamp>",
Action: func(c *cli.Context) {
admin.StoreGetAddressFromTimestamp(c)
},
},
{
Name: "store-purgeextent",
Aliases: []string{"purge"},
Usage: "purge <store_uuid> <extent_uuid> [<address> | --entirely]",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "entirely",
Usage: "deletes extent entirely",
},

cli.Int64Flag{
Name: "address, a",
Value: 0,
Usage: "address to delete upto",
},
},
Action: func(c *cli.Context) {
admin.StorePurgeMessages(c)
},
},
}

app.Run(os.Args)
Expand Down
12 changes: 12 additions & 0 deletions common/mockrpm.go
Expand Up @@ -21,6 +21,7 @@
package common

import (
"math/rand"
"strings"
"sync"
)
Expand Down Expand Up @@ -107,6 +108,17 @@ func (rpm *MockRingpopMonitor) FindHostForKey(service string, key string) (*Host
return &HostInfo{}, ErrInsufficientHosts
}

// FindRandomHost finds and returns a random host responsible for handling the given key
func (rpm *MockRingpopMonitor) FindRandomHost(service string) (*HostInfo, error) {
rpm.RLock()
defer rpm.RUnlock()

if hosts, ok := rpm.serviceHosts[service]; ok && len(hosts) > 0 {
return hosts[rand.Intn(len(hosts))], nil
}
return &HostInfo{}, ErrInsufficientHosts
}

// IsHostHealthy returns true if the given host is healthy and false otherwise
func (rpm *MockRingpopMonitor) IsHostHealthy(service string, uuid string) bool {
rpm.RLock()
Expand Down
16 changes: 12 additions & 4 deletions common/rpm.go
Expand Up @@ -65,6 +65,8 @@ type (
FindHostForAddr(service string, addr string) (*HostInfo, error)
// FindHostForKey finds and returns the host responsible for handling the given (service, key)
FindHostForKey(service string, key string) (*HostInfo, error)
// FindRandomHost finds and returns a random host responsible for handling the given service
FindRandomHost(service string) (*HostInfo, error)
// IsHostHealthy returns true if the given (service, host) is healthy
IsHostHealthy(service string, uuid string) bool
// ResolveUUID resovles a host UUID to an IP address, if the host is found
Expand Down Expand Up @@ -275,6 +277,16 @@ func (rpm *ringpopMonitorImpl) FindHostForKey(service string, key string) (*Host
return &host, nil
}

// FindRandomHost finds and returns a random host responsible for handling the given key
func (rpm *ringpopMonitorImpl) FindRandomHost(service string) (*HostInfo, error) {
members, err := rpm.GetHosts(service)
if err != nil {
return nil, err
}

return members[rand.Intn(len(members))], nil
}

// IsHostHealthy returns true if the given host is healthy and false otherwise
func (rpm *ringpopMonitorImpl) IsHostHealthy(service string, uuid string) bool {
_, err := rpm.ResolveUUID(service, uuid)
Expand Down Expand Up @@ -546,7 +558,3 @@ func (rpm *ringpopMonitorImpl) notifyListeners(info *serviceInfo, eventType Ring
}
info.listenerMutex.RUnlock()
}

func (rpm *ringpopMonitorImpl) random(hosts []*HostInfo) *HostInfo {
return hosts[rand.Intn(len(hosts))]
}
5 changes: 5 additions & 0 deletions services/controllerhost/dfdd_test.go
Expand Up @@ -211,6 +211,11 @@ func (rpm *testRpmImpl) FindHostForKey(service string, key string) (*common.Host
return nil, common.ErrUUIDLookupFailed
}

// FindRandomHost finds and returns a random host responsible for handling the given service
func (rpm *testRpmImpl) FindRandomHost(service string) (*common.HostInfo, error) {
return nil, common.ErrUUIDLookupFailed
}

// IsHostHealthy returns true if the given (service, host) is healthy
func (rpm *testRpmImpl) IsHostHealthy(service string, uuid string) bool { return false }

Expand Down
2 changes: 1 addition & 1 deletion services/storehost/storehost.go
Expand Up @@ -1395,7 +1395,7 @@ func getReplicationArgsFromRemoteReplicateRequest(req *store.RemoteReplicateExte
}

rpm := sCommon.GetRingpopMonitor()
hostInfo, err := rpm.FindHostForKey(common.ReplicatorServiceName, common.ReplicatorServiceName)
hostInfo, err := rpm.FindRandomHost(common.ReplicatorServiceName)
if err != nil {
return nil, newInternalServiceError(fmt.Sprintf("error finding replicator host : (%v)", err))
}
Expand Down
46 changes: 46 additions & 0 deletions test/mocks/metadata/TChanMetadataServiceClient.go
Expand Up @@ -444,6 +444,29 @@ func (_m *TChanMetadataServiceClient) MoveExtent(ctx thrift.Context, request *me
return r0
}

// ReadConsumerGroupExtentsLite provides a mock function with given fields: ctx, request
func (_m *TChanMetadataServiceClient) ReadConsumerGroupExtentsLite(ctx thrift.Context, request *metadata.ReadConsumerGroupExtentsLiteRequest) (*metadata.ReadConsumerGroupExtentsLiteResult_, error) {
ret := _m.Called(ctx, request)

var r0 *metadata.ReadConsumerGroupExtentsLiteResult_
if rf, ok := ret.Get(0).(func(thrift.Context, *metadata.ReadConsumerGroupExtentsLiteRequest) *metadata.ReadConsumerGroupExtentsLiteResult_); ok {
r0 = rf(ctx, request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*metadata.ReadConsumerGroupExtentsLiteResult_)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(thrift.Context, *metadata.ReadConsumerGroupExtentsLiteRequest) error); ok {
r1 = rf(ctx, request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ReadConsumerGroup provides a mock function with given fields: ctx, getRequest
func (_m *TChanMetadataServiceClient) ReadConsumerGroup(ctx thrift.Context, getRequest *metadata.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
ret := _m.Called(ctx, getRequest)
Expand Down Expand Up @@ -844,6 +867,29 @@ func (_m *TChanMetadataServiceClient) DeleteHostInfo(ctx thrift.Context, request
return r0
}

// ListDestinationExtents provides a mock function with given fields: ctx, request
func (_m *TChanMetadataServiceClient) ListDestinationExtents(ctx thrift.Context, request *metadata.ListDestinationExtentsRequest) (*metadata.ListDestinationExtentsResult_, error) {
ret := _m.Called(ctx, request)

var r0 *metadata.ListDestinationExtentsResult_
if rf, ok := ret.Get(0).(func(thrift.Context, *metadata.ListDestinationExtentsRequest) *metadata.ListDestinationExtentsResult_); ok {
r0 = rf(ctx, request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*metadata.ListDestinationExtentsResult_)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(thrift.Context, *metadata.ListDestinationExtentsRequest) error); ok {
r1 = rf(ctx, request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ReadHostInfo provides a mock function with given fields: ctx, request
func (_m *TChanMetadataServiceClient) ReadHostInfo(ctx thrift.Context, request *metadata.ReadHostInfoRequest) (*metadata.ReadHostInfoResult_, error) {
ret := _m.Called(ctx, request)
Expand Down
36 changes: 36 additions & 0 deletions tools/admin/lib.go
Expand Up @@ -1123,3 +1123,39 @@ func DeleteServiceConfig(c *cli.Context) {
err = mClient.DeleteServiceConfig(req)
toolscommon.ExitIfError(err)
}

// SealConsistencyCheck iterates through every sealed extent for every destination
// and checks to see if the corresponding replicas have been sealed.
func SealConsistencyCheck(c *cli.Context) {

mClient := toolscommon.GetMClient(c, adminToolService)
toolscommon.SealConsistencyCheck(c, mClient)
}

// StoreSealExtent sends a SealExtent command to the specified store.
func StoreSealExtent(c *cli.Context) {

mClient := toolscommon.GetMClient(c, adminToolService)
toolscommon.StoreSealExtent(c, mClient)
}

// StoreIsExtentSealed checks if an extent is sealed on the specified store
func StoreIsExtentSealed(c *cli.Context) {

mClient := toolscommon.GetMClient(c, adminToolService)
toolscommon.StoreIsExtentSealed(c, mClient)
}

// StoreGetAddressFromTimestamp sends a GetAddressFromTimestamp command to the specified store.
func StoreGetAddressFromTimestamp(c *cli.Context) {

mClient := toolscommon.GetMClient(c, adminToolService)
toolscommon.StoreGetAddressFromTimestamp(c, mClient)
}

// StorePurgeMessages sends a purge command for an extent to the specified store.
func StorePurgeMessages(c *cli.Context) {

mClient := toolscommon.GetMClient(c, adminToolService)
toolscommon.StorePurgeMessages(c, mClient)
}

0 comments on commit c229e86

Please sign in to comment.