Skip to content
This repository has been archived by the owner on Oct 18, 2019. It is now read-only.

Commit

Permalink
Merge a1b5f23 into d2f624b
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonRichardson committed Nov 6, 2017
2 parents d2f624b + a1b5f23 commit 4562c7c
Show file tree
Hide file tree
Showing 50 changed files with 4,727 additions and 161 deletions.
38 changes: 25 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ install:
go get github.com/mattn/goveralls
go get github.com/golang/mock/mockgen
go get github.com/prometheus/client_golang/prometheus
glide install
glide install --strip-vendor

.PHONY: build
build: dist/courier
Expand All @@ -26,36 +26,46 @@ dist/courier:

pkg/audit/mocks/log.go:
mockgen -package=mocks -destination=pkg/audit/mocks/log.go ${PATH_COURIER}/pkg/audit Log
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/audit/mocks/log.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/audit/mocks/log.go

pkg/metrics/mocks/metrics.go:
mockgen -package=mocks -destination=pkg/metrics/mocks/metrics.go ${PATH_COURIER}/pkg/metrics Gauge,HistogramVec,Counter
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/metrics.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/metrics.go

pkg/metrics/mocks/observer.go:
mockgen -package=mocks -destination=pkg/metrics/mocks/observer.go github.com/prometheus/client_golang/prometheus Observer
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/observer.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/observer.go

pkg/models/mocks/transaction.go:
mockgen -package=mocks -destination=pkg/models/mocks/transaction.go ${PATH_COURIER}/pkg/models Transaction
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/transaction.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/transaction.go

pkg/models/mocks/record.go:
mockgen -package=mocks -destination=pkg/models/mocks/record.go ${PATH_COURIER}/pkg/models Record
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/record.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/record.go

pkg/queue/mocks/queue.go:
mockgen -package=mocks -destination=pkg/queue/mocks/queue.go ${PATH_COURIER}/pkg/queue Queue
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/queue/mocks/queue.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/queue/mocks/queue.go

pkg/store/cluster/mocks/peer.go:
mockgen -package=mocks -destination=pkg/store/cluster/mocks/peer.go ${PATH_COURIER}/pkg/store/cluster Peer
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/store/cluster/mocks/peer.go

pkg/store/members/mocks/members.go:
mockgen -package=mocks -destination=pkg/store/members/mocks/members.go ${PATH_COURIER}/pkg/store/members Members,MemberList,Member
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/store/members/mocks/members.go

.PHONY: build-mocks
build-mocks: FORCE
$(MAKE) pkg/audit/mocks/log.go
$(MAKE) pkg/metrics/mocks/metrics.go
$(MAKE) pkg/metrics/mocks/observer.go
$(MAKE) pkg/models/mocks/record.go
$(MAKE) pkg/models/mocks/transaction.go
$(MAKE) pkg/queue/mocks/queue.go
@ $(MAKE) pkg/audit/mocks/log.go
@ $(MAKE) pkg/metrics/mocks/metrics.go
@ $(MAKE) pkg/metrics/mocks/observer.go
@ $(MAKE) pkg/models/mocks/record.go
@ $(MAKE) pkg/models/mocks/transaction.go
@ $(MAKE) pkg/queue/mocks/queue.go
@ $(MAKE) pkg/store/cluster/mocks/peer.go
@ $(MAKE) pkg/store/members/mocks/members.go

.PHONY: clean-mocks
clean-mocks: FORCE
Expand All @@ -65,6 +75,8 @@ clean-mocks: FORCE
rm -f pkg/models/mocks/record.go
rm -f pkg/models/mocks/transaction.go
rm -f pkg/queue/mocks/queue.go
rm -f pkg/store/cluster/mocks/peer.go
rm -f pkg/store/members/mocks/members.go

.PHONY: clean
clean: FORCE
Expand Down
152 changes: 130 additions & 22 deletions cmd/courier/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"github.com/SimonRichardson/gexec"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/trussle/courier/pkg/audit"
"github.com/trussle/courier/pkg/consumer"
h "github.com/trussle/courier/pkg/http"
"github.com/trussle/courier/pkg/queue"
"github.com/trussle/courier/pkg/status"
"github.com/trussle/courier/pkg/store"
"github.com/trussle/courier/pkg/store/cluster"
"github.com/trussle/courier/pkg/store/members"
"github.com/trussle/fsys"
)

Expand All @@ -29,6 +33,10 @@ const (
defaultAuditLogRootPath = "bin"
defaultFilesystem = "nop"

defaultStore = "nop"
defaultStoreSize = 1000
defaultReplicationFactor = 2

defaultAWSID = ""
defaultAWSSecret = ""
defaultAWSToken = ""
Expand All @@ -52,28 +60,29 @@ func runIngest(args []string) error {
debug = flags.Bool("debug", false, "debug logging")
apiAddr = flags.String("api", defaultAPIAddr, "listen address for ingest API")

awsID = flags.String("aws.id", defaultAWSID, "AWS configuration id")
awsSecret = flags.String("aws.secret", defaultAWSSecret, "AWS configuration secret")
awsToken = flags.String("aws.token", defaultAWSToken, "AWS configuration token")
awsRegion = flags.String("aws.region", defaultAWSRegion, "AWS configuration region")

awsSQSQueue = flags.String("aws.sqs.queue", defaultAWSSQSQueue, "AWS configuration queue")
awsFirehoseStream = flags.String("aws.firehose.stream", defaultAWSFirehoseStream, "AWS configuration stream")

queueType = flags.String("queue", defaultQueue, "type of queue to use (remote, virtual, nop)")
auditLogType = flags.String("auditlog", defaultAuditLog, "type of audit log to use (remote, local, nop)")
auditLogRootPath = flags.String("auditlog.path", defaultAuditLogRootPath, "audit log root directory for the filesystem to use")
filesystemType = flags.String("filesystem", defaultFilesystem, "type of filesystem backing (local, virtual, nop)")

recipientURL = flags.String("recipient.url", defaultRecipientURL, "URL to hit with the message payload")
numConsumers = flags.Int("num.consumers", defaultNumConsumers, "number of consumers to run at once")

maxNumberOfMessages = flags.Int("max.messages", defaultMaxNumberOfMessages, "max number of messages to dequeue at once")
visibilityTimeout = flags.String("visibility.timeout", defaultVisibilityTimeout, "how long the visibility of a message should extended by in seconds")

metricsRegistration = flags.Bool("metrics.registration", defaultMetricsRegistration, "Registration of metrics on launch")
awsID = flags.String("aws.id", defaultAWSID, "AWS configuration id")
awsSecret = flags.String("aws.secret", defaultAWSSecret, "AWS configuration secret")
awsToken = flags.String("aws.token", defaultAWSToken, "AWS configuration token")
awsRegion = flags.String("aws.region", defaultAWSRegion, "AWS configuration region")
awsSQSQueue = flags.String("aws.sqs.queue", defaultAWSSQSQueue, "AWS configuration queue")
awsFirehoseStream = flags.String("aws.firehose.stream", defaultAWSFirehoseStream, "AWS configuration stream")
queueType = flags.String("queue", defaultQueue, "type of queue to use (remote, virtual, nop)")
auditLogType = flags.String("auditlog", defaultAuditLog, "type of audit log to use (remote, local, nop)")
auditLogRootPath = flags.String("auditlog.path", defaultAuditLogRootPath, "audit log root directory for the filesystem to use")
filesystemType = flags.String("filesystem", defaultFilesystem, "type of filesystem backing (local, virtual, nop)")
storeType = flags.String("store", defaultStore, "type of temporary store to use (remote, virtual, nop)")
storeSize = flags.Int("store.size", defaultStoreSize, "number items the store should hold")
storeReplicationFactor = flags.Int("store.replication.factor", defaultReplicationFactor, "replication factor for remote configuration")
recipientURL = flags.String("recipient.url", defaultRecipientURL, "URL to hit with the message payload")
numConsumers = flags.Int("num.consumers", defaultNumConsumers, "number of consumers to run at once")
maxNumberOfMessages = flags.Int("max.messages", defaultMaxNumberOfMessages, "max number of messages to dequeue at once")
visibilityTimeout = flags.String("visibility.timeout", defaultVisibilityTimeout, "how long the visibility of a message should extended by in seconds")
metricsRegistration = flags.Bool("metrics.registration", defaultMetricsRegistration, "Registration of metrics on launch")
clusterBindAddr = flags.String("cluster", defaultClusterAddr, "listen address for cluster")
clusterAdvertiseAddr = flags.String("cluster.advertise-addr", "", "optional, explicit address to advertise in cluster")
clusterPeers = stringslice{}
)

flags.Var(&clusterPeers, "peer", "cluster peer host:port (repeatable)")
flags.Usage = usageFor(flags, "ingest [flags]")
if err := flags.Parse(args); err != nil {
return nil
Expand Down Expand Up @@ -225,6 +234,20 @@ func runIngest(args []string) error {
return errors.Wrap(err, "queue config")
}

// Configuration for the store
cache, err := configureStore(
logger,
*storeType,
*storeSize,
*storeReplicationFactor,
*clusterBindAddr,
*clusterAdvertiseAddr,
clusterPeers.Slice(),
)
if err != nil {
return errors.Wrap(err, "store")
}

// Execution group.
g := gexec.NewGroup()
gexec.Block(g)
Expand Down Expand Up @@ -264,6 +287,7 @@ func runIngest(args []string) error {
h.NewClient(timeoutClient, *recipientURL),
consumerQueue,
consumerLog,
cache,
consumedSegments,
consumedRecords,
replicatedSegments,
Expand All @@ -283,9 +307,15 @@ func runIngest(args []string) error {
{
g.Add(func() error {
mux := http.NewServeMux()
mux.Handle("/store/", http.StripPrefix("/store", store.NewAPI(
cache,
log.With(logger, "component", "store_api"),
connectedClients.WithLabelValues("store"),
apiDuration,
)))
mux.Handle("/status/", http.StripPrefix("/status", status.NewAPI(
log.With(logger, "component", "status_api"),
connectedClients.WithLabelValues("ingest"),
connectedClients.WithLabelValues("status"),
apiDuration,
)))

Expand All @@ -300,3 +330,81 @@ func runIngest(args []string) error {
gexec.Interrupt(g)
return g.Run()
}

type membersLogOutput struct {
logger log.Logger
}

func (m membersLogOutput) Write(b []byte) (int, error) {
level.Debug(m.logger).Log("fwd_msg", string(b))
return len(b), nil
}

func configureStore(logger log.Logger,
storeType string,
size, replicationFactor int,
bindAddr, advertiseAddr string,
peers []string,
) (store.Store, error) {

clusterBindHost, clusterBindPort, err := parseClusterAddr(bindAddr, defaultClusterPort)
if err != nil {
return nil, err
}
level.Info(logger).Log("cluster_bind", fmt.Sprintf("%s:%d", clusterBindHost, clusterBindPort))

var (
clusterAdvertiseHost string
clusterAdvertisePort int
)
if advertiseAddr != "" {
clusterAdvertiseHost, clusterAdvertisePort, err = parseClusterAddr(advertiseAddr, defaultClusterPort)
if err != nil {
return nil, err
}
level.Info(logger).Log("cluster_advertise", fmt.Sprintf("%s:%d", clusterAdvertiseHost, clusterAdvertisePort))
}

// Safety warning.
if addr, err := cluster.CalculateAdvertiseAddress(clusterBindHost, clusterAdvertiseHost); err != nil {
level.Warn(logger).Log("err", "couldn't deduce an advertise address: "+err.Error())
} else if hasNonlocal(peers) && isUnroutable(addr.String()) {
level.Warn(logger).Log("err", "this node advertises itself on an unroutable address", "addr", addr.String())
level.Warn(logger).Log("err", "this node will be unreachable in the cluster")
level.Warn(logger).Log("err", "provide -cluster.advertise-addr as a routable IP address or hostname")
}

storeMembersConfig, err := members.Build(
members.WithPeerType(cluster.PeerTypeStore),
members.WithNodeName(uuid.New()),
members.WithBindAddrPort(clusterBindHost, clusterAdvertisePort),
members.WithAdvertiseAddrPort(clusterAdvertiseHost, clusterAdvertisePort),
members.WithLogOutput(membersLogOutput{
logger: log.With(logger, "component", "cluster"),
}),
)
if err != nil {
return nil, errors.Wrap(err, "members remote config")
}

storeMembers, err := members.NewRealMembers(storeMembersConfig, log.With(logger, "component", "members"))
if err != nil {
return nil, errors.Wrap(err, "members remote")
}

storeRemoteConfig, err := store.BuildConfig(
store.WithReplicationFactor(replicationFactor),
store.WithPeer(cluster.NewPeer(storeMembers, log.With(logger, "component", "peer"))),
)
if err != nil {
return nil, errors.Wrap(err, "store remote config")
}

storeConfig, err := store.Build(
store.With(storeType),
store.WithSize(size),
store.WithRemoteConfig(storeRemoteConfig),
)

return store.New(storeConfig, log.With(logger, "component", "store"))
}
8 changes: 5 additions & 3 deletions cmd/courier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
var version = "dev"

const (
defaultAPIPort = 8080
defaultAddr = "0.0.0.0:0"
defaultAPIPort = 8080
defaultClusterPort = 8079
defaultAddr = "0.0.0.0:0"
)

var (
defaultAPIAddr = fmt.Sprintf("tcp://0.0.0.0:%d", defaultAPIPort)
defaultAPIAddr = fmt.Sprintf("tcp://0.0.0.0:%d", defaultAPIPort)
defaultClusterAddr = fmt.Sprintf("tcp://0.0.0.0:%d", defaultClusterPort)
)

type command func([]string) error
Expand Down
Loading

0 comments on commit 4562c7c

Please sign in to comment.