Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
403 changes: 403 additions & 0 deletions .docs/ISSUES.md

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
services:
mysql:
image: mysql:8.0
environment:
MYSQL_DATABASE: coinflow
MYSQL_USER: coinflow
MYSQL_PASSWORD: ${DB_PASSWORD:-coinflow}
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD:-root}
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql

kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
volumes:
- kafka_data:/var/lib/kafka/data

volumes:
mysql_data:
kafka_data:
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.coinflow.common.exception;

import jakarta.validation.ConstraintViolationException;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
Expand All @@ -22,4 +23,11 @@ public ResponseEntity<ErrorResponse> handleValidation(MethodArgumentNotValidExce
.status(ErrorCode.INVALID_REQUEST.getStatus())
.body(ErrorResponse.of(ErrorCode.INVALID_REQUEST));
}

@ExceptionHandler(ConstraintViolationException.class)
public ResponseEntity<ErrorResponse> handleConstraintViolation(ConstraintViolationException e) {
return ResponseEntity
.status(ErrorCode.INVALID_REQUEST.getStatus())
.body(ErrorResponse.of(ErrorCode.INVALID_REQUEST));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.coinflow.common.pagination;

import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;

public record OffsetBasedPageRequest(long offset, int limit) implements Pageable {

@Override public int getPageNumber() { return (int) (offset / limit); }
@Override public int getPageSize() { return limit; }
@Override public long getOffset() { return offset; }
@Override public Sort getSort() { return Sort.unsorted(); }

@Override
public Pageable next() {
return new OffsetBasedPageRequest(offset + limit, limit);
}

@Override
public Pageable previousOrFirst() {
return offset <= 0 ? this : new OffsetBasedPageRequest(Math.max(0, offset - limit), limit);
}

@Override
public Pageable first() {
return new OffsetBasedPageRequest(0, limit);
}

@Override
public Pageable withPage(int pageNumber) {
return new OffsetBasedPageRequest((long) pageNumber * limit, limit);
}

@Override
public boolean hasPrevious() {
return offset > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
public interface DomainEventRepository extends JpaRepository<DomainEvent, Long> {
List<DomainEvent> findAllByAggregateTypeAndAggregateId(String aggregateType, Long aggregateId);
List<DomainEvent> findAllByEventType(DomainEventType eventType);
List<DomainEvent> findAllByPublishedFalseOrderByIdAsc();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.util.Map;

@Service
Expand Down Expand Up @@ -91,7 +92,12 @@ public void recordSettlementCompleted(Trade trade) {
private void save(DomainEventType type, String aggregateType, Long aggregateId,
Long marketId, String marketSymbol, Map<String, Object> payload) {
try {
String json = objectMapper.writeValueAsString(payload);
Map<String, Object> envelope = Map.of(
"schemaVersion", "1.0",
"occurredAt", Instant.now().toString(),
"payload", payload
);
String json = objectMapper.writeValueAsString(envelope);
domainEventRepository.save(
DomainEvent.create(type, aggregateType, aggregateId, marketId, marketSymbol, json)
);
Expand Down
34 changes: 28 additions & 6 deletions src/main/java/com/coinflow/market/api/MarketController.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
package com.coinflow.market.api;

import com.coinflow.common.exception.ApiException;
import com.coinflow.common.exception.ErrorCode;
import com.coinflow.market.domain.Market;
import com.coinflow.market.domain.MarketStatus;
import com.coinflow.market.dto.MarketResponse;
import com.coinflow.market.dto.OrderBookResponse;
import com.coinflow.market.repository.MarketRepository;
import com.coinflow.order.matching.MatchingEngine;
import com.coinflow.order.matching.OrderBookEntry;
import com.coinflow.order.service.OrderService;
import lombok.RequiredArgsConstructor;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/markets")
@Validated
public class MarketController {

private final MarketRepository marketRepository;
private final MatchingEngine matchingEngine;
private final OrderService orderService;

@GetMapping
public List<MarketResponse> getMarkets() {
Expand All @@ -30,11 +42,21 @@ public List<MarketResponse> getMarkets() {
}

@GetMapping("/{market}/orderbook")
public OrderBookResponse getOrderBook(@PathVariable String market) {
return OrderBookResponse.of(
market,
matchingEngine.getBuySide(market),
matchingEngine.getSellSide(market)
);
public OrderBookResponse getOrderBook(
@PathVariable String market,
@RequestParam(defaultValue = "10") @Min(1) @Max(100) int depth
) {
Market found = marketRepository.findBySymbol(market)
.orElseThrow(() -> new ApiException(ErrorCode.MARKET_NOT_FOUND));

ReentrantLock lock = orderService.getMarketLock(found.getId());
lock.lock();
try {
List<OrderBookEntry> buySide = matchingEngine.getBuySide(market);
List<OrderBookEntry> sellSide = matchingEngine.getSellSide(market);
return OrderBookResponse.of(market, buySide, sellSide, depth);
} finally {
lock.unlock();
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/com/coinflow/market/domain/Market.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public class Market {
public boolean isActive() {
return status == MarketStatus.ACTIVE;
}

public void enableCancelOnly() {
this.cancelOnly = true;
}
}
10 changes: 7 additions & 3 deletions src/main/java/com/coinflow/market/dto/MarketResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,31 @@
import com.coinflow.market.domain.Market;

public record MarketResponse(
String symbol,
String market,
String displayName,
String baseAsset,
String quoteAsset,
int amountScale,
String tickSize,
String stepSize,
String minOrderQuantity,
String minOrderAmount,
String status
String status,
boolean cancelOnly
) {
public static MarketResponse from(Market market) {
return new MarketResponse(
market.getSymbol(),
market.getDisplayName(),
market.getBaseAsset(),
market.getQuoteAsset(),
market.getAmountScale(),
market.getTickSize().toPlainString(),
market.getStepSize().toPlainString(),
market.getMinOrderQuantity().toPlainString(),
market.getMinOrderAmount().toPlainString(),
market.getStatus().name()
market.getStatus().name(),
market.isCancelOnly()
);
}
}
38 changes: 29 additions & 9 deletions src/main/java/com/coinflow/market/dto/OrderBookResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,47 @@

import com.coinflow.order.matching.OrderBookEntry;

import java.math.BigDecimal;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;

public record OrderBookResponse(
String market,
List<PriceLevel> bids,
List<PriceLevel> asks
) {
public record PriceLevel(String price, String quantity) {
public static PriceLevel from(OrderBookEntry entry) {
return new PriceLevel(
entry.price().toPlainString(),
entry.remainingQuantity().toPlainString()
);
}
}

public static OrderBookResponse of(String market, List<OrderBookEntry> buySide, List<OrderBookEntry> sellSide) {
public static OrderBookResponse of(String market, List<OrderBookEntry> buySide, List<OrderBookEntry> sellSide, int depth) {
return new OrderBookResponse(
market,
buySide.stream().map(PriceLevel::from).toList(),
sellSide.stream().map(PriceLevel::from).toList()
aggregate(buySide, Comparator.reverseOrder(), depth),
aggregate(sellSide, Comparator.naturalOrder(), depth)
);
}

private static List<PriceLevel> aggregate(
List<OrderBookEntry> entries,
Comparator<BigDecimal> priceOrder,
int depth
) {
Map<BigDecimal, BigDecimal> quantitiesByPrice = entries.stream()
.collect(Collectors.groupingBy(
OrderBookEntry::price,
() -> new TreeMap<>(priceOrder),
Collectors.reducing(BigDecimal.ZERO, OrderBookEntry::remainingQuantity, BigDecimal::add)
));

return quantitiesByPrice.entrySet().stream()
.limit(depth)
.map(entry -> new PriceLevel(
entry.getKey().toPlainString(),
entry.getValue().toPlainString()
))
.toList();
}
}
10 changes: 8 additions & 2 deletions src/main/java/com/coinflow/order/api/OrderController.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import com.coinflow.order.dto.OrderSummaryResponse;
import com.coinflow.order.service.OrderService;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -25,6 +28,7 @@
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/orders")
@Validated
public class OrderController {

private final OrderService orderService;
Expand All @@ -51,10 +55,12 @@ public CancelOrderResponse cancelOrder(
@GetMapping
public List<OrderSummaryResponse> getOrders(
@AuthenticationPrincipal Jwt jwt,
@RequestParam(required = false) String market
@RequestParam(required = false) String market,
@RequestParam(defaultValue = "50") @Min(1) @Max(200) int limit,
@RequestParam(defaultValue = "0") @Min(0) int offset
) {
Long userId = Long.parseLong(jwt.getSubject());
return orderService.getOrders(userId, market);
return orderService.getOrders(userId, market, limit, offset);
}

@GetMapping("/{id}")
Expand Down
23 changes: 15 additions & 8 deletions src/main/java/com/coinflow/order/matching/MatchingEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ public class MatchingEngine {

private final Map<String, MemoryOrderBook> orderBooks = new ConcurrentHashMap<>();

public List<MatchResult> match(Market market, Order taker) {
public List<MatchResult> planMatch(Market market, Order taker) {
MemoryOrderBook book = orderBooks.computeIfAbsent(
market.getSymbol(),
k -> new MemoryOrderBook(market.getAmountScale())
);
return book.planMatch(taker);
}

List<MatchResult> results = book.match(taker);

if (taker.getRemainingQuantity().compareTo(java.math.BigDecimal.ZERO) > 0) {
book.add(taker);
}

return results;
public void applyMatchPlan(Market market, Order taker, List<MatchResult> plan) {
MemoryOrderBook book = orderBooks.computeIfAbsent(
market.getSymbol(),
k -> new MemoryOrderBook(market.getAmountScale())
);
book.applyMatchPlan(taker, plan);
}

public void cancelOrder(String marketSymbol, Order order) {
Expand All @@ -45,6 +46,12 @@ public void addToBook(Market market, Order order) {
book.add(order);
}

public void rebuildBook(Market market, List<Order> orders) {
MemoryOrderBook book = new MemoryOrderBook(market.getAmountScale());
orders.forEach(book::add);
orderBooks.put(market.getSymbol(), book);
}

public boolean hasSelfTrade(String marketSymbol, OrderSide side, BigDecimal price, Long userId) {
MemoryOrderBook book = orderBooks.get(marketSymbol);
return book != null && book.hasSelfTrade(side, price, userId);
Expand Down
Loading
Loading