Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/python-poetry/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ inputs:
poetry-version:
required: false
description: Poetry version
default: "1.2.2"
default: "2.1.4"

runs:
using: composite
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-and-push-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- uses: ./.github/actions/python-poetry
- uses: pre-commit/action@v2.0.3
- uses: pre-commit/action@v3.0.1

run-tests:
runs-on: ubuntu-latest
Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
__pycache__/
*.py[cod]
*$py.class
poetry.toml

.DS_Store
.envrc
.coverage

.env
.vscode/
.vscode/
.idea/
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.10
3.11
16 changes: 9 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
ARG APP_NAME=pyth-observer
ARG APP_PACKAGE=pyth_observer
ARG APP_PATH=/opt/$APP_NAME
ARG PYTHON_VERSION=3.10.4
ARG POETRY_VERSION=1.2.2
ARG PYTHON_VERSION=3.11
ARG POETRY_VERSION=2.1.4

#
# Stage: base
#

FROM python:$PYTHON_VERSION as base
FROM python:$PYTHON_VERSION AS base

ARG APP_NAME
ARG APP_PATH
Expand All @@ -27,8 +27,9 @@ ENV \
POETRY_NO_INTERACTION=1

# Install Poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://install.python-poetry.org | python
RUN curl -sSL https://install.python-poetry.org | python - --version $POETRY_VERSION
ENV PATH="$POETRY_HOME/bin:$PATH"
RUN which poetry && poetry --version

WORKDIR $APP_PATH
COPY . .
Expand All @@ -37,7 +38,7 @@ COPY . .
# Stage: development
#

FROM base as development
FROM base AS development

ARG APP_NAME
ARG APP_PATH
Expand All @@ -54,20 +55,21 @@ CMD ["$APP_NAME"]
# Stage: build
#

FROM base as build
FROM base AS build

ARG APP_NAME
ARG APP_PATH

WORKDIR $APP_PATH
RUN poetry build --format wheel
RUN poetry self add poetry-plugin-export
RUN poetry export --format requirements.txt --output constraints.txt --without-hashes

#
# Stage: production
#

FROM python:$PYTHON_VERSION as production
FROM python:$PYTHON_VERSION AS production

ARG APP_NAME
ARG APP_PATH
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@ To integrate Telegram events with the Observer, you need the Telegram group chat

Use this ID in the `publishers.yaml` configuration to correctly set up Telegram events.

## Health Endpoints

The Observer exposes HTTP endpoints for health checks, suitable for Kubernetes liveness and readiness probes:

- **Liveness probe**: `GET /live` always returns `200 OK` with body `OK`.
- **Readiness probe**: `GET /ready` returns `200 OK` with body `OK` if the observer is ready, otherwise returns `503 Not Ready`.

By default, these endpoints are served on port 8080. You can use them in your Kubernetes deployment to monitor the application's health.
2,859 changes: 1,810 additions & 1,049 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[tool.mypy]
python_version = "3.10"
python_version = "3.11"
ignore_missing_imports = true

[tool.poetry]
name = "pyth-observer"
version = "0.3.5"
version = "2.1.4"
description = "Alerts and stuff"
authors = []
readme = "README.md"
Expand All @@ -28,6 +28,7 @@ types-pyyaml = "^6.0.12"
types-pytz = "^2022.4.0.0"
python-dotenv = "^1.0.1"
numpy = "^2.1.3"
cffi = "^1.17"


[tool.poetry.group.dev.dependencies]
Expand All @@ -47,3 +48,6 @@ pyth-observer = "pyth_observer.cli:run"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.requires-plugins]
poetry-plugin-export = ">=1.8"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline

163 changes: 86 additions & 77 deletions pyth_observer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from throttler import Throttler

import pyth_observer.health_server as health_server
from pyth_observer.check.price_feed import PriceFeedState
from pyth_observer.check.publisher import PublisherState
from pyth_observer.coingecko import Symbol, get_coingecko_prices
Expand Down Expand Up @@ -72,98 +73,106 @@ def __init__(

async def run(self):
while True:
logger.info("Running checks")

products = await self.get_pyth_products()
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
crosschain_prices = await self.get_crosschain_prices()

for product in products:
# Skip tombstone accounts with blank metadata
if "base" not in product.attrs:
continue

if not product.first_price_account_key:
continue

# For each product, we build a list of price feed states (one
# for each price account) and a list of publisher states (one
# for each publisher).
states = []
price_accounts = await self.get_pyth_prices(product)

crosschain_price = crosschain_prices.get(
b58decode(product.first_price_account_key.key).hex(), None
)

for _, price_account in price_accounts.items():
# Handle potential None for min_publishers
if (
price_account.min_publishers is None
# When min_publishers is high it means that the price is not production-ready
# yet and it is still being tested. We need no alerting for these prices.
or price_account.min_publishers >= 10
):
try:
logger.info("Running checks")

products = await self.get_pyth_products()
coingecko_prices, coingecko_updates = await self.get_coingecko_prices()
crosschain_prices = await self.get_crosschain_prices()

health_server.observer_ready = True

for product in products:
# Skip tombstone accounts with blank metadata
if "base" not in product.attrs:
continue

if not product.first_price_account_key:
continue

# Ensure latest_block_slot is not None or provide a default value
latest_block_slot = (
price_account.slot if price_account.slot is not None else -1
# For each product, we build a list of price feed states (one
# for each price account) and a list of publisher states (one
# for each publisher).
states = []
price_accounts = await self.get_pyth_prices(product)

crosschain_price = crosschain_prices.get(
b58decode(product.first_price_account_key.key).hex(), None
)

if not price_account.aggregate_price_status:
raise RuntimeError("Price account status is missing")

if not price_account.aggregate_price_info:
raise RuntimeError("Aggregate price info is missing")

states.append(
PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
coingecko_price=coingecko_prices.get(product.attrs["base"]),
coingecko_update=coingecko_updates.get(
product.attrs["base"]
),
crosschain_price=crosschain_price,
for _, price_account in price_accounts.items():
# Handle potential None for min_publishers
if (
price_account.min_publishers is None
# When min_publishers is high it means that the price is not production-ready
# yet and it is still being tested. We need no alerting for these prices.
or price_account.min_publishers >= 10
):
continue

# Ensure latest_block_slot is not None or provide a default value
latest_block_slot = (
price_account.slot if price_account.slot is not None else -1
)
)

for component in price_account.price_components:
pub = self.publishers.get(component.publisher_key.key, None)
publisher_name = (
(pub.name if pub else "")
+ f" ({component.publisher_key.key})"
).strip()
if not price_account.aggregate_price_status:
raise RuntimeError("Price account status is missing")

if not price_account.aggregate_price_info:
raise RuntimeError("Aggregate price info is missing")

states.append(
PublisherState(
publisher_name=publisher_name,
PriceFeedState(
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=component.publisher_key,
confidence_interval=component.latest_price_info.confidence_interval,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
price=component.latest_price_info.price,
price_aggregate=price_account.aggregate_price_info.price,
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
public_key=price_account.key,
status=price_account.aggregate_price_status,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
latest_trading_slot=price_account.last_slot,
price_aggregate=price_account.aggregate_price_info.price,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
coingecko_price=coingecko_prices.get(
product.attrs["base"]
),
coingecko_update=coingecko_updates.get(
product.attrs["base"]
),
crosschain_price=crosschain_price,
)
)

await self.dispatch.run(states)
for component in price_account.price_components:
pub = self.publishers.get(component.publisher_key.key, None)
publisher_name = (
(pub.name if pub else "")
+ f" ({component.publisher_key.key})"
).strip()
states.append(
PublisherState(
publisher_name=publisher_name,
symbol=product.attrs["symbol"],
asset_type=product.attrs["asset_type"],
schedule=MarketSchedule(product.attrs["schedule"]),
public_key=component.publisher_key,
confidence_interval=component.latest_price_info.confidence_interval,
confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval,
price=component.latest_price_info.price,
price_aggregate=price_account.aggregate_price_info.price,
slot=component.latest_price_info.pub_slot,
aggregate_slot=price_account.last_slot,
# this is the solana block slot when price account was fetched
latest_block_slot=latest_block_slot,
status=component.latest_price_info.price_status,
aggregate_status=price_account.aggregate_price_status,
)
)

await self.dispatch.run(states)
except Exception as e:
logger.error(f"Error in run loop: {e}")
health_server.observer_ready = False

logger.debug("Sleeping...")
await asyncio.sleep(5)
Expand Down
7 changes: 6 additions & 1 deletion pyth_observer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from prometheus_client import start_http_server

from pyth_observer import Observer, Publisher
from pyth_observer.health_server import start_health_server
from pyth_observer.models import ContactInfo


Expand Down Expand Up @@ -61,7 +62,11 @@ def run(config, publishers, coingecko_mapping, prometheus_port):

start_http_server(int(prometheus_port))

asyncio.run(observer.run())
async def main():
asyncio.create_task(start_health_server())
await observer.run()

asyncio.run(main())


logger.remove()
Expand Down
28 changes: 28 additions & 0 deletions pyth_observer/health_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio

from aiohttp import web

observer_ready = False


async def live_handler(request):
return web.Response(text="OK")


async def ready_handler(request):
if observer_ready:
return web.Response(text="OK")
else:
return web.Response(status=503, text="Not Ready")


async def start_health_server(port=8080):
app = web.Application()
app.router.add_get("/live", live_handler)
app.router.add_get("/ready", ready_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
while True:
await asyncio.sleep(3600)
4 changes: 2 additions & 2 deletions tests/test_checks_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def setup_check(

# Seed the cache with the publisher state
PUBLISHER_CACHE[(state.publisher_name, state.symbol)].append(
PriceUpdate(self.current_time, state.price)
PriceUpdate(int(self.current_time), state.price)
)

return check
Expand Down Expand Up @@ -191,7 +191,7 @@ def test_redemption_rate_passes_check(self):
asset_type="Crypto Redemption Rate",
symbol="Crypto.FUSDC/USDC.RR",
)
check = self.setup_check(state, self.current_time)
check = self.setup_check(state, int(self.current_time))

# Should pass even after long period without changes
self.run_check(check, 3600, True) # 1 hour