Skip to content

Commit

Permalink
Merge pull request #7 from runreveal/alan/rename
Browse files Browse the repository at this point in the history
今日は, 川
  • Loading branch information
abraithwaite committed Jul 27, 2023
2 parents 8a93cbb + 59d390f commit dc40e14
Show file tree
Hide file tree
Showing 20 changed files with 122 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: cheetahd
name: kawa

on:
push:
Expand Down
14 changes: 7 additions & 7 deletions .goreleaser.public.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ before:
# you may remove this if you don't need go generate
# - go generate ./...
builds:
- main: ./cmd/cheetahd
id: cheetahd
binary: cheetahd
- main: ./cmd/kawa
id: kawa
binary: kawa
goos:
- linux
- darwin
- windows

archives:
- format: tar.gz
id: cheetahd
id: kawa
# this name template makes the OS and Arch compatible with the results of uname.
name_template: >-
cheetahd-
kawa-
{{- .Os }}-
{{- if eq .Arch "386" }}i386
{{- else }}{{ .Arch }}{{ end }}
{{- if .Arm }}v{{ .Arm }}{{ end }}
# use zip for windows archives
builds: [cheetahd]
builds: [kawa]
format_overrides:
- goos: windows
format: zip
Expand All @@ -44,7 +44,7 @@ release:
# Default is extracted from the origin remote URL or empty if its private hosted.
github:
owner: runreveal
name: cheetah
name: kawa

checksum:
name_template: 'checksums.txt'
Expand Down
34 changes: 17 additions & 17 deletions cmd/cheetahd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package main
import (
"os"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/destinations"
"github.com/runreveal/chta/internal/destinations/runreveal"
"github.com/runreveal/chta/internal/sources"
"github.com/runreveal/chta/internal/sources/journald"
"github.com/runreveal/chta/internal/sources/syslog"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/destinations"
"github.com/runreveal/kawa/internal/destinations/runreveal"
"github.com/runreveal/kawa/internal/sources"
"github.com/runreveal/kawa/internal/sources/journald"
"github.com/runreveal/kawa/internal/sources/syslog"
"github.com/runreveal/kawa/internal/types"
"github.com/runreveal/lib/loader"
"golang.org/x/exp/slog"
// We could register and configure these in a separate package
Expand All @@ -19,28 +19,28 @@ import (
)

func init() {
loader.Register("scanner", func() loader.Builder[chta.Source[types.Event]] {
loader.Register("scanner", func() loader.Builder[kawa.Source[types.Event]] {
return &ScannerConfig{}
})
loader.Register("syslog", func() loader.Builder[chta.Source[types.Event]] {
loader.Register("syslog", func() loader.Builder[kawa.Source[types.Event]] {
return &SyslogConfig{}
})
loader.Register("journald", func() loader.Builder[chta.Source[types.Event]] {
loader.Register("journald", func() loader.Builder[kawa.Source[types.Event]] {
return &JournaldConfig{}
})

loader.Register("printer", func() loader.Builder[chta.Destination[types.Event]] {
loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] {
return &PrinterConfig{}
})
loader.Register("runreveal", func() loader.Builder[chta.Destination[types.Event]] {
loader.Register("runreveal", func() loader.Builder[kawa.Destination[types.Event]] {
return &RunRevealConfig{}
})
}

type ScannerConfig struct {
}

func (c *ScannerConfig) Configure() (chta.Source[types.Event], error) {
func (c *ScannerConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring scanner")
return sources.NewScanner(os.Stdin), nil
}
Expand All @@ -49,7 +49,7 @@ type SyslogConfig struct {
Addr string `json:"addr"`
}

func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) {
func (c *SyslogConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring syslog")
return syslog.NewSyslogSource(syslog.SyslogCfg{
Addr: c.Addr,
Expand All @@ -59,7 +59,7 @@ func (c *SyslogConfig) Configure() (chta.Source[types.Event], error) {
type PrinterConfig struct {
}

func (c *PrinterConfig) Configure() (chta.Destination[types.Event], error) {
func (c *PrinterConfig) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring printer")
return destinations.NewPrinter(os.Stdout), nil
}
Expand All @@ -68,7 +68,7 @@ type RunRevealConfig struct {
WebhookURL string `json:"webhookURL"`
}

func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) {
func (c *RunRevealConfig) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring runreveal")
return runreveal.New(
runreveal.WithWebhookURL(c.WebhookURL),
Expand All @@ -78,7 +78,7 @@ func (c *RunRevealConfig) Configure() (chta.Destination[types.Event], error) {
type JournaldConfig struct {
}

func (c *JournaldConfig) Configure() (chta.Source[types.Event], error) {
func (c *JournaldConfig) Configure() (kawa.Source[types.Event], error) {
slog.Info("configuring journald")
return journald.New(), nil
}
14 changes: 7 additions & 7 deletions cmd/cheetahd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"path"
"path/filepath"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/queue"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/queue"
"github.com/runreveal/kawa/internal/types"
"github.com/runreveal/lib/loader"
"github.com/spf13/cobra"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -75,8 +75,8 @@ variety of destinations.`,
}

type Config struct {
Sources []loader.Loader[chta.Source[types.Event]] `json:"sources"`
Destinations []loader.Loader[chta.Destination[types.Event]] `json:"destinations"`
Sources []loader.Loader[kawa.Source[types.Event]] `json:"sources"`
Destinations []loader.Loader[kawa.Destination[types.Event]] `json:"destinations"`

PProfAddr string `json:"pprof"`
}
Expand All @@ -103,7 +103,7 @@ func NewRunCommand() *cobra.Command {
slog.Info(fmt.Sprintf("config: %+v", config))

ctx := context.Background()
srcs := []chta.Source[types.Event]{}
srcs := []kawa.Source[types.Event]{}
for _, v := range config.Sources {
src, err := v.Configure()
if err != nil {
Expand All @@ -112,7 +112,7 @@ func NewRunCommand() *cobra.Command {
srcs = append(srcs, src)
}

dsts := []chta.Destination[types.Event]{}
dsts := []kawa.Destination[types.Event]{}
for _, v := range config.Destinations {
dst, err := v.Configure()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/runreveal/chta
module github.com/runreveal/kawa

go 1.18

Expand Down
6 changes: 3 additions & 3 deletions internal/destinations/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"io"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
)

type Printer struct {
Expand All @@ -17,7 +17,7 @@ func NewPrinter(writer io.Writer) *Printer {
return &Printer{writer: writer}
}

func (p *Printer) Send(ctx context.Context, ack func(), msg ...chta.Message[types.Event]) error {
func (p *Printer) Send(ctx context.Context, ack func(), msg ...kawa.Message[types.Event]) error {
for _, m := range msg {
_, err := fmt.Fprintf(p.writer, "%s\n", m.Value.RawLog)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions internal/destinations/runreveal/runreveal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/carlmjohnson/requests"
"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
batch "github.com/runreveal/chta/x/batcher"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
batch "github.com/runreveal/kawa/x/batcher"
"golang.org/x/exp/slog"
)

Expand Down Expand Up @@ -66,16 +66,16 @@ func (r *RunReveal) Run(ctx context.Context) error {
return r.batcher.Run(ctx)
}

func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...chta.Message[types.Event]) error {
func (r *RunReveal) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error {
return r.batcher.Send(ctx, ack, msgs...)
}

func (r *RunReveal) newReq() *requests.Builder {
return requests.New(r.reqConf)
}

// Flush sends the given messages of type chta.Message[type.Event] to the RunReveal api
func (r *RunReveal) Flush(ctx context.Context, msgs []chta.Message[types.Event]) error {
// Flush sends the given messages of type kawa.Message[type.Event] to the RunReveal api
func (r *RunReveal) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error {
batch := make([]json.RawMessage, len(msgs))
var err error
for i, msg := range msgs {
Expand Down
20 changes: 10 additions & 10 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@ import (
"context"
"errors"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/chta/x/multi"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
"github.com/runreveal/kawa/x/multi"
"github.com/runreveal/lib/await"
"golang.org/x/exp/slog"
)

type Option func(*Queue)

func WithSources(srcs []chta.Source[types.Event]) Option {
func WithSources(srcs []kawa.Source[types.Event]) Option {
return func(q *Queue) {
q.Sources = srcs
}
}

func WithDestinations(dsts []chta.Destination[types.Event]) Option {
func WithDestinations(dsts []kawa.Destination[types.Event]) Option {
return func(q *Queue) {
q.Destinations = dsts
}
}

type Queue struct {
Sources []chta.Source[types.Event]
Destinations []chta.Destination[types.Event]
Sources []kawa.Source[types.Event]
Destinations []kawa.Destination[types.Event]
}

var (
Expand Down Expand Up @@ -84,13 +84,13 @@ func (q *Queue) Run(ctx context.Context) error {
multiSrc := multi.NewMultiSource(q.Sources)
w.Add(multiSrc.Run)

p, err := chta.New(chta.Config[types.Event, types.Event]{
p, err := kawa.New(kawa.Config[types.Event, types.Event]{
Source: multiSrc,
Destination: multiDst,
Handler: chta.Pipe[types.Event](),
Handler: kawa.Pipe[types.Event](),
// NOTE(alan): don't increase parallelism on this processor until we've
// verified thread safety thread-safe story.
}, chta.Parallelism(1))
}, kawa.Parallelism(1))
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions internal/sources/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"sync"
"time"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
"golang.org/x/exp/slog"
)

Expand All @@ -24,7 +24,7 @@ type Journald struct {
}

type msgAck struct {
msg chta.Message[types.Event]
msg kawa.Message[types.Event]
ack func()
}

Expand All @@ -40,7 +40,7 @@ func (s *Journald) Run(ctx context.Context) error {

func (s *Journald) recvLoop(ctx context.Context) error {
// Open file to check and save high watermark
hwmFile, err := os.OpenFile("/tmp/chtad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644))
hwmFile, err := os.OpenFile("/tmp/kawad-journald-hwm", os.O_RDWR|os.O_CREATE, os.FileMode(0644))
if err != nil {
return err
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *Journald) recvLoop(ctx context.Context) error {
wg.Add(1)
select {
case s.msgC <- msgAck{
msg: chta.Message[types.Event]{
msg: kawa.Message[types.Event]{
Value: types.Event{
Timestamp: ts,
SourceType: "journald",
Expand Down Expand Up @@ -150,10 +150,10 @@ func (s *Journald) recvLoop(ctx context.Context) error {
return cmd.Wait()
}

func (s *Journald) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) {
func (s *Journald) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) {
select {
case <-ctx.Done():
return chta.Message[types.Event]{}, nil, ctx.Err()
return kawa.Message[types.Event]{}, nil, ctx.Err()
case pass := <-s.msgC:
return pass.msg, pass.ack, nil
}
Expand Down
12 changes: 6 additions & 6 deletions internal/sources/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"sync"
"time"

"github.com/runreveal/chta"
"github.com/runreveal/chta/internal/types"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/internal/types"
)

type Scanner struct {
Expand All @@ -18,7 +18,7 @@ type Scanner struct {
}

type msgAck struct {
msg chta.Message[types.Event]
msg kawa.Message[types.Event]
ack func()
}

Expand All @@ -45,7 +45,7 @@ func (s *Scanner) recvLoop(ctx context.Context) error {
wg.Add(1)
select {
case s.msgC <- msgAck{
msg: chta.Message[types.Event]{
msg: kawa.Message[types.Event]{
Value: types.Event{
Timestamp: time.Now(),
SourceType: "reader",
Expand Down Expand Up @@ -79,10 +79,10 @@ func (s *Scanner) recvLoop(ctx context.Context) error {
return nil
}

func (s *Scanner) Recv(ctx context.Context) (chta.Message[types.Event], func(), error) {
func (s *Scanner) Recv(ctx context.Context) (kawa.Message[types.Event], func(), error) {
select {
case <-ctx.Done():
return chta.Message[types.Event]{}, nil, ctx.Err()
return kawa.Message[types.Event]{}, nil, ctx.Err()
case pass := <-s.msgC:
return pass.msg, pass.ack, nil
}
Expand Down
Loading

0 comments on commit dc40e14

Please sign in to comment.