-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/event distribution #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR introduces WAL-backed event persistence, protobuf-based event serialization, and gRPC streaming infrastructure to the matching engine, adds a new WebSocket server for event subscription, integrates Kafka for event emission, and restructures symbol configuration from string lists to typed configuration structs. The API gateway's cancel order endpoint is updated from POST to DELETE. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as WebSocket Client
participant WSGateway as WebSocket Gateway
participant MatchingEngine as Matching Engine (gRPC)
participant ActorRegistry as Actor Registry
participant WAL as Write-Ahead Log
participant Kafka as Kafka Broker
Client->>WSGateway: Connect + Subscribe(Symbol)
WSGateway->>MatchingEngine: SubscribeSymbol(symbol, gateway_id)
MatchingEngine->>ActorRegistry: Lookup actor
ActorRegistry->>WAL: ReplayWal(0)
WAL-->>ActorRegistry: Replay entries
ActorRegistry->>MatchingEngine: Return gRPC stream
MatchingEngine-->>WSGateway: Stream established
Note over MatchingEngine: Order arrives
MatchingEngine->>ActorRegistry: AddOrderInternal()
ActorRegistry->>WAL: WriteEntry(event)
WAL-->>ActorRegistry: Offset confirmed
ActorRegistry->>Kafka: EmitToKafka(event)
Kafka-->>ActorRegistry: Sent
ActorRegistry->>MatchingEngine: Emit via gRPC stream
MatchingEngine-->>WSGateway: EngineEvent (protobuf)
WSGateway->>WSGateway: Unmarshal + Broadcast
WSGateway-->>Client: JSON event payload
Client->>WSGateway: Unsubscribe
WSGateway->>MatchingEngine: Stream.Context cancellation
MatchingEngine->>ActorRegistry: Remove stream
sequenceDiagram
participant ActorRegistry as Actor Registry
participant KafkaWorker as Kafka Producer Worker
participant WAL as Write-Ahead Log
participant Kafka as Kafka Broker
participant Checkpoint as Checkpoint File
loop Periodic (Emit Interval)
KafkaWorker->>Checkpoint: loadCheckpoint()
Checkpoint-->>KafkaWorker: Last offset
KafkaWorker->>WAL: ReadFromTo(lastOffset, latest)
WAL-->>KafkaWorker: Batch of WAL_Entry
KafkaWorker->>Kafka: SendMessages(batch)
Kafka-->>KafkaWorker: Success
KafkaWorker->>Checkpoint: saveCheckpoint(newOffset)
Checkpoint-->>KafkaWorker: Fsynced
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Buf (1.63.0)packages/proto-defs/proto/common/engine_event_log.protoFailure: no .proto files were targeted. This can occur if no .proto files are found in your input, --path points to files that do not exist, or --exclude-path excludes all files. packages/proto-defs/proto/engine/order_matching.protoFailure: no .proto files were targeted. This can occur if no .proto files are found in your input, --path points to files that do not exist, or --exclude-path excludes all files. packages/proto-defs/proto/common/order_types.protoFailure: no .proto files were targeted. This can occur if no .proto files are found in your input, --path points to files that do not exist, or --exclude-path excludes all files. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/api-gateway/src/routers/order.route.ts (1)
36-41: Fix validator to match DELETE request expectations or change HTTP method.The
CancelOrderValidatorrequiresbody.symbolas a mandatory field, but DELETE requests conventionally omit request bodies. Clients following REST conventions will fail validation. Either:
- Make
symboloptional in the validator schema, or- Change the route back to POST, or
- Document that DELETE requests must include a JSON body with
symbol.Currently, line 34 of
packages/validator/src/Order.tsdefinessymbol: z.string()without.optional(), causing the middleware to reject valid DELETE requests without a body.apps/matching-engine/internal/engine.go (1)
869-888: Reply sent before WAL persistence in CancelOrderMsg handler.At Line 876, the reply is sent to the client before events are written to WAL (Line 884). If WAL write fails or the system crashes after reply but before persistence, the client believes the cancel succeeded but the state is lost on recovery.
The same issue exists in
ModifyOrderMsghandler (Line 897 vs 905).🔧 Proposed fix - persist before replying
case CancelOrderMsg: response, events, err := a.engine.CancelOrderInternal(m.ID, m.UserID, m.Symbol) if err != nil { m.Err <- err continue } - m.Reply <- response for _, event := range events { data, err := proto.Marshal(event) if err != nil { m.Err <- err continue } if err := a.wal.writeEntry(data); err != nil { m.Err <- err continue } } + m.Reply <- response
🤖 Fix all issues with AI agents
In `@apps/matching-engine/internal/actor_registy.go`:
- Around line 142-149: The loop iterates actor.grpcStreams without holding
actor.mu, causing a data race; fix by acquiring the mutex before accessing the
slice (either Lock() for the whole find-and-remove or RLock() to locate index
then Lock() to remove) so all reads/writes to actor.grpcStreams are protected;
update the code around the loop that references actor.grpcStreams and actor.mu
(the for i, s := range actor.grpcStreams { ... } block) to lock before iterating
and unlock after the remove (or use RLock->Lock upgrade pattern) and ensure you
break/unlock correctly to avoid deadlocks.
In `@apps/matching-engine/internal/engine.go`:
- Line 1011: The code incorrectly uses order.Price when indexing
obs.PriceLevels; replace uses of order.Price with restingOrder.Price (the
resting book entry) wherever a price level lookup for the book-side is required
(e.g., the lookup currently using order.Price and the similar lookup later) so
that obs.PriceLevels is indexed by the restingOrder's Price rather than the
aggressor order's Price; update both occurrences to reference restingOrder.Price
to avoid using market/aggressor orders without a valid price.
- Around line 1049-1050: Handler code for ORDER_CANCELLED (and similarly for
ORDER_REDUCED and ORDER_FILLED) reads order := a.engine.AllOrders[event.OrderId]
and then uses order.PriceLevel without checking for nil, which can panic if the
order is missing; update each handler to first test the map lookup (e.g., order,
ok := a.engine.AllOrders[event.OrderId]) and bail out or log/skip the event when
ok is false instead of dereferencing a nil order, ensuring subsequent uses of
order.PriceLevel or other fields occur only when order is non-nil.
- Around line 1103-1108: The ORDER_FILLED case incorrectly unmarshals into
pb.OrderReducedEvent; change it to unmarshal into pb.OrderStatusEvent (the type
produced by EncodeOrderStatusEvent) so the handler for
pbTypes.EventType_ORDER_FILLED uses the correct protobuf type; update the
variable declaration and the proto.Unmarshal call in the switch branch handling
ORDER_FILLED to use pb.OrderStatusEvent instead of pb.OrderReducedEvent and
adjust any subsequent references to the event fields accordingly.
- Around line 805-810: The field grpcStreams is iterated in Run() without using
the declared mu RWMutex, causing a data race; update Run() to acquire mu.RLock()
before iterating grpcStreams and release it after, and ensure any functions that
append/remove entries from grpcStreams (the methods that add or remove server
streams) use mu.Lock()/mu.Unlock() when mutating the slice; reference the
grpcStreams field and the mu sync.RWMutex, and modify the Run(),
subscribe/unsubscribe (or equivalent add/remove stream) methods to use RLock for
reads and Lock for writes to eliminate the race.
- Around line 932-938: The switch handlers are unmarshalling from the wrong
source: replace calls like proto.Unmarshal(log.GetData(), &event) with
proto.Unmarshal(logData.Data, &event) so the payload is read from the
EngineEvent payload rather than the top-level log; update every case in the
switch on logData.EventType (e.g., where pb.OrderStatusEvent, other pb.* event
structs are decoded) to use logData.Data for proto.Unmarshal and keep the same
target structs (e.g., pb.OrderStatusEvent) and error handling.
- Around line 978-987: When replaying trade events in the matching engine,
AllOrders lookups for buyOrder and sellOrder (variables buyOrder, sellOrder) are
used without nil checks which can panic if an order is missing; update the code
that reads a.engine.AllOrders[event.BuyOrderId] and
a.engine.AllOrders[event.SellOrderId] to check for nil before proceeding, and if
either is nil handle it gracefully (e.g., log an error including the event IDs
and return/continue/skip applying that trade) instead of dereferencing; retain
the existing IsBuyerMaker branch (restingOrder/order assignment) but only
perform that assignment and subsequent logic after confirming both orders are
non-nil.
In `@apps/matching-engine/internal/kafka.go`:
- Around line 57-60: The checkpoint file is opened with os.OpenFile using the
O_APPEND flag which conflicts with saveCheckpoint's use of Seek(0,0) and
Truncate(0); remove the O_APPEND flag when opening the file (keep O_CREATE and
O_RDWR) so writes respect Seek/Truncate in saveCheckpoint and ensure the
os.OpenFile call that constructs filepath.Join(dirPath, symbol,
"checkpoint.meta") is updated accordingly.
In `@apps/matching-engine/internal/utils.go`:
- Around line 76-95: In EncodeTradeEvent, the pb.TradeEvent field BuyerId is
incorrectly populated from trade.BuyOrderID; update the mapping so BuyerId uses
trade.BuyerID (leave BuyOrderId mapped to trade.BuyOrderID and SellerId/other
fields unchanged) to correctly reflect the Trade struct's buyer identifier when
constructing the proto message.
In `@apps/matching-engine/internal/wal.go`:
- Around line 189-194: New segment files are created without the ".log"
extension causing OpenWAL's suffix-based discovery to miss them; when building
the new segment path in the WAL writer (the filepath.Join call that assigns
filePath using sw.dirPath and sw.currentSegmentIndex+1), append the ".log"
suffix (e.g., fmt.Sprintf("%v.log", ...)) so the created file and the
os.OpenFile call create a file with the expected ".log" extension; update any
related references in the WAL writer that compute file names to use the same
".log" format so OpenWAL's .log filtering finds the segment.
- Around line 146-151: The CRC currently appends only byte(sw.nextOffset) which
truncates the uint64 sequence number; update the CRC computation in the WAL
write path (where crc := crc32.ChecksumIEEE(append(data, byte(sw.nextOffset))))
to append the full 8-byte representation of sw.nextOffset (use a fixed
endianness, e.g., little-endian) before computing CRC, and make the analogous
change in unmarshalAndVerifyEntry so CRC verification uses the same 8-byte
encoding of the sequence number.
- Around line 379-382: The validation in SymbolWAL.ReadFromToLast is inverted:
it currently rejects any from > 0 (effectively only allowing from == 0). Since
from is a uint64, update the check to reject the zero value instead (e.g.,
change the condition to test from == 0 and return an "invalid from" error) or
remove the check entirely if all uint64 values are valid; modify the validation
in the ReadFromToLast function accordingly.
🟠 Major comments (16)
apps/websocket-server/cmd/websocket-server/main.go-12-13 (1)
12-13: Add error handling forConnectToEnginecall.The
ConnectToEnginemethod returns an error, but it's currently being ignored. If the connection fails, the server will start anyway and serve requests without a working backend connection, leading to confusing failures for clients.Proposed fix
func main() { wsg := internal.NewWSGateway() - wsg.ConnectToEngine("localhost:50052") + if err := wsg.ConnectToEngine("localhost:50052"); err != nil { + log.Fatalf("Failed to connect to engine: %v", err) + }apps/matching-engine/internal/kafka.go-50-55 (1)
50-55: Hardcoded Kafka broker addresses.Broker addresses should be configurable (e.g., from environment variables or config) rather than hardcoded. This will cause issues when deploying to different environments.
apps/websocket-server/internal/gateway.go-231-239 (1)
231-239: Stream not actually closed on unsubscribe.When there are no more subscribers, the stream is deleted from the map but not actually closed. This leaves the gRPC stream open and the
receiveFromSymbolStreamgoroutine running indefinitely.Consider calling a close/cancel method on the stream or using a cancellable context.
apps/matching-engine/internal/wal.go-340-341 (1)
340-341: Type mismatch: readinguint32but writingint32.
writeEntryToBufferwrites size asint32(line 164), butReadFromToreads it asuint32. This mismatch could cause incorrect size interpretation on some platforms.- var size uint32 + var size int32 if err := binary.Read(file, binary.LittleEndian, &size); err != nil {Apply the same fix in
ReadFromToLast(line 409).apps/matching-engine/internal/kafka.go-90-96 (1)
90-96: Silently ignored error fromReadFromTo.The error from
wal.ReadFromTois discarded. This could mask WAL corruption or I/O errors, leading to silent data loss or infinite loops if the WAL consistently fails.- events, _ := kpw.wal.ReadFromTo( + events, err := kpw.wal.ReadFromTo( startOffset+1, startOffset+uint64(kpw.batchSize), ) + if err != nil { + log.Println("WAL read failed:", err) + return + }apps/websocket-server/internal/gateway.go-304-328 (1)
304-328:writePumpexits on single unmarshal error.If one event fails to unmarshal (line 312-315), the entire
writePumpreturns, killing the WebSocket write loop for that user. This means one corrupted event disconnects the user. Consider logging and continuing instead.if err := proto.Unmarshal(event.Data, &unmarshalData); err != nil { log.Printf("failed to unmarshal order status event for user %s: %v", u.ID, err) - return + continue }Apply similar changes to other unmarshal error handlers in this switch (lines 333-336, 354-357, 374-377, 394-397).
apps/websocket-server/internal/gateway.go-141-146 (1)
141-146: Blocking send for order events could deadlock.Order events (lines 141-146) use a direct channel send without the non-blocking
selectpattern used for trades and depth events. If the user's channel buffer is full, this will block the broadcast loop.if user, exist := wsg.UsersByID[msg.UserId]; exist { - user.Channel <- &Event{ - EventType: msg.EventType, - Data: msg.Data, + select { + case user.Channel <- &Event{ + EventType: msg.EventType, + Data: msg.Data, + }: + default: + log.Printf("User %s buffer full for order event", user.ID) } }apps/matching-engine/internal/wal.go-333-337 (1)
333-337: Deferredfile.Close()inside loop accumulates open file handles.Each iteration defers
file.Close(), but defers don't execute until the function returns. This keeps all segment files open simultaneously until the function completes.file, err := os.Open(path) if err != nil { return nil, err } - defer file.Close() for { // ... read entries ... } + file.Close()Apply the same fix in
ReadFromToLast(lines 402-406).apps/websocket-server/internal/gateway.go-86-92 (1)
86-92: HardcodedGatewayId.The gateway ID is hardcoded as
"123". This should be configurable or auto-generated to uniquely identify each gateway instance.+// In WSGateway struct, add: +// GatewayID string req := &pb.SubscribeRequest{ Symbol: symbol, - GatewayId: "123", + GatewayId: wsg.GatewayID, Timestamp: time.Now().UnixNano(), }apps/matching-engine/internal/utils.go-12-39 (1)
12-39: UnusedstatusMessageparameter.The
statusMessageparameter is accepted but never assigned to theOrderStatusEvent. Either use it or remove it.-func EncodeOrderStatusEvent(order *Order, statusMessage *string) ([]byte, error) { +func EncodeOrderStatusEvent(order *Order, statusMessage *string) ([]byte, error) { eventByte, err := proto.Marshal(&pb.OrderStatusEvent{ OrderId: order.ClientOrderID, UserId: order.UserID, Symbol: order.Symbol, Status: order.Status, + StatusMessage: statusMessage, Side: order.Side, Type: order.Type,apps/matching-engine/internal/actor_registy.go-24-44 (1)
24-44: Race condition: actor stored in map after goroutines start.The actor is stored in
actors[sym.Name](line 43) after starting goroutines (lines 36-41). IfPlaceOrderor other functions are called before the map is populated, they'll fail with "unknown symbol". Move the map assignment before starting goroutines.+ actors[sym.Name] = actor + // 3. Start other workers owned by actor go actor.wal.keepSyncing() go actor.kafkaEmitter.Run() // 4. Start actor loop LAST go actor.Run() - - actors[sym.Name] = actorapps/matching-engine/internal/wal.go-447-449 (1)
447-449:panicon unmarshal failure is inappropriate.A corrupted WAL entry should not crash the entire application. Return an error instead.
entry := &pbTypes.WAL_Entry{} if err := proto.Unmarshal(data, entry); err != nil { - panic(fmt.Sprintf("unmarshal should never fail (%v)", err)) + return nil, fmt.Errorf("failed to unmarshal WAL entry: %w", err) }apps/matching-engine/internal/engine.go-554-559 (1)
554-559: Encoding error ignored here as well.Same issue as in
buildEvents- the error fromEncodeOrderStatusEventis discarded.apps/matching-engine/internal/engine.go-447-461 (1)
447-461: Errors from encoding functions are silently ignored.At Line 450 and Line 473, the errors from
EncodeOrderStatusEventandEncodeTradeEventare discarded. If encoding fails,datacould be nil or invalid, leading to corrupted events being published to WAL, Kafka, and gRPC streams.🔧 Proposed fix - propagate errors
func (me *MatchingEngine) buildEvents( order *Order, trades []Trade, -) []*pb.EngineEvent { +) ([]*pb.EngineEvent, error) { events := []*pb.EngineEvent{} - data, _ := EncodeOrderStatusEvent(order, StrPtr("")) + data, err := EncodeOrderStatusEvent(order, StrPtr("")) + if err != nil { + return nil, fmt.Errorf("encode order status: %w", err) + } // ---------- REJECT ---------- if order.Status == pbTypes.OrderStatus_REJECTED { - return []*pb.EngineEvent{ + return []*pb.EngineEvent{ { EventType: pbTypes.EventType_ORDER_REJECTED, Data: data, }, - } + }, nil }Similar changes needed for
EncodeTradeEventat Line 473 and callers ofbuildEvents.apps/matching-engine/internal/engine.go-315-315 (1)
315-315: Remove debug statement before production.This
fmt.Println("inside the loop start")appears to be debugging code that should be removed.🧹 Proposed fix
- fmt.Println("inside the loop start")apps/matching-engine/internal/engine.go-318-320 (1)
318-320: Self-trade prevention is disabled without a configuration option.The STP check at lines 318-320 is commented out and has no replacement. While the
Orderstruct includes theUserIDfield needed for STP, and the matching engine currently will execute trades between orders from the same user, there are no configuration flags or alternative mechanisms in place.For production use, implement a configurable STP mode (e.g.,
CancelNewest,CancelOldest,CancelBoth,None) rather than leaving this disabled. This is typically required for regulatory compliance on exchanges and prevents wash trading.
🟡 Minor comments (9)
apps/websocket-server/package.json-9-9 (1)
9-9: Clean script targets wrong directory.The
cleanscript removesdist, but the Makefile builds tobin/. This inconsistency meanscleanwon't remove built artifacts.Proposed fix
- "clean": "rm -rf dist" + "clean": "rm -rf bin"apps/websocket-server/makefile-9-10 (1)
9-10: Go version mismatch in generated go.mod.The post-install target writes
go 1.21to the proto-generated go.mod, but the websocket-server module usesgo 1.25.4. This inconsistency may cause compatibility issues.Proposed fix - use consistent Go version
- `@printf` "module github.com/sameerkrdev/nerve/packages/proto-defs/go/generated\n\ngo 1.21\n" > \ + `@printf` "module github.com/sameerkrdev/nerve/packages/proto-defs/go/generated\n\ngo 1.25.4\n" > \ $(ROOT_DIR)/packages/proto-defs/go/generated/go.modapps/websocket-server/cmd/websocket-server/main.go-16-16 (1)
16-16: Typo in handler method name:HandelWebsocket.The method name appears to be misspelled. It should likely be
HandleWebsocket.Proposed fix (if renaming in gateway.go)
- mux.HandleFunc("GET /api/v1/ws", wsg.HandelWebsocket) + mux.HandleFunc("GET /api/v1/ws", wsg.HandleWebsocket)apps/websocket-server/internal/gateway.go-132-132 (1)
132-132: Typo:boardcastToUsersshould bebroadcastToUsers.packages/proto-defs/proto/engine/order_matching.proto-103-104 (1)
103-104: Comment references incorrect message name.The comment says "SymbolEvent.data" but the actual message is
EngineEvent. Update for consistency.// ============= EVENT DATA STRUCTURES ============= -// These are encoded as JSON in SymbolEvent.data field +// These are encoded as protobuf in EngineEvent.data fieldapps/matching-engine/internal/utils.go-41-74 (1)
41-74: Typo in parameter name.
oldRemainingQuantiyshould beoldRemainingQuantity.-func EncodeOrderReducedEvent(order *Order, oldQuantity int64, oldRemainingQuantiy int64) ([]byte, error) { +func EncodeOrderReducedEvent(order *Order, oldQuantity int64, oldRemainingQuantity int64) ([]byte, error) {And update line 66:
- OldRemainingQuantity: oldRemainingQuantiy, + OldRemainingQuantity: oldRemainingQuantity,apps/matching-engine/internal/wal.go-293-293 (1)
293-293: Typo: "invaild" should be "invalid".- return nil, fmt.Errorf("invaild entry size %d", size) + return nil, fmt.Errorf("invalid entry size %d", size)apps/matching-engine/internal/actor_registy.go-27-29 (1)
27-29: Log message dumps entire symbols slice on failure.Line 28 logs the entire
symbolsslice instead of just the failing symbolsym. This makes debugging harder and could log sensitive configuration.- log.Fatalln("Failed to start actor", symbols, err) + log.Fatalln("Failed to start actor for symbol", sym.Name, err)apps/matching-engine/internal/engine.go-857-859 (1)
857-859: gRPC streamSenderrors are not handled.If a client disconnects or the stream fails,
stream.Send(event)will return an error that is silently ignored. This could lead to stale/dead streams accumulating ingrpcStreamsand repeated failed sends.🔧 Proposed fix
- for _, stream := range a.grpcStreams { - stream.Send(event) - } + a.mu.RLock() + for i, stream := range a.grpcStreams { + if err := stream.Send(event); err != nil { + // Mark stream for removal or log error + // Consider maintaining a separate cleanup mechanism + } + } + a.mu.RUnlock()
🧹 Nitpick comments (17)
apps/websocket-server/makefile (1)
6-21: Consider adding.PHONYdeclarations and missing targets.Makefile targets that don't produce files with matching names should be declared as
.PHONYto avoid conflicts with files and ensure they always run. Additionally, acleantarget would be useful.Proposed additions
APP_NAME := websocket-server CMD_PATH := ./cmd/websocket-server BUILD_DIR := bin ROOT_DIR := ../.. + +.PHONY: post-install dev build tidy clean post-install: `@echo` "➡ Generating go.mod for proto-generated code..." `@mkdir` -p $(ROOT_DIR)/packages/proto-defs/go/generated `@printf` "module github.com/sameerkrdev/nerve/packages/proto-defs/go/generated\n\ngo 1.21\n" > \ $(ROOT_DIR)/packages/proto-defs/go/generated/go.mod dev: air build: `@echo` "➡ Building $(APP_NAME)..." go build -o $(BUILD_DIR)/$(APP_NAME) $(CMD_PATH) tidy: `@echo` "➡ Tidying modules..." go mod tidy + +clean: + `@echo` "➡ Cleaning build artifacts..." + rm -rf $(BUILD_DIR)packages/proto-defs/proto/common/order_types.proto (2)
29-38: Consider addingEVENT_TYPE_UNSPECIFIED = 0per proto3 conventions.Proto3 best practices recommend that the first enum value be an "unspecified" or "unknown" sentinel with value 0, as this is the default for unset fields. Currently
ORDER_ACCEPTED = 0will be indistinguishable from an unset/default value.Proposed fix
enum EventType { - ORDER_ACCEPTED = 0; - ORDER_FILLED= 1; - ORDER_CANCELLED= 2; - ORDER_REJECTED= 3; - ORDER_REDUCED= 4; - TRADE_EXECUTED= 5; - DEPTH= 6; - TICKER= 7; + EVENT_TYPE_UNSPECIFIED = 0; + ORDER_ACCEPTED = 1; + ORDER_FILLED = 2; + ORDER_CANCELLED = 3; + ORDER_REJECTED = 4; + ORDER_REDUCED = 5; + TRADE_EXECUTED = 6; + DEPTH = 7; + TICKER = 8; }
31-37: Inconsistent whitespace formatting.Mixed tabs and spaces, and inconsistent spacing around
=. This won't affect functionality but harms readability.apps/matching-engine/wal/SOLUSD/checkpoint.meta (1)
1-1: Consider excluding checkpoint files from version control.This checkpoint metadata file appears to be runtime-generated data that tracks WAL processing state. Committing it to the repository could cause issues:
- Other developers cloning the repo will inherit stale checkpoint state
- Conflicts during merges when multiple environments generate different checkpoint values
Consider adding
apps/matching-engine/wal/or**/checkpoint.metato.gitignoreand remove this file from the repository.apps/websocket-server/cmd/websocket-server/main.go (1)
13-19: Consider externalizing configuration values.The engine address (
localhost:50052) and server port (:50053) are hardcoded. For flexibility across environments (development, staging, production), consider reading these from environment variables or a configuration file.apps/matching-engine/internal/server.go (1)
98-107: Simplify the return statement.The error check and return can be condensed since we're just returning the error (or nil) from the delegated call.
Proposed simplification
func (s *Server) SubscribeSymbol(req *pb.SubscribeRequest, stream pb.MatchingEngine_SubscribeSymbolServer) error { symbol := req.Symbol gatewayId := req.GatewayId - - if err := SubscribeSymbol(symbol, gatewayId, stream); err != nil { - return err - } - - return nil + return SubscribeSymbol(symbol, gatewayId, stream) }apps/matching-engine/cmd/matching-engine/main.go (1)
35-41: Consider externalizing symbol configuration.Symbol configuration is hardcoded with repeated values across all symbols. For maintainability and operational flexibility:
- Extract configuration to environment variables, config files, or a database
- Consider defining default values for common fields (
MaxWalFileSize,WalSyncInterval, etc.) to reduce duplicationThis would allow adding/modifying symbols without code changes and redeployment.
packages/proto-defs/proto/common/engine_event_log.proto (1)
7-14: Field 4 is skipped and naming conventions diverge from protobuf style guides.The observations are valid:
Field number 4 gap: Add
reserved 4;to document the gap and prevent accidental field reuse.Naming conventions: Protobuf style guides recommend:
WalEntryinstead ofWAL_Entry(PascalCase for message names)crcinstead ofCRC(lowercase snake_case for field names)However,
WAL_Entryis actively used throughout the codebase (wal.go, kafka.go, and generated Go bindings). Renaming would require coordinated updates across multiple files and is a breaking change to the generated Go API. Evaluate whether the style consistency benefit justifies the coordination effort.Proposed changes (if refactoring is approved)
-message WAL_Entry { +message WalEntry { uint64 sequence_number = 1; bytes data = 2; - uint32 CRC = 3; - + uint32 crc = 3; + reserved 4; // Optional field for checkpointing. optional bool is_checkpoint = 5; }packages/proto-defs/proto/engine/order_matching.proto (1)
158-164: Inconsistent field naming convention.
Orderon line 159 uses PascalCase while all other fields in this file use snake_case (e.g.,old_quantity). Protobuf convention is snake_case for field names.message OrderReducedEvent { - OrderStatusEvent Order = 1; + OrderStatusEvent order = 1; int64 old_quantity = 2; int64 new_quantity = 3; int64 old_remaining_quantity = 4; int64 new_remaining_quantity = 5; }apps/matching-engine/internal/kafka.go (1)
74-88: No graceful shutdown for the Kafka worker.The
ctxfield is set tocontext.Background()and never replaced with a cancellable context. TheRun()loop checksctx.Done()but it will never trigger. Consider passing a cancellable context from the caller or storing the cancel function.apps/websocket-server/internal/gateway.go (3)
51-53: Permissive CORS configuration allows all origins.
CheckOriginalways returnstrue, which allows WebSocket connections from any origin. This is a security concern for production. Consider validating against allowed origins.
67-74: Unreachable return statement afterlog.Fatalf.
log.Fatalfterminates the program, so thereturn nilon line 74 is never executed. Either remove the return or change to non-fatal error handling that returns the error.conn, err := grpc.NewClient(uri, opt...) if err != nil { - log.Fatalf("fail to dial: %v", err) + return fmt.Errorf("failed to connect to engine: %w", err) } wsg.engineConnection = conn return nil
109-128: Recursive reconnect can cause stack overflow.When a stream error occurs,
receiveFromSymbolStreamcallssubscribeToSymbol, which spawns a new goroutine callingreceiveFromSymbolStream. However, the current goroutine then returns, so it doesn't stack. The issue is thetime.Sleep(time.Second)without exponential backoff—rapid repeated failures will spam reconnection attempts.Consider implementing exponential backoff for reconnection attempts.
apps/matching-engine/internal/wal.go (1)
393-393: Excessive pre-allocation of 1,000,000 entries.Pre-allocating capacity for 1 million entries allocates significant memory upfront. Consider starting smaller and letting the slice grow dynamically.
- results := make([]*pbTypes.WAL_Entry, 0, 1_000_000) + results := make([]*pbTypes.WAL_Entry, 0, 1024)apps/matching-engine/internal/engine.go (3)
819-819: Typo in variable name:kakfaWoker.Minor typo that could cause confusion.
✏️ Proposed fix
- kakfaWoker, err := NewKafkaProducerWorker( + kafkaWorker, err := NewKafkaProducerWorker((And update Line 835 accordingly)
919-919: Function name typo:ReplyWalshould beReplayWAL.The function replays WAL entries, not replies to them.
-func (a *SymbolActor) ReplyWal(from uint64) error { +func (a *SymbolActor) ReplayWAL(from uint64) error {
1100-1101: TODO comment indicates incomplete implementation.The
ORDER_REJECTEDcase is empty with a TODO comment. Rejected orders may need replay handling depending on whether they affect state (e.g., sequence numbers, metrics).Would you like me to help implement the
ORDER_REJECTEDreplay handler, or is this intentionally a no-op since rejected orders don't affect book state?
|
|
||
| wal *SymbolWAL | ||
| kafkaEmitter *KafkaProducerWorker | ||
| grpcStreams []pb.MatchingEngine_SubscribeSymbolServer | ||
|
|
||
| mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race: grpcStreams accessed without synchronization.
The mu sync.RWMutex is declared but never used. In Run() (Line 857-858), grpcStreams is iterated without holding a lock, while streams may be added/removed concurrently from other goroutines.
🔒 Proposed fix
When iterating in Run():
+ a.mu.RLock()
+ streams := make([]pb.MatchingEngine_SubscribeSymbolServer, len(a.grpcStreams))
+ copy(streams, a.grpcStreams)
+ a.mu.RUnlock()
+
- for _, stream := range a.grpcStreams {
+ for _, stream := range streams {
stream.Send(event)
}And ensure any method that adds/removes streams acquires mu.Lock().
🤖 Prompt for AI Agents
In `@apps/matching-engine/internal/engine.go` around lines 805 - 810, The field
grpcStreams is iterated in Run() without using the declared mu RWMutex, causing
a data race; update Run() to acquire mu.RLock() before iterating grpcStreams and
release it after, and ensure any functions that append/remove entries from
grpcStreams (the methods that add or remove server streams) use
mu.Lock()/mu.Unlock() when mutating the slice; reference the grpcStreams field
and the mu sync.RWMutex, and modify the Run(), subscribe/unsubscribe (or
equivalent add/remove stream) methods to use RLock for reads and Lock for writes
to eliminate the race.
Summary by CodeRabbit
New Features
Breaking Changes
✏️ Tip: You can customize this high-level summary in your review settings.