Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Initial kafka integration on outputhost
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Bailey committed Apr 9, 2017
1 parent 1238704 commit ca37572
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 30 deletions.
5 changes: 5 additions & 0 deletions common/metadata.go
Expand Up @@ -36,3 +36,8 @@ func IsDLQDestination(dstDesc *shared.DestinationDescription) bool {
func IsDLQDestinationPath(path string) bool {
return len(path) > 4 && strings.HasSuffix(path, ".dlq")
}

// IsKafkaConsumerGroupExtent determines if a consumer group extent is a Kafka consumption assignment
func IsKafkaConsumerGroupExtent(e *shared.ConsumerGroupExtent) bool {
return len(e.GetStoreUUIDs()) == 0 // TODO: Replace this heuristic when CGE metadata has been updated
}
21 changes: 21 additions & 0 deletions common/stdLoggerFromBark.go
@@ -0,0 +1,21 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common
56 changes: 45 additions & 11 deletions services/outputhost/cgcache.go
Expand Up @@ -39,6 +39,10 @@ import (
"github.com/uber/cherami-thrift/.generated/go/shared"
)

const (
kafkaConnectedStoreUUID = `cafca000-0000-0caf-ca00-0000000cafca` // Placeholder connected store for logs
)

type (
// cacheMsg is the message written to the cg cache
// it has the actual message and a connection ID
Expand Down Expand Up @@ -189,6 +193,12 @@ type (

// cfgMgr is the reference to the cassandra backed cfgMgr
cfgMgr dconfig.ConfigManager

// kafkaCluster is the Kafka cluster for this consumer group, if applicable
kafkaCluster string

// kafkaTopics is the list of kafka topics consumed by this consumer group, if applicable
kafkaTopics []string
}
)

Expand Down Expand Up @@ -293,6 +303,7 @@ func (cgCache *consumerGroupCache) getConsumerGroupTags() map[string]string {

// loadExtentCache loads the extent cache, if it doesn't already exist for this consumer group
func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType shared.DestinationType, cge *shared.ConsumerGroupExtent) {
var committer Committer
extUUID := cge.GetExtentUUID()
if extCache, exists := cgCache.extentCache[extUUID]; !exists {
extCache = &extentCache{
Expand Down Expand Up @@ -336,15 +347,24 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
extCache.initialCredits = cgCache.getMessageCacheSize(cfg, defaultNumOutstandingMsgs)
}

committer := NewCheramiCommitter(
cgCache.metaClient,
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.extUUID,
&extCache.connectedStoreUUID,
cgCache.cachedCGDesc.GetIsMultiZone(),
cgCache.tClients,
)
if common.IsKafkaConsumerGroupExtent(cge) {
committer = NewKafkaCommitter(
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.logger,
&extCache.kafkaClient,
)
} else {
committer = NewCheramiCommitter(
cgCache.metaClient,
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.extUUID,
&extCache.connectedStoreUUID,
cgCache.cachedCGDesc.GetIsMultiZone(),
cgCache.tClients,
)
}

extCache.ackMgr = newAckManager(
cgCache,
Expand All @@ -364,8 +384,17 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
extCache.shutdownWG.Add(1)

// if load fails we will unload it the usual way
go extCache.load(cgCache.outputHostUUID, cgCache.cachedCGDesc.GetConsumerGroupUUID(), cgCache.metaClient, cge)

go extCache.load(
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
cgCache.cachedCGDesc.GetConsumerGroupName(),
cgCache.kafkaCluster,
cgCache.kafkaTopics,
common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()),
cgCache.metaClient,
cge,
)

// now notify the outputhost
cgCache.ackMgrLoadCh <- ackMgrLoadMsg{uint32(extCache.ackMgr.ackMgrID), extCache.ackMgr}
}
Expand Down Expand Up @@ -519,6 +548,11 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error {
return ErrCgUnloaded
}

if dstDesc.GetType() == shared.DestinationType_KAFKA {
cgCache.kafkaCluster = dstDesc.GetKafkaCluster()
cgCache.kafkaTopics = dstDesc.GetKafkaTopics()
}

readReq := &shared.ReadConsumerGroupRequest{
DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()),
ConsumerGroupName: common.StringPtr(cgCache.cachedCGDesc.GetConsumerGroupName()),
Expand Down
155 changes: 152 additions & 3 deletions services/outputhost/extcache.go
Expand Up @@ -21,11 +21,13 @@
package outputhost

import (
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync"
"time"

Expand All @@ -34,6 +36,8 @@ import (
"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
Expand Down Expand Up @@ -137,13 +141,27 @@ type extentCache struct {

// consumerM3Client for metrics per consumer group
consumerM3Client metrics.Client

// kafkaClient is the client for the kafka connection, if any
kafkaClient *sc.Consumer
}

var kafkaLogSetup sync.Once

// extentLoadReportingInterval is the freq which load
// metrics are reported to the controller
const extentLoadReportingInterval = 2 * time.Second

func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClient metadata.TChanMetadataService, cge *shared.ConsumerGroupExtent) (err error) {
func (extCache *extentCache) load(
outputHostUUID,
cgUUID,
cgName,
kafkaCluster string,
kafkaTopics []string,
startFrom common.UnixNanoTime,
metaClient metadata.TChanMetadataService,
cge *shared.ConsumerGroupExtent,
) (err error) {
// it is ok to take the local lock for this extent which will not affect
// others
extCache.cacheMutex.Lock()
Expand All @@ -153,8 +171,14 @@ func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClie
extCache.ackMgr.start()

// now try to load the replica streams
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))

if common.IsKafkaConsumerGroupExtent(cge) {
extCache.connectedStoreUUID = kafkaConnectedStoreUUID
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics)
} else {
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))
}
if err != nil {
// Exhausted all replica streams.. giving up
extCache.logger.Error(`unable to load replica stream for extent`)
Expand Down Expand Up @@ -306,6 +330,68 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
return
}

func (extCache *extentCache) loadKafkaStream(
cgName string,
outputHostUUID string,
startFrom common.UnixNanoTime,
kafkaCluster string,
kafkaTopics []string,
) (repl *replicaConnection, err error) {
groupID := getKafkaGroupIDForCheramiConsumerGroupName(cgName)

// Configure sarama-cluster
cfg := sc.NewConfig()

// Metadata for the Kafka group join
meta := KafkaGroupMetadata{
Version: kafkaGroupMetadataVersion,
OutputHostUUID: outputHostUUID,
}
cfg.Group.Member.UserData, _ = json.Marshal(meta)

// Get the notifications channel; we will just log it
cfg.Group.Return.Notifications = true

// Older startFroms (e.g. 0, >1 weeks back) are considered to want the oldest offset
// TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request
// to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster
// starts from the right place
if common.Now()-startFrom > common.UnixNanoTime(time.Hour*24*7) {
cfg.Config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

// This is an ID that may appear in Kafka logs or metadata
cfg.Config.ClientID = `cherami_` + groupID

// TODO: Sarama metrics registry

// Build the Kafka client. Note that we would ideally like to have a factory for this, but the client
// has consumer-group-specific changes to its configuration
extCache.kafkaClient, err = sc.NewConsumer(
getKafkaBrokersForCluster(kafkaCluster),
groupID,
kafkaTopics,
cfg,
)
if err != nil {
extCache.logger.WithField(common.TagErr, err).Error(`couldn't make Kafka client`)
return nil, err
}

// Setup the notification logger
go kafkaNotificationsLogger(extCache.kafkaClient.Notifications(), extCache.logger)

// Create the kafkaStream
call := OpenKafkaStream(extCache.kafkaClient.Messages(), extCache.logger)

// Setup the replicaConnection
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster)
repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0)
extCache.shutdownWG.Add(1)
repl.open()
return
}

// stop the extentCache stops the ackMgr and notifies the cgCache that this extent is done
// Notification to the CG happens only when extent is closed after it is consumed.
// If it is being unloaded by the CG, then no need to notify again
Expand Down Expand Up @@ -417,6 +503,11 @@ func (extCache *extentCache) Report(reporter common.LoadReporter) {
func (extCache *extentCache) unload() {
extCache.cacheMutex.Lock()
close(extCache.closeChannel)
if extCache.kafkaClient != nil {
if err := extCache.kafkaClient.Close(); err != nil {
extCache.logger.WithField(common.TagErr, err).Error(`error closing Kafka client`)
}
}
extCache.cacheMutex.Unlock()
}

Expand All @@ -431,3 +522,61 @@ func (extCache *extentCache) getState() *admin.OutputCgExtent {

return cge
}

// KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for
// Kafka groups joined by Cherami
type KafkaGroupMetadata struct {
// Version is the version of this structure
Version uint

// CGUUID is the internal Cherami consumer group UUID that committed this offset
CGUUID string

// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
OutputHostUUID string
}

const kafkaGroupMetadataVersion = uint(0) // Current version of the KafkaGroupMetadata

func kafkaNotificationsLogger(ch <-chan *sc.Notification, log bark.Logger) {
notificationNum := 0
notificationsLoop:
for {
select {
case n, ok := <-ch:
if !ok {
break notificationsLoop
}
if n == nil {
log.Warn(`nil notification received`)
continue notificationsLoop
}
notificationNum++
log = log.WithField(`notificationNum`, notificationNum)
if len(n.Claimed) > 0 {
log.WithField(`claimed`, n.Claimed).Info(`claimed partitions`)
}
if len(n.Released) > 0 {
log.WithField(`released`, n.Released).Info(`released partitions`)
}
if len(n.Current) > 0 {
log.WithField(`current`, n.Released).Info(`current partitions`)
}
}
}
log.Info(`Notifications channel closed`)
}

func getKafkaGroupIDForCheramiConsumerGroupName(cgName string) string {
s := strings.Split(cgName, `/`)
return s[len(s)-1]
}

func getKafkaBrokersForCluster(cluster string) []string {
// TODO: replace with clusters.yaml config
s := strings.Split(os.Getenv(`CHERAMI_KAFKA_BROKERS`), `,`)
if s[0] == `` {
panic(`CHERAMI_KAFKA_BROKERS must be set`)
}
return s
}

0 comments on commit ca37572

Please sign in to comment.