Skip to content

[price-service] Add get_vaa_ccip endpoint #500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions price-service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions price-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"node-fetch": "^2.6.1",
"prom-client": "^14.0.1",
"response-time": "^2.3.2",
"ts-retry-promise": "^0.7.0",
"winston": "^3.3.3",
"ws": "^8.6.0"
},
Expand Down
18 changes: 18 additions & 0 deletions price-service/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,21 @@ export function envOrErr(env: string): string {
}
return String(process.env[env]);
}

export function parseToOptionalNumber(
s: string | undefined
): number | undefined {
if (s === undefined) {
return undefined;
}

return parseInt(s, 10);
}

export function removeLeading0x(s: string): string {
if (s.startsWith("0x")) {
return s.substring(2);
}

return s;
}
8 changes: 6 additions & 2 deletions price-service/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { envOrErr } from "./helpers";
import { envOrErr, parseToOptionalNumber } from "./helpers";
import { Listener } from "./listen";
import { initLogger } from "./logging";
import { PromClient } from "./promClient";
Expand Down Expand Up @@ -38,6 +38,10 @@ async function run() {
10
),
},
cacheCleanupLoopInterval: parseToOptionalNumber(
process.env.REMOVE_EXPIRED_VALUES_INTERVAL_SECONDS
),
cacheTtl: parseToOptionalNumber(process.env.CACHE_TTL_SECONDS),
},
promClient
);
Expand All @@ -59,7 +63,7 @@ async function run() {
const wsAPI = new WebSocketAPI(listener, promClient);

listener.run();
listener.runCacheCleanupLoop();

const server = await restAPI.run();
wsAPI.run(server);
}
Expand Down
28 changes: 17 additions & 11 deletions price-service/src/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
} from "@pythnetwork/p2w-sdk-js";
import { HexString, PriceFeed } from "@pythnetwork/pyth-sdk-js";
import LRUCache from "lru-cache";
import { sleep, TimestampInSec } from "./helpers";
import { DurationInSec, sleep, TimestampInSec } from "./helpers";
import { logger } from "./logging";
import { PromClient } from "./promClient";

Expand Down Expand Up @@ -49,11 +49,13 @@ type ListenerConfig = {
readiness: ListenerReadinessConfig;
webApiEndpoint?: string;
webApiCluster?: string;
cacheCleanupLoopInterval?: DurationInSec;
cacheTtl?: DurationInSec;
};

type VaaKey = string;

type VaaConfig = {
export type VaaConfig = {
publishTime: number;
vaa: string;
};
Expand All @@ -62,7 +64,7 @@ export class VaaCache {
private cache: Map<string, VaaConfig[]>;
private ttl: number;

constructor(ttl: number = 300) {
constructor(ttl: DurationInSec = 300) {
this.cache = new Map();
this.ttl = ttl;
}
Expand All @@ -85,6 +87,9 @@ export class VaaCache {
}

find(arr: VaaConfig[], publishTime: number): VaaConfig | undefined {
// If the publishTime is less than the first element we are
// not sure that this VAA is actually the first VAA after that
// time.
if (arr.length === 0 || publishTime < arr[0].publishTime) {
return undefined;
}
Expand Down Expand Up @@ -126,6 +131,7 @@ export class Listener implements PriceStore {
private updateCallbacks: ((priceInfo: PriceInfo) => any)[];
private observedVaas: LRUCache<VaaKey, boolean>;
private vaasCache: VaaCache;
private cacheCleanupLoopInterval: DurationInSec;

constructor(config: ListenerConfig, promClient?: PromClient) {
this.promClient = promClient;
Expand All @@ -137,7 +143,8 @@ export class Listener implements PriceStore {
max: 10000, // At most 10000 items
ttl: 60 * 1000, // 60 seconds
});
this.vaasCache = new VaaCache();
this.vaasCache = new VaaCache(config.cacheTtl);
this.cacheCleanupLoopInterval = config.cacheCleanupLoopInterval ?? 60;
}

private loadFilters(filtersRaw?: string) {
Expand Down Expand Up @@ -170,22 +177,21 @@ export class Listener implements PriceStore {
logger.info("loaded " + this.filters.length + " filters");
}

async runCacheCleanupLoop(interval: number = 60) {
setInterval(this.vaasCache.removeExpiredValues, interval * 1000);
}

async run() {
logger.info(
"pyth_relay starting up, will listen for signed VAAs from " +
this.spyServiceHost
);

setInterval(
this.vaasCache.removeExpiredValues.bind(this.vaasCache),
this.cacheCleanupLoopInterval * 1000
);

while (true) {
let stream: ClientReadableStream<SubscribeSignedVAAResponse> | undefined;
try {
const client = createSpyRPCServiceClient(
process.env.SPY_SERVICE_HOST || ""
);
const client = createSpyRPCServiceClient(this.spyServiceHost);
stream = await subscribeSignedVAA(client, { filters: this.filters });

stream!.on("data", ({ vaaBytes }: { vaaBytes: Buffer }) => {
Expand Down
155 changes: 127 additions & 28 deletions price-service/src/rest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import { Server } from "http";
import { StatusCodes } from "http-status-codes";
import morgan from "morgan";
import fetch from "node-fetch";
import { TimestampInSec } from "./helpers";
import { PriceStore } from "./listen";
import { removeLeading0x, TimestampInSec } from "./helpers";
import { PriceStore, VaaConfig } from "./listen";
import { logger } from "./logging";
import { PromClient } from "./promClient";
import { retry } from "ts-retry-promise";

const MORGAN_LOG_FORMAT =
':remote-addr - :remote-user ":method :url HTTP/:http-version"' +
Expand All @@ -27,9 +28,25 @@ export class RestException extends Error {
static PriceFeedIdNotFound(notFoundIds: string[]): RestException {
return new RestException(
StatusCodes.BAD_REQUEST,
`Price Feeds with ids ${notFoundIds.join(", ")} not found`
`Price Feed(s) with id(s) ${notFoundIds.join(", ")} not found.`
);
}

static DbApiError(): RestException {
return new RestException(StatusCodes.INTERNAL_SERVER_ERROR, `DB API Error`);
}

static VaaNotFound(): RestException {
return new RestException(StatusCodes.NOT_FOUND, "VAA not found.");
}
}

function asyncWrapper(
callback: (req: Request, res: Response, next: NextFunction) => Promise<any>
) {
return function (req: Request, res: Response, next: NextFunction) {
callback(req, res, next).catch(next);
};
}

export class RestAPI {
Expand All @@ -54,6 +71,39 @@ export class RestAPI {
this.promClient = promClient;
}

async getVaaWithDbLookup(priceFeedId: string, publishTime: TimestampInSec) {
// Try to fetch the vaa from the local cache
let vaa = this.priceFeedVaaInfo.getVaa(priceFeedId, publishTime);

// if publishTime is older than cache ttl or vaa is not found, fetch from db
if (vaa === undefined && this.dbApiEndpoint && this.dbApiCluster) {
const priceFeedWithoutLeading0x = removeLeading0x(priceFeedId);

try {
const data = (await retry(
() =>
fetch(
`${this.dbApiEndpoint}/vaa?id=${priceFeedWithoutLeading0x}&publishTime=${publishTime}&cluster=${this.dbApiCluster}`
).then((res) => res.json()),
{ retries: 3 }
)) as any[];
if (data.length > 0) {
vaa = {
vaa: data[0].vaa,
publishTime: Math.floor(
new Date(data[0].publishTime).getTime() / 1000
),
};
}
} catch (e: any) {
logger.error(`DB API Error: ${e}`);
throw RestException.DbApiError();
}
}

return vaa;
}

// Run this function without blocking (`await`) if you want to run it async.
async createApp() {
const app = express();
Expand Down Expand Up @@ -126,43 +176,92 @@ export class RestAPI {
publish_time: Joi.number().required(),
}).required(),
};

app.get(
"/api/get_vaa",
validate(getVaaInputSchema),
(req: Request, res: Response) => {
asyncWrapper(async (req: Request, res: Response) => {
const priceFeedId = req.query.id as string;
const publishTime = Number(req.query.publish_time as string);
const vaa = this.priceFeedVaaInfo.getVaa(priceFeedId, publishTime);
// if publishTime is older than cache ttl or vaa is not found, fetch from db
if (!vaa) {
// cache miss
if (this.dbApiEndpoint && this.dbApiCluster) {
fetch(
`${this.dbApiEndpoint}/vaa?id=${priceFeedId}&publishTime=${publishTime}&cluster=${this.dbApiCluster}`
)
.then((r: any) => r.json())
.then((arr: any) => {
if (arr.length > 0 && arr[0]) {
res.json(arr[0]);
} else {
res.status(StatusCodes.NOT_FOUND).send("VAA not found");
}
});
}

if (
this.priceFeedVaaInfo.getLatestPriceInfo(priceFeedId) === undefined
) {
throw RestException.PriceFeedIdNotFound([priceFeedId]);
}

const vaa = await this.getVaaWithDbLookup(priceFeedId, publishTime);

if (vaa === undefined) {
throw RestException.VaaNotFound();
} else {
// cache hit
const processedVaa = {
publishTime: new Date(vaa.publishTime),
vaa: vaa.vaa,
};
res.json(processedVaa);
res.json(vaa);
}
}
})
);

endpoints.push(
"api/get_vaa?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>"
);

const getVaaCcipInputSchema: schema = {
query: Joi.object({
data: Joi.string()
.regex(/^0x[a-f0-9]{80}$/)
.required(),
}).required(),
};

// CCIP compatible endpoint. Read more information about it from
// https://eips.ethereum.org/EIPS/eip-3668
app.get(
"/api/get_vaa_ccip",
validate(getVaaCcipInputSchema),
asyncWrapper(async (req: Request, res: Response) => {
const dataHex = req.query.data as string;
const data = Buffer.from(removeLeading0x(dataHex), "hex");

const priceFeedId = data.slice(0, 32).toString("hex");
const publishTime = Number(data.readBigInt64BE(32));

if (
this.priceFeedVaaInfo.getLatestPriceInfo(priceFeedId) === undefined
) {
throw RestException.PriceFeedIdNotFound([priceFeedId]);
}

const vaa = await this.getVaaWithDbLookup(priceFeedId, publishTime);

if (vaa === undefined) {
// Returning Bad Gateway error because CCIP expects a 5xx error if it needs to
// retry or try other endpoints. Bad Gateway seems the best choice here as this
// is not an internal error and could happen on two scenarios:
// 1. DB Api is not responding well (Bad Gateway is appropriate here)
// 2. Publish time is a few seconds before current time and a VAA
// Will be available in a few seconds. So we want the client to retry.
res
.status(StatusCodes.BAD_GATEWAY)
.json({ "message:": "VAA not found." });
} else {
const pubTimeBuffer = Buffer.alloc(8);
pubTimeBuffer.writeBigInt64BE(BigInt(vaa.publishTime));

const resData =
"0x" +
pubTimeBuffer.toString("hex") +
Buffer.from(vaa.vaa, "base64").toString("hex");

res.json({
data: resData,
});
}
})
);

endpoints.push(
"api/get_vaa_ccip?data=<0x<price_feed_id_32_bytes>+<publish_time_unix_timestamp_be_8_bytes>>"
);

const latestPriceFeedsInputSchema: schema = {
query: Joi.object({
ids: Joi.array()
Expand Down