Skip to content

Commit

Permalink
fix: okex korea change to official v3 websocket
Browse files Browse the repository at this point in the history
closes #96
  • Loading branch information
namjug-kim committed Jul 18, 2019
1 parent 75f5564 commit 07fc5a3
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 467 deletions.
4 changes: 1 addition & 3 deletions reactive-crypto-okexkorea/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ apply plugin: 'org.jetbrains.kotlin.jvm'
version '1.0-SNAPSHOT'

dependencies {
compile project(':reactive-crypto-core')
compile project(':reactive-crypto-okex')

compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"

compile group: 'org.apache.commons', name: 'commons-compress', version: '1.18'
}

compileKotlin {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,115 +19,18 @@ package com.njkim.reactivecrypto.okexkorea
import com.njkim.reactivecrypto.core.common.model.ExchangeVendor
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
import com.njkim.reactivecrypto.core.common.model.order.OrderBook
import com.njkim.reactivecrypto.core.common.model.order.OrderBookUnit
import com.njkim.reactivecrypto.core.common.model.order.TickData
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
import com.njkim.reactivecrypto.core.websocket.ExchangeWebsocketClient
import com.njkim.reactivecrypto.okex.OkexWebsocketClient
import reactor.core.publisher.Flux
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.ZonedDateTime
import java.util.concurrent.ConcurrentHashMap

/**
* Okex Korea is based on OKEX. but it has different interface than OKEX websocket
*/
class OkexKoreaWebsocketClient : ExchangeWebsocketClient {
private val okexKoreaRawWebsocketClient = OkexKoreaRawWebsocketClient()

override fun createTradeWebsocket(subscribeTargets: List<CurrencyPair>): Flux<TickData> {
return okexKoreaRawWebsocketClient.createTickDataFlux(subscribeTargets)
.flatMapIterable { messageFrame ->
messageFrame.data
.map {
TickData(
it.uniqueId,
LocalDateTime.of(LocalDate.now(), it.eventTime).atZone(ZoneId.systemDefault()),
it.price,
it.quantity,
messageFrame.currencyPair,
ExchangeVendor.OKEX_KOREA,
it.side
)
}
}
}

class OkexKoreaWebsocketClient : OkexWebsocketClient("wss://okexcomreal.bafang.com:8443/ws/v3?brokerId=151") {
override fun createDepthSnapshot(subscribeTargets: List<CurrencyPair>): Flux<OrderBook> {
val currentOrderBookMap: MutableMap<CurrencyPair, OrderBook> = ConcurrentHashMap()

return okexKoreaRawWebsocketClient.createDepthFlux(subscribeTargets)
.map {
val now = ZonedDateTime.now()

OrderBook(
"${now.toEpochMilli()}",
CurrencyPair(it.base, it.quote),
now,
ExchangeVendor.OKEX_KOREA,
it.data.bids.map { bid ->
OrderBookUnit(bid.price, bid.totalSize, TradeSideType.BUY)
},
it.data.asks.map { ask ->
OrderBookUnit(ask.price, ask.totalSize, TradeSideType.SELL)
}
)
}
.map { orderBook ->
if (!currentOrderBookMap.containsKey(orderBook.currencyPair)) {
currentOrderBookMap[orderBook.currencyPair] = orderBook
return@map orderBook
}

val prevOrderBook = currentOrderBookMap[orderBook.currencyPair]!!

val askMap: MutableMap<BigDecimal, OrderBookUnit> = prevOrderBook.asks
.map { Pair(it.price.stripTrailingZeros(), it) }
.toMap()
.toMutableMap()

orderBook.asks.forEach { updatedAsk ->
askMap.compute(updatedAsk.price.stripTrailingZeros()) { _, oldValue ->
when {
updatedAsk.quantity <= BigDecimal.ZERO -> null
oldValue == null -> updatedAsk
else -> oldValue.copy(
quantity = updatedAsk.quantity,
orderNumbers = updatedAsk.orderNumbers
)
}
}
}

val bidMap: MutableMap<BigDecimal, OrderBookUnit> = prevOrderBook.bids
.map { Pair(it.price.stripTrailingZeros(), it) }
.toMap()
.toMutableMap()

orderBook.bids.forEach { updatedBid ->
bidMap.compute(updatedBid.price.stripTrailingZeros()) { _, oldValue ->
when {
updatedBid.quantity <= BigDecimal.ZERO -> null
oldValue == null -> updatedBid
else -> oldValue.copy(
quantity = updatedBid.quantity,
orderNumbers = updatedBid.orderNumbers
)
}
}
}
return super.createDepthSnapshot(subscribeTargets)
.map { it.copy(exchangeVendor = ExchangeVendor.OKEX_KOREA) }
}

val currentOrderBook = prevOrderBook.copy(
eventTime = orderBook.eventTime,
asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
)
currentOrderBookMap[currentOrderBook.currencyPair] = currentOrderBook
currentOrderBook
}
.doFinally { currentOrderBookMap.clear() } // cleanup memory limit orderBook when disconnected
override fun createTradeWebsocket(subscribeTargets: List<CurrencyPair>): Flux<TickData> {
return super.createTradeWebsocket(subscribeTargets)
.map { it.copy(exchangeVendor = ExchangeVendor.OKEX_KOREA) }
}
}

This file was deleted.

0 comments on commit 07fc5a3

Please sign in to comment.