Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into test-kafka-travis
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Yang committed Mar 28, 2017
2 parents 84f35b3 + 44d5dde commit 0a4192e
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 75 deletions.
97 changes: 61 additions & 36 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,20 @@ const (
tableDestinationsByPath = "destinations_by_path"
tableHostAddrToUUID = "host_addr_to_uuid"
tableInputHostExtents = "input_host_extents"
tableStoreExtents = "store_extents"
tableUUIDToHostAddr = "uuid_to_host_addr"
tableOperationsByEntityName = "user_operations_by_entity_name"
tableOperationsByEntityUUID = "user_operations_by_entity_uuid"
tableStoreExtents = "store_extents"
tableUUIDToHostAddr = "uuid_to_host_addr"
)

const (
columnAckLevelOffset = "ack_level_offset"
columnAckLevelSequence = "ack_level_sequence"
columnAckLevelSequenceRate = "ack_level_sequence_rate"
columnActiveZone = "active_zone"
columnAllowConsume = "allow_consume"
columnAllowPublish = "allow_publish"
columnAlwaysReplicatedTo = "always_replicate_to"
columnArchivalLocation = "archival_location"
columnAvailableAddress = "available_address"
columnAvailableEnqueueTime = "available_enqueue_time"
Expand All @@ -88,42 +92,56 @@ const (
columnBeginEnqueueTime = "begin_enqueue_time"
columnBeginSequence = "begin_sequence"
columnBeginTime = "begin_time"
columnCallerHostName = "caller_host_name"
columnCallerServiceName = "caller_service_name"
columnChecksumOption = "checksum_option"
columnConnectedStore = "connected_store"
columnConsumedMessagesRetention = "consumed_messages_retention"
columnConsumerGroup = "consumer_group"
columnConsumerGroupUUID = "consumer_group_uuid"
columnConsumerGroupVisibility = "consumer_group_visibility"
columnCreatedTime = "created_time"
columnDLQConsumerGroup = "dlq_consumer_group"
columnDLQMergeBefore = "dlq_merge_before"
columnDLQPurgeBefore = "dlq_purge_before"
columnDeadLetterQueueDestinationUUID = "dead_letter_queue_destination_uuid"
columnDestination = "destination"
columnDestinationUUID = "destination_uuid"
columnDirectoryUUID = "directory_uuid"
columnEndTime = "end_time"
columnEntityName = "entity_name"
columnEntityType = "entity_type"
columnEntityUUID = "entity_uuid"
columnExtent = "extent"
columnExtentUUID = "extent_uuid"
columnHostAddr = "hostaddr"
columnHostName = "hostname"
columnInitiatorInfo = "initiator"
columnInputHostUUID = "input_host_uuid"
columnOriginZone = "origin_zone"
columnRemoteExtentPrimaryStore = "remote_extent_primary_store"
columnIsMultiZone = "is_multi_zone"
columnKafkaCluster = "kafka_cluster"
columnKafkaTopics = "kafka_topics"
columnLastAddress = "last_address"
columnLastEnqueueTime = "last_enqueue_time"
columnLastSequence = "last_sequence"
columnLastSequenceRate = "last_sequence_rate"
columnLockTimeoutSeconds = "lock_timeout_seconds"
columnMaxDeliveryCount = "max_delivery_count"
columnDLQMergeBefore = "dlq_merge_before"
columnName = "name"
columnOpsContent = "operation_content"
columnOpsTime = "operation_time"
columnOpsType = "operation_type"
columnOriginZone = "origin_zone"
columnOutputHostUUID = "output_host_uuid"
columnOwnerEmail = "owner_email"
columnChecksumOption = "checksum_option"
columnPath = "path"
columnDLQPurgeBefore = "dlq_purge_before"
columnReceivedLevelOffset = "received_level_offset"
columnReceivedLevelSequence = "received_level_sequence"
columnReceivedLevelSequenceRate = "received_level_sequence_rate"
columnRemoteExtentPrimaryStore = "remote_extent_primary_store"
columnRemoteExtentReplicaNum = "remote_extent_replica_num"
columnReplicaStats = "replica_stats"
columnReplicationStatus = "replication_status"
columnSizeInBytes = "size_in_bytes"
columnSizeInBytesRate = "size_in_bytes_rate"
columnSkipOlderMessagesSeconds = "skip_older_messages_seconds"
Expand All @@ -137,27 +155,11 @@ const (
columnType = "type"
columnUUID = "uuid"
columnUnconsumedMessagesRetention = "unconsumed_messages_retention"
columnEntityName = "entity_name"
columnEntityUUID = "entity_uuid"
columnEntityType = "entity_type"
columnInitiatorInfo = "initiator"
columnOpsType = "operation_type"
columnOpsTime = "operation_time"
columnOpsContent = "operation_content"
columnUserName = "user_name"
columnUserEmail = "user_email"
columnIsMultiZone = "is_multi_zone"
columnZoneConfigs = "zone_configs"
columnZone = "zone"
columnAllowPublish = "allow_publish"
columnAllowConsume = "allow_consume"
columnAlwaysReplicatedTo = "always_replicate_to"
columnRemoteExtentReplicaNum = "remote_extent_replica_num"
columnUserName = "user_name"
columnVisible = "visible"
columnActiveZone = "active_zone"
columnCallerServiceName = "caller_service_name"
columnCallerHostName = "caller_host_name"
columnReplicationStatus = "replication_status"
columnZone = "zone"
columnZoneConfigs = "zone_configs"
)

const userOperationTTL = "2592000" // 30 days
Expand Down Expand Up @@ -320,16 +322,18 @@ const (
columnZoneConfigs + `: ?}`

sqlInsertDstByUUID = `INSERT INTO ` + tableDestinations +
`(` + columnUUID + `, ` + columnIsMultiZone + `, ` + columnDestination + `, ` + columnDLQConsumerGroup + `) ` +
` VALUES (?, ?, ` + sqlDstType + `, ?)`
`(` + columnUUID + `, ` + columnIsMultiZone + `, ` + columnDestination + `, ` + columnKafkaCluster + `, ` + columnKafkaTopics + `, ` + columnDLQConsumerGroup + `) ` +
` VALUES (?, ?, ` + sqlDstType + `, ?, ?, ?)`

sqlInsertDstByPath = `INSERT INTO ` + tableDestinationsByPath +
`(` +
columnDirectoryUUID + `, ` +
columnPath + `, ` +
columnIsMultiZone + `, ` +
columnDestination +
`) VALUES (?, ?, ?, ` + sqlDstType + `) IF NOT EXISTS`
columnDestination + `, ` +
columnKafkaCluster + `, ` +
columnKafkaTopics +
`) VALUES (?, ?, ?, ` + sqlDstType + `, ?, ?) IF NOT EXISTS`

sqlGetDstByPath = `SELECT ` +
columnDestination + `.` + columnUUID + `, ` +
Expand All @@ -341,7 +345,9 @@ const (
columnDestination + `.` + columnOwnerEmail + `, ` +
columnDestination + `.` + columnChecksumOption + `, ` +
columnDestination + `.` + columnIsMultiZone + `, ` +
columnDestination + `.` + columnZoneConfigs +
columnDestination + `.` + columnZoneConfigs + `, ` +
columnKafkaCluster + `, ` +
columnKafkaTopics +
` FROM ` + tableDestinationsByPath +
` WHERE ` + columnDirectoryUUID + `=? and ` + columnPath + `=?`

Expand All @@ -358,7 +364,9 @@ const (
columnDestination + `.` + columnZoneConfigs + `, ` +
columnDLQConsumerGroup + `, ` +
columnDLQPurgeBefore + `, ` +
columnDLQMergeBefore +
columnDLQMergeBefore + `, ` +
columnKafkaCluster + `, ` +
columnKafkaTopics +
` FROM ` + tableDestinations

sqlUpdateDstByUUID = `UPDATE ` + tableDestinations + ` SET ` +
Expand Down Expand Up @@ -422,6 +430,8 @@ func (s *CassandraMetadataService) CreateDestinationUUID(ctx thrift.Context, uui
request.GetChecksumOption(),
request.GetIsMultiZone(),
marshalDstZoneConfigs(request.GetZoneConfigs()),
request.KafkaCluster,
request.KafkaTopics,
request.DLQConsumerGroupUUID, // may be nil
).Exec(); err != nil {
return nil, &shared.InternalServiceError{
Expand All @@ -447,7 +457,9 @@ func (s *CassandraMetadataService) CreateDestinationUUID(ctx thrift.Context, uui
request.GetOwnerEmail(),
request.GetChecksumOption(),
request.GetIsMultiZone(),
marshalDstZoneConfigs(request.GetZoneConfigs()))
marshalDstZoneConfigs(request.GetZoneConfigs()),
request.KafkaCluster,
request.KafkaTopics)
applied, err := query.MapScanCAS(previous)
if err != nil {
return nil, &shared.InternalServiceError{
Expand Down Expand Up @@ -495,6 +507,8 @@ func (s *CassandraMetadataService) CreateDestinationUUID(ctx thrift.Context, uui
IsMultiZone: common.BoolPtr(request.GetIsMultiZone()),
ZoneConfigs: request.GetZoneConfigs(),
DLQConsumerGroupUUID: common.StringPtr(request.GetDLQConsumerGroupUUID()),
KafkaCluster: common.StringPtr(request.GetKafkaCluster()),
KafkaTopics: request.KafkaTopics,
}, nil
}

Expand All @@ -513,6 +527,8 @@ func getUtilDestinationDescription() *shared.DestinationDescription {
result.DLQMergeBefore = common.Int64Ptr(0)
result.IsMultiZone = common.BoolPtr(false)
result.ZoneConfigs = shared.DestinationDescription_ZoneConfigs_DEFAULT
result.KafkaCluster = common.StringPtr("")
result.KafkaTopics = make([]string, 0)

return result
}
Expand Down Expand Up @@ -621,7 +637,10 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
result.OwnerEmail,
result.ChecksumOption,
result.IsMultiZone,
&zoneConfigsData)
&zoneConfigsData,
result.KafkaCluster,
&result.KafkaTopics,
)
} else {
sql = sqlListDestinationsByUUID + ` WHERE ` + columnUUID + `=?`
query = s.session.Query(sql).Consistency(s.lowConsLevel)
Expand All @@ -641,6 +660,8 @@ func (s *CassandraMetadataService) ReadDestination(ctx thrift.Context, getReques
result.DLQConsumerGroupUUID, //
result.DLQPurgeBefore, // Only a UUID lookup can populate these values; this is OK since DLQ destinations can only be found by UUID anyway
result.DLQMergeBefore, //
result.KafkaCluster,
&result.KafkaTopics,
)
}
result.ZoneConfigs = unmarshalDstZoneConfigs(zoneConfigsData)
Expand Down Expand Up @@ -994,7 +1015,9 @@ func (s *CassandraMetadataService) ListDestinationsByUUID(ctx thrift.Context, li
&zoneConfigsData,
d.DLQConsumerGroupUUID,
d.DLQPurgeBefore,
d.DLQMergeBefore) {
d.DLQMergeBefore,
d.KafkaCluster,
&d.KafkaTopics) {
d.ZoneConfigs = unmarshalDstZoneConfigs(zoneConfigsData)

// Get a new item within limit
Expand Down Expand Up @@ -1062,7 +1085,9 @@ func (s *CassandraMetadataService) ListAllDestinations(ctx thrift.Context, listR
&zoneConfigsData,
d.DLQConsumerGroupUUID,
d.DLQPurgeBefore,
d.DLQMergeBefore) && count < listRequest.GetLimit() {
d.DLQMergeBefore,
d.KafkaCluster,
&d.KafkaTopics) && count < listRequest.GetLimit() {
d.ZoneConfigs = unmarshalDstZoneConfigs(zoneConfigsData)
*d.DLQPurgeBefore = int64(cqlTimestampToUnixNano(*d.DLQPurgeBefore))
*d.DLQMergeBefore = int64(cqlTimestampToUnixNano(*d.DLQMergeBefore))
Expand Down
32 changes: 32 additions & 0 deletions clients/metadata/metadata_cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ func createDestination(s *CassandraSuite, path string, dlqDestination bool) (*sh
}

var destinationOwnerEmail = "lhc@uber.com"
var testKafkaCluster = `kafkaCluster`
var testKafkaTopics = []string{`a`, `bb`}

func (s *CassandraSuite) TestDestinationCRUD() {
// Create
Expand All @@ -193,6 +195,8 @@ func (s *CassandraSuite) TestDestinationCRUD() {
ChecksumOption: common.InternalChecksumOptionPtr(0),
IsMultiZone: common.BoolPtr(true),
ZoneConfigs: []*shared.DestinationZoneConfig{zoneConfig},
KafkaCluster: common.StringPtr(testKafkaCluster),
KafkaTopics: testKafkaTopics,
}

if dlq {
Expand All @@ -214,6 +218,8 @@ func (s *CassandraSuite) TestDestinationCRUD() {
s.Equal(zoneConfig.GetAllowPublish(), destination.GetZoneConfigs()[0].GetAllowPublish())
s.Equal(zoneConfig.GetRemoteExtentReplicaNum(), destination.GetZoneConfigs()[0].GetRemoteExtentReplicaNum())
s.Equal(createDestination.GetDLQConsumerGroupUUID(), destination.GetDLQConsumerGroupUUID())
s.Equal(createDestination.KafkaCluster, destination.KafkaCluster)
s.assertStringUnorderedArrayEqual(createDestination.KafkaTopics, destination.KafkaTopics, `KafkaTopics should match`)

// Duplicated Create
_, err = s.client.CreateDestination(nil, createDestination)
Expand Down Expand Up @@ -253,6 +259,8 @@ func (s *CassandraSuite) TestDestinationCRUD() {
s.Equal(destination.GetDLQConsumerGroupUUID(), loadedDestination.GetDLQConsumerGroupUUID())
s.Equal(destination.GetDLQPurgeBefore(), loadedDestination.GetDLQPurgeBefore())
s.Equal(destination.GetDLQMergeBefore(), loadedDestination.GetDLQMergeBefore())
s.Equal(destination.KafkaCluster, loadedDestination.KafkaCluster)
s.assertStringUnorderedArrayEqual(destination.KafkaTopics, loadedDestination.KafkaTopics, `KafkaTopics should match`)

// By Path
getDestination = &shared.ReadDestinationRequest{
Expand All @@ -277,6 +285,8 @@ func (s *CassandraSuite) TestDestinationCRUD() {
s.Equal(``, loadedDestination.GetDLQConsumerGroupUUID()) //
s.Equal(int64(0), loadedDestination.GetDLQPurgeBefore()) // DLQ destinations are not visible as a 'by path'
s.Equal(int64(0), loadedDestination.GetDLQMergeBefore()) //
s.Equal(destination.KafkaCluster, loadedDestination.KafkaCluster)
s.assertStringUnorderedArrayEqual(destination.KafkaTopics, loadedDestination.KafkaTopics, `KafkaTopics should match`)

// Update
updateDestination := &shared.UpdateDestinationRequest{
Expand Down Expand Up @@ -318,6 +328,8 @@ func (s *CassandraSuite) TestDestinationCRUD() {
s.Equal(updatedDestination.GetDLQConsumerGroupUUID(), loadedDestination.GetDLQConsumerGroupUUID())
s.Equal(updatedDestination.GetDLQPurgeBefore(), loadedDestination.GetDLQPurgeBefore())
s.Equal(updatedDestination.GetDLQMergeBefore(), loadedDestination.GetDLQMergeBefore())
s.Equal(destination.KafkaCluster, loadedDestination.KafkaCluster)
s.assertStringUnorderedArrayEqual(destination.KafkaTopics, loadedDestination.KafkaTopics, `KafkaTopics should match`)

destination.ConsumedMessagesRetention = updateDestination.ConsumedMessagesRetention
destination.UnconsumedMessagesRetention = updateDestination.UnconsumedMessagesRetention
Expand Down Expand Up @@ -364,6 +376,8 @@ func (s *CassandraSuite) TestDestinationCRUD() {
s.Equal(updatedDestination.GetDLQConsumerGroupUUID(), loadedDestination.GetDLQConsumerGroupUUID())
s.Equal(updatedDestination.GetDLQPurgeBefore(), loadedDestination.GetDLQPurgeBefore())
s.Equal(updatedDestination.GetDLQMergeBefore(), loadedDestination.GetDLQMergeBefore())
s.Equal(updatedDestination.KafkaCluster, loadedDestination.KafkaCluster)
s.assertStringUnorderedArrayEqual(updatedDestination.KafkaTopics, loadedDestination.KafkaTopics, `KafkaTopics should match`)

// Alter the update for the 2nd pass
updateDestinationDLQCursors.DLQMergeBefore = common.Int64Ptr(now + int64(time.Hour))
Expand Down Expand Up @@ -1193,17 +1207,35 @@ func (s *CassandraSuite) TestMoveExtent() {
s.Equal(mReq.GetConsumerGroupVisibilityUUID(), extentStats2.GetExtentStats().GetConsumerGroupVisibility(), "%v", mReq.GetExtentUUID())
}

func (s *CassandraSuite) assertStringUnorderedArrayEqual(a, b []string, msgAndArgs ...interface{}) {
s.NotNil(a, msgAndArgs)
s.NotNil(b, msgAndArgs)
s.Equal(len(a), len(b), msgAndArgs)

for _, A := range a {
match := false
for _, B := range b {
if A == B {
match = true
}
}
s.True(match, msgAndArgs)
}
}

func (s *CassandraSuite) assertReplicaStatsArrayEqual(a, b []*shared.ExtentReplicaStats, msgAndArgs ...interface{}) {
s.NotNil(a, msgAndArgs)
s.NotNil(b, msgAndArgs)
s.Equal(len(a), len(b), msgAndArgs)

for _, A := range a {
match := false
inner:
for _, B := range b {
if A.GetStoreUUID() == B.GetStoreUUID() {
match = true
s.assertReplicaStatsEqual(A, B, msgAndArgs)
break inner
}
}
s.True(match, msgAndArgs)
Expand Down
6 changes: 5 additions & 1 deletion clients/metadata/schema/metadata.cql
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ CREATE TABLE destinations (
-- DLQ Destination metadata; N.B.: DLQ destinations don't exist in the destinations_by_path table --
dlq_purge_before timestamp, -- Indicates that retention should delete messages before this timestamp in this destination; consumer groups should not read before this
dlq_merge_before timestamp, -- Indicates that extent controller should merge messages/extents created before this timestamp to the consumer group. consumer groups should not read before this
dlq_consumer_group uuid -- If this is a DLQ destination, the consumer group uuid that corresponds to this DLQ destination
dlq_consumer_group uuid, -- If this is a DLQ destination, the consumer group uuid that corresponds to this DLQ destination
kafka_cluster text,
kafka_topics set<text>
);

CREATE INDEX ON destinations (is_multi_zone);
Expand All @@ -103,6 +105,8 @@ CREATE TABLE destinations_by_path (
path text,
is_multi_zone boolean,
destination frozen<destination>,
kafka_cluster text,
kafka_topics set<text>,
PRIMARY KEY (directory_uuid, path)
);

Expand Down
4 changes: 4 additions & 0 deletions clients/metadata/schema/v14/201703240000_add_kafka.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE destinations ADD kafka_cluster text;
ALTER TABLE destinations ADD kafka_topics set<text>;
ALTER TABLE destinations_by_path ADD kafka_cluster text;
ALTER TABLE destinations_by_path ADD kafka_topics set<text>;
8 changes: 8 additions & 0 deletions clients/metadata/schema/v14/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": 14,
"MinCompatibleVersion": 8,
"Description": "Add kafkaCluster and kafkaTopics",
"SchemaUpdateCqlFiles": [
"201703240000_add_kafka.cql"
]
}
Loading

0 comments on commit 0a4192e

Please sign in to comment.