From 6c98f378e1a47cfb0b756cb86891772355f4e927 Mon Sep 17 00:00:00 2001 From: jabolina Date: Sat, 13 Feb 2021 14:11:06 -0300 Subject: [PATCH] Create integration for using a atomic broadcast backed by etcd. Using this approach we should have a primitive that is consistent and totally orders all messages. Along with this changes, added a generic interface to be possible to use different atomic broadcast protocols. --- .github/workflows/go.yml | 19 ++-- _examples/peer.go | 10 +- go.mod | 14 ++- go.sum | 147 ++++++++++++++++++++++++ internal/atomic_flag.go | 31 +++++ internal/coordinator.go | 119 ++++++++++++++++++++ internal/core.go | 172 ++++++++++++++++++++++++++++ internal/event.go | 28 +++++ internal/routine_handler.go | 52 +++++++++ pkg/relt/configuration.go | 7 +- pkg/relt/rabbitmq.go | 218 ------------------------------------ pkg/relt/relt.go | 69 +++--------- pkg/relt/transport.go | 13 ++- pkg/relt/util.go | 6 + test/relt_test.go | 24 +++- 15 files changed, 631 insertions(+), 298 deletions(-) create mode 100644 internal/atomic_flag.go create mode 100644 internal/coordinator.go create mode 100644 internal/core.go create mode 100644 internal/event.go create mode 100644 internal/routine_handler.go delete mode 100644 pkg/relt/rabbitmq.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index d2656e9..bd97e0b 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -22,14 +22,11 @@ jobs: - name: Check out code into the Go module directory uses: actions/checkout@v2 - - name: Setup RabbitMQ with username and password - uses: getong/rabbitmq-action@v1.2 - with: - rabbitmq version: '3.8.2-management-alpine' - host port: 5672 - rabbitmq user: 'guest' - rabbitmq password: 'guest' - rabbitmq vhost: '/' - - - name: All - run: make ci + - name: Setup etcd server and make + run: | + ETCD_VER=$(curl --silent https://api.github.com/repos/etcd-io/etcd/releases/latest | grep "tag_name" | cut -d ' ' -f4 | awk -F'"' '$0=$2') + curl -sL https://storage.googleapis.com/etcd/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o ./etcd.tar.gz + mkdir etcd && tar xzvf etcd.tar.gz -C etcd --strip-components=1 + etcd/etcd > /dev/null & + sleep 30 + make ci diff --git a/_examples/peer.go b/_examples/peer.go index 8799662..369f494 100644 --- a/_examples/peer.go +++ b/_examples/peer.go @@ -12,6 +12,7 @@ import ( func produce(r *relt.Relt, reader io.Reader) { for { + println("Write message:") scan := bufio.NewScanner(reader) for scan.Scan() { message := relt.Send{ @@ -27,9 +28,14 @@ func produce(r *relt.Relt, reader io.Reader) { } func consume(r *relt.Relt, ctx context.Context) { + listener, err := r.Consume() + if err != nil { + return + } + for { select { - case message := <-r.Consume(): + case message := <-listener: if message.Error != nil { log.Errorf("message with error: %#v", message) } @@ -43,7 +49,7 @@ func consume(r *relt.Relt, ctx context.Context) { func main() { conf := relt.DefaultReltConfiguration() conf.Name = "local-test" - relt := relt.NewRelt(*conf) + relt, _ := relt.NewRelt(*conf) ctx, done := context.WithCancel(context.Background()) go func() { diff --git a/go.mod b/go.mod index fc4dc61..4408122 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,20 @@ go 1.14 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect + github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect + github.com/coreos/etcd v3.3.25+incompatible + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect + github.com/envoyproxy/go-control-plane v0.9.5 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.4.3 // indirect + github.com/google/go-cmp v0.5.0 // indirect + github.com/google/uuid v1.2.0 // indirect github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/sirupsen/logrus v1.6.0 // indirect - github.com/streadway/amqp v1.0.0 + github.com/stretchr/testify v1.5.1 // indirect + go.uber.org/zap v1.16.0 // indirect + google.golang.org/grpc v1.26.0 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect ) diff --git a/go.sum b/go.sum index 4855d02..9a05cd7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -7,12 +9,57 @@ github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7 github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/coreos/etcd v0.5.0-alpha.5 h1:0Qi6Jzjk2CDuuGlIeecpu+em2nrjhOgz2wsIwCmQHmc= +github.com/coreos/etcd v3.3.25+incompatible h1:0GQEw6h3YnuOVdtwygkIfJ+Omx0tZ8/QkVyXI4LkbeY= +github.com/coreos/etcd v3.3.25+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.5/go.mod h1:OXl5to++W0ctG+EHWTFUjiypVxC/Y4VLc/KFU+al13s= +github.com/envoyproxy/go-control-plane v0.9.6/go.mod h1:GFqM7v0B62MraO4PWRedIbhThr/Rf7ev6aHOOPXeaDA= +github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= +github.com/envoyproxy/go-control-plane v0.9.8/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= @@ -28,7 +75,12 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -37,9 +89,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= @@ -49,14 +103,107 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20210212180131-e7f2df4ecc2d h1:Edhcm0CKDPLQIecHCp5Iz57Lo7MfT6zUFBAlocmOjcY= +google.golang.org/genproto v0.0.0-20210212180131-e7f2df4ecc2d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= +google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8= +google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/internal/atomic_flag.go b/internal/atomic_flag.go new file mode 100644 index 0000000..c3b90c3 --- /dev/null +++ b/internal/atomic_flag.go @@ -0,0 +1,31 @@ +package internal + +import "sync/atomic" + +const ( + // Constant to represent the `active` state on the Flag. + active = 0x0 + + // Constant to represent the `inactive` state on the Flag. + inactive = 0x1 +) + +// An atomic boolean implementation, to act specifically as a flag. +type Flag struct { + flag int32 +} + +// Verify if the flag still on `active` state. +func (f *Flag) IsActive() bool { + return atomic.LoadInt32(&f.flag) == active +} + +// Verify if the flag is on `inactive` state. +func (f *Flag) IsInactive() bool { + return atomic.LoadInt32(&f.flag) == inactive +} + +// Transition the flag from `active` to `inactive`. +func (f *Flag) Inactivate() bool { + return atomic.CompareAndSwapInt32(&f.flag, active, inactive) +} diff --git a/internal/coordinator.go b/internal/coordinator.go new file mode 100644 index 0000000..4f4ce52 --- /dev/null +++ b/internal/coordinator.go @@ -0,0 +1,119 @@ +package internal + +import ( + "context" + "github.com/coreos/etcd/clientv3" + "io" + "time" +) + +// Configuration for the coordinator. +type CoordinatorConfiguration struct { + // Each Coordinator will handle only a single partition. + // This will avoid peers with overlapping partitions. + Partition string + + // Address for etcd server. + Server string + + // Context that the Coordinator will work. + Ctx context.Context + + // Handler for managing goroutines. + Handler *GoRoutineHandler +} + +// Coordinator interface that should be implemented by the +// atomic broadcast handler. +// Commands should be issued through the coordinator to be delivered +// to other peers +type Coordinator interface { + io.Closer + + // Watch for changes on the partition. + // After called, this method will start a new goroutine that only + // returns when the Coordinator context is done. + Watch(received chan<- Event) error + + // Issues an Event. + Write(event Event) error +} + +// Create a new Coordinator using the given configuration. +// The current implementation is the EtcdCoordinator, backed by etcd. +func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) { + cli, err := clientv3.New(clientv3.Config{ + DialTimeout: 30 * time.Second, + Endpoints: []string{configuration.Server}, + }) + if err != nil { + return nil, err + } + kv := clientv3.NewKV(cli) + coord := &EtcdCoordinator{ + configuration: configuration, + cli: cli, + kv: kv, + } + return coord, nil +} + +// EtcdCoordinator will use etcd for atomic broadcast. +type EtcdCoordinator struct { + // Configuration parameters. + configuration CoordinatorConfiguration + + // A client for the etcd server. + cli *clientv3.Client + + // The key-value entry point for issuing requests. + kv clientv3.KV +} + +// Starts a new coroutine for watching the Coordinator partition. +// All received information will be published back through the channel +// received as parameter. +// +// After calling a routine will run bounded to the application lifetime. +func (e *EtcdCoordinator) Watch(received chan<- Event) error { + watchChan := e.cli.Watch(e.configuration.Ctx, e.configuration.Partition) + watchChanges := func() { + for { + select { + case <-e.configuration.Ctx.Done(): + return + case response := <-watchChan: + e.handleResponse(response, received) + } + } + } + e.configuration.Handler.Spawn(watchChanges) + return nil +} + +// Write the given event using the KV interface. +func (e *EtcdCoordinator) Write(event Event) error { + _, err := e.kv.Put(e.configuration.Ctx, event.Key, string(event.Value)) + return err +} + +// Stop the etcd client connection. +func (e *EtcdCoordinator) Close() error { + return e.cli.Close() +} + +// This method is responsible for handling events from the etcd client. +// +// This method will transform each received event into Event object and +// publish it back using the given channel. This method can block the whole +// event loop if the channel is not consumed. +// TODO: asynchronously publish events. +func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) { + for _, event := range response.Events { + received <- Event{ + Key: string(event.Kv.Key), + Value: event.Kv.Value, + Error: nil, + } + } +} diff --git a/internal/core.go b/internal/core.go new file mode 100644 index 0000000..28f88f5 --- /dev/null +++ b/internal/core.go @@ -0,0 +1,172 @@ +package internal + +import ( + "context" + "errors" + "io" +) + +var ( + // Thrown when an action is performed on a core that was closed. + coreWasShutdown = errors.New("core application was shutdown") + + // Thrown when trying to watch more than once. + coreAlreadyWatching = errors.New("already watching partition") +) + +type CoreConfiguration struct { + // Partition the Coordinator will work with. + Partition string + + // Server address for the Coordinator. + Server string +} + +// Holds all flags used to manage the Core state. +type CoreFlags struct { + // Flag for the shutdown state. + shutdown Flag + + // Flag for watching state. + watching Flag +} + +// Core is the interface that will hold the Relt connection to the Coordinator. +// Every command issued will be parsed here, and every command received should +// be handled here before going back to the client. +type Core interface { + io.Closer + + // Start listening for new messages. + Listen() (<-chan Message, error) + + // Send a message asynchronously for the given partition. + Send(dest string, data []byte) <-chan error +} + +// Implements the Core interface. +// Holds all needed configuration for receiving commands +// from external world and sending through the Coordinator. +// +// This structure will be alive throughout the whole lifetime, +// opening some routines for handling requests and responses. +type ReltCore struct { + // Context for bounding the application life. + ctx context.Context + + // Called when the application Closes. + finish context.CancelFunc + + // Manager for spawning goroutines and avoid leaking. + handler *GoRoutineHandler + + // Coordinator to issues commands and receive Event. + coord Coordinator + + // Channel for sending Message to the client. + output chan Message + + // Flags for handling state. + flags CoreFlags +} + +// Create a new ReltCore using the given configuration. +// As an effect, this will instantiate a Coordinator a failures +// can happen while handling connections between the peers. +func NewCore(configuration CoreConfiguration) (Core, error) { + ctx, cancel := context.WithCancel(context.TODO()) + handler := NewRoutineHandler() + coordinatorConf := CoordinatorConfiguration{ + Partition: configuration.Partition, + Server: configuration.Server, + Ctx: ctx, + Handler: handler, + } + coord, err := NewCoordinator(coordinatorConf) + if err != nil { + cancel() + return nil, err + } + core := &ReltCore{ + ctx: ctx, + finish: cancel, + handler: handler, + coord: coord, + output: make(chan Message), + flags: CoreFlags{ + shutdown: Flag{}, + watching: Flag{}, + }, + } + return core, nil +} + +// The Listen method can be called only once. +// This will start a new goroutine that receives updates +// from the Coordinator and parse the information to a Message object. +// +// This goroutine will run while the application is not closed. +func (r *ReltCore) Listen() (<-chan Message, error) { + if r.flags.shutdown.IsInactive() { + return nil, coreWasShutdown + } + + if r.flags.watching.Inactivate() { + events := make(chan Event) + if err := r.coord.Watch(events); err != nil { + return nil, err + } + + receiveAndParse := func() { + for { + select { + case <-r.ctx.Done(): + return + case event := <-events: + r.output <- event.toMessage() + } + } + } + r.handler.Spawn(receiveAndParse) + return r.output, nil + } + + return nil, coreAlreadyWatching +} + +// Send the given data to the peers that listen to the given partition. +// This is a broadcast message, which means that if _N_ nodes are +// subscribed for a partition, every node will receive the message. +func (r *ReltCore) Send(dest string, data []byte) <-chan error { + response := make(chan error, 1) + writeRequest := func() { + defer close(response) + if r.flags.shutdown.IsActive() { + event := Event{ + Key: dest, + Value: data, + } + response <- r.coord.Write(event) + } else { + response <- coreWasShutdown + } + } + r.handler.Spawn(writeRequest) + return response +} + +// The Close method can be called only once. +// This will cancel the application context and start +// shutting down all goroutines. +// +// *This method will block until everything is finished.* +func (r *ReltCore) Close() error { + if r.flags.shutdown.Inactivate() { + if err := r.coord.Close(); err != nil { + return err + } + r.finish() + r.handler.Close() + } + return coreWasShutdown +} diff --git a/internal/event.go b/internal/event.go new file mode 100644 index 0000000..2f69c21 --- /dev/null +++ b/internal/event.go @@ -0,0 +1,28 @@ +package internal + +// Event is a structure handled by the Coordinator. +// Events are received and issued through the atomic broadcast. +type Event struct { + Key string + Value []byte + Error error +} + +func (e Event) isError() bool { + return e.Error != nil +} + +// Parse the Event to the Message object. +func (e Event) toMessage() Message { + return Message{ + Data: e.Value, + Error: e.Error, + } +} + +// Message is the structure handled by the Core. +// Messages are the available data sent to the client. +type Message struct { + Data []byte + Error error +} diff --git a/internal/routine_handler.go b/internal/routine_handler.go new file mode 100644 index 0000000..84f61a1 --- /dev/null +++ b/internal/routine_handler.go @@ -0,0 +1,52 @@ +package internal + +import ( + "sync" +) + +var ( + // Global instance for the handler. + handler *GoRoutineHandler +) + +// GoRoutineHandler is responsible for handling goroutines. +// This is used so go routines do not leak and are spawned without any control. +// Using the handler to spawn new routines will guarantee that any routine that +// is not controller careful will be known when the application finishes. +type GoRoutineHandler struct { + flag *Flag + group *sync.WaitGroup +} + +// Create a new instance of the GoRoutineHandler. +func NewRoutineHandler() *GoRoutineHandler { + handler = &GoRoutineHandler{ + flag: &Flag{}, + group: &sync.WaitGroup{}, + } + return handler +} + +// This method will increase the size of the group count and spawn +// the new go routine. After the routine is done, the group will be decreased. +// +// This method will panic if the handler is already closed. +func (h *GoRoutineHandler) Spawn(f func()) { + if h.flag.IsInactive() { + panic("go routine handler closed!") + } + + h.group.Add(1) + go func() { + defer h.group.Done() + f() + }() +} + +// Blocks while waiting for go routines to stop. This will set the +// working mode to off, so after this is called any spawned go routine will panic. +func (h *GoRoutineHandler) Close() { + if h.flag.Inactivate() { + h.group.Wait() + } +} diff --git a/pkg/relt/configuration.go b/pkg/relt/configuration.go index 891343c..24f788b 100644 --- a/pkg/relt/configuration.go +++ b/pkg/relt/configuration.go @@ -2,11 +2,10 @@ package relt import ( "errors" - "strings" ) var ( - ErrInvalidConfiguration = errors.New("invalid AMQP URL for connection") + ErrInvalidConfiguration = errors.New("invalid URL for connection") DefaultExchangeName = GroupAddress("relt") ) @@ -42,7 +41,7 @@ type ReltConfiguration struct { func DefaultReltConfiguration() *ReltConfiguration { return &ReltConfiguration{ Name: GenerateUID(), - Url: "amqp://guest:guest@127.0.0.1:5672/", + Url: "localhost:2379", Exchange: DefaultExchangeName, } } @@ -61,7 +60,7 @@ func (c *ReltConfiguration) ValidateConfiguration() error { return ErrInvalidConfiguration } - if strings.HasPrefix(c.Url, "amqp://") { + if !IsUrl(c.Url) { return ErrInvalidConfiguration } diff --git a/pkg/relt/rabbitmq.go b/pkg/relt/rabbitmq.go deleted file mode 100644 index 324eacf..0000000 --- a/pkg/relt/rabbitmq.go +++ /dev/null @@ -1,218 +0,0 @@ -package relt - -import ( - "context" - "github.com/streadway/amqp" - "log" - "time" -) - -// The core structure responsible for -// sending and receiving messages through RabbitMQ. -type core struct { - // A reference for the Relt context. - invoker *invoker - - // Context for the core structure. - ctx context.Context - - // Configuration for both Relt and AMQP. - configuration ReltConfiguration - - // Channel to access the RabbitMQ broker. - broker *amqp.Channel - - // Connection to the RabbitMQ broker. - connection *amqp.Connection - - // Channel for publishing received messages. - received chan Recv - - // Channel for receiving messages to be published. - sending chan Send -} - -// Subscribe to a queue and starts consuming. -// -// The queue will be declared as a quorum queue, since we -// need reliable communication and total order delivery. -// The quorum queues are backed by the Raft protocol, more -// information can be found https://www.rabbitmq.com/quorum-queues.html. -// -// The routing exchange will be to consume any message published, -// since we will only use this transport for broadcasting messages, -// if needed it can later be changed to emulate reliable unicast -// or multicast messages using the broadcast primitive, but there -// is no need at the moment. -// -// To any messages received, it will be publish onto the messages channel, and -// this method will be executed until the connections channel is not closed. -func (c core) subscribe(consumer <-chan amqp.Delivery) { - defer func() { - log.Println("closing rabbitmq consumer") - recover() - }() - - for { - select { - case packet, ok := <-consumer: - if !ok { - log.Printf("consumer channel closed.") - break - } - for err := c.broker.Ack(packet.DeliveryTag, false); err != nil; { - log.Printf("failed acking. %v", err) - } - recv := Recv{ - Data: packet.Body, - Error: nil, - } - c.received <- recv - case <-c.ctx.Done(): - return - } - } -} - -// Publish a message on the RabbitMQ exchange. -// See that this will publish in a fanout exchange, -// this means that all queues related to the exchange -// will receive the message all will ignore the configured key. -// -// This will keep polling until the context is cancelled, and -// will receive messages to be published through the channel. -func (c core) publish(confirm <-chan amqp.Confirmation) { - defer func() { - log.Println("closing rabbitmq publisher") - }() - - for { - select { - case _, ok := <-confirm: - if !ok { - log.Println("failed confirmation") - continue - } - case body, running := <-c.sending: - if !running { - return - } - - c.invoker.spawn(func() { - err := c.broker.Publish(string(body.Address), "*", false, false, amqp.Publishing{ - Body: body.Data, - }) - if err != nil { - time.Sleep(150 * time.Millisecond) - select { - case <-time.After(150 * time.Millisecond): - return - case c.sending <- body: - return - } - } - }) - case <-c.ctx.Done(): - return - } - } -} - -// Start all goroutines for publishing and consuming -// values for the queues. -// All spawned routines will be handled by the wait -// group from the Relt structure and this will stop -// when the context is canceled. -func (c core) start() { - defer func() { - close(c.received) - c.broker.Close() - c.connection.Close() - }() - - fails := make(chan *amqp.Error) - c.connection.NotifyClose(fails) - - for !c.connection.IsClosed() { - select { - case <-c.ctx.Done(): - return - case err := <-fails: - log.Printf("error from connection. %v", err) - c.received <- Recv{ - Data: nil, - Error: err, - } - } - } -} - -func (c *core) declarations() error { - conn, err := amqp.Dial(c.configuration.Url) - if err != nil { - return err - } - - ch, err := conn.Channel() - if err != nil { - return err - } - - c.connection = conn - c.broker = ch - - err = ch.ExchangeDeclare(string(c.configuration.Exchange), "fanout", false, true, false, false, nil) - if err != nil { - log.Fatalf("error declaring exchange %s: %v", c.configuration.Exchange, err) - } - - _, err = ch.QueueDeclare(c.configuration.Name, true, false, true, false, nil) - if err != nil { - return err - } - - if err = ch.QueueBind(c.configuration.Name, "*", string(c.configuration.Exchange), false, nil); err != nil { - return err - } - - consumer, err := ch.Consume(c.configuration.Name, "", false, true, false, false, nil) - if err != nil { - return err - } - - confirm := make(chan amqp.Confirmation, 1) - if ch.Confirm(false) == nil { - ch.NotifyPublish(confirm) - } else { - close(confirm) - } - - c.invoker.spawn(func() { - c.subscribe(consumer) - }) - - c.invoker.spawn(func() { - c.publish(confirm) - }) - - c.invoker.spawn(c.start) - - return nil -} - -// Creates a new instance of the core structure. -// This will also start running and consuming messages. -func newCore(relt Relt) (*core, error) { - c := &core{ - invoker: relt.ctx, - ctx: relt.cancel, - configuration: relt.configuration, - received: make(chan Recv), - sending: make(chan Send), - } - err := c.declarations() - if err != nil { - return nil, err - } - return c, nil -} diff --git a/pkg/relt/relt.go b/pkg/relt/relt.go index 0df6335..beb808a 100644 --- a/pkg/relt/relt.go +++ b/pkg/relt/relt.go @@ -1,9 +1,8 @@ package relt import ( - "context" "errors" - "sync" + "github.com/jabolina/relt/internal" "time" ) @@ -14,46 +13,20 @@ var ( ErrPublishTimeout = errors.New("took to long to publish message") ) -// Holds information about the transport context. -// Such as keeping track of spawned goroutines. -type invoker struct { - // Used to track goroutines. - group *sync.WaitGroup -} - -// Spawn a new goroutine and controls it with -// the wait group. -func (c *invoker) spawn(f func()) { - c.group.Add(1) - go func() { - defer c.group.Done() - f() - }() -} - // The implementation for the Transport interface // providing reliable communication between hosts. type Relt struct { - // Context for the transport, used internally only. - ctx *invoker - - // Information about shutdown the transport. - cancel context.Context - - // Cancel the transport context. - finish context.CancelFunc - // Holds the configuration about the core // and the Relt transport. configuration ReltConfiguration - // Holds the core structure. - core *core + // Holds the Core structure. + core internal.Core } // Implements the Transport interface. -func (r Relt) Consume() <-chan Recv { - return r.core.received +func (r Relt) Consume() (<-chan internal.Message, error) { + return r.core.Listen() } // Implements the Transport interface. @@ -67,38 +40,32 @@ func (r Relt) Broadcast(message Send) error { } select { - case <-r.cancel.Done(): - return ErrContextClosed - case <-time.After(200 * time.Millisecond): + case <-time.After(time.Second): return ErrPublishTimeout - case r.core.sending <- message: + case err := <-r.core.Send(string(message.Address), message.Data): + return err } - return nil } // Implements the Transport interface. -func (r *Relt) Close() { - defer close(r.core.sending) - r.finish() - r.ctx.group.Wait() +func (r *Relt) Close() error { + return r.core.Close() } // Creates a new instance of the reliable transport, // and start all needed routines. func NewRelt(configuration ReltConfiguration) (*Relt, error) { - ctx, done := context.WithCancel(context.Background()) - relt := &Relt{ - ctx: &invoker{ - group: &sync.WaitGroup{}, - }, - cancel: ctx, - finish: done, - configuration: configuration, + conf := internal.CoreConfiguration{ + Partition: string(configuration.Exchange), + Server: configuration.Url, } - c, err := newCore(*relt) + core, err := internal.NewCore(conf) if err != nil { return nil, err } - relt.core = c + relt := &Relt{ + configuration: configuration, + core: core, + } return relt, nil } diff --git a/pkg/relt/transport.go b/pkg/relt/transport.go index aa58b26..c1a63f4 100644 --- a/pkg/relt/transport.go +++ b/pkg/relt/transport.go @@ -1,6 +1,9 @@ package relt -import "time" +import ( + "github.com/jabolina/relt/internal" + "io" +) // When broadcasting or multicasting a message must provide // the group address. @@ -37,8 +40,10 @@ type Send struct { // response back, everything is just message passing simulating // events that can occur. type Transport interface { + io.Closer + // A channel for consuming received messages. - Consume() <-chan Recv + Consume() (<-chan internal.Message, error) // Broadcast a message to a given group. // See that this not work in the request - response model, @@ -53,8 +58,4 @@ type Transport interface { // Relt transport is closed, so if the the transport is already // closed this function will panic. Broadcast(message Send) error - - // To shutdown the transport and stop consuming and - // publishing new messages. - Close(timeout time.Duration) } diff --git a/pkg/relt/util.go b/pkg/relt/util.go index 9150258..ae9454e 100644 --- a/pkg/relt/util.go +++ b/pkg/relt/util.go @@ -4,6 +4,7 @@ import ( crand "crypto/rand" "fmt" "net" + "net/url" ) // Generates a random 128-bit UUID, panic if not possible. @@ -31,3 +32,8 @@ func GenerateRandomIP() (string, error) { return listener.Addr().String(), nil } + +func IsUrl(value string) bool { + parsed, err := url.Parse(value) + return err == nil && parsed.Scheme != "" +} diff --git a/test/relt_test.go b/test/relt_test.go index 2512c5b..c7ab0e2 100644 --- a/test/relt_test.go +++ b/test/relt_test.go @@ -32,10 +32,14 @@ func TestRelt_PublishAndReceiveMessage(t *testing.T) { data := []byte("hello") group := &sync.WaitGroup{} group.Add(1) + listener, err := r.Consume() + if err != nil { + t.Fatalf("failed listening. %#v", err) + } go func() { defer group.Done() select { - case recv := <-r.Consume(): + case recv := <-listener: if !bytes.Equal(data, recv.Data) { t.Fatalf("data not equals. expected %s found %s", string(data), string(recv.Data)) } @@ -100,13 +104,18 @@ func Test_LoadPublishAndReceiveMessage(t *testing.T) { group := &sync.WaitGroup{} testSize := 1000 + listener, err := r.Consume() + if err != nil { + t.Fatalf("failed listening. %#v", err) + } + + group.Add(testSize) for i := 0; i < testSize; i++ { - group.Add(1) data := []byte(fmt.Sprintf("%d", i)) go func() { defer group.Done() select { - case recv := <-r.Consume(): + case recv := <-listener: if recv.Error != nil { t.Errorf("error on consumed response. %v", recv.Error) } @@ -165,12 +174,17 @@ func Test_LoadPublishAndReceiveMultipleConnection(t *testing.T) { group := &sync.WaitGroup{} testSize := 1000 + listener, err := second.Consume() + if err != nil { + t.Fatalf("failed listening. %#v", err) + } + + group.Add(testSize) for i := 0; i < testSize; i++ { - group.Add(1) read := func() { defer group.Done() select { - case recv := <-second.Consume(): + case recv := <-listener: if recv.Data == nil || len(recv.Data) == 0 { t.Errorf("received wrong data") }