Skip to content

Commit

Permalink
refactor namespace #2 #3
Browse files Browse the repository at this point in the history
  • Loading branch information
thangchung committed Nov 2, 2022
1 parent ad655d4 commit 311b5b6
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 126 deletions.
6 changes: 4 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// new files getting created as root, but you may need to update the USER_UID
// and USER_GID in .devcontainer/Dockerfile to match your user if not 1000.
"-u",
"root",
"vscode",
// Mount go mod cache
"-v",
"coffeeshop-gomodcache:/go/pkg",
Expand Down Expand Up @@ -96,7 +96,9 @@
"bungcip.better-toml",
"eamodio.gitlens",
"casualjim.gotemplate",
"davidanson.vscode-markdownlint"
"davidanson.vscode-markdownlint",
"cweijan.vscode-mysql-client2",
"bierner.markdown-mermaid"
],
"postCreateCommand": "go version",
"features": {
Expand Down
88 changes: 30 additions & 58 deletions internal/counter/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,20 @@ package app

import (
"context"
"errors"
"net"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/thangchung/go-coffeeshop/cmd/counter/config"
counterGrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc"
"github.com/thangchung/go-coffeeshop/internal/counter/domain"
mygrpc "github.com/thangchung/go-coffeeshop/internal/counter/grpc"
"github.com/thangchung/go-coffeeshop/internal/counter/usecase"
"github.com/thangchung/go-coffeeshop/internal/counter/usecase/repo"
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
"github.com/thangchung/go-coffeeshop/pkg/postgres"
"github.com/thangchung/go-coffeeshop/pkg/rabbitmq"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
OrderTopic = "orders_topic"
RetryTimes = 5
BackOffSeconds = 2
)

var ErrCannotConnectRabbitMQ = errors.New("cannot connect to rabbit")

type App struct {
logger *mylogger.Logger
cfg *config.Config
Expand All @@ -43,26 +35,41 @@ func New(log *mylogger.Logger, cfg *config.Config) *App {
func (a *App) Run(ctx context.Context) error {
a.logger.Info("Init %s %s\n", a.cfg.Name, a.cfg.Version)

// Repository
pg, err := postgres.NewPostgres(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax))
// PostgresDB
pg, err := postgres.NewPostgresDB(a.cfg.PG.URL, postgres.MaxPoolSize(a.cfg.PG.PoolMax))
if err != nil {
a.logger.Fatal("app - Run - postgres.NewPostgres: %w", err)
a.logger.Fatal("app - Run - postgres.NewPostgres: %s", err.Error())

return err
}
defer pg.Close()

// Use case
queryOrderFulfillmentUseCase := usecase.NewQueryOrderFulfillmentUseCase(repo.NewQueryOrderFulfillmentRepo(pg))

// RabbitMQ
amqpConn, err := a.connectToRabbit()
amqpConn, err := rabbitmq.NewRabbitMQConn(a.cfg.RabbitMQ.URL, a.logger)
if err != nil {
a.logger.Fatal("app - Run - rabbitmq.NewRabbitMQConn: %s", err.Error())

return err
}
defer amqpConn.Close()

// gRPC Client
conn, err := grpc.Dial("0.0.0.0:5001", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer conn.Close()

var productServiceClient domain.ProductServiceClient = mygrpc.NewProductServiceClient(ctx, conn)

// Use case
queryOrderFulfillmentUseCase := usecase.NewQueryOrderFulfillmentUseCase(ctx, repo.NewQueryOrderFulfillmentRepo(ctx, pg))

// gRPC Server
l, err := net.Listen(a.network, a.address)
if err != nil {
a.logger.Fatal("app - Run - net.Listener: %s", err.Error())

return err
}

Expand All @@ -72,50 +79,15 @@ func (a *App) Run(ctx context.Context) error {
}
}()

s := grpc.NewServer()
counterGrpc.NewCounterServiceServerGrpc(s, amqpConn, queryOrderFulfillmentUseCase, a.logger)
server := grpc.NewServer()
mygrpc.NewCounterServiceServerGrpc(server, amqpConn, queryOrderFulfillmentUseCase, productServiceClient, a.logger)

go func() {
defer s.GracefulStop()
defer server.GracefulStop()
<-ctx.Done()
}()

a.logger.Info("Start server at " + a.address + " ...")

return s.Serve(l)
}

func (a *App) connectToRabbit() (*amqp.Connection, error) {
var (
amqpConn *amqp.Connection
counts int64
rabbitMqURL = a.cfg.RabbitMQ.URL
)

for {
connection, err := amqp.Dial(rabbitMqURL)
if err != nil {
a.logger.Error("RabbitMq at %s not ready...\n", rabbitMqURL)
counts++
} else {
amqpConn = connection

break
}

if counts > RetryTimes {
a.logger.LogError(err)

return nil, ErrCannotConnectRabbitMQ
}

a.logger.Info("Backing off for 2 seconds...")
time.Sleep(BackOffSeconds * time.Second)

continue
}

a.logger.Info("Connected to RabbitMQ!")

return amqpConn, nil
return server.Serve(l)
}
19 changes: 19 additions & 0 deletions internal/counter/domain/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package domain

import (
gen "github.com/thangchung/go-coffeeshop/proto/gen"
)

type (
QueryOrderFulfillmentUseCase interface {
GetListOrderFulfillment() ([]gen.OrderDto, error)
}

QueryOrderFulfillmentRepo interface {
GetListOrderFulfillment() ([]gen.OrderDto, error)
}

ProductServiceClient interface {
GetItemsByType(*gen.PlaceOrderRequest, bool) (*gen.GetItemsByTypeResponse, error)
}
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package entity
package domain

import (
"github.com/google/uuid"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
package entity
package domain

import (
"context"
"fmt"
"strings"
"time"

"github.com/google/uuid"
"github.com/samber/lo"
gen "github.com/thangchung/go-coffeeshop/proto/gen"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Order struct {
Expand All @@ -32,34 +25,19 @@ func NewOrder(orderSource gen.OrderSource, loyaltyMemberID uuid.UUID, orderStatu
}
}

func CreateOrderFrom(request *gen.PlaceOrderRequest) (*Order, error) {
func CreateOrderFrom(request *gen.PlaceOrderRequest, productServiceClient ProductServiceClient) (*Order, error) {
loyaltyMemberID, err := uuid.Parse(request.LoyaltyMemberId)
if err != nil {
return nil, err
}

order := NewOrder(request.OrderSource, loyaltyMemberID, gen.Status_IN_PROGRESS, request.Location)

//TODO: remove hard code URL
conn, err := grpc.Dial("0.0.0.0:5001", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
defer conn.Close()
c := gen.NewProductServiceClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

numberOfBaristaItems := len(request.BaristaItems) > 0
numberOfKitchenItems := len(request.KitchenItems) > 0

if numberOfBaristaItems {
itemTypes := lo.Reduce(request.BaristaItems, func(agg string, item *gen.CommandItem, _ int) string {
return fmt.Sprintf("%s,%s", agg, item.ItemType)
}, "")

itemTypesRes, err := c.GetItemsByType(ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")})
itemTypesRes, err := productServiceClient.GetItemsByType(request, true)
if err != nil {
return nil, err
}
Expand All @@ -81,11 +59,7 @@ func CreateOrderFrom(request *gen.PlaceOrderRequest) (*Order, error) {
}

if numberOfKitchenItems {
itemTypes := lo.Reduce(request.KitchenItems, func(agg string, item *gen.CommandItem, _ int) string {
return fmt.Sprintf("%s,%s", agg, item.ItemType)
}, "")

itemTypesRes, err := c.GetItemsByType(ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")})
itemTypesRes, err := productServiceClient.GetItemsByType(request, false)
if err != nil {
return nil, err
}
Expand Down
20 changes: 13 additions & 7 deletions internal/counter/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (

"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/thangchung/go-coffeeshop/internal/counter/entity"
"github.com/thangchung/go-coffeeshop/internal/counter/usecase"
"github.com/thangchung/go-coffeeshop/internal/counter/domain"
events "github.com/thangchung/go-coffeeshop/pkg/event"
mylogger "github.com/thangchung/go-coffeeshop/pkg/logger"
gen "github.com/thangchung/go-coffeeshop/proto/gen"
Expand All @@ -23,9 +22,15 @@ const (
func NewCounterServiceServerGrpc(
grpcServer *grpc.Server,
amqpConn *amqp.Connection,
queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase,
queryOrderFulfillmentUseCase domain.QueryOrderFulfillmentUseCase,
productServiceClient domain.ProductServiceClient,
log *mylogger.Logger) {
svc := CounterServiceServerImpl{logger: log, amqpConn: amqpConn, queryOrderFulfillmentUseCase: queryOrderFulfillmentUseCase}
svc := CounterServiceServerImpl{
logger: log,
amqpConn: amqpConn,
queryOrderFulfillmentUseCase: queryOrderFulfillmentUseCase,
productServiceClient: productServiceClient,
}

gen.RegisterCounterServiceServer(grpcServer, &svc)

Expand All @@ -36,15 +41,16 @@ type CounterServiceServerImpl struct {
gen.UnimplementedCounterServiceServer
logger *mylogger.Logger
amqpConn *amqp.Connection
queryOrderFulfillmentUseCase usecase.QueryOrderFulfillmentUseCase
productServiceClient domain.ProductServiceClient
queryOrderFulfillmentUseCase domain.QueryOrderFulfillmentUseCase
}

func (g *CounterServiceServerImpl) GetListOrderFulfillment(ctx context.Context, request *gen.GetListOrderFulfillmentRequest) (*gen.GetListOrderFulfillmentResponse, error) {
g.logger.Info("GET: GetListOrderFulfillment")

res := gen.GetListOrderFulfillmentResponse{}

entities, err := g.queryOrderFulfillmentUseCase.GetListOrderFulfillment(ctx)
entities, err := g.queryOrderFulfillmentUseCase.GetListOrderFulfillment()
if err != nil {
return nil, fmt.Errorf("CounterServiceServerImpl - GetListOrderFulfillment - g.queryOrderFulfillmentUseCase.GetListOrderFulfillment: %w", err)
}
Expand All @@ -64,7 +70,7 @@ func (g *CounterServiceServerImpl) PlaceOrder(ctx context.Context, request *gen.
g.logger.Debug("request: %s", request)

// add order
order, err := entity.CreateOrderFrom(request)
order, err := domain.CreateOrderFrom(request, g.productServiceClient)
if err != nil {
return nil, err
}
Expand Down
40 changes: 40 additions & 0 deletions internal/counter/grpc/product_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package grpc

import (
"context"
"fmt"
"strings"

"github.com/samber/lo"
gen "github.com/thangchung/go-coffeeshop/proto/gen"
"google.golang.org/grpc"
)

type ProductServiceClient struct {
ctx context.Context
conn *grpc.ClientConn
}

func NewProductServiceClient(ctx context.Context, conn *grpc.ClientConn) *ProductServiceClient {
return &ProductServiceClient{
ctx: ctx,
conn: conn,
}
}

func (p *ProductServiceClient) GetItemsByType(request *gen.PlaceOrderRequest, isBarista bool) (*gen.GetItemsByTypeResponse, error) {
c := gen.NewProductServiceClient(p.conn)

itemTypes := ""
if isBarista {
itemTypes = lo.Reduce(request.BaristaItems, func(agg string, item *gen.CommandItem, _ int) string {
return fmt.Sprintf("%s,%s", agg, item.ItemType)
}, "")
} else {
itemTypes = lo.Reduce(request.KitchenItems, func(agg string, item *gen.CommandItem, _ int) string {
return fmt.Sprintf("%s,%s", agg, item.ItemType)
}, "")
}

return c.GetItemsByType(p.ctx, &gen.GetItemsByTypeRequest{ItemTypes: strings.TrimLeft(itemTypes, ",")})
}
17 changes: 0 additions & 17 deletions internal/counter/usecase/interfaces.go

This file was deleted.

11 changes: 7 additions & 4 deletions internal/counter/usecase/order_fulfillment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ import (
"context"
"fmt"

"github.com/thangchung/go-coffeeshop/internal/counter/domain"
gen "github.com/thangchung/go-coffeeshop/proto/gen"
)

type DefaultQueryOrderFulfillmentUseCase struct {
repo QueryOrderFulfillmentRepo
ctx context.Context
repo domain.QueryOrderFulfillmentRepo
}

func NewQueryOrderFulfillmentUseCase(r QueryOrderFulfillmentRepo) *DefaultQueryOrderFulfillmentUseCase {
func NewQueryOrderFulfillmentUseCase(ctx context.Context, r domain.QueryOrderFulfillmentRepo) *DefaultQueryOrderFulfillmentUseCase {
return &DefaultQueryOrderFulfillmentUseCase{
ctx: ctx,
repo: r,
}
}

func (d DefaultQueryOrderFulfillmentUseCase) GetListOrderFulfillment(ctx context.Context) ([]gen.OrderDto, error) {
entities, err := d.repo.GetListOrderFulfillment(ctx)
func (d DefaultQueryOrderFulfillmentUseCase) GetListOrderFulfillment() ([]gen.OrderDto, error) {
entities, err := d.repo.GetListOrderFulfillment()
if err != nil {
return nil, fmt.Errorf("NewQueryOrderFulfillmentUseCase - GetListOrderFulfillment - s.repo.GetListOrderFulfillment: %w", err)
}
Expand Down
Loading

0 comments on commit 311b5b6

Please sign in to comment.