Skip to content

Commit

Permalink
Stdout outputter に iterations オプションを追加 (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
takuo authored Jan 8, 2024
2 parents c123277 + 84a0074 commit 6e37c02
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 53 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ outputter のオプションは基本的に outputter の名前がプレフィ
|オプション|意味|
|----|----|
|--stdout.interval=`INT`|データを出力する間隔(秒)(`default: 60`)|
|--stdout.iterations=`INT`|データを出力する回数(`default: 0(制限なし)`)|

### MQTT Outputter

Expand Down
58 changes: 44 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/alecthomas/kong"
Expand Down Expand Up @@ -48,9 +49,15 @@ type Chissoku struct {

// available outputters
outputters map[string]output.Outputter
// active outputters
activeOutputters atomic.Value

// reader channel
rchan chan *types.Data
// deactivate outputter
dechan chan string
// cancel
cancel func()

// serial device
port serial.Port
Expand All @@ -71,21 +78,26 @@ func (c *Chissoku) AfterApply(opts *options.Options) error {
slog.SetDefault(slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{Level: level})))

c.rchan = make(chan *types.Data)
c.dechan = make(chan string)
ctx := output.ContextWithDeactivateChannel(context.Background(), c.dechan)
ctx = context.WithValue(ctx, options.ContextKeyOptions{}, opts)
ctx, c.cancel = context.WithCancel(ctx)

enabled := opts.Output[:0]
for _, v := range opts.Output {
if o, ok := c.outputters[v]; ok {
if err := o.Initialize(opts); err != nil {
// initialize and filter outputters
a := make(map[string]output.Outputter, len(opts.Output))
for _, name := range opts.Output {
if o, ok := c.outputters[name]; ok {
if err := o.Initialize(ctx); err != nil {
slog.Error("Initialize outputter", "outputter", o.Name(), "error", err)
continue
}
enabled = append(enabled, v)
a[name] = o
}
}
opts.Output = enabled
if len(opts.Output) == 0 {
return fmt.Errorf("no outputters are avaiable")
if len(a) == 0 {
return fmt.Errorf("no active outputters are avaiable")
}
c.activeOutputters.Store(a)
return nil
}

Expand All @@ -103,6 +115,7 @@ const (
)

func (c *Chissoku) cleanup() {
c.cancel()
if c.port != nil {
slog.Debug("Closing Serial port")
// nolint: errcheck
Expand All @@ -111,8 +124,9 @@ func (c *Chissoku) cleanup() {
// nolint: errcheck
c.port.Close()
}
for _, v := range c.Options.Output {
c.outputters[v].Close()

for _, o := range c.activeOutputters.Load().(map[string]output.Outputter) {
o.Close()
}
}

Expand Down Expand Up @@ -175,18 +189,34 @@ func (c *Chissoku) readDevice() error {
}
if c.scanner.Err() != nil {
slog.Error("Scanner read error", "error", c.scanner.Err())
c.cleanup()
return c.scanner.Err()
}
return nil
}

func (c *Chissoku) dispatch() {
for d := range c.rchan {
for _, v := range c.Options.Output {
c.outputters[v].Output(d)
defer c.cleanup()
for {
select {
case deactivate := <-c.dechan:
a := c.activeOutputters.Load().(map[string]output.Outputter)
delete(a, deactivate)
if len(a) == 0 {
slog.Debug("No outputers are alive")
return
}
c.activeOutputters.Store(a)
case data, more := <-c.rchan:
if !more {
slog.Debug("Reader channel has ben closed")
return
}
for _, o := range c.activeOutputters.Load().(map[string]output.Outputter) {
o.Output(data)
}
}
}
slog.Debug("Reader channel has ben closed")
}

// initialize and prepare the device
Expand Down
3 changes: 3 additions & 0 deletions options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ type Options struct {
// Debug
Debug bool `short:"d" help:"print debug log"`
}

// ContextKeyOptions context value key for global Options
type ContextKeyOptions struct{}
27 changes: 25 additions & 2 deletions output/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (k *Kinesis) Name() string {
}

// Initialize initialize outputter
func (k *Kinesis) Initialize(_ *options.Options) (_ error) {
k.Base.Initialize(nil)
func (k *Kinesis) Initialize(ctx context.Context) (_ error) {
k.Base.Initialize(ctx)

// データレシーバの初期化ルーチンを書く

Expand All @@ -52,6 +52,29 @@ func (k *Kinesis) Initialize(_ *options.Options) (_ error) {

`Output()` メソッドは `Base` に最低限で実装されているのでチャンネルで受け取る形で十分であれば実装する必要はありませんが、 `Interval` オプションが不要な場合など `Base` を埋め込まない場合は実装する必要があります。

### context

context には 以下のValueが埋め込まれています。

| Key | Value | 説明 |
|-----|----|----|
|`options.ContextKeyOptions{}`|`*options.Options{}`|グローバルオプション構造体のポインタ|

### outputter 側から自身を無効化する

`Initialize(ctx)` で受け取った `ctx` と自身のポインタを引数として `output.deactivate()` に渡します

```go
func (o *foo) Initialize(ctx context.Context) error {
// ...
go func () {
defer deactivate(ctx, o)
for {
// ...
}
}()
}
```

### プログラム本体に追加する

Expand Down
6 changes: 4 additions & 2 deletions output/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
package output

import (
"context"
"reflect"
"strings"

"github.com/northeye/chissoku/options"
"github.com/northeye/chissoku/types"
)

Expand All @@ -16,6 +16,8 @@ type Base struct {

// receiver channel
r chan *types.Data
// cancel
cancel func()
}

// Close sample implementation
Expand All @@ -33,7 +35,7 @@ func (b *Base) Output(d *types.Data) {
}

// Initialize initialize outputter
func (b *Base) Initialize(_ *options.Options) (_ error) {
func (b *Base) Initialize(ctx context.Context) (_ error) {
b.r = make(chan *types.Data)
return
}
29 changes: 22 additions & 7 deletions output/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"os"
"reflect"
"strings"
"sync"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/northeye/chissoku/options"
"github.com/northeye/chissoku/types"
)

Expand Down Expand Up @@ -42,11 +42,16 @@ type Mqtt struct {

// mqtt mqtt Client interface
client mqtt.Client

// close
close func()
}

// Initialize initialize outputter
func (m *Mqtt) Initialize(_ *options.Options) error {
m.Base.Initialize(nil)
func (m *Mqtt) Initialize(ctx context.Context) error {
m.Base.Initialize(ctx)
ctx, m.cancel = context.WithCancel(ctx)

o := mqtt.NewClientOptions()
o.AddBroker(m.Address)
if m.ClientID != "" {
Expand All @@ -64,12 +69,24 @@ func (m *Mqtt) Initialize(_ *options.Options) error {
return t.Error()
}

m.close = sync.OnceFunc(func() {
deactivate(ctx, m)
close(m.r)
if m.client.IsConnected() {
m.client.Disconnect(1000)
}
})

go func() {
var cur *types.Data
m.publish(<-m.r) // publish first data
m.publish(<-m.r) // publish first data immediately
tick := time.NewTicker(time.Second * time.Duration(m.Interval))
for {
select {
case <-ctx.Done():
cur = nil
m.Close()
tick.Stop()
case <-tick.C:
if cur == nil {
continue
Expand Down Expand Up @@ -98,9 +115,7 @@ func (m *Mqtt) Name() string {
// Close outputter interface method
// clsoe the MQTT connection
func (m *Mqtt) Close() {
if m.client.IsConnected() {
m.client.Disconnect(1000)
}
m.close()
}

func (m *Mqtt) publish(d *types.Data) {
Expand Down
22 changes: 20 additions & 2 deletions output/outputter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package output

import (
"github.com/northeye/chissoku/options"
"context"
"log/slog"

"github.com/northeye/chissoku/types"
)

Expand All @@ -14,7 +16,7 @@ type Outputter interface {

// Intialize the outputter.
// When it returns non-nil error the outputter will be disabled.
Initialize(*options.Options) error
Initialize(context.Context) error

// Output the data.
// This method must be non-blocking and light-weight.
Expand All @@ -23,3 +25,19 @@ type Outputter interface {
// Close cleanup the outputter.
Close()
}

// contextKeyDeactivateOutputterChannel context value key for DeactivateOutputterChannel
type contextKeyDeactivateOutputterChannel struct{}

// ContextWithDeactivateChannel new context with deactivate channel
func ContextWithDeactivateChannel(ctx context.Context, c chan string) context.Context {
return context.WithValue(ctx, contextKeyDeactivateOutputterChannel{}, c)
}

// deactivate deactivate an outputter
func deactivate(ctx context.Context, o Outputter) {
if c, ok := ctx.Value(contextKeyDeactivateOutputterChannel{}).(chan string); ok {
slog.Debug("Deactivate", "outputter", o.Name())
c <- o.Name()
}
}
Loading

0 comments on commit 6e37c02

Please sign in to comment.