Skip to content

Commit

Permalink
Merge pull request #6 from runreveal/alan/rename
Browse files Browse the repository at this point in the history
Hello, 🐆!
  • Loading branch information
abraithwaite committed Jul 27, 2023
2 parents 61ce08e + a2f99b0 commit 8a93cbb
Show file tree
Hide file tree
Showing 21 changed files with 137 additions and 137 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: flowd
name: cheetahd

on:
push:
Expand Down
14 changes: 7 additions & 7 deletions .goreleaser.public.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ before:
# you may remove this if you don't need go generate
# - go generate ./...
builds:
- main: ./cmd/flowd
id: flowd
binary: flowd
- main: ./cmd/cheetahd
id: cheetahd
binary: cheetahd
goos:
- linux
- darwin
- windows

archives:
- format: tar.gz
id: flowd
id: cheetahd
# this name template makes the OS and Arch compatible with the results of uname.
name_template: >-
flowd-
cheetahd-
{{- .Os }}-
{{- if eq .Arch "386" }}i386
{{- else }}{{ .Arch }}{{ end }}
{{- if .Arm }}v{{ .Arm }}{{ end }}
# use zip for windows archives
builds: [flowd]
builds: [cheetahd]
format_overrides:
- goos: windows
format: zip
Expand All @@ -44,7 +44,7 @@ release:
# Default is extracted from the origin remote URL or empty if its private hosted.
github:
owner: runreveal
name: flow
name: cheetah

checksum:
name_template: 'checksums.txt'
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# flow
# cheetah

flow is an opinionated framework for scalable, reliable stream processing.
cheetah is an opinionated framework for scalable, reliable stream processing.

# flowd
# cheetahd

flowd is a daemon for collecting system logs and metrics which makes is powered
by flow.
cheetahd is a daemon for collecting system logs and metrics which makes is powered
by cheetah.

# Installation

Expand All @@ -18,7 +18,7 @@ working set of APIs, interfaces and data models.

# TODO

- Ensure that consumers of flow aren't subject to all the dependencies of flowd.
- Ensure that consumers of cheetah aren't subject to all the dependencies of cheetahd.
- Consider breaking apart the library from the daemon.

# Source Wishlist
Expand Down
34 changes: 17 additions & 17 deletions cmd/flowd/config.go → cmd/cheetahd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package main
import (
"os"

"github.com/runreveal/flow"
"github.com/runreveal/flow/internal/destinations"
"github.com/runreveal/flow/internal/destinations/runreveal"
"github.com/runreveal/flow/internal/sources"
"github.com/runreveal/flow/internal/sources/journald"
"github.com/runreveal/flow/internal/sources/syslog"
"github.com/runreveal/flow/internal/types"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/destinations"
"github.com/runreveal/chta/internal/destinations/runreveal"
"github.com/runreveal/chta/internal/sources"
"github.com/runreveal/chta/internal/sources/journald"
"github.com/runreveal/chta/internal/sources/syslog"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/lib/loader"
"golang.org/x/exp/slog"
// We could register and configure these in a separate package
Expand All @@ -19,28 +19,28 @@ import (
)

func init() {
loader.Register("scanner", func() loader.Builder[flow.Source[types.Event]] {
loader.Register("scanner", func() loader.Builder[chta.Source[types.Event]] {
return &ScannerConfig{}
})
loader.Register("syslog", func() loader.Builder[flow.Source[types.Event]] {
loader.Register("syslog", func() loader.Builder[chta.Source[types.Event]] {
return &SyslogConfig{}
})
loader.Register("journald", func() loader.Builder[flow.Source[types.Event]] {
loader.Register("journald", func() loader.Builder[chta.Source[types.Event]] {
return &JournaldConfig{}
})

loader.Register("printer", func() loader.Builder[flow.Destination[types.Event]] {
loader.Register("printer", func() loader.Builder[chta.Destination[types.Event]] {
return &PrinterConfig{}
})
loader.Register("runreveal", func() loader.Builder[flow.Destination[types.Event]] {
loader.Register("runreveal", func() loader.Builder[chta.Destination[types.Event]] {
return &RunRevealConfig{}
})
}

type ScannerConfig struct {
}

func (c *ScannerConfig) Configure() (flow.Source[types.Event], error) {
func (c *ScannerConfig) Configure() (chta.Source[types.Event], error) {
slog.Info("configuring scanner")
return sources.NewScanner(os.Stdin), nil
}
Expand All @@ -49,7 +49,7 @@ type SyslogConfig struct {
Addr string `json:"addr"`
}

func (c *SyslogConfig) Configure() (flow.Source[types.Event], error) {
func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) {
slog.Info("configuring syslog")
return syslog.NewSyslogSource(syslog.SyslogCfg{
Addr: c.Addr,
Expand All @@ -59,7 +59,7 @@ func (c *SyslogConfig) Configure() (flow.Source[types.Event], error) {
type PrinterConfig struct {
}

func (c *PrinterConfig) Configure() (flow.Destination[types.Event], error) {
func (c *PrinterConfig) Configure() (chta.Destination[types.Event], error) {
slog.Info("configuring printer")
return destinations.NewPrinter(os.Stdout), nil
}
Expand All @@ -68,7 +68,7 @@ type RunRevealConfig struct {
WebhookURL string `json:"webhookURL"`
}

func (c *RunRevealConfig) Configure() (flow.Destination[types.Event], error) {
func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) {
slog.Info("configuring runreveal")
return runreveal.New(
runreveal.WithWebhookURL(c.WebhookURL),
Expand All @@ -78,7 +78,7 @@ func (c *RunRevealConfig) Configure() (flow.Destination[types.Event], error) {
type JournaldConfig struct {
}

func (c *JournaldConfig) Configure() (flow.Source[types.Event], error) {
func (c *JournaldConfig) Configure() (chta.Source[types.Event], error) {
slog.Info("configuring journald")
return journald.New(), nil
}
24 changes: 12 additions & 12 deletions cmd/flowd/main.go → cmd/cheetahd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"path"
"path/filepath"

"github.com/runreveal/flow"
"github.com/runreveal/flow/internal/queue"
"github.com/runreveal/flow/internal/types"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/queue"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/lib/loader"
"github.com/spf13/cobra"
"golang.org/x/exp/slog"
Expand All @@ -29,7 +29,7 @@ func init() {
return a
}
level := slog.LevelInfo
if _, ok := os.LookupEnv("FLOW_DEBUG"); ok {
if _, ok := os.LookupEnv("CHEETAH_DEBUG"); ok {
level = slog.LevelDebug
}

Expand All @@ -49,8 +49,8 @@ func init() {
func main() {
slog.Info(fmt.Sprintf("starting %s", path.Base(os.Args[0])), "version", version)
rootCmd := NewRootCommand()
flowdCmd := NewRunCommand()
rootCmd.AddCommand(flowdCmd)
cheetahdCmd := NewRunCommand()
rootCmd.AddCommand(cheetahdCmd)

if err := rootCmd.Execute(); err != nil {
slog.Error(fmt.Sprintf("%+v", err))
Expand All @@ -62,8 +62,8 @@ func main() {
func NewRootCommand() *cobra.Command {
rootCmd := &cobra.Command{
Use: path.Base(os.Args[0]),
Short: `flowd is an all-in-one event ingestion daemon`,
Long: `flowd is an all-in-one event ingestion daemon.
Short: `cheetahd is an all-in-one event ingestion daemon`,
Long: `cheetahd is an all-in-one event ingestion daemon.
It is designed to be a single binary that can be deployed to a server and
configured to receive events from a variety of sources and send them to a
variety of destinations.`,
Expand All @@ -75,8 +75,8 @@ variety of destinations.`,
}

type Config struct {
Sources []loader.Loader[flow.Source[types.Event]] `json:"sources"`
Destinations []loader.Loader[flow.Destination[types.Event]] `json:"destinations"`
Sources []loader.Loader[chta.Source[types.Event]] `json:"sources"`
Destinations []loader.Loader[chta.Destination[types.Event]] `json:"destinations"`

PProfAddr string `json:"pprof"`
}
Expand All @@ -103,7 +103,7 @@ func NewRunCommand() *cobra.Command {
slog.Info(fmt.Sprintf("config: %+v", config))

ctx := context.Background()
srcs := []flow.Source[types.Event]{}
srcs := []chta.Source[types.Event]{}
for _, v := range config.Sources {
src, err := v.Configure()
if err != nil {
Expand All @@ -112,7 +112,7 @@ func NewRunCommand() *cobra.Command {
srcs = append(srcs, src)
}

dsts := []flow.Destination[types.Event]{}
dsts := []chta.Destination[types.Event]{}
for _, v := range config.Destinations {
dst, err := v.Configure()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/runreveal/flow
module github.com/runreveal/chta

go 1.18

Expand Down
6 changes: 3 additions & 3 deletions internal/destinations/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"io"

"github.com/runreveal/flow"
"github.com/runreveal/flow/internal/types"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
)

type Printer struct {
Expand All @@ -17,7 +17,7 @@ func NewPrinter(writer io.Writer) *Printer {
return &Printer{writer: writer}
}

func (p *Printer) Send(ctx context.Context, ack func(), msg ...flow.Message[types.Event]) error {
func (p *Printer) Send(ctx context.Context, ack func(), msg ...chta.Message[types.Event]) error {
for _, m := range msg {
_, err := fmt.Fprintf(p.writer, "%s\n", m.Value.RawLog)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions internal/destinations/runreveal/runreveal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/carlmjohnson/requests"
"github.com/runreveal/flow"
"github.com/runreveal/flow/internal/types"
batch "github.com/runreveal/flow/x/batcher"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
batch "github.com/runreveal/chta/x/batcher"
"golang.org/x/exp/slog"
)

Expand Down Expand Up @@ -57,7 +57,7 @@ func (r *RunReveal) Run(ctx context.Context) error {

r.reqConf = func(rb *requests.Builder) {
rb.
UserAgent("flowd").
UserAgent("cheetahd").
Accept("application/json").
BaseURL(r.webhookURL).
Header("Content-Type", "application/json")
Expand All @@ -66,16 +66,16 @@ func (r *RunReveal) Run(ctx context.Context) error {
return r.batcher.Run(ctx)
}

func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...flow.Message[types.Event]) error {
func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...chta.Message[types.Event]) error {
return r.batcher.Send(ctx, ack, msgs...)
}

func (r *RunReveal) newReq() *requests.Builder {
return requests.New(r.reqConf)
}

// Flush sends the given messages of type flow.Message[type.Event] to the RunReveal api
func (r *RunReveal) Flush(ctx context.Context, msgs []flow.Message[types.Event]) error {
// Flush sends the given messages of type chta.Message[type.Event] to the RunReveal api
func (r *RunReveal) Flush(ctx context.Context, msgs []chta.Message[types.Event]) error {
batch := make([]json.RawMessage, len(msgs))
var err error
for i, msg := range msgs {
Expand Down
20 changes: 10 additions & 10 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@ import (
"context"
"errors"

"github.com/runreveal/flow"
"github.com/runreveal/flow/internal/types"
"github.com/runreveal/flow/x/multi"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/chta/x/multi"
"github.com/runreveal/lib/await"
"golang.org/x/exp/slog"
)

type Option func(*Queue)

func WithSources(srcs []flow.Source[types.Event]) Option {
func WithSources(srcs []chta.Source[types.Event]) Option {
return func(q *Queue) {
q.Sources = srcs
}
}

func WithDestinations(dsts []flow.Destination[types.Event]) Option {
func WithDestinations(dsts []chta.Destination[types.Event]) Option {
return func(q *Queue) {
q.Destinations = dsts
}
}

type Queue struct {
Sources []flow.Source[types.Event]
Destinations []flow.Destination[types.Event]
Sources []chta.Source[types.Event]
Destinations []chta.Destination[types.Event]
}

var (
Expand Down Expand Up @@ -84,13 +84,13 @@ func (q *Queue) Run(ctx context.Context) error {
multiSrc := multi.NewMultiSource(q.Sources)
w.Add(multiSrc.Run)

p, err := flow.New(flow.Config[types.Event, types.Event]{
p, err := chta.New(chta.Config[types.Event, types.Event]{
Source: multiSrc,
Destination: multiDst,
Handler: flow.Pipe[types.Event](),
Handler: chta.Pipe[types.Event](),
// NOTE(alan): don't increase parallelism on this processor until we've
// verified thread safety thread-safe story.
}, flow.Parallelism(1))
}, chta.Parallelism(1))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 8a93cbb

Please sign in to comment.