Skip to content
This repository has been archived by the owner on Oct 18, 2019. It is now read-only.

Commit

Permalink
Merge fed602f into d2f624b
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonRichardson committed Nov 3, 2017
2 parents d2f624b + fed602f commit f1619e9
Show file tree
Hide file tree
Showing 49 changed files with 4,479 additions and 132 deletions.
38 changes: 25 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ install:
go get github.com/mattn/goveralls
go get github.com/golang/mock/mockgen
go get github.com/prometheus/client_golang/prometheus
glide install
glide install --strip-vendor

.PHONY: build
build: dist/courier
Expand All @@ -26,36 +26,46 @@ dist/courier:

pkg/audit/mocks/log.go:
mockgen -package=mocks -destination=pkg/audit/mocks/log.go ${PATH_COURIER}/pkg/audit Log
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/audit/mocks/log.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/audit/mocks/log.go

pkg/metrics/mocks/metrics.go:
mockgen -package=mocks -destination=pkg/metrics/mocks/metrics.go ${PATH_COURIER}/pkg/metrics Gauge,HistogramVec,Counter
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/metrics.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/metrics.go

pkg/metrics/mocks/observer.go:
mockgen -package=mocks -destination=pkg/metrics/mocks/observer.go github.com/prometheus/client_golang/prometheus Observer
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/observer.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/metrics/mocks/observer.go

pkg/models/mocks/transaction.go:
mockgen -package=mocks -destination=pkg/models/mocks/transaction.go ${PATH_COURIER}/pkg/models Transaction
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/transaction.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/transaction.go

pkg/models/mocks/record.go:
mockgen -package=mocks -destination=pkg/models/mocks/record.go ${PATH_COURIER}/pkg/models Record
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/record.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/models/mocks/record.go

pkg/queue/mocks/queue.go:
mockgen -package=mocks -destination=pkg/queue/mocks/queue.go ${PATH_COURIER}/pkg/queue Queue
$(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/queue/mocks/queue.go
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/queue/mocks/queue.go

pkg/store/cluster/mocks/peer.go:
mockgen -package=mocks -destination=pkg/store/cluster/mocks/peer.go ${PATH_COURIER}/pkg/store/cluster Peer
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/store/cluster/mocks/peer.go

pkg/store/members/mocks/members.go:
mockgen -package=mocks -destination=pkg/store/members/mocks/members.go ${PATH_COURIER}/pkg/store/members Members,MemberList,Member
@ $(SED) 's/github.com\/trussle\/courier\/vendor\///g' ./pkg/store/members/mocks/members.go

.PHONY: build-mocks
build-mocks: FORCE
$(MAKE) pkg/audit/mocks/log.go
$(MAKE) pkg/metrics/mocks/metrics.go
$(MAKE) pkg/metrics/mocks/observer.go
$(MAKE) pkg/models/mocks/record.go
$(MAKE) pkg/models/mocks/transaction.go
$(MAKE) pkg/queue/mocks/queue.go
@ $(MAKE) pkg/audit/mocks/log.go
@ $(MAKE) pkg/metrics/mocks/metrics.go
@ $(MAKE) pkg/metrics/mocks/observer.go
@ $(MAKE) pkg/models/mocks/record.go
@ $(MAKE) pkg/models/mocks/transaction.go
@ $(MAKE) pkg/queue/mocks/queue.go
@ $(MAKE) pkg/store/cluster/mocks/peer.go
@ $(MAKE) pkg/store/members/mocks/members.go

.PHONY: clean-mocks
clean-mocks: FORCE
Expand All @@ -65,6 +75,8 @@ clean-mocks: FORCE
rm -f pkg/models/mocks/record.go
rm -f pkg/models/mocks/transaction.go
rm -f pkg/queue/mocks/queue.go
rm -f pkg/store/cluster/mocks/peer.go
rm -f pkg/store/members/mocks/members.go

.PHONY: clean
clean: FORCE
Expand Down
66 changes: 65 additions & 1 deletion cmd/courier/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"github.com/SimonRichardson/gexec"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/trussle/courier/pkg/audit"
"github.com/trussle/courier/pkg/consumer"
h "github.com/trussle/courier/pkg/http"
"github.com/trussle/courier/pkg/queue"
"github.com/trussle/courier/pkg/status"
"github.com/trussle/courier/pkg/store"
"github.com/trussle/courier/pkg/store/cluster"
"github.com/trussle/courier/pkg/store/members"
"github.com/trussle/fsys"
)

Expand All @@ -29,6 +33,10 @@ const (
defaultAuditLogRootPath = "bin"
defaultFilesystem = "nop"

defaultStore = "nop"
defaultStoreSize = 1000
defaultReplicationFactor = 2

defaultAWSID = ""
defaultAWSSecret = ""
defaultAWSToken = ""
Expand Down Expand Up @@ -65,6 +73,10 @@ func runIngest(args []string) error {
auditLogRootPath = flags.String("auditlog.path", defaultAuditLogRootPath, "audit log root directory for the filesystem to use")
filesystemType = flags.String("filesystem", defaultFilesystem, "type of filesystem backing (local, virtual, nop)")

storeType = flags.String("store", defaultStore, "type of temporary store to use (remote, virtual, nop)")
storeSize = flags.Int("store.size", defaultStoreSize, "number items the store should hold")
storeReplicationFactor = flags.Int("store.replication.factor", defaultReplicationFactor, "replication factor for remote configuration")

recipientURL = flags.String("recipient.url", defaultRecipientURL, "URL to hit with the message payload")
numConsumers = flags.Int("num.consumers", defaultNumConsumers, "number of consumers to run at once")

Expand Down Expand Up @@ -225,6 +237,42 @@ func runIngest(args []string) error {
return errors.Wrap(err, "queue config")
}

// Configuration for the store
storeMembersConfig, err := members.Build(
members.WithPeerType(cluster.PeerTypeStore),
members.WithNodeName(uuid.New()),
members.WithLogOutput(membersLogOutput{
logger: log.With(logger, "component", "cluster"),
}),
)
if err != nil {
return errors.Wrap(err, "members remote config")
}

storeMembers, err := members.NewRealMembers(storeMembersConfig, log.With(logger, "component", "members"))
if err != nil {
return errors.Wrap(err, "members remote")
}

storeRemoteConfig, err := store.BuildConfig(
store.WithReplicationFactor(*storeReplicationFactor),
store.WithPeer(cluster.NewPeer(storeMembers, log.With(logger, "component", "peer"))),
)
if err != nil {
return errors.Wrap(err, "store remote config")
}

storeConfig, err := store.Build(
store.With(*storeType),
store.WithSize(*storeSize),
store.WithRemoteConfig(storeRemoteConfig),
)

cache, err := store.New(storeConfig, log.With(logger, "component", "store"))
if err != nil {
return errors.Wrap(err, "store")
}

// Execution group.
g := gexec.NewGroup()
gexec.Block(g)
Expand Down Expand Up @@ -264,6 +312,7 @@ func runIngest(args []string) error {
h.NewClient(timeoutClient, *recipientURL),
consumerQueue,
consumerLog,
cache,
consumedSegments,
consumedRecords,
replicatedSegments,
Expand All @@ -283,9 +332,15 @@ func runIngest(args []string) error {
{
g.Add(func() error {
mux := http.NewServeMux()
mux.Handle("/store/", http.StripPrefix("/store", store.NewAPI(
cache,
log.With(logger, "component", "store_api"),
connectedClients.WithLabelValues("store"),
apiDuration,
)))
mux.Handle("/status/", http.StripPrefix("/status", status.NewAPI(
log.With(logger, "component", "status_api"),
connectedClients.WithLabelValues("ingest"),
connectedClients.WithLabelValues("status"),
apiDuration,
)))

Expand All @@ -300,3 +355,12 @@ func runIngest(args []string) error {
gexec.Interrupt(g)
return g.Run()
}

type membersLogOutput struct {
logger log.Logger
}

func (m membersLogOutput) Write(b []byte) (int, error) {
level.Debug(m.logger).Log("fwd_msg", string(b))
return len(b), nil
}
18 changes: 0 additions & 18 deletions cmd/courier/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,13 @@ import (
"net/http"
"net/http/pprof"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func interrupt(cancel <-chan struct{}) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-c:
return fmt.Errorf("received signal %s", sig)
case <-cancel:
return errors.New("canceled")
}
}

// "udp://host:1234", 80 => udp host:1234 host 1234
// "host:1234", 80 => tcp host:1234 host 1234
// "host", 80 => tcp host:80 host 80
Expand Down Expand Up @@ -79,7 +65,3 @@ func registerProfile(mux *http.ServeMux) {
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
}

func envName(name string) string {
return strings.Replace(strings.ToUpper(name), ".", "_", -1)
}
18 changes: 0 additions & 18 deletions cmd/courier/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,3 @@ func TestParseAddr(t *testing.T) {
}
}
}

func TestEnvName(t *testing.T) {
for _, testcase := range []struct {
value string
want string
}{
{"name", "NAME"},
{"name.subname", "NAME_SUBNAME"},
{"name..SubName", "NAME__SUBNAME"},
{".NAmE.", "_NAME_"},
} {
t.Run(testcase.value, func(t *testing.T) {
if expected, actual := testcase.want, envName(testcase.value); expected != actual {
t.Errorf("expected: %s, actual: %s", expected, actual)
}
})
}
}
62 changes: 42 additions & 20 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import:
- package: github.com/SimonRichardson/betwixt
- package: github.com/SimonRichardson/resilience
- package: github.com/SimonRichardson/flagset
- package: github.com/hashicorp/go-sockaddr
- package: github.com/hashicorp/memberlist
- package: github.com/hashicorp/serf
- package: github.com/gorilla/mux
- package: github.com/trussle/fsys
- package: github.com/trussle/harness
- package: github.com/aws/aws-sdk-go
subpackages:
- aws
Expand Down
Loading

0 comments on commit f1619e9

Please sign in to comment.