Skip to content

Commit

Permalink
feat: add poloniex websocket client
Browse files Browse the repository at this point in the history
closes #20
  • Loading branch information
namjug-kim committed Jun 15, 2019
1 parent fdfd62e commit 516570f
Show file tree
Hide file tree
Showing 19 changed files with 972 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Support public market feature (tickData, orderBook)
| ![bitmax](https://user-images.githubusercontent.com/16334718/57548356-b082d480-739b-11e9-9539-b27c60877fb6.jpg) | Bitmax | BITMAX | v1.2 | [ws](https://github.com/bitmax-exchange/api-doc/blob/master/bitmax-api-doc-v1.2.md) |
| ![idax](https://user-images.githubusercontent.com/16334718/58029691-128bc880-7b58-11e9-9aaa-a331f394c8bd.jpg) | Idax | IDAX | * | [ws](https://github.com/idax-exchange/idax-official-api-docs/blob/master/open-ws_en.md) |
| ![coineal](https://user-images.githubusercontent.com/16334718/58037062-7d90cb80-7b67-11e9-9278-e8b03c5ddd86.jpg) | Coineal | COINEAL | 鈿狅笍 | 鈿狅笍 |
| ![poloniex](https://user-images.githubusercontent.com/16334718/59551277-335a0900-8fb2-11e9-9d1e-4ab2a7574148.jpg) | Poloniex | POLONIEX | * | [ws](https://docs.poloniex.com/#websocket-api) |

鈿狅笍 : Uses endpoints that are used by the official web. This is not an official api and should be used with care.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum class ExchangeVendor {
BITMAX,
IDAX,
COINEAL,
POLONIEX,
UNKNOWN;

/**
Expand Down
33 changes: 33 additions & 0 deletions reactive-crypto-poloniex/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 namjug-kim
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'

version '1.0-SNAPSHOT'

dependencies {
compile project(':reactive-crypto-core')

compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
}

compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2019 namjug-kim
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.njkim.reactivecrypto.poloniex

import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.njkim.reactivecrypto.core.ExchangeJsonObjectMapper
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
import com.njkim.reactivecrypto.core.common.model.order.OrderBookUnit
import com.njkim.reactivecrypto.core.common.model.order.OrderSideType
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType
import com.njkim.reactivecrypto.poloniex.model.*
import java.math.BigDecimal
import java.time.Instant
import java.time.ZoneId
import java.time.ZonedDateTime

class PoloniexJsonObjectMapper : ExchangeJsonObjectMapper {

companion object {
val instance: ObjectMapper = com.njkim.reactivecrypto.poloniex.PoloniexJsonObjectMapper().objectMapper()
}

override fun zonedDateTimeDeserializer(): JsonDeserializer<ZonedDateTime> {
return object : JsonDeserializer<ZonedDateTime>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): ZonedDateTime {
return Instant.ofEpochSecond(p.valueAsLong).atZone(ZoneId.systemDefault())
}
}
}

/**
* {baseCurrency}_{targetCurrency}
*/
override fun currencyPairDeserializer(): JsonDeserializer<CurrencyPair> {
return object : JsonDeserializer<CurrencyPair>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): CurrencyPair {
val currencyPairRawValue = p.valueAsString
val split = currencyPairRawValue.split("_")

return CurrencyPair.parse(split[1], split[0])
}
}
}

override fun orderSideTypeDeserializer(): JsonDeserializer<OrderSideType>? {
// <1 for bid 0 for ask>
return object : JsonDeserializer<OrderSideType>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): OrderSideType {
return when (p.valueAsInt) {
0 -> OrderSideType.ASK
1 -> OrderSideType.BID
else -> throw IllegalArgumentException()
}
}
}
}

override fun tradeSideTypeDeserializer(): JsonDeserializer<TradeSideType>? {
// <1 for buy 0 for sell>
return object : JsonDeserializer<TradeSideType>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): TradeSideType {
return when (p.valueAsInt) {
0 -> TradeSideType.SELL
1 -> TradeSideType.BUY
else -> throw IllegalArgumentException()
}
}
}
}

override fun customConfiguration(simpleModule: SimpleModule) {
val poloniexMessageFrameDeserializer = object : JsonDeserializer<PoloniexMessageFrame>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexMessageFrame {
val jsonNode: JsonNode = p.codec.readTree(p)
val channelId = jsonNode[0].asLong()
val sequenceNumber = jsonNode[1].asLong()
val events = jsonNode[2].map { event ->
val eventType = PoloniexEventType.parse(event[0].asText())
PoloniexJsonObjectMapper.instance.convertValue(event, eventType.classType)
}

return PoloniexMessageFrame(
channelId, sequenceNumber, events
)
}
}

// ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>] ]
val tradeEventDeserializer = object : JsonDeserializer<PoloniexTradeEvent>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexTradeEvent {
val jsonNode: JsonNode = p.codec.readTree(p)

val tradeId: String = jsonNode[1].asText()
val side: TradeSideType = instance.convertValue(jsonNode[2], TradeSideType::class.java)
val price: BigDecimal = instance.convertValue(jsonNode[3], BigDecimal::class.java)
val size: BigDecimal = instance.convertValue(jsonNode[4], BigDecimal::class.java)
val timestamp: ZonedDateTime = instance.convertValue(jsonNode[5], ZonedDateTime::class.java)

return PoloniexTradeEvent(
tradeId,
side,
price,
size,
timestamp
)
}
}

val orderBookUpdateEventDeserializer = object : JsonDeserializer<PoloniexOrderBookUpdateEvent>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexOrderBookUpdateEvent {
val jsonNode: JsonNode = p.codec.readTree(p)

val side: OrderSideType = instance.convertValue(jsonNode[1], OrderSideType::class.java)
val price: BigDecimal = instance.convertValue(jsonNode[2], BigDecimal::class.java)
val size: BigDecimal = instance.convertValue(jsonNode[3], BigDecimal::class.java)

return PoloniexOrderBookUpdateEvent(
side,
price,
size
)
}
}

/**
* [
* "i",
* {
* "currencyPair": "<currency pair name>",
* "orderBook": [
* { "<lowest ask price>": "<lowest ask size>", "<next ask price>": "<next ask size>", ... },
* { "<highest bid price>": "<highest bid size>", "<next bid price>": "<next bid size>", ... }
* ]
* }
* ]
*/
val orderBookSnapshotEventDeserializer = object : JsonDeserializer<PoloniexOrderBookSnapshotEvent>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexOrderBookSnapshotEvent {
val jsonNode: JsonNode = p.codec.readTree(p)

val orderBookSnapshotNode = jsonNode.get(1)
val currencyPair =
instance.convertValue(orderBookSnapshotNode.get("currencyPair"), CurrencyPair::class.java)
val orderBookNode = orderBookSnapshotNode.get("orderBook")
val asksNode = orderBookNode.get(0)
val asks = asksNode.fields().asSequence().toList().map { mutableEntry ->
val price = mutableEntry.key
val size = mutableEntry.value

OrderBookUnit(
instance.convertValue(price, BigDecimal::class.java),
instance.convertValue(size, BigDecimal::class.java),
OrderSideType.ASK
)
}

val bidsNode = orderBookNode.get(1)
val bids = bidsNode.fields().asSequence().toList().map { mutableEntry ->
val price = mutableEntry.key
val size = mutableEntry.value

OrderBookUnit(
instance.convertValue(price, BigDecimal::class.java),
instance.convertValue(size, BigDecimal::class.java),
OrderSideType.BID
)
}

return PoloniexOrderBookSnapshotEvent(
currencyPair,
bids,
asks
)
}
}

simpleModule.addDeserializer(PoloniexMessageFrame::class.java, poloniexMessageFrameDeserializer)
simpleModule.addDeserializer(PoloniexTradeEvent::class.java, tradeEventDeserializer)
simpleModule.addDeserializer(PoloniexOrderBookUpdateEvent::class.java, orderBookUpdateEventDeserializer)
simpleModule.addDeserializer(PoloniexOrderBookSnapshotEvent::class.java, orderBookSnapshotEventDeserializer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.njkim.reactivecrypto.poloniex

import com.fasterxml.jackson.module.kotlin.readValue
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
import com.njkim.reactivecrypto.poloniex.model.PoloniexEventType
import com.njkim.reactivecrypto.poloniex.model.PoloniexMessageFrame
import com.njkim.reactivecrypto.poloniex.model.PoloniexOrderBookSnapshotEvent
import reactor.core.publisher.Flux
import reactor.core.publisher.toFlux
import reactor.netty.http.client.HttpClient

class PoloniexRawWebsocketClient {
private val baseUrl: String = "wss://api2.poloniex.com"

/**
* Subscribe to price aggregated depth of book by currency pair.
* Response includes an initial book snapshot, book modifications, and trades.
* Book modification updates with 0 quantity should be treated as removal of the price level.
* Note that the updates are price aggregated and do not contain individual orders.
*
*/
fun priceAggregatedBook(currencyPairs: List<CurrencyPair>): Flux<PoloniexMessageFrame> {
val channelIdCurrencyPairMap: MutableMap<Long, CurrencyPair> = HashMap()

// { "command": "subscribe", "channel": "<channel id>" }
val subscribeChannels = currencyPairs
.map { "${it.baseCurrency}_${it.targetCurrency}" }
.map { "{ \"command\": \"subscribe\", \"channel\": \"$it\" }" }
.toFlux()

// TODO heartbeat check
return HttpClient.create()
.websocket(655360)
.uri(baseUrl)
.handle { inbound, outbound ->
outbound.sendString(subscribeChannels)
.then()
.thenMany(inbound.receive().asString())
}
.filter { it != "[1010]" } // ping message
.map { PoloniexJsonObjectMapper.instance.readValue<PoloniexMessageFrame>(it) }
// set currencyPair info for each channel
.doOnNext { messageFrame ->
val orderBookSnapshotEvent = messageFrame.events
.filter { it.eventType == PoloniexEventType.ORDER_BOOK_SNAPSHOT }
.map { it as PoloniexOrderBookSnapshotEvent }
.firstOrNull()

if (orderBookSnapshotEvent != null) {
val channelId = messageFrame.channelId
channelIdCurrencyPairMap[channelId] = orderBookSnapshotEvent.currencyPair
}
}
.doOnNext { messageFrame ->
val channelId = messageFrame.channelId
messageFrame.events
.forEach { event ->
event.currencyPair = channelIdCurrencyPairMap[channelId]!!
}
}
.doFinally {
channelIdCurrencyPairMap.clear()
}
}
}

0 comments on commit 516570f

Please sign in to comment.