Skip to content

Commit

Permalink
Added cache ttl refresh on heartbeat message for Tiingo IEX endpoint (#…
Browse files Browse the repository at this point in the history
…3244)

* add cache ttl refresh feature

* changeset
  • Loading branch information
karen-stepanyan committed Mar 28, 2024
1 parent ebb0af9 commit 8cc43b7
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 29 deletions.
5 changes: 5 additions & 0 deletions .changeset/quick-ravens-switch.md
@@ -0,0 +1,5 @@
---
'@chainlink/tiingo-adapter': patch
---

Added support for cache TTL refresh on heartbeat messages for IEX endpoint
18 changes: 14 additions & 4 deletions packages/sources/tiingo/README.md
Expand Up @@ -4,6 +4,16 @@

This document was generated automatically. Please see [README Generator](../../scripts#readme-generator) for more info.

## Known Issues

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (120000ms), the extended cache TTL feature for out-of-market-hours in IEX endpoint that relies on heartbeats will not work.

### CACHE_MAX_AGE interaction with WS_SUBSCRIPTION_TTL

If the value of `WS_SUBSCRIPTION_TTL` is less than the value of `CACHE_MAX_AGE`, there will be stale values in the cache.

## Environment Variables

| Required? | Name | Description | Type | Options | Default |
Expand All @@ -26,9 +36,9 @@ This document was generated automatically. Please see [README Generator](../../s

## Input Parameters

| Required? | Name | Description | Type | Options | Default |
| :-------: | :------: | :-----------------: | :----: | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | :------: |
| | endpoint | The endpoint to use | string | [commodities](#forex-endpoint), [crypto-lwba](#crypto-lwba-endpoint), [crypto-synth](#crypto-endpoint), [crypto-vwap](#vwap-endpoint), [crypto](#crypto-endpoint), [crypto_lwba](#crypto-lwba-endpoint), [cryptolwba](#crypto-lwba-endpoint), [cryptoyield](#cryptoyield-endpoint), [eod](#eod-endpoint), [forex](#forex-endpoint), [fx](#forex-endpoint), [iex](#iex-endpoint), [price](#crypto-endpoint), [price](#crypto-lwba-endpoint), [prices](#crypto-endpoint), [realized-vol](#realized-vol-endpoint), [realized-volatility](#realized-vol-endpoint), [stock](#iex-endpoint), [top](#top-endpoint), [volume](#volume-endpoint), [vwap](#vwap-endpoint), [yield](#cryptoyield-endpoint) | `crypto` |
| Required? | Name | Description | Type | Options | Default |
| :-------: | :------: | :-----------------: | :----: | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | :------: |
| | endpoint | The endpoint to use | string | [commodities](#forex-endpoint), [crypto-lwba](#crypto-lwba-endpoint), [crypto-synth](#crypto-endpoint), [crypto-vwap](#vwap-endpoint), [crypto](#crypto-endpoint), [crypto_lwba](#crypto-lwba-endpoint), [cryptolwba](#crypto-lwba-endpoint), [cryptoyield](#cryptoyield-endpoint), [eod](#eod-endpoint), [forex](#forex-endpoint), [fx](#forex-endpoint), [iex](#iex-endpoint), [price](#crypto-endpoint), [prices](#crypto-endpoint), [realized-vol](#realized-vol-endpoint), [realized-volatility](#realized-vol-endpoint), [stock](#iex-endpoint), [top](#top-endpoint), [volume](#volume-endpoint), [vwap](#vwap-endpoint), [yield](#cryptoyield-endpoint) | `crypto` |

## Crypto Endpoint

Expand Down Expand Up @@ -217,7 +227,7 @@ Request:

## Crypto-lwba Endpoint

Supported names for this endpoint are: `crypto-lwba`, `crypto_lwba`, `cryptolwba`, `price`.
Supported names for this endpoint are: `crypto-lwba`, `crypto_lwba`, `cryptolwba`.

### Input Params

Expand Down
9 changes: 9 additions & 0 deletions packages/sources/tiingo/docs/known-issues.md
@@ -0,0 +1,9 @@
## Known Issues

### CACHE_MAX_AGE interaction with Heartbeat messages

If `CACHE_MAX_AGE` is set below a current heartbeat interval (120000ms), the extended cache TTL feature for out-of-market-hours in IEX endpoint that relies on heartbeats will not work.

### CACHE_MAX_AGE interaction with WS_SUBSCRIPTION_TTL

If the value of `WS_SUBSCRIPTION_TTL` is less than the value of `CACHE_MAX_AGE`, there will be stale values in the cache.
40 changes: 24 additions & 16 deletions packages/sources/tiingo/src/config/index.ts
@@ -1,20 +1,28 @@
import { AdapterConfig } from '@chainlink/external-adapter-framework/config'

export const config = new AdapterConfig({
API_ENDPOINT: {
description: 'API endpoint for tiingo',
default: 'https://api.tiingo.com/',
type: 'string',
export const config = new AdapterConfig(
{
API_ENDPOINT: {
description: 'API endpoint for tiingo',
default: 'https://api.tiingo.com/',
type: 'string',
},
API_KEY: {
description: 'API key for tiingo',
type: 'string',
required: true,
sensitive: true,
},
WS_API_ENDPOINT: {
description: 'websocket endpoint for tiingo',
default: 'wss://api.tiingo.com',
type: 'string',
},
},
API_KEY: {
description: 'API key for tiingo',
type: 'string',
required: true,
sensitive: true,
{
envDefaultOverrides: {
CACHE_MAX_AGE: 150_000, // see known issues in readme
WS_SUBSCRIPTION_TTL: 180_000,
},
},
WS_API_ENDPOINT: {
description: 'websocket endpoint for tiingo',
default: 'wss://api.tiingo.com',
type: 'string',
},
})
)
19 changes: 18 additions & 1 deletion packages/sources/tiingo/src/transport/iex-ws.ts
@@ -1,3 +1,4 @@
import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports/websocket'
import { BaseEndpointTypes } from '../endpoint/iex'
import { TiingoWebsocketTransport } from './utils'

Expand Down Expand Up @@ -43,6 +44,15 @@ type WsTransportTypes = BaseEndpointTypes & {
}
}

/*
Tiingo EA currently does not receive asset prices during off-market hours. When a heartbeat message is received during these hours,
we update the TTL of cache entries that EA is requested to provide a price during off-market hours.
*/
const updateTTL = async (transport: WebSocketTransport<WsTransportTypes>, ttl: number) => {
const params = await transport.subscriptionSet.getAll()
transport.responseCache.writeTTL(transport.name, params, ttl)
}

export const wsTransport: TiingoWebsocketTransport<WsTransportTypes> =
new TiingoWebsocketTransport<WsTransportTypes>({
url: (context) => {
Expand All @@ -51,7 +61,14 @@ export const wsTransport: TiingoWebsocketTransport<WsTransportTypes> =
},

handlers: {
message(message) {
message(message, context) {
// Check for a heartbeat message, refresh the TTLs of all requested entries in the cache
if (message.messageType === 'H') {
wsTransport.lastMessageReceivedAt = Date.now()
updateTTL(wsTransport, context.adapterSettings.CACHE_MAX_AGE)
return []
}

const updateType = message.data[0]
// Expects Last Trade (T) or Quote (Q) messages
if (
Expand Down
Expand Up @@ -55,7 +55,7 @@ exports[`websocket iex endpoint Q request should return success 1`] = `
"result": 170.285,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 4048,
"providerDataReceivedUnixMs": 3038,
"providerDataStreamEstablishedUnixMs": 3030,
"providerIndicatedTimeUnixMs": 1645032916595,
},
Expand All @@ -70,7 +70,22 @@ exports[`websocket iex endpoint T request should return success 1`] = `
"result": 106.21,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 4048,
"providerDataReceivedUnixMs": 3038,
"providerDataStreamEstablishedUnixMs": 3030,
"providerIndicatedTimeUnixMs": 1645032916595,
},
}
`;

exports[`websocket iex endpoint should update the ttl after heartbeat is received 1`] = `
{
"data": {
"result": 170.285,
},
"result": 170.285,
"statusCode": 200,
"timestamps": {
"providerDataReceivedUnixMs": 3038,
"providerDataStreamEstablishedUnixMs": 3030,
"providerIndicatedTimeUnixMs": 1645032916595,
},
Expand Down
21 changes: 17 additions & 4 deletions packages/sources/tiingo/test/integration/adapter-ws.test.ts
Expand Up @@ -3,6 +3,7 @@ import {
setEnvVariables,
mockWebSocketProvider,
MockWebsocketServer,
runAllUntilTime,
} from '@chainlink/external-adapter-framework/util/testing-utils'
import {
mockCryptoWebSocketServer,
Expand Down Expand Up @@ -51,6 +52,9 @@ describe('websocket', () => {
oldEnv = JSON.parse(JSON.stringify(process.env))
process.env['WS_API_ENDPOINT'] = wsEndpoint
process.env['API_KEY'] = 'fake-api-key'
process.env['WS_SUBSCRIPTION_UNRESPONSIVE_TTL'] = '180000'
process.env['CACHE_MAX_AGE'] = '150000'
process.env['WS_SUBSCRIPTION_TTL'] = '180000'

// Start mock web socket server
mockWebSocketProvider(WebSocketClassProvider)
Expand Down Expand Up @@ -98,6 +102,13 @@ describe('websocket', () => {
})
})

describe('forex endpoint', () => {
it('should return success', async () => {
const response = await testAdapter.request(priceDataForex)
expect(response.json()).toMatchSnapshot()
})
})

describe('iex endpoint', () => {
it('Q request should return success', async () => {
const response = await testAdapter.request(priceDataAapl)
Expand All @@ -107,11 +118,13 @@ describe('websocket', () => {
const response = await testAdapter.request(priceDataAmzn)
expect(response.json()).toMatchSnapshot()
})
})

describe('forex endpoint', () => {
it('should return success', async () => {
const response = await testAdapter.request(priceDataForex)
it('should update the ttl after heartbeat is received', async () => {
// The cache ttl is 150 seconds. Mocked heartbeat message is sent after 10s after connection which should
// update the ttl and therefore after 153 seconds (from the initial message) we can access the asset
await runAllUntilTime(testAdapter.clock, 153000)
const response = await testAdapter.request(priceDataAapl)
expect(response.statusCode).toBe(200)
expect(response.json()).toMatchSnapshot()
})
})
Expand Down
14 changes: 12 additions & 2 deletions packages/sources/tiingo/test/integration/fixtures.ts
Expand Up @@ -438,11 +438,21 @@ export const mockIexWebSocketServer = (URL: string): MockWebsocketServer => {
0,
],
}
const wsResponseHeartbeat = {
response: { code: 200, message: 'HeartBeat' },
messageType: 'H',
}
const mockWsServer = new MockWebsocketServer(URL, { mock: false })
mockWsServer.on('connection', (socket) => {
let counter = 0
socket.on('message', () => {
socket.send(JSON.stringify(wsResponseQ))
socket.send(JSON.stringify(wsResponseT))
if (counter++ === 0) {
socket.send(JSON.stringify(wsResponseQ))
socket.send(JSON.stringify(wsResponseT))
setTimeout(() => {
socket.send(JSON.stringify(wsResponseHeartbeat))
}, 10000)
}
})
})

Expand Down

0 comments on commit 8cc43b7

Please sign in to comment.