Skip to content

Commit

Permalink
rename: cqrs -> eventhus
Browse files Browse the repository at this point in the history
  • Loading branch information
mishudark committed Jan 1, 2017
1 parent e18ccdf commit 0be405c
Show file tree
Hide file tree
Showing 21 changed files with 69 additions and 69 deletions.
2 changes: 1 addition & 1 deletion aggregate.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

//BaseAggregate contains the basic info
//that all aggregates should have
Expand Down
2 changes: 1 addition & 1 deletion aggregate_test.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

import "testing"

Expand Down
2 changes: 1 addition & 1 deletion command.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

//Command contains the methods to retreive basic info about it
type Command interface {
Expand Down
2 changes: 1 addition & 1 deletion command_handler.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion commandbus.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

//CommandBus serve as the bridge between commands and command handler
//it should manage the queues
Expand Down
22 changes: 11 additions & 11 deletions commandbus/async.go
@@ -1,19 +1,19 @@
package async

import "cqrs"
import "eventhus"

var workerPool = make(chan chan cqrs.Command)
var workerPool = make(chan chan eventhus.Command)

//Worker contains the basic info to manage commands
type Worker struct {
WorkerPool chan chan cqrs.Command
JobChannel chan cqrs.Command
CommandHandler cqrs.CommandHandlerRegister
WorkerPool chan chan eventhus.Command
JobChannel chan eventhus.Command
CommandHandler eventhus.CommandHandlerRegister
}

//Bus stores the command handler
type Bus struct {
CommandHandler cqrs.CommandHandlerRegister
CommandHandler eventhus.CommandHandlerRegister
maxWorkers int
}

Expand Down Expand Up @@ -43,26 +43,26 @@ func (w *Worker) Start() {
}

//NewWorker initialize the values of worker and start it
func NewWorker(commandHandler cqrs.CommandHandlerRegister) {
func NewWorker(commandHandler eventhus.CommandHandlerRegister) {
w := Worker{
WorkerPool: workerPool,
CommandHandler: commandHandler,
JobChannel: make(chan cqrs.Command),
JobChannel: make(chan eventhus.Command),
}

w.Start()
}

//HandleCommand ad a job to the queue
func (b *Bus) HandleCommand(command cqrs.Command) {
go func(c cqrs.Command) {
func (b *Bus) HandleCommand(command eventhus.Command) {
go func(c eventhus.Command) {
workerJobQueue := <-workerPool
workerJobQueue <- c
}(command)
}

//NewBus return a bus with command handler register
func NewBus(register cqrs.CommandHandlerRegister, maxWorkers int) *Bus {
func NewBus(register eventhus.CommandHandlerRegister, maxWorkers int) *Bus {
b := &Bus{
CommandHandler: register,
maxWorkers: maxWorkers,
Expand Down
2 changes: 1 addition & 1 deletion event.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion event_test.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

import "testing"

Expand Down
2 changes: 1 addition & 1 deletion eventbus.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

//EventBus defines the methods for manage the events publisher and consumer
type EventBus interface {
Expand Down
4 changes: 2 additions & 2 deletions eventbus/nats/nats.go
@@ -1,7 +1,7 @@
package nats

import (
"cqrs"
"eventhus"
"encoding/json"
"strings"

Expand Down Expand Up @@ -29,7 +29,7 @@ func NewClient(urls string, useTLS bool) (*Client, error) {
}

//Publish a event
func (c *Client) Publish(event cqrs.Event, bucket, subset string) error {
func (c *Client) Publish(event eventhus.Event, bucket, subset string) error {
nc, err := c.Options.Connect()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions eventbus/rabbitmq/rabbitmq.go
@@ -1,7 +1,7 @@
package rabbitmq

import (
"cqrs"
"eventhus"
"encoding/json"
"fmt"

Expand All @@ -22,7 +22,7 @@ func NewClient(username, password, host string, port int) (*Client, error) {
}

//Publish a event
func (c *Client) Publish(event cqrs.Event, bucket, subset string) error {
func (c *Client) Publish(event eventhus.Event, bucket, subset string) error {
ch, err := c.conn.Channel()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion eventstore.go
@@ -1,4 +1,4 @@
package cqrs
package eventhus

//EventStore saves the events from an aggregate
type EventStore interface {
Expand Down
22 changes: 11 additions & 11 deletions eventstore/mongo/mongo.go
@@ -1,7 +1,7 @@
package mongo

import (
"cqrs"
"eventhus"
"fmt"
"time"

Expand Down Expand Up @@ -34,7 +34,7 @@ type Client struct {
}

//NewClient generates a new client to access to mongodb
func NewClient(host string, port int, db string) (cqrs.EventStore, error) {
func NewClient(host string, port int, db string) (eventhus.EventStore, error) {
session, err := mgo.Dial(fmt.Sprintf("%s:%d", host, port))
if err != nil {
return nil, err
Expand All @@ -51,7 +51,7 @@ func NewClient(host string, port int, db string) (cqrs.EventStore, error) {
return cli, nil
}

func (c *Client) save(events []cqrs.Event, version int, safe bool) error {
func (c *Client) save(events []eventhus.Event, version int, safe bool) error {
if len(events) == 0 {
return nil
}
Expand Down Expand Up @@ -119,18 +119,18 @@ func (c *Client) save(events []cqrs.Event, version int, safe bool) error {
}

//SafeSave store the events without check the current version
func (c *Client) SafeSave(events []cqrs.Event, version int) error {
func (c *Client) SafeSave(events []eventhus.Event, version int) error {
return c.save(events, version, true)
}

//Save the events ensuring the current version
func (c *Client) Save(events []cqrs.Event, version int) error {
func (c *Client) Save(events []eventhus.Event, version int) error {
return c.save(events, version, false)
}

//Load the stored events for an AggregateID
func (c *Client) Load(aggregateID string) ([]cqrs.Event, error) {
var events []cqrs.Event
func (c *Client) Load(aggregateID string) ([]eventhus.Event, error) {
var events []eventhus.Event

sess := c.session.Copy()
defer sess.Close()
Expand All @@ -143,8 +143,8 @@ func (c *Client) Load(aggregateID string) ([]cqrs.Event, error) {
return events, err
}

events = make([]cqrs.Event, len(aggregate.Events))
register := cqrs.NewEventRegister()
events = make([]eventhus.Event, len(aggregate.Events))
register := eventhus.NewEventRegister()

for i, dbEvent := range aggregate.Events {
// Create an event of the correct type.
Expand All @@ -162,8 +162,8 @@ func (c *Client) Load(aggregateID string) ([]cqrs.Event, error) {
dbEvent.data = dataType
dbEvent.RawData = bson.Raw{}

// Translate dbEvent to cqrs.Event
events[i] = cqrs.Event{
// Translate dbEvent to eventhus.Event
events[i] = eventhus.Event{
AggregateID: aggregateID,
AggregateType: dbEvent.AggregateType,
Version: dbEvent.Version,
Expand Down
12 changes: 6 additions & 6 deletions eventstore/mongo/mongo_test.go
@@ -1,7 +1,7 @@
package mongo

import (
"cqrs"
"eventhus"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -42,22 +42,22 @@ func TestClientSave(t *testing.T) {
entropy := rand.New(rand.NewSource(ta.UnixNano()))
aid := ulid.MustNew(ulid.Timestamp(ta), entropy)

events := []cqrs.Event{
cqrs.Event{
events := []eventhus.Event{
eventhus.Event{
AggregateID: aid.String(),
AggregateType: "order",
Version: 1,
Type: "cqrs.SubEvent2",
Type: "eventhus.SubEvent2",
Data: SubEvent2{
Name: "muñeca",
SKU: "123",
},
},
cqrs.Event{
eventhus.Event{
AggregateID: aid.String(),
AggregateType: "order",
Version: 1,
Type: "cqrs.SubEvent2",
Type: "eventhus.SubEvent2",
Data: SubEvent2{
Name: "muñeca",
SKU: "123",
Expand Down
12 changes: 6 additions & 6 deletions examples/bank/account.go
@@ -1,7 +1,7 @@
package bank

import (
"cqrs"
"eventhus"
"errors"
)

Expand All @@ -10,20 +10,20 @@ var ErrBalanceOut = errors.New("balance out")

//Account of bank
type Account struct {
cqrs.BaseAggregate
eventhus.BaseAggregate
Owner string
Balance int
}

//LoadsFromHistory restore the account to last status
func (a *Account) LoadsFromHistory(events []cqrs.Event) {
func (a *Account) LoadsFromHistory(events []eventhus.Event) {
for _, event := range events {
a.BaseAggregate.ApplyChange(a, event, false)
}
}

//ApplyChange to account
func (a *Account) ApplyChange(event cqrs.Event, commit bool) {
func (a *Account) ApplyChange(event eventhus.Event, commit bool) {
switch e := event.Data.(type) {
case *AccountCreated:
a.Owner = e.Owner
Expand All @@ -36,8 +36,8 @@ func (a *Account) ApplyChange(event cqrs.Event, commit bool) {
}

//Handle a command
func (a *Account) Handle(command cqrs.Command) error {
event := cqrs.Event{
func (a *Account) Handle(command eventhus.Command) error {
event := eventhus.Event{
AggregateID: a.ID,
AggregateType: "Account",
}
Expand Down
18 changes: 9 additions & 9 deletions examples/bank/cmd/main/config.go
@@ -1,17 +1,17 @@
package main

import (
"cqrs"
"cqrs/commandbus"
"cqrs/eventbus/nats"
"cqrs/eventstore/mongo"
"cqrs/examples/bank"
"eventhus"
"eventhus/commandbus"
"eventhus/eventbus/nats"
"eventhus/eventstore/mongo"
"eventhus/examples/bank"
"log"
)

func config() (cqrs.CommandBus, error) {
func config() (eventhus.CommandBus, error) {
//register events
reg := cqrs.NewEventRegister()
reg := eventhus.NewEventRegister()
reg.Set(bank.AccountCreated{})
reg.Set(bank.DepositPerformed{})
reg.Set(bank.WithdrawalPerformed{})
Expand All @@ -32,10 +32,10 @@ func config() (cqrs.CommandBus, error) {
}

//repository
repository := cqrs.NewRepository(eventstore, nat)
repository := eventhus.NewRepository(eventstore, nat)

//handlers
commandHandler := cqrs.NewCommandHandler()
commandHandler := eventhus.NewCommandHandler()
accountHandler := bank.NewCommandHandler(repository)

//add commands to commandhandler
Expand Down
4 changes: 2 additions & 2 deletions examples/bank/cmd/main/main.go
@@ -1,8 +1,8 @@
package main

import (
"cqrs/examples/bank"
"cqrs/utils"
"eventhus/examples/bank"
"eventhus/utils"
"os"
"time"
)
Expand Down
2 changes: 1 addition & 1 deletion examples/bank/cmd/nats/nats.go
Expand Up @@ -3,7 +3,7 @@ package main
import (
"log"

n "cqrs/eventbus/nats"
n "eventhus/eventbus/nats"

"github.com/nats-io/go-nats"
)
Expand Down
10 changes: 5 additions & 5 deletions examples/bank/commands.go
@@ -1,27 +1,27 @@
package bank

import "cqrs"
import "eventhus"

//CreateAccount assigned to an owner
type CreateAccount struct {
cqrs.BaseCommand
eventhus.BaseCommand
Owner string
}

//PerformDeposit to a given account
type PerformDeposit struct {
cqrs.BaseCommand
eventhus.BaseCommand
Ammount int
}

//ChangeOwner of an account
type ChangeOwner struct {
cqrs.BaseCommand
eventhus.BaseCommand
Owner string
}

//PerformWithdrawal to a given account
type PerformWithdrawal struct {
cqrs.BaseCommand
eventhus.BaseCommand
Ammount int
}

0 comments on commit 0be405c

Please sign in to comment.