Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Specify consistency level for all the operations that are not specifi… (
Browse files Browse the repository at this point in the history
#315)

* Specify consistency level for all the operations that are not specified before

* change low to mid in deleting cg operation
  • Loading branch information
kobeyang committed Oct 30, 2017
1 parent 0752a0c commit 83ebee1
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func (s *CassandraMetadataService) recordUserOperation(entityName string, entity
operationTime,
operationContent)

batch.Cons = s.highConsLevel

if err := s.session.ExecuteBatch(batch); err != nil {
s.log.WithFields(bark.Fields{common.TagErr: err}).Error("recordUserOperation failed")
return fmt.Errorf("recordUserOperation error: %v", err)
Expand Down Expand Up @@ -476,7 +478,7 @@ func (s *CassandraMetadataService) CreateDestinationUUID(ctx thrift.Context, uui
request.KafkaTopics,
request.DLQConsumerGroupUUID, // may be nil
int64(0), int64(0), // dlq_{purge,merge}_before default to '0'
).Exec(); err != nil {
).Consistency(s.highConsLevel).Exec(); err != nil {
return nil, &shared.InternalServiceError{
Message: fmt.Sprintf("CreateDestination failure while inserting into destinations: %v", err),
}
Expand All @@ -502,7 +504,7 @@ func (s *CassandraMetadataService) CreateDestinationUUID(ctx thrift.Context, uui
request.GetIsMultiZone(),
marshalDstZoneConfigs(request.GetZoneConfigs()),
request.KafkaCluster,
request.KafkaTopics)
request.KafkaTopics).Consistency(s.highConsLevel)
applied, err := query.MapScanCAS(previous)
if err != nil {
return nil, &shared.InternalServiceError{
Expand Down Expand Up @@ -811,6 +813,8 @@ func (s *CassandraMetadataService) UpdateDestination(ctx thrift.Context, updateR
existing.GetPath(),
directoryUUID)

batch.Cons = s.highConsLevel

if err = s.session.ExecuteBatch(batch); err != nil {
return nil, &shared.InternalServiceError{
Message: "UpdateDestination: " + err.Error(),
Expand Down Expand Up @@ -879,7 +883,11 @@ func (s *CassandraMetadataService) DeleteDestination(ctx thrift.Context, deleteR
marshalDstZoneConfigs(existing.GetZoneConfigs()),
existing.GetIsMultiZone(),
existing.GetDestinationUUID())

batch.Query(sqlDeleteDst, directoryUUID, existing.GetPath())

batch.Cons = s.midConsLevel

if err = s.session.ExecuteBatch(batch); err != nil {
return &shared.InternalServiceError{
Message: "DeleteDestination: " + err.Error(),
Expand Down Expand Up @@ -1330,7 +1338,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs()),
createRequest.GetOptions()).Exec()
createRequest.GetOptions()).Consistency(s.highConsLevel).Exec()

if err != nil {
return nil, &shared.InternalServiceError{
Expand All @@ -1357,7 +1365,7 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r
createRequest.GetIsMultiZone(),
createRequest.GetActiveZone(),
marshalCgZoneConfigs(createRequest.GetZoneConfigs()),
createRequest.GetOptions())
createRequest.GetOptions()).Consistency(s.highConsLevel)

previous := make(map[string]interface{}) // We actually throw away the old values below, but passing nil causes a panic

Expand Down Expand Up @@ -1674,6 +1682,8 @@ func (s *CassandraMetadataService) UpdateConsumerGroup(ctx thrift.Context, reque
newCG.GetDestinationUUID(),
newCG.GetConsumerGroupName())

batch.Cons = s.highConsLevel

if err = s.session.ExecuteBatch(batch); err != nil {
return nil, &shared.InternalServiceError{
Message: fmt.Sprintf("UpdateConsumerGroup - Batch operation failed, dst=%v cg=%v err=%v",
Expand Down Expand Up @@ -1800,6 +1810,8 @@ func (s *CassandraMetadataService) DeleteConsumerGroup(ctx thrift.Context, reque
existingCG.GetDestinationUUID(),
existingCG.GetConsumerGroupName())

batch.Cons = s.midConsLevel

if e = s.session.ExecuteBatch(batch); e != nil {
return &shared.InternalServiceError{
Message: fmt.Sprintf("DeleteConsumerGroup - Batch operation failed, dst=%v cg=%v err=%v",
Expand Down

0 comments on commit 83ebee1

Please sign in to comment.