Skip to content

Commit

Permalink
Create integration for using a atomic broadcast
Browse files Browse the repository at this point in the history
backed by etcd. Using this approach we should
have a primitive that is consistent and totally
orders all messages.

Along with this changes, added a generic
interface to be possible to use different
atomic broadcast protocols.
  • Loading branch information
jabolina committed Feb 14, 2021
1 parent 0d99c86 commit 6c98f37
Show file tree
Hide file tree
Showing 15 changed files with 631 additions and 298 deletions.
19 changes: 8 additions & 11 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Setup RabbitMQ with username and password
uses: getong/rabbitmq-action@v1.2
with:
rabbitmq version: '3.8.2-management-alpine'
host port: 5672
rabbitmq user: 'guest'
rabbitmq password: 'guest'
rabbitmq vhost: '/'

- name: All
run: make ci
- name: Setup etcd server and make
run: |
ETCD_VER=$(curl --silent https://api.github.com/repos/etcd-io/etcd/releases/latest | grep "tag_name" | cut -d ' ' -f4 | awk -F'"' '$0=$2')
curl -sL https://storage.googleapis.com/etcd/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o ./etcd.tar.gz
mkdir etcd && tar xzvf etcd.tar.gz -C etcd --strip-components=1
etcd/etcd > /dev/null &
sleep 30
make ci
10 changes: 8 additions & 2 deletions _examples/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

func produce(r *relt.Relt, reader io.Reader) {
for {
println("Write message:")
scan := bufio.NewScanner(reader)
for scan.Scan() {
message := relt.Send{
Expand All @@ -27,9 +28,14 @@ func produce(r *relt.Relt, reader io.Reader) {
}

func consume(r *relt.Relt, ctx context.Context) {
listener, err := r.Consume()
if err != nil {
return
}

for {
select {
case message := <-r.Consume():
case message := <-listener:
if message.Error != nil {
log.Errorf("message with error: %#v", message)
}
Expand All @@ -43,7 +49,7 @@ func consume(r *relt.Relt, ctx context.Context) {
func main() {
conf := relt.DefaultReltConfiguration()
conf.Name = "local-test"
relt := relt.NewRelt(*conf)
relt, _ := relt.NewRelt(*conf)
ctx, done := context.WithCancel(context.Background())

go func() {
Expand Down
14 changes: 13 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,20 @@ go 1.14
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/envoyproxy/go-control-plane v0.9.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/go-cmp v0.5.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.5.1 // indirect
go.uber.org/zap v1.16.0 // indirect
google.golang.org/grpc v1.26.0 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
)
147 changes: 147 additions & 0 deletions go.sum

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions internal/atomic_flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package internal

import "sync/atomic"

const (
// Constant to represent the `active` state on the Flag.
active = 0x0

// Constant to represent the `inactive` state on the Flag.
inactive = 0x1
)

// An atomic boolean implementation, to act specifically as a flag.
type Flag struct {
flag int32
}

// Verify if the flag still on `active` state.
func (f *Flag) IsActive() bool {
return atomic.LoadInt32(&f.flag) == active
}

// Verify if the flag is on `inactive` state.
func (f *Flag) IsInactive() bool {
return atomic.LoadInt32(&f.flag) == inactive
}

// Transition the flag from `active` to `inactive`.
func (f *Flag) Inactivate() bool {
return atomic.CompareAndSwapInt32(&f.flag, active, inactive)
}
119 changes: 119 additions & 0 deletions internal/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package internal

import (
"context"
"github.com/coreos/etcd/clientv3"
"io"
"time"
)

// Configuration for the coordinator.
type CoordinatorConfiguration struct {
// Each Coordinator will handle only a single partition.
// This will avoid peers with overlapping partitions.
Partition string

// Address for etcd server.
Server string

// Context that the Coordinator will work.
Ctx context.Context

// Handler for managing goroutines.
Handler *GoRoutineHandler
}

// Coordinator interface that should be implemented by the
// atomic broadcast handler.
// Commands should be issued through the coordinator to be delivered
// to other peers
type Coordinator interface {
io.Closer

// Watch for changes on the partition.
// After called, this method will start a new goroutine that only
// returns when the Coordinator context is done.
Watch(received chan<- Event) error

// Issues an Event.
Write(event Event) error
}

// Create a new Coordinator using the given configuration.
// The current implementation is the EtcdCoordinator, backed by etcd.
func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) {
cli, err := clientv3.New(clientv3.Config{
DialTimeout: 30 * time.Second,
Endpoints: []string{configuration.Server},
})
if err != nil {
return nil, err
}
kv := clientv3.NewKV(cli)
coord := &EtcdCoordinator{
configuration: configuration,
cli: cli,
kv: kv,
}
return coord, nil
}

// EtcdCoordinator will use etcd for atomic broadcast.
type EtcdCoordinator struct {
// Configuration parameters.
configuration CoordinatorConfiguration

// A client for the etcd server.
cli *clientv3.Client

// The key-value entry point for issuing requests.
kv clientv3.KV
}

// Starts a new coroutine for watching the Coordinator partition.
// All received information will be published back through the channel
// received as parameter.
//
// After calling a routine will run bounded to the application lifetime.
func (e *EtcdCoordinator) Watch(received chan<- Event) error {
watchChan := e.cli.Watch(e.configuration.Ctx, e.configuration.Partition)
watchChanges := func() {
for {
select {
case <-e.configuration.Ctx.Done():
return
case response := <-watchChan:
e.handleResponse(response, received)
}
}
}
e.configuration.Handler.Spawn(watchChanges)
return nil
}

// Write the given event using the KV interface.
func (e *EtcdCoordinator) Write(event Event) error {
_, err := e.kv.Put(e.configuration.Ctx, event.Key, string(event.Value))
return err
}

// Stop the etcd client connection.
func (e *EtcdCoordinator) Close() error {
return e.cli.Close()
}

// This method is responsible for handling events from the etcd client.
//
// This method will transform each received event into Event object and
// publish it back using the given channel. This method can block the whole
// event loop if the channel is not consumed.
// TODO: asynchronously publish events.
func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) {
for _, event := range response.Events {
received <- Event{
Key: string(event.Kv.Key),
Value: event.Kv.Value,
Error: nil,
}
}
}

0 comments on commit 6c98f37

Please sign in to comment.