Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into govet
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Bailey committed Apr 4, 2017
2 parents 427be44 + c449efd commit 2ee2725
Show file tree
Hide file tree
Showing 43 changed files with 1,341 additions and 296 deletions.
6 changes: 3 additions & 3 deletions clients/metadata/client.go
Expand Up @@ -186,7 +186,7 @@ func (c *clientImpl) ReadConsumerGroupExtentsByExtUUID(request *m.ReadConsumerGr
return c.client.ReadConsumerGroupExtentsByExtUUID(ctx, request)
}

func (c *clientImpl) ReadConsumerGroupExtents(request *m.ReadConsumerGroupExtentsRequest) (*m.ReadConsumerGroupExtentsResult_, error) {
func (c *clientImpl) ReadConsumerGroupExtents(request *shared.ReadConsumerGroupExtentsRequest) (*shared.ReadConsumerGroupExtentsResult_, error) {
ctx, cancel := c.createContext()
defer cancel()

Expand All @@ -200,15 +200,15 @@ func (c *clientImpl) ReadConsumerGroupExtent(request *m.ReadConsumerGroupExtentR
return c.client.ReadConsumerGroupExtent(ctx, request)
}

func (c *clientImpl) ReadConsumerGroup(request *m.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
func (c *clientImpl) ReadConsumerGroup(request *shared.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {

ctx, cancel := c.createContext()
defer cancel()

return c.client.ReadConsumerGroup(ctx, request)
}

func (c *clientImpl) ReadConsumerGroupByUUID(request *m.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
func (c *clientImpl) ReadConsumerGroupByUUID(request *shared.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {

ctx, cancel := c.createContext()
defer cancel()
Expand Down
6 changes: 3 additions & 3 deletions clients/metadata/interface.go
Expand Up @@ -36,11 +36,11 @@ type (
ReadDestination(request *shared.ReadDestinationRequest) (*shared.DestinationDescription, error)
ListDestinations(request *shared.ListDestinationsRequest) (*shared.ListDestinationsResult_, error)
ListDestinationsByUUID(request *shared.ListDestinationsByUUIDRequest) (*shared.ListDestinationsResult_, error)
ReadConsumerGroupExtents(request *m.ReadConsumerGroupExtentsRequest) (*m.ReadConsumerGroupExtentsResult_, error)
ReadConsumerGroupExtents(request *shared.ReadConsumerGroupExtentsRequest) (*shared.ReadConsumerGroupExtentsResult_, error)
ReadConsumerGroupExtent(request *m.ReadConsumerGroupExtentRequest) (*m.ReadConsumerGroupExtentResult_, error)
ReadConsumerGroupExtentsByExtUUID(request *m.ReadConsumerGroupExtentsByExtUUIDRequest) (*m.ReadConsumerGroupExtentsByExtUUIDResult_, error)
ReadConsumerGroupByUUID(request *m.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
ReadConsumerGroup(request *m.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
ReadConsumerGroupByUUID(request *shared.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
ReadConsumerGroup(request *shared.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
ListHosts(request *m.ListHostsRequest) (*m.ListHostsResult_, error)
ListConsumerGroups(request *shared.ListConsumerGroupRequest) (*shared.ListConsumerGroupResult_, error)
ListAllConsumerGroups(request *shared.ListConsumerGroupRequest) (*shared.ListConsumerGroupResult_, error)
Expand Down
46 changes: 23 additions & 23 deletions clients/metadata/metadata_cassandra.go
Expand Up @@ -1421,7 +1421,7 @@ func (s *CassandraMetadataService) readConsumerGroupByDstUUID(dstUUID string, cg
// When destination path is specified as input, this method only returns result, if the
// destination has not been DELETED. When destination UUID is specified as input, this
// method will always return result, if the consumer group exist.
func (s *CassandraMetadataService) ReadConsumerGroup(ctx thrift.Context, request *m.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
func (s *CassandraMetadataService) ReadConsumerGroup(ctx thrift.Context, request *shared.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {

if request.ConsumerGroupName == nil {
if request.ConsumerGroupUUID != nil {
Expand Down Expand Up @@ -1450,7 +1450,7 @@ func (s *CassandraMetadataService) ReadConsumerGroup(ctx thrift.Context, request
}

// ReadConsumerGroupByUUID returns the ConsumerGroupDescription for the [consumerGroupUUID].
func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, request *m.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {
func (s *CassandraMetadataService) ReadConsumerGroupByUUID(ctx thrift.Context, request *shared.ReadConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {

if request.ConsumerGroupUUID == nil {
return nil, &shared.BadRequestError{Message: "ConsumerGroupUUID cannot be nil"}
Expand Down Expand Up @@ -1522,7 +1522,7 @@ func updateCGDescIfChanged(req *shared.UpdateConsumerGroupRequest, cgDesc *share
// This method can only be called for an existing consumer group
func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, request *shared.UpdateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error) {

readCGReq := &m.ReadConsumerGroupRequest{
readCGReq := &shared.ReadConsumerGroupRequest{
DestinationPath: common.StringPtr(request.GetDestinationPath()),
ConsumerGroupName: common.StringPtr(request.GetConsumerGroupName()),
}
Expand Down Expand Up @@ -1627,7 +1627,7 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
var existingCG *shared.ConsumerGroupDescription

if request.DestinationPath != nil {
readCGReq := &m.ReadConsumerGroupRequest{
readCGReq := &shared.ReadConsumerGroupRequest{
DestinationPath: common.StringPtr(request.GetDestinationPath()),
ConsumerGroupName: common.StringPtr(request.GetConsumerGroupName()),
}
Expand Down Expand Up @@ -3499,7 +3499,7 @@ func (s *CassandraMetadataService) deleteConsumerGroupExtent(cgUUID string, exte
cge.GetConsumerGroupUUID(),
cge.GetExtentUUID(),
cge.GetOutputHostUUID(),
m.ConsumerGroupExtentStatus_DELETED,
shared.ConsumerGroupExtentStatus_DELETED,
cge.GetAckLevelOffset(),
cge.GetAckLevelSeqNo(),
cge.GetAckLevelSeqNoRate(),
Expand All @@ -3522,14 +3522,14 @@ func (s *CassandraMetadataService) deleteConsumerGroupExtent(cgUUID string, exte
}

// UpdateConsumerGroupExtentStatus updates the consumer group extent status
func (s *CassandraMetadataService) UpdateConsumerGroupExtentStatus(ctx thrift.Context, request *m.UpdateConsumerGroupExtentStatusRequest) error {
func (s *CassandraMetadataService) UpdateConsumerGroupExtentStatus(ctx thrift.Context, request *shared.UpdateConsumerGroupExtentStatusRequest) error {
if request.Status == nil {
return &shared.BadRequestError{
Message: "Empty status",
}
}

if request.GetStatus() == m.ConsumerGroupExtentStatus_DELETED {
if request.GetStatus() == shared.ConsumerGroupExtentStatus_DELETED {
return s.deleteConsumerGroupExtent(request.GetConsumerGroupUUID(), request.GetExtentUUID())
}
query := s.session.Query(sqlCGUpdateStatus, request.GetStatus(), request.GetConsumerGroupUUID(), request.GetExtentUUID())
Expand All @@ -3546,15 +3546,15 @@ func (s *CassandraMetadataService) UpdateConsumerGroupExtentStatus(ctx thrift.Co
// SetAckOffset updates the ack offset for the given [ConsumerGroup, Exent]
// If there is no existing record for a [ConsumerGroup, Extent], this method
// will automatically create a record with the given offset
func (s *CassandraMetadataService) SetAckOffset(ctx thrift.Context, request *m.SetAckOffsetRequest) error {
func (s *CassandraMetadataService) SetAckOffset(ctx thrift.Context, request *shared.SetAckOffsetRequest) error {

var err error
var connectedStore interface{}
if request.IsSetConnectedStoreUUID() {
connectedStore = request.GetConnectedStoreUUID()
}

if request.Status != nil && request.GetStatus() == m.ConsumerGroupExtentStatus_CONSUMED {
if request.Status != nil && request.GetStatus() == shared.ConsumerGroupExtentStatus_CONSUMED {
query := s.session.Query(
sqlCGUpdateAckOffsetConsumed,
request.GetStatus(),
Expand Down Expand Up @@ -3601,13 +3601,13 @@ func (s *CassandraMetadataService) SetAckOffset(ctx thrift.Context, request *m.S

// CreateConsumerGroupExtent creates a [ConsumerGroup, Extent, OutputHost] mapping
// If the mapping already exist, this method will overwrite the existing mapping
func (s *CassandraMetadataService) CreateConsumerGroupExtent(ctx thrift.Context, request *m.CreateConsumerGroupExtentRequest) error {
func (s *CassandraMetadataService) CreateConsumerGroupExtent(ctx thrift.Context, request *shared.CreateConsumerGroupExtentRequest) error {

query := s.session.Query(sqlCGPutExtent,
request.GetConsumerGroupUUID(),
request.GetExtentUUID(),
request.GetOutputHostUUID(),
m.ConsumerGroupExtentStatus_OPEN,
shared.ConsumerGroupExtentStatus_OPEN,
request.GetStoreUUIDs())

query.Consistency(s.midConsLevel)
Expand Down Expand Up @@ -3640,10 +3640,10 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtentsByExtUUID(ctx thrift.
}

result := &m.ReadConsumerGroupExtentsByExtUUIDResult_{
CgExtents: []*m.ConsumerGroupExtent{},
CgExtents: []*shared.ConsumerGroupExtent{},
NextPageToken: listRequest.PageToken,
}
ext := &m.ConsumerGroupExtent{}
ext := &shared.ConsumerGroupExtent{}
ext.ExtentUUID = common.StringPtr(listRequest.GetExtentUUID())
var storeUUIDs []gocql.UUID
count := int64(0)
Expand All @@ -3661,7 +3661,7 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtentsByExtUUID(ctx thrift.
&ext.ConnectedStoreUUID) && count < listRequest.GetLimit() {
// Get a new item within limit
result.CgExtents = append(result.CgExtents, ext)
ext = &m.ConsumerGroupExtent{}
ext = &shared.ConsumerGroupExtent{}
ext.ExtentUUID = common.StringPtr(listRequest.GetExtentUUID())
count++
}
Expand All @@ -3679,7 +3679,7 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtentsByExtUUID(ctx thrift.

// ReadConsumerGroupExtent returns the [Status, AckOffset] corresponding to the given [ConsumerGroup, Extent], if it exist
func (s *CassandraMetadataService) ReadConsumerGroupExtent(ctx thrift.Context, request *m.ReadConsumerGroupExtentRequest) (*m.ReadConsumerGroupExtentResult_, error) {
result := &m.ReadConsumerGroupExtentResult_{Extent: &m.ConsumerGroupExtent{}}
result := &m.ReadConsumerGroupExtentResult_{Extent: &shared.ConsumerGroupExtent{}}
result.Extent.ExtentUUID = common.StringPtr(request.GetExtentUUID())
result.Extent.ConsumerGroupUUID = common.StringPtr(request.GetConsumerGroupUUID())

Expand Down Expand Up @@ -3750,7 +3750,7 @@ func (s *CassandraMetadataService) SetOutputHost(ctx thrift.Context, request *m.
}

// ReadConsumerGroupExtents implements the corresponding TChanMetadataServiceClient API
func (s *CassandraMetadataService) ReadConsumerGroupExtents(ctx thrift.Context, request *m.ReadConsumerGroupExtentsRequest) (*m.ReadConsumerGroupExtentsResult_, error) {
func (s *CassandraMetadataService) ReadConsumerGroupExtents(ctx thrift.Context, request *shared.ReadConsumerGroupExtentsRequest) (*shared.ReadConsumerGroupExtentsResult_, error) {
if request.GetMaxResults() < 1 {
return nil, &shared.BadRequestError{
Message: "MaxResults < 1",
Expand All @@ -3765,7 +3765,7 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtents(ctx thrift.Context,
// entries that map to a single index value
// are a lot (> 10). So, only use secondary
// index for filtering by OPEN status
if request.IsSetStatus() && request.GetStatus() != m.ConsumerGroupExtentStatus_OPEN {
if request.IsSetStatus() && request.GetStatus() != shared.ConsumerGroupExtentStatus_OPEN {
filterLocally = true
}

Expand All @@ -3774,12 +3774,12 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtents(ctx thrift.Context,
return nil, err
}

result := &m.ReadConsumerGroupExtentsResult_{
Extents: []*m.ConsumerGroupExtent{},
result := &shared.ReadConsumerGroupExtentsResult_{
Extents: []*shared.ConsumerGroupExtent{},
NextPageToken: request.PageToken,
}

var status m.ConsumerGroupExtentStatus
var status shared.ConsumerGroupExtentStatus
var ackLevelOffset, ackLevelSeq, readLevelOffset, readLevelSeq, writeTime int64
var extentUUID, outputHostUUID, connectedStore string
var alsr, rlsr float64
Expand Down Expand Up @@ -3819,7 +3819,7 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtents(ctx thrift.Context,
continue
}

item := &m.ConsumerGroupExtent{
item := &shared.ConsumerGroupExtent{
ExtentUUID: common.StringPtr(extentUUID),
ConsumerGroupUUID: common.StringPtr(request.GetConsumerGroupUUID()),
Status: common.MetadataConsumerGroupExtentStatusPtr(status),
Expand Down Expand Up @@ -3850,7 +3850,7 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtents(ctx thrift.Context,
return result, nil
}

func (s *CassandraMetadataService) readConsumerGroupExtentsHelper(request *m.ReadConsumerGroupExtentsRequest, ignoreStatusFilter bool) (*gocql.Iter, error) {
func (s *CassandraMetadataService) readConsumerGroupExtentsHelper(request *shared.ReadConsumerGroupExtentsRequest, ignoreStatusFilter bool) (*gocql.Iter, error) {

sql := sqlCGGetAllExtents
if !ignoreStatusFilter && request.IsSetStatus() {
Expand Down Expand Up @@ -3878,7 +3878,7 @@ func (s *CassandraMetadataService) ReadConsumerGroupExtentsLite(ctx thrift.Conte
}

filterLocally := false
if request.IsSetStatus() && request.GetStatus() != m.ConsumerGroupExtentStatus_OPEN {
if request.IsSetStatus() && request.GetStatus() != shared.ConsumerGroupExtentStatus_OPEN {
filterLocally = true
}

Expand Down

0 comments on commit 2ee2725

Please sign in to comment.