Skip to content

Commit

Permalink
add amqp
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Oct 26, 2022
1 parent f74b4a5 commit 8ed9baa
Show file tree
Hide file tree
Showing 34 changed files with 2,631 additions and 184 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ run-product:
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/product
.PHONY: run-product

run-counter:
cd cmd/counter && go mod tidy && go mod download && \
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/counter
.PHONY: run-counter

run-barista:
cd cmd/counter && go mod tidy && go mod download && \
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/barista
.PHONY: run-barista

run-proxy:
cd cmd/proxy && go mod tidy && go mod download && \
CGO_ENABLED=0 go run -tags migrate github.com/thangchung/go-coffeeshop/cmd/proxy
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

The coffeeshop with golang stack

## Services

No. | Service | URI
--- | --- | ---
1 | grpc-gateway | [http://localhost:5000](http://localhost:5000)
2 | product service | [http://localhost:5001](http://localhost:5001)
3 | counter service | [http://localhost:5002](http://localhost:5002)
4 | barista service | [http://localhost:5003](http://localhost:5003)
5 | kitchen service | [http://localhost:5004](http://localhost:5004)

## Package

```go
Expand Down
6 changes: 3 additions & 3 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
version: v1
plugins:
- remote: buf.build/protocolbuffers/plugins/go:v1.28.0-1
out: proto
out: proto/gen/
opt: paths=source_relative
- remote: buf.build/grpc/plugins/go:v1.2.0-1
out: proto
out: proto/gen/
opt: paths=source_relative,require_unimplemented_servers=false
- remote: buf.build/grpc-ecosystem/plugins/grpc-gateway:v2.7.2-1
out: proto
out: proto/gen/
opt: paths=source_relative
- remote: buf.build/grpc-ecosystem/plugins/openapiv2:v2.7.2-1
out: third_party/OpenAPI
4 changes: 4 additions & 0 deletions client.http
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ content-type: application/json

###
GET {{host}}/v1/api/items-by-types/1,2,3 HTTP/1.1
content-type: application/json

###
GET {{host}}/v1/fulfillment-orders HTTP/1.1
content-type: application/json
114 changes: 114 additions & 0 deletions cmd/barista/event/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package event

import (
"encoding/json"
"fmt"
"log"

amqp "github.com/rabbitmq/amqp091-go"
)

const ExchangeName = "orders_topic"

type Consumer struct {
conn *amqp.Connection
}

func NewConsumer(conn *amqp.Connection) (Consumer, error) {
consumer := Consumer{
conn: conn,
}

err := consumer.setup()
if err != nil {
return Consumer{}, err
}

return consumer, nil
}

func (c *Consumer) setup() error {
channel, err := c.conn.Channel()
if err != nil {
return err
}

return channel.ExchangeDeclare(
ExchangeName, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}

type Payload struct {
Name string `json:"name"`
}

func (c *Consumer) Listen(topics []string) error {
ch, err := c.conn.Channel()
if err != nil {
return err
}
defer ch.Close()

q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}

for _, s := range topics {
err = ch.QueueBind(
q.Name,
s,
ExchangeName,
false,
nil,
)

if err != nil {
log.Println(err)

return err
}
}

messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return err
}

forever := make(chan bool)

go func() {
for d := range messages {
var payload Payload

_ = json.Unmarshal(d.Body, &payload)

go func() {
switch payload.Name {
case "drink_made":
fmt.Println("Got it")
default:
fmt.Println("default")
}
}()
}
}()

log.Printf("[*] Waiting for message [Exchange, Queue][%s, %s].", ExchangeName, q.Name)
<-forever

return nil
}
75 changes: 75 additions & 0 deletions cmd/barista/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"fmt"
"log"
"math"
"os"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/thangchung/go-coffeeshop/cmd/barista/event"
)

const (
RetryTimes = 5
PowOf = 2
)

func main() {
rabbitConn, err := connect()
if err != nil {
log.Println(err)
os.Exit(1)
}

defer rabbitConn.Close()

log.Println("Listening for and consuming RabbitMQ messages...")

consumer, err := event.NewConsumer(rabbitConn)
if err != nil {
panic(err)
}

err = consumer.Listen([]string{"log.INFO", "log.WARNING", "log.ERROR"})
if err != nil {
log.Println(err)
}
}

func connect() (*amqp.Connection, error) {
var (
counts int64
backOff = 1 * time.Second
connection *amqp.Connection
rabbitURL = "amqp://guest:guest@172.28.177.17:5672/"
)

for {
c, err := amqp.Dial(rabbitURL)
if err != nil {
fmt.Println("RabbitMQ not yet ready...")
counts++
} else {
connection = c
fmt.Println()

break
}

if counts > RetryTimes {
fmt.Println(err)

return nil, err
}

fmt.Printf("Backing off for %d seconds...\n", int(math.Pow(float64(counts), PowOf)))
backOff = time.Duration(math.Pow(float64(counts), PowOf)) * time.Second
time.Sleep(backOff)

continue
}

return connection, nil
}
11 changes: 11 additions & 0 deletions cmd/counter/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
app:
name: 'counter-service'
version: '1.0.0'

http:
host: '0.0.0.0'
port: 5002

logger:
log_level: 'debug'
rollbar_env: 'counter-service'
42 changes: 42 additions & 0 deletions cmd/counter/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package config

import (
"fmt"
"log"
"os"

"github.com/ilyakaznacheev/cleanenv"
configs "github.com/thangchung/go-coffeeshop/pkg/config"
)

type (
Config struct {
configs.App `yaml:"app"`
configs.HTTP `yaml:"http"`
configs.Log `yaml:"logger"`
}
)

func NewConfig() (*Config, error) {
cfg := &Config{}

dir, err := os.Getwd()
if err != nil {
log.Fatal(err)
}

// debug
fmt.Println(dir)

err = cleanenv.ReadConfig(dir+"/config.yml", cfg)
if err != nil {
return nil, fmt.Errorf("config error: %w", err)
}

err = cleanenv.ReadEnv(cfg)
if err != nil {
return nil, err
}

return cfg, nil
}
30 changes: 30 additions & 0 deletions cmd/counter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"context"
"fmt"
"os"
"reflect"

"github.com/golang/glog"
"github.com/thangchung/go-coffeeshop/cmd/counter/config"
"github.com/thangchung/go-coffeeshop/internal/counter/app"
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
)

func main() {
cfg, err := config.NewConfig()
if err != nil {
glog.Fatal(err)
}

fmt.Println(reflect.TypeOf(struct{}{}))

mylog := mylogger.New(cfg.Level)

a := app.New(mylog, cfg)
if err = a.Run(context.Background()); err != nil {
glog.Fatal(err)
os.Exit(1)
}
}
21 changes: 4 additions & 17 deletions cmd/product/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,14 @@ import (
"os"

"github.com/ilyakaznacheev/cleanenv"
configs "github.com/thangchung/go-coffeeshop/pkg/config"
)

type (
Config struct {
App `yaml:"app"`
HTTP `yaml:"http"`
Log `yaml:"logger"`
}

App struct {
Name string `env-required:"true" yaml:"name" env:"APP_NAME"`
Version string `env-required:"true" yaml:"version" env:"APP_VERSION"`
}

HTTP struct {
Host string `env-required:"true" yaml:"host" env:"HTTP_HOST"`
Port int `env-required:"true" yaml:"port" env:"HTTP_PORT"`
}

Log struct {
Level string `env-required:"true" yaml:"log_level" env:"LOG_LEVEL"`
configs.App `yaml:"app"`
configs.HTTP `yaml:"http"`
configs.Log `yaml:"logger"`
}
)

Expand Down
4 changes: 4 additions & 0 deletions cmd/product/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"fmt"
"os"
"reflect"

"github.com/golang/glog"
"github.com/thangchung/go-coffeeshop/cmd/product/config"
Expand All @@ -16,6 +18,8 @@ func main() {
glog.Fatal(err)
}

fmt.Println(reflect.TypeOf(struct{}{}))

mylog := mylogger.New(cfg.Level)

a := app.New(mylog, cfg)
Expand Down
2 changes: 2 additions & 0 deletions cmd/proxy/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ http:
grpc:
product_host: '0.0.0.0'
product_port: 5001
counter_host: '0.0.0.0'
counter_port: 5002

logger:
log_level: 'debug'
Expand Down
Loading

0 comments on commit 8ed9baa

Please sign in to comment.