diff --git a/README.md b/README.md index 2263651f..213f556e 100644 --- a/README.md +++ b/README.md @@ -125,9 +125,9 @@ await pipeline.run(); Monitoring – Observe pipeline runs and endpoint health - @lde/sparql-monitor - npm - Monitor SPARQL endpoints with periodic checks + @lde/distribution-monitor + npm + Monitor DCAT distributions (SPARQL endpoints and data dumps) with periodic probes @lde/pipeline-console-reporter @@ -200,7 +200,7 @@ graph TD subgraph Monitoring pipeline-console-reporter --> pipeline - sparql-monitor + distribution-monitor --> distribution-probe end subgraph Infrastructure diff --git a/package-lock.json b/package-lock.json index b5b290ee..1e7804ac 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22721,6 +22721,10 @@ "resolved": "packages/distribution-downloader", "link": true }, + "node_modules/@lde/distribution-monitor": { + "resolved": "packages/distribution-monitor", + "link": true + }, "node_modules/@lde/distribution-probe": { "resolved": "packages/distribution-probe", "link": true @@ -22757,10 +22761,6 @@ "resolved": "packages/sparql-importer", "link": true }, - "node_modules/@lde/sparql-monitor": { - "resolved": "packages/sparql-monitor", - "link": true - }, "node_modules/@lde/sparql-qlever": { "resolved": "packages/sparql-qlever", "link": true @@ -38521,9 +38521,40 @@ "tslib": "^2.3.0" } }, + "packages/distribution-monitor": { + "name": "@lde/distribution-monitor", + "version": "0.1.0", + "license": "MIT", + "dependencies": { + "@lde/dataset": "0.7.2", + "@lde/distribution-probe": "0.1.1", + "c12": "^3.3.4", + "commander": "^14.0.3", + "cron": "^4.1.0", + "drizzle-kit": "1.0.0-beta.20", + "drizzle-orm": "1.0.0-beta.22-41a7d21", + "postgres": "^3.4.9", + "tslib": "^2.3.0" + }, + "bin": { + "distribution-monitor": "dist/cli.js" + }, + "devDependencies": { + "@testcontainers/postgresql": "^11.14.0" + } + }, + "packages/distribution-monitor/node_modules/commander": { + "version": "14.0.3", + "resolved": "https://registry.npmjs.org/commander/-/commander-14.0.3.tgz", + "integrity": "sha512-H+y0Jo/T1RZ9qPP4Eh1pkcQcLRglraJaSLoyOtHxu6AapkjWVCy2Sit1QQ4x3Dng8qDlSsZEet7g5Pq06MvTgw==", + "license": "MIT", + "engines": { + "node": ">=20" + } + }, "packages/distribution-probe": { "name": "@lde/distribution-probe", - "version": "0.1.0", + "version": "0.1.1", "license": "MIT", "dependencies": { "@lde/dataset": "0.7.2", @@ -39266,12 +39297,12 @@ }, "packages/pipeline": { "name": "@lde/pipeline", - "version": "0.28.9", + "version": "0.28.10", "license": "MIT", "dependencies": { "@lde/dataset": "0.7.2", "@lde/dataset-registry-client": "0.7.5", - "@lde/distribution-probe": "0.1.0", + "@lde/distribution-probe": "0.1.1", "@lde/sparql-importer": "0.6.0", "@lde/sparql-server": "0.4.10", "@rdfjs/types": "^2.0.1", @@ -39289,7 +39320,7 @@ }, "packages/pipeline-console-reporter": { "name": "@lde/pipeline-console-reporter", - "version": "0.19.12", + "version": "0.19.13", "license": "MIT", "dependencies": { "chalk": "^5.4.1", @@ -39300,7 +39331,7 @@ }, "peerDependencies": { "@lde/dataset": "0.7.2", - "@lde/pipeline": "0.28.9" + "@lde/pipeline": "0.28.10" } }, "packages/pipeline-console-reporter/node_modules/ansi-regex": { @@ -39480,7 +39511,7 @@ }, "packages/pipeline-shacl-validator": { "name": "@lde/pipeline-shacl-validator", - "version": "0.10.9", + "version": "0.10.10", "license": "MIT", "dependencies": { "@rdfjs/types": "^2.0.1", @@ -39495,7 +39526,7 @@ }, "peerDependencies": { "@lde/dataset": "0.7.2", - "@lde/pipeline": "0.28.9" + "@lde/pipeline": "0.28.10" } }, "packages/pipeline-shacl-validator/node_modules/n3": { @@ -39514,7 +39545,7 @@ }, "packages/pipeline-void": { "name": "@lde/pipeline-void", - "version": "0.26.10", + "version": "0.26.11", "license": "MIT", "dependencies": { "@rdfjs/types": "^2.0.1", @@ -39524,7 +39555,7 @@ }, "peerDependencies": { "@lde/dataset": "0.7.2", - "@lde/pipeline": "0.28.9" + "@lde/pipeline": "0.28.10" } }, "packages/pipeline-void/node_modules/n3": { @@ -39592,30 +39623,13 @@ }, "packages/sparql-monitor": { "name": "@lde/sparql-monitor", - "version": "0.5.13", + "version": "0.6.0", + "deprecated": "Renamed to @lde/distribution-monitor. Update your dependency.", + "extraneous": true, "license": "MIT", "dependencies": { - "c12": "^3.3.4", - "commander": "^14.0.3", - "cron": "^4.1.0", - "drizzle-kit": "1.0.0-beta.20", - "drizzle-orm": "1.0.0-beta.22-41a7d21", - "fetch-sparql-endpoint": "^7.1.0", - "postgres": "^3.4.9", + "@lde/distribution-monitor": "0.1.0", "tslib": "^2.3.0" - }, - "bin": { - "sparql-monitor": "dist/cli.js" - }, - "devDependencies": { - "@testcontainers/postgresql": "^11.14.0" - } - }, - "packages/sparql-monitor/node_modules/commander": { - "version": "14.0.3", - "license": "MIT", - "engines": { - "node": ">=20" } }, "packages/sparql-qlever": { diff --git a/packages/sparql-monitor/CHANGELOG.md b/packages/distribution-monitor/CHANGELOG.md similarity index 100% rename from packages/sparql-monitor/CHANGELOG.md rename to packages/distribution-monitor/CHANGELOG.md diff --git a/packages/distribution-monitor/README.md b/packages/distribution-monitor/README.md new file mode 100644 index 00000000..0b53de46 --- /dev/null +++ b/packages/distribution-monitor/README.md @@ -0,0 +1,180 @@ +# Distribution Monitor + +Monitor DCAT distributions (SPARQL endpoints and data dumps) with periodic probes, storing observations in PostgreSQL. Uses [`@lde/distribution-probe`](../distribution-probe) for the actual health check. + +## Installation + +```bash +npm install @lde/distribution-monitor +``` + +## CLI Usage + +The easiest way to run the monitor is via the CLI with a configuration file. + +### Quick Start + +1. Create a configuration file (TypeScript, JavaScript, JSON, or YAML) +2. Run the monitor + +```bash +# Start continuous monitoring +npx distribution-monitor start + +# Run a one-off check for all monitors +npx distribution-monitor check + +# Check a specific monitor +npx distribution-monitor check dbpedia + +# Use a custom config path +npx distribution-monitor start --config ./configs/production.config.ts +``` + +### TypeScript Config (`distribution-monitor.config.ts`) + +```typescript +import { defineConfig } from '@lde/distribution-monitor'; + +export default defineConfig({ + databaseUrl: process.env.DATABASE_URL, + intervalSeconds: 300, + timeoutMs: 30_000, + monitors: [ + { + identifier: 'dbpedia', + distribution: { + accessUrl: 'https://dbpedia.org/sparql', + conformsTo: 'https://www.w3.org/TR/sparql11-protocol/', + }, + sparqlQuery: 'ASK { ?s ?p ?o }', + }, + { + identifier: 'wikidata', + distribution: { + accessUrl: 'https://query.wikidata.org/sparql', + conformsTo: 'https://www.w3.org/TR/sparql11-protocol/', + }, + sparqlQuery: 'SELECT * WHERE { ?s ?p ?o } LIMIT 1', + }, + { + identifier: 'my-dump', + distribution: { + accessUrl: 'https://example.org/data.nt', + mediaType: 'application/n-triples', + }, + }, + ], +}); +``` + +### YAML Config (`distribution-monitor.config.yaml`) + +```yaml +databaseUrl: ${DATABASE_URL} +intervalSeconds: 300 +monitors: + - identifier: dbpedia + distribution: + accessUrl: https://dbpedia.org/sparql + conformsTo: https://www.w3.org/TR/sparql11-protocol/ + sparqlQuery: ASK { ?s ?p ?o } + - identifier: my-dump + distribution: + accessUrl: https://example.org/data.nt + mediaType: application/n-triples +``` + +### Environment Variables + +Create a `.env` file for sensitive configuration: + +``` +DATABASE_URL=postgres://user:pass@localhost:5432/monitoring +``` + +The CLI automatically loads `.env` files. + +### Config Auto-Discovery + +The CLI searches for configuration in this order: + +1. `distribution-monitor.config.{ts,mts,js,mjs,json,yaml,yml}` +2. `.distribution-monitorrc` +3. `package.json` → `"distribution-monitor"` key + +## Programmatic Usage + +```typescript +import { Distribution } from '@lde/dataset'; +import { + MonitorService, + PostgresObservationStore, + type MonitorConfig, +} from '@lde/distribution-monitor'; + +const monitors: MonitorConfig[] = [ + { + identifier: 'dbpedia', + distribution: Distribution.sparql(new URL('https://dbpedia.org/sparql')), + sparqlQuery: 'ASK { ?s ?p ?o }', + }, + { + identifier: 'my-dump', + distribution: new Distribution( + new URL('https://example.org/data.nt'), + 'application/n-triples', + ), + }, +]; + +const store = await PostgresObservationStore.create( + 'postgres://user:pass@localhost:5432/db', +); + +const service = new MonitorService({ + store, + monitors, + intervalSeconds: 300, + timeoutMs: 30_000, + headers: new Headers({ 'User-Agent': 'my-monitor/1.0' }), +}); + +service.start(); +// …or run immediate checks +await service.checkAll(); +await service.checkNow('dbpedia'); + +const observations = await store.getLatest(); +for (const [identifier, observation] of observations) { + console.log( + `${identifier}: ${observation.success ? 'OK' : 'FAIL'} (${ + observation.responseTimeMs + }ms)`, + ); +} + +service.stop(); +await store.close(); +``` + +## Distribution shape + +Each monitor targets a DCAT `Distribution`. Supply: + +- `accessUrl` — required. The URL to probe. +- `mediaType` (optional) — plain content-type (e.g. `application/n-triples`) or DCAT-AP 3.0 IANA URI. Omit for SPARQL endpoints that only serve the protocol. +- `conformsTo` (optional) — use `https://www.w3.org/TR/sparql11-protocol/` to mark a distribution as a SPARQL endpoint. Required when `accessUrl` doesn’t already imply SPARQL via `mediaType`. +- `sparqlQuery` (optional) — for SPARQL endpoints. Query type (ASK / SELECT / CONSTRUCT / DESCRIBE) is autodetected. Defaults to a minimal `SELECT` availability probe. + +Distributions with embedded credentials (`https://user:pass@host/path`) are supported: the credentials are stripped from the URL and forwarded as an `Authorization: Basic` header. + +## Database Initialisation + +`PostgresObservationStore.create()` automatically initializes the database schema: + +- `observations` table for storing check results +- `latest_observations` materialized view for efficient queries +- Required indexes + +This is idempotent and safe to call on every startup. diff --git a/packages/sparql-monitor/drizzle.config.ts b/packages/distribution-monitor/drizzle.config.ts similarity index 100% rename from packages/sparql-monitor/drizzle.config.ts rename to packages/distribution-monitor/drizzle.config.ts diff --git a/packages/sparql-monitor/eslint.config.mjs b/packages/distribution-monitor/eslint.config.mjs similarity index 100% rename from packages/sparql-monitor/eslint.config.mjs rename to packages/distribution-monitor/eslint.config.mjs diff --git a/packages/sparql-monitor/package.json b/packages/distribution-monitor/package.json similarity index 71% rename from packages/sparql-monitor/package.json rename to packages/distribution-monitor/package.json index cbf4862c..e10843cf 100644 --- a/packages/sparql-monitor/package.json +++ b/packages/distribution-monitor/package.json @@ -1,10 +1,10 @@ { - "name": "@lde/sparql-monitor", - "version": "0.5.13", - "description": "Monitor SPARQL endpoints with periodic checks", + "name": "@lde/distribution-monitor", + "version": "0.1.0", + "description": "Monitor DCAT distributions (SPARQL endpoints and data dumps) with periodic probes", "repository": { "url": "git+https://github.com/ldelements/lde.git", - "directory": "packages/sparql-monitor" + "directory": "packages/distribution-monitor" }, "license": "MIT", "type": "module", @@ -21,19 +21,20 @@ "module": "./dist/index.js", "types": "./dist/index.d.ts", "bin": { - "sparql-monitor": "dist/cli.js" + "distribution-monitor": "dist/cli.js" }, "files": [ "dist", "!**/*.tsbuildinfo" ], "dependencies": { + "@lde/dataset": "0.7.2", + "@lde/distribution-probe": "0.1.1", "c12": "^3.3.4", "commander": "^14.0.3", "cron": "^4.1.0", "drizzle-kit": "1.0.0-beta.20", "drizzle-orm": "1.0.0-beta.22-41a7d21", - "fetch-sparql-endpoint": "^7.1.0", "postgres": "^3.4.9", "tslib": "^2.3.0" }, diff --git a/packages/sparql-monitor/src/cli.ts b/packages/distribution-monitor/src/cli.ts similarity index 79% rename from packages/sparql-monitor/src/cli.ts rename to packages/distribution-monitor/src/cli.ts index 71f898d8..34744310 100644 --- a/packages/sparql-monitor/src/cli.ts +++ b/packages/distribution-monitor/src/cli.ts @@ -5,7 +5,7 @@ import { createRequire } from 'node:module'; import { MonitorService } from './service.js'; import { PostgresObservationStore } from './store.js'; import type { MonitorConfig } from './types.js'; -import { normalizeConfig, type SparqlMonitorConfig } from './config.js'; +import { normalizeConfig, type DistributionMonitorConfig } from './config.js'; const require = createRequire(import.meta.url); const { version } = require('../package.json') as { version: string }; @@ -14,6 +14,7 @@ interface MonitorContext { config: { databaseUrl: string; intervalSeconds?: number; + timeoutMs?: number; monitors: MonitorConfig[]; }; store: PostgresObservationStore; @@ -21,10 +22,10 @@ interface MonitorContext { } async function loadMonitorContext( - configFile?: string + configFile?: string, ): Promise { - const { config: rawConfig } = await loadConfig({ - name: 'sparql-monitor', + const { config: rawConfig } = await loadConfig({ + name: 'distribution-monitor', configFile, dotenv: true, }); @@ -32,7 +33,7 @@ async function loadMonitorContext( if (!rawConfig) { console.error('Error: No configuration found.'); console.error( - 'Create a sparql-monitor.config.ts file or specify --config.' + 'Create a distribution-monitor.config.ts file or specify --config.', ); process.exit(1); } @@ -42,7 +43,7 @@ async function loadMonitorContext( const databaseUrl = config.databaseUrl ?? process.env.DATABASE_URL; if (!databaseUrl) { console.error( - 'Error: databaseUrl required (set in config or DATABASE_URL env).' + 'Error: databaseUrl required (set in config or DATABASE_URL env).', ); process.exit(1); } @@ -57,6 +58,7 @@ async function loadMonitorContext( store, monitors: config.monitors, intervalSeconds: config.intervalSeconds, + timeoutMs: config.timeoutMs, }); return { @@ -69,13 +71,13 @@ async function loadMonitorContext( const program = new Command(); program - .name('sparql-monitor') - .description('Monitor SPARQL endpoints') + .name('distribution-monitor') + .description('Monitor DCAT distributions (SPARQL endpoints and data dumps)') .version(version); program .command('start') - .description('Start monitoring all configured endpoints') + .description('Start monitoring all configured distributions') .option('-c, --config ', 'Config file path') .action(async (options) => { const { config, store, service } = await loadMonitorContext(options.config); @@ -83,7 +85,7 @@ program await service.checkAll(); service.start(); - console.log(`Monitoring ${config.monitors.length} endpoint(s)...`); + console.log(`Monitoring ${config.monitors.length} distribution(s)...`); console.log(`Interval: ${config.intervalSeconds ?? 300} seconds`); const shutdown = async () => { @@ -107,13 +109,13 @@ program try { if (identifier) { const monitor = config.monitors.find( - (m) => m.identifier === identifier + (m) => m.identifier === identifier, ); if (!monitor) { console.error(`Error: Monitor '${identifier}' not found.`); console.error( 'Available monitors:', - config.monitors.map((m) => m.identifier).join(', ') + config.monitors.map((m) => m.identifier).join(', '), ); process.exit(1); } @@ -121,7 +123,7 @@ program await service.checkNow(identifier); console.log(`Check completed for ${identifier}.`); } else { - console.log(`Checking ${config.monitors.length} endpoint(s)...`); + console.log(`Checking ${config.monitors.length} distribution(s)...`); await service.checkAll(); console.log('All checks completed.'); } diff --git a/packages/distribution-monitor/src/config.ts b/packages/distribution-monitor/src/config.ts new file mode 100644 index 00000000..1e935737 --- /dev/null +++ b/packages/distribution-monitor/src/config.ts @@ -0,0 +1,117 @@ +import { Distribution } from '@lde/dataset'; +import type { MonitorConfig } from './types.js'; + +/** + * Shape of a single monitor entry in a configuration file. URLs may be + * supplied as strings for YAML/JSON ergonomics; they are converted to + * {@link URL} objects by {@link normalizeConfig}. + */ +export interface RawMonitorConfig { + /** Unique identifier for this monitor. */ + identifier: string; + /** The distribution to probe. */ + distribution: { + /** Distribution access URL. */ + accessUrl: string | URL; + /** + * Plain content-type (e.g. `application/n-triples`) or DCAT-AP 3.0 + * IANA media type URI. + */ + mediaType?: string; + /** + * Specification the distribution conforms to, e.g. + * `https://www.w3.org/TR/sparql11-protocol/` for SPARQL endpoints. + */ + conformsTo?: string | URL; + }; + /** + * SPARQL query to run against SPARQL-endpoint distributions. Ignored for + * data-dump distributions. + */ + sparqlQuery?: string; +} + +/** + * Configuration for the distribution monitor. + */ +export interface DistributionMonitorConfig { + /** PostgreSQL connection string. */ + databaseUrl?: string; + /** Polling interval in seconds (default: 300). */ + intervalSeconds?: number; + /** Request timeout in milliseconds (default: 30 000). */ + timeoutMs?: number; + /** Monitor definitions. */ + monitors: RawMonitorConfig[]; +} + +/** + * Type helper for TypeScript config files. + * + * @example + * ```ts + * // distribution-monitor.config.ts + * import { defineConfig } from '@lde/distribution-monitor'; + * + * export default defineConfig({ + * databaseUrl: process.env.DATABASE_URL, + * intervalSeconds: 300, + * monitors: [ + * { + * identifier: 'dbpedia', + * distribution: { + * accessUrl: 'https://dbpedia.org/sparql', + * conformsTo: 'https://www.w3.org/TR/sparql11-protocol/', + * }, + * sparqlQuery: 'ASK { ?s ?p ?o }', + * }, + * { + * identifier: 'my-dump', + * distribution: { + * accessUrl: 'https://example.org/data.nt', + * mediaType: 'application/n-triples', + * }, + * }, + * ], + * }); + * ``` + */ +export function defineConfig( + config: DistributionMonitorConfig, +): DistributionMonitorConfig { + return config; +} + +/** + * Normalize config: convert string URLs to URL objects and construct + * {@link Distribution} instances for each monitor. + */ +export function normalizeConfig(raw: DistributionMonitorConfig): { + databaseUrl?: string; + intervalSeconds?: number; + timeoutMs?: number; + monitors: MonitorConfig[]; +} { + return { + databaseUrl: raw.databaseUrl, + intervalSeconds: raw.intervalSeconds, + timeoutMs: raw.timeoutMs, + monitors: raw.monitors.map((m) => ({ + identifier: m.identifier, + distribution: toDistribution(m.distribution), + sparqlQuery: m.sparqlQuery, + })), + }; +} + +function toDistribution(raw: RawMonitorConfig['distribution']): Distribution { + const accessUrl = + typeof raw.accessUrl === 'string' ? new URL(raw.accessUrl) : raw.accessUrl; + const conformsTo = + raw.conformsTo === undefined + ? undefined + : typeof raw.conformsTo === 'string' + ? new URL(raw.conformsTo) + : raw.conformsTo; + return new Distribution(accessUrl, raw.mediaType, conformsTo); +} diff --git a/packages/distribution-monitor/src/index.ts b/packages/distribution-monitor/src/index.ts new file mode 100644 index 00000000..7ddccfc3 --- /dev/null +++ b/packages/distribution-monitor/src/index.ts @@ -0,0 +1,19 @@ +export type { + MonitorConfig, + CheckResult, + Observation, + ObservationStore, +} from './types.js'; +export { PostgresObservationStore } from './store.js'; +export { + MonitorService, + mapProbeResult, + type MonitorServiceOptions, + type Probe, +} from './service.js'; +export { + defineConfig, + normalizeConfig, + type DistributionMonitorConfig, + type RawMonitorConfig, +} from './config.js'; diff --git a/packages/sparql-monitor/src/schema.ts b/packages/distribution-monitor/src/schema.ts similarity index 96% rename from packages/sparql-monitor/src/schema.ts rename to packages/distribution-monitor/src/schema.ts index 57dbf683..10d16d12 100644 --- a/packages/sparql-monitor/src/schema.ts +++ b/packages/distribution-monitor/src/schema.ts @@ -34,9 +34,9 @@ export const observations = pgTable( index('observations_observed_at_idx').on(table.observedAt), index('observations_monitor_observed_at_idx').on( table.monitor, - sql`${table.observedAt} DESC` + sql`${table.observedAt} DESC`, ), - ] + ], ); /** @@ -51,7 +51,7 @@ export const refreshLatestObservationsViewSql = sql` */ export const latestObservations = pgMaterializedView( 'latest_observations', - columns + columns, ).as(sql` SELECT DISTINCT ON (monitor) * FROM ${observations} diff --git a/packages/distribution-monitor/src/service.ts b/packages/distribution-monitor/src/service.ts new file mode 100644 index 00000000..c87580b1 --- /dev/null +++ b/packages/distribution-monitor/src/service.ts @@ -0,0 +1,181 @@ +import { CronJob } from 'cron'; +import { + probe, + NetworkError, + type ProbeResultType, + type ProbeOptions, +} from '@lde/distribution-probe'; +import type { CheckResult, MonitorConfig, ObservationStore } from './types.js'; + +/** + * Function signature for a probe. Matches `probe()` from + * `@lde/distribution-probe`; injectable so tests can stub it. + */ +export type Probe = typeof probe; + +export interface MonitorServiceOptions { + /** Store for persisting observations. */ + store: ObservationStore; + /** Monitor configurations. */ + monitors: MonitorConfig[]; + /** Polling interval in seconds (default: 300). */ + intervalSeconds?: number; + /** Request timeout in milliseconds passed to the probe (default: 30 000). */ + timeoutMs?: number; + /** HTTP headers forwarded to every probe request (e.g. User-Agent). */ + headers?: Headers; + /** + * Override the probe function. Mostly useful for tests; defaults to + * {@link probe} from `@lde/distribution-probe`. + */ + probe?: Probe; +} + +const DEFAULT_TIMEOUT_MS = 30_000; + +/** + * Orchestrates monitoring of multiple DCAT distributions. + */ +export class MonitorService { + private readonly store: ObservationStore; + private readonly probe: Probe; + private readonly configs: MonitorConfig[]; + private readonly intervalSeconds: number; + private readonly timeoutMs: number; + private readonly headers?: Headers; + private job: CronJob | null = null; + + constructor(options: MonitorServiceOptions) { + this.store = options.store; + this.probe = options.probe ?? probe; + this.configs = options.monitors; + this.intervalSeconds = options.intervalSeconds ?? 300; + this.timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS; + this.headers = options.headers; + } + + /** + * Perform an immediate check for a monitor. + */ + async checkNow(identifier: string): Promise { + const config = this.configs.find((c) => c.identifier === identifier); + if (!config) { + throw new Error(`Monitor not found: ${identifier}`); + } + await this.performCheck(config); + await this.refreshView(); + } + + /** + * Perform an immediate check for all monitors. + */ + async checkAll(): Promise { + await Promise.all(this.configs.map((config) => this.performCheck(config))); + await this.refreshView(); + } + + /** + * Start monitoring all configured distributions. + */ + start(): void { + if (!this.job) { + const cronExpression = this.secondsToCron(this.intervalSeconds); + this.job = new CronJob(cronExpression, () => this.checkAll()); + this.job.start(); + } + } + + /** + * Stop monitoring. + */ + stop(): void { + if (this.job) { + this.job.stop(); + this.job = null; + } + } + + /** + * Check whether monitoring is running. + */ + isRunning(): boolean { + return this.job !== null; + } + + /** + * Convert seconds to a cron expression. + */ + private secondsToCron(seconds: number): string { + if (seconds < 60) { + return `*/${seconds} * * * * *`; + } + const minutes = Math.floor(seconds / 60); + if (minutes < 60) { + return `0 */${minutes} * * * *`; + } + const hours = Math.floor(minutes / 60); + return `0 0 */${hours} * * *`; + } + + private async performCheck(config: MonitorConfig): Promise { + const observedAt = new Date(); + const options: ProbeOptions = { timeoutMs: this.timeoutMs }; + if (this.headers) options.headers = this.headers; + if (config.sparqlQuery) options.sparqlQuery = config.sparqlQuery; + + const result = await this.probe(config.distribution, options); + const checkResult = mapProbeResult(result, observedAt); + await this.store.store({ monitor: config.identifier, ...checkResult }); + } + + private async refreshView(): Promise { + try { + await this.store.refreshLatestObservationsView(); + } catch { + // View refresh failure is not critical + } + } +} + +/** + * Collapse a {@link ProbeResultType} into a {@link CheckResult}. Network + * errors become `success: false` with the network error message; HTTP or + * body-validation failures become `success: false` with the probe's + * failureReason (falling back to joined warnings or the HTTP status) as the + * error message; everything else is `success: true`. + */ +export function mapProbeResult( + result: ProbeResultType, + observedAt: Date, +): CheckResult { + if (result instanceof NetworkError) { + return { + success: false, + responseTimeMs: result.responseTimeMs, + errorMessage: result.message, + observedAt, + }; + } + + if (result.isSuccess()) { + return { + success: true, + responseTimeMs: result.responseTimeMs, + errorMessage: null, + observedAt, + }; + } + + const errorMessage = + result.failureReason ?? + (result.warnings.length > 0 + ? result.warnings.join('; ') + : `HTTP ${result.statusCode} ${result.statusText}`); + + return { + success: false, + responseTimeMs: result.responseTimeMs, + errorMessage, + observedAt, + }; +} diff --git a/packages/sparql-monitor/src/store.ts b/packages/distribution-monitor/src/store.ts similarity index 94% rename from packages/sparql-monitor/src/store.ts rename to packages/distribution-monitor/src/store.ts index aa72d672..5845df5b 100644 --- a/packages/sparql-monitor/src/store.ts +++ b/packages/distribution-monitor/src/store.ts @@ -26,12 +26,11 @@ export class PostgresObservationStore implements ObservationStore { * See: https://github.com/drizzle-team/drizzle-orm/issues/5293 */ static async create( - connectionString: string + connectionString: string, ): Promise { const store = new PostgresObservationStore(connectionString); - const { generateDrizzleJson, generateMigration } = await import( - 'drizzle-kit/api-postgres' - ); + const { generateDrizzleJson, generateMigration } = + await import('drizzle-kit/api-postgres'); // Generate migration from empty state to our schema const empty = await generateDrizzleJson({}); @@ -59,7 +58,7 @@ export class PostgresObservationStore implements ObservationStore { // Create unique index on materialized view for CONCURRENTLY refresh try { await store.db.execute( - sql`CREATE UNIQUE INDEX latest_observations_monitor_idx ON latest_observations (monitor)` + sql`CREATE UNIQUE INDEX latest_observations_monitor_idx ON latest_observations (monitor)`, ); } catch { // Index may already exist diff --git a/packages/sparql-monitor/src/types.ts b/packages/distribution-monitor/src/types.ts similarity index 63% rename from packages/sparql-monitor/src/types.ts rename to packages/distribution-monitor/src/types.ts index 462ee231..253de70a 100644 --- a/packages/sparql-monitor/src/types.ts +++ b/packages/distribution-monitor/src/types.ts @@ -1,20 +1,30 @@ +import { Distribution } from '@lde/dataset'; + /** - * Configuration for a monitor. + * Configuration for a single monitor. + * + * Monitors target any DCAT {@link Distribution}: a SPARQL endpoint (in which + * case `sparqlQuery` is used for the probe) or a data dump (in which case + * `sparqlQuery` is ignored and the distribution is fetched with HEAD/GET). */ export interface MonitorConfig { /** Unique identifier for this monitor. */ identifier: string; - /** URL of the SPARQL endpoint to monitor. */ - endpointUrl: URL; - /** SPARQL query to execute. */ - query: string; + /** The DCAT distribution to probe. */ + distribution: Distribution; + /** + * SPARQL query to run against the endpoint. Only meaningful when the + * distribution is a SPARQL endpoint. Defaults to a minimal availability + * probe (`SELECT * { ?s ?p ?o } LIMIT 1`). + */ + sparqlQuery?: string; } /** - * Result of a single check against a SPARQL endpoint. + * Result of a single check against a distribution. */ export interface CheckResult { - /** Whether the endpoint responded successfully. */ + /** Whether the distribution responded successfully. */ success: boolean; /** Response time in milliseconds. */ responseTimeMs: number; diff --git a/packages/distribution-monitor/test/config.test.ts b/packages/distribution-monitor/test/config.test.ts new file mode 100644 index 00000000..920a2a98 --- /dev/null +++ b/packages/distribution-monitor/test/config.test.ts @@ -0,0 +1,92 @@ +import { describe, it, expect } from 'vitest'; +import { Distribution } from '@lde/dataset'; +import { defineConfig, normalizeConfig } from '../src/config.js'; + +describe('defineConfig', () => { + it('returns the config as-is', () => { + const config = { + databaseUrl: 'postgres://localhost/test', + intervalSeconds: 60, + monitors: [ + { + identifier: 'test', + distribution: { + accessUrl: 'https://example.org/sparql', + conformsTo: 'https://www.w3.org/TR/sparql11-protocol/', + }, + sparqlQuery: 'ASK { ?s ?p ?o }', + }, + ], + }; + + expect(defineConfig(config)).toEqual(config); + }); +}); + +describe('normalizeConfig', () => { + it('constructs a Distribution from string URLs', () => { + const raw = { + databaseUrl: 'postgres://localhost/test', + intervalSeconds: 300, + monitors: [ + { + identifier: 'dbpedia', + distribution: { + accessUrl: 'https://example.org/sparql', + conformsTo: 'https://www.w3.org/TR/sparql11-protocol/', + }, + sparqlQuery: 'ASK { ?s ?p ?o }', + }, + ], + }; + + const normalized = normalizeConfig(raw); + + expect(normalized.databaseUrl).toBe('postgres://localhost/test'); + expect(normalized.intervalSeconds).toBe(300); + expect(normalized.monitors[0].identifier).toBe('dbpedia'); + expect(normalized.monitors[0].distribution).toBeInstanceOf(Distribution); + expect(normalized.monitors[0].distribution.accessUrl.href).toBe( + 'https://example.org/sparql', + ); + expect(normalized.monitors[0].distribution.isSparql()).toBe(true); + expect(normalized.monitors[0].sparqlQuery).toBe('ASK { ?s ?p ?o }'); + }); + + it('preserves URL objects', () => { + const accessUrl = new URL('https://example.org/data.nt'); + const raw = { + monitors: [ + { + identifier: 'dump', + distribution: { + accessUrl, + mediaType: 'application/n-triples', + }, + }, + ], + }; + + const normalized = normalizeConfig(raw); + + expect(normalized.monitors[0].distribution.accessUrl).toBe(accessUrl); + expect(normalized.monitors[0].distribution.mimeType).toBe( + 'application/n-triples', + ); + expect(normalized.monitors[0].distribution.isSparql()).toBe(false); + }); + + it('forwards timeoutMs', () => { + const normalized = normalizeConfig({ + timeoutMs: 10_000, + monitors: [ + { + identifier: 'x', + distribution: { accessUrl: 'https://example.org/data' }, + }, + ], + }); + + expect(normalized.timeoutMs).toBe(10_000); + }); +}); diff --git a/packages/distribution-monitor/test/service.test.ts b/packages/distribution-monitor/test/service.test.ts new file mode 100644 index 00000000..b3c907df --- /dev/null +++ b/packages/distribution-monitor/test/service.test.ts @@ -0,0 +1,302 @@ +import { describe, it, expect, vi } from 'vitest'; +import { Distribution } from '@lde/dataset'; +import { + SparqlProbeResult, + DataDumpProbeResult, + NetworkError, +} from '@lde/distribution-probe'; +import { MonitorService, mapProbeResult } from '../src/service.js'; +import type { ObservationStore, MonitorConfig } from '../src/types.js'; + +function createMockStore(): ObservationStore { + return { + getLatest: vi.fn().mockResolvedValue(new Map()), + get: vi.fn().mockResolvedValue(null), + store: vi.fn().mockImplementation(async (observation) => ({ + id: 'obs-1', + ...observation, + })), + refreshLatestObservationsView: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + }; +} + +const sparqlDistribution = Distribution.sparql( + new URL('http://example.org/sparql'), +); + +const dumpDistribution = new Distribution( + new URL('http://example.org/data.nt'), + 'application/n-triples', +); + +const testMonitors: MonitorConfig[] = [ + { + identifier: 'sparql-monitor', + distribution: sparqlDistribution, + sparqlQuery: 'ASK { ?s ?p ?o }', + }, + { + identifier: 'dump-monitor', + distribution: dumpDistribution, + }, +]; + +describe('MonitorService', () => { + describe('checkNow', () => { + it('probes a SPARQL distribution and stores the result', async () => { + const store = createMockStore(); + const probe = vi.fn().mockResolvedValue( + new SparqlProbeResult( + 'http://example.org/sparql', + new Response('{"boolean": true}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + 42, + 'application/sparql-results+json', + ), + ); + const service = new MonitorService({ + store, + monitors: testMonitors, + probe, + }); + + await service.checkNow('sparql-monitor'); + + expect(probe).toHaveBeenCalledWith( + sparqlDistribution, + expect.objectContaining({ sparqlQuery: 'ASK { ?s ?p ?o }' }), + ); + expect(store.store).toHaveBeenCalledWith( + expect.objectContaining({ + monitor: 'sparql-monitor', + success: true, + responseTimeMs: 42, + errorMessage: null, + }), + ); + }); + + it('probes a data-dump distribution without a sparqlQuery', async () => { + const store = createMockStore(); + const probe = vi.fn().mockResolvedValue( + new DataDumpProbeResult( + 'http://example.org/data.nt', + new Response('', { + status: 200, + headers: { + 'Content-Type': 'application/n-triples', + 'Content-Length': '50000', + }, + }), + 12, + ), + ); + const service = new MonitorService({ + store, + monitors: testMonitors, + probe, + }); + + await service.checkNow('dump-monitor'); + + expect(probe).toHaveBeenCalledWith( + dumpDistribution, + expect.not.objectContaining({ sparqlQuery: expect.anything() }), + ); + expect(store.store).toHaveBeenCalledWith( + expect.objectContaining({ + monitor: 'dump-monitor', + success: true, + responseTimeMs: 12, + }), + ); + }); + + it('records probe failures as success: false', async () => { + const store = createMockStore(); + const probe = vi + .fn() + .mockResolvedValue( + new NetworkError( + 'http://example.org/sparql', + 'Connection refused', + 7, + ), + ); + const service = new MonitorService({ + store, + monitors: testMonitors, + probe, + }); + + await service.checkNow('sparql-monitor'); + + expect(store.store).toHaveBeenCalledWith( + expect.objectContaining({ + monitor: 'sparql-monitor', + success: false, + errorMessage: 'Connection refused', + responseTimeMs: 7, + }), + ); + }); + + it('throws when monitor not found', async () => { + const store = createMockStore(); + const service = new MonitorService({ store, monitors: testMonitors }); + + await expect(service.checkNow('nonexistent')).rejects.toThrow( + 'Monitor not found: nonexistent', + ); + }); + + it('forwards configured timeoutMs and headers to the probe', async () => { + const store = createMockStore(); + const probe = vi.fn().mockResolvedValue( + new SparqlProbeResult( + 'http://example.org/sparql', + new Response('{"boolean": true}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + 1, + 'application/sparql-results+json', + ), + ); + const headers = new Headers({ 'User-Agent': 'TestAgent/1.0' }); + const service = new MonitorService({ + store, + monitors: testMonitors, + probe, + timeoutMs: 10_000, + headers, + }); + + await service.checkNow('sparql-monitor'); + + expect(probe).toHaveBeenCalledWith( + sparqlDistribution, + expect.objectContaining({ timeoutMs: 10_000, headers }), + ); + }); + }); + + describe('checkAll', () => { + it('checks all monitors in parallel', async () => { + const store = createMockStore(); + const probe = vi.fn(async (distribution: Distribution) => { + if (distribution === sparqlDistribution) { + return new SparqlProbeResult( + 'http://example.org/sparql', + new Response('{"boolean": true}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + 1, + 'application/sparql-results+json', + ); + } + return new DataDumpProbeResult( + 'http://example.org/data.nt', + new Response('', { + status: 200, + headers: { + 'Content-Type': 'application/n-triples', + 'Content-Length': '50000', + }, + }), + 1, + ); + }); + const service = new MonitorService({ + store, + monitors: testMonitors, + probe, + }); + + await service.checkAll(); + + expect(probe).toHaveBeenCalledTimes(2); + expect(store.store).toHaveBeenCalledTimes(2); + expect(store.store).toHaveBeenCalledWith( + expect.objectContaining({ monitor: 'sparql-monitor' }), + ); + expect(store.store).toHaveBeenCalledWith( + expect.objectContaining({ monitor: 'dump-monitor' }), + ); + }); + }); + + describe('start/stop', () => { + it('starts and stops monitoring', () => { + const store = createMockStore(); + const service = new MonitorService({ store, monitors: testMonitors }); + + service.start(); + expect(service.isRunning()).toBe(true); + + service.stop(); + expect(service.isRunning()).toBe(false); + }); + }); +}); + +describe('mapProbeResult', () => { + const observedAt = new Date('2026-04-23T10:00:00Z'); + + it('maps NetworkError to success: false', () => { + const result = new NetworkError('http://example.org', 'boom', 50); + expect(mapProbeResult(result, observedAt)).toEqual({ + success: false, + responseTimeMs: 50, + errorMessage: 'boom', + observedAt, + }); + }); + + it('maps successful SparqlProbeResult to success: true', () => { + const result = new SparqlProbeResult( + 'http://example.org', + new Response('{"results":{}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + 100, + 'application/sparql-results+json', + ); + expect(mapProbeResult(result, observedAt).success).toBe(true); + }); + + it('maps failureReason into errorMessage when probe is unsuccessful', () => { + const result = new DataDumpProbeResult( + 'http://example.org/data.nt', + new Response('', { + status: 200, + headers: { 'Content-Type': 'text/turtle' }, + }), + 20, + 'Distribution is empty', + ); + expect(mapProbeResult(result, observedAt)).toEqual({ + success: false, + responseTimeMs: 20, + errorMessage: 'Distribution is empty', + observedAt, + }); + }); + + it('falls back to HTTP status when there is no failureReason or warnings', () => { + const result = new SparqlProbeResult( + 'http://example.org', + new Response('', { status: 502, statusText: 'Bad Gateway' }), + 30, + 'application/sparql-results+json', + ); + expect(mapProbeResult(result, observedAt).errorMessage).toBe( + 'HTTP 502 Bad Gateway', + ); + }); +}); diff --git a/packages/sparql-monitor/test/store.test.ts b/packages/distribution-monitor/test/store.test.ts similarity index 97% rename from packages/sparql-monitor/test/store.test.ts rename to packages/distribution-monitor/test/store.test.ts index d040d172..4c251f22 100644 --- a/packages/sparql-monitor/test/store.test.ts +++ b/packages/distribution-monitor/test/store.test.ts @@ -17,7 +17,7 @@ describe('PostgresObservationStore', () => { beforeAll(async () => { container = await new PostgreSqlContainer('postgres:18').start(); store = await PostgresObservationStore.create( - container.getConnectionUri() + container.getConnectionUri(), ); }, 60000); @@ -84,7 +84,7 @@ describe('PostgresObservationStore', () => { // Close and recreate store to test idempotent schema push await store.close(); store = await PostgresObservationStore.create( - container.getConnectionUri() + container.getConnectionUri(), ); expect(store).toBeDefined(); }); diff --git a/packages/sparql-monitor/tsconfig.json b/packages/distribution-monitor/tsconfig.json similarity index 100% rename from packages/sparql-monitor/tsconfig.json rename to packages/distribution-monitor/tsconfig.json diff --git a/packages/sparql-monitor/tsconfig.lib.json b/packages/distribution-monitor/tsconfig.lib.json similarity index 79% rename from packages/sparql-monitor/tsconfig.lib.json rename to packages/distribution-monitor/tsconfig.lib.json index d57102cc..5a63f73f 100644 --- a/packages/sparql-monitor/tsconfig.lib.json +++ b/packages/distribution-monitor/tsconfig.lib.json @@ -9,7 +9,14 @@ "types": ["node"] }, "include": ["src/**/*.ts"], - "references": [], + "references": [ + { + "path": "../dataset/tsconfig.lib.json" + }, + { + "path": "../distribution-probe/tsconfig.lib.json" + } + ], "exclude": [ "vite.config.ts", "vite.config.mts", diff --git a/packages/sparql-monitor/tsconfig.spec.json b/packages/distribution-monitor/tsconfig.spec.json similarity index 100% rename from packages/sparql-monitor/tsconfig.spec.json rename to packages/distribution-monitor/tsconfig.spec.json diff --git a/packages/sparql-monitor/vite.config.ts b/packages/distribution-monitor/vite.config.ts similarity index 60% rename from packages/sparql-monitor/vite.config.ts rename to packages/distribution-monitor/vite.config.ts index 35949139..31592ac6 100644 --- a/packages/sparql-monitor/vite.config.ts +++ b/packages/distribution-monitor/vite.config.ts @@ -5,17 +5,18 @@ export default mergeConfig( baseConfig, defineConfig({ root: __dirname, - cacheDir: '../../node_modules/.vite/packages/sparql-monitor', + cacheDir: '../../node_modules/.vite/packages/distribution-monitor', test: { coverage: { exclude: ['src/cli.ts', 'drizzle.config.ts'], thresholds: { - functions: 96.96, - lines: 95.23, - branches: 79.06, - statements: 93.8, + autoUpdate: true, + functions: 96.29, + lines: 94.04, + branches: 77.27, + statements: 92.47, }, }, }, - }) + }), ); diff --git a/packages/distribution-probe/src/index.ts b/packages/distribution-probe/src/index.ts index 62c45a2d..c90f233d 100644 --- a/packages/distribution-probe/src/index.ts +++ b/packages/distribution-probe/src/index.ts @@ -3,5 +3,6 @@ export { NetworkError, SparqlProbeResult, DataDumpProbeResult, + type ProbeOptions, type ProbeResultType, } from './probe.js'; diff --git a/packages/distribution-probe/src/probe.ts b/packages/distribution-probe/src/probe.ts index b0cf1ca4..14679177 100644 --- a/packages/distribution-probe/src/probe.ts +++ b/packages/distribution-probe/src/probe.ts @@ -1,6 +1,29 @@ import { Distribution } from '@lde/dataset'; import { Parser } from 'n3'; +/** + * Options for {@link probe}. + */ +export interface ProbeOptions { + /** Request timeout in milliseconds. Defaults to 5 000. */ + timeoutMs?: number; + /** + * Extra HTTP headers to send with the request. Merged with probe-generated + * headers; caller-supplied values take precedence on conflict. + */ + headers?: Headers; + /** + * SPARQL query to use when probing a SPARQL endpoint. The query’s type + * (`ASK` / `SELECT` / `CONSTRUCT` / `DESCRIBE`) determines the `Accept` + * header and the response validation strategy. Ignored for data-dump + * distributions. Defaults to `SELECT * { ?s ?p ?o } LIMIT 1`. + */ + sparqlQuery?: string; +} + +const DEFAULT_SPARQL_QUERY = 'SELECT * { ?s ?p ?o } LIMIT 1'; +const DEFAULT_TIMEOUT_MS = 5000; + /** * Result of a network error during probing. */ @@ -8,6 +31,7 @@ export class NetworkError { constructor( public readonly url: string, public readonly message: string, + public readonly responseTimeMs: number, ) {} } @@ -21,16 +45,19 @@ abstract class ProbeResult { public readonly contentType: string | null; public readonly failureReason: string | null; public readonly warnings: string[] = []; + public readonly responseTimeMs: number; constructor( public readonly url: string, response: Response, + responseTimeMs: number, failureReason: string | null = null, ) { this.statusCode = response.status; this.statusText = response.statusText; this.contentType = response.headers.get('Content-Type'); this.failureReason = failureReason; + this.responseTimeMs = responseTimeMs; const lastModifiedHeader = response.headers.get('Last-Modified'); if (lastModifiedHeader) { this.lastModified = new Date(lastModifiedHeader); @@ -47,12 +74,24 @@ abstract class ProbeResult { } const SPARQL_RESULTS_JSON = 'application/sparql-results+json'; +const SPARQL_RDF_RESULTS = 'application/n-triples'; /** * Result of probing a SPARQL endpoint. */ export class SparqlProbeResult extends ProbeResult { - public readonly acceptedContentType = SPARQL_RESULTS_JSON; + public readonly acceptedContentType: string; + + constructor( + url: string, + response: Response, + responseTimeMs: number, + acceptedContentType: string, + failureReason: string | null = null, + ) { + super(url, response, responseTimeMs, failureReason); + this.acceptedContentType = acceptedContentType; + } override isSuccess(): boolean { return ( @@ -71,9 +110,10 @@ export class DataDumpProbeResult extends ProbeResult { constructor( url: string, response: Response, + responseTimeMs: number, failureReason: string | null = null, ) { - super(url, response, failureReason); + super(url, response, responseTimeMs, failureReason); const contentLengthHeader = response.headers.get('Content-Length'); if (contentLengthHeader) { this.contentSize = parseInt(contentLengthHeader); @@ -86,90 +126,207 @@ export type ProbeResultType = | DataDumpProbeResult | NetworkError; +type SparqlQueryType = 'ASK' | 'SELECT' | 'CONSTRUCT' | 'DESCRIBE'; + /** * Probe a distribution to check availability and gather metadata. * - * For SPARQL endpoints, sends a simple SELECT query. - * For data dumps, sends HEAD (or GET if HEAD returns no Content-Length). + * For SPARQL endpoints, issues the configured SPARQL query (default: a + * minimal `SELECT`). For data dumps, issues `HEAD` (with a `GET` fallback + * for small or unknown-size bodies). * - * Returns pure probe results without mutating the distribution. + * Returns a pure result object; never throws. */ export async function probe( distribution: Distribution, - timeout = 5000, + options?: ProbeOptions, ): Promise { + const resolved = resolveOptions(options); + const url = distribution.accessUrl?.toString() ?? 'unknown'; + const [authUrl, authHeaders] = + distribution.accessUrl !== undefined + ? extractUrlCredentials(distribution.accessUrl, resolved.headers) + : [new URL(url), new Headers(resolved.headers)]; + + const start = performance.now(); try { if (distribution.isSparql()) { - return await probeSparqlEndpoint(distribution, timeout); + return await probeSparqlEndpoint( + authUrl.toString(), + distribution, + resolved, + authHeaders, + start, + ); } - return await probeDataDump(distribution, timeout); + return await probeDataDump( + authUrl.toString(), + distribution, + resolved, + authHeaders, + start, + ); } catch (e) { + const responseTimeMs = Math.round(performance.now() - start); return new NetworkError( - distribution.accessUrl?.toString() ?? 'unknown', + url, e instanceof Error ? e.message : String(e), + responseTimeMs, + ); + } +} + +function resolveOptions( + options: ProbeOptions | undefined, +): Required { + return { + timeoutMs: options?.timeoutMs ?? DEFAULT_TIMEOUT_MS, + headers: options?.headers ?? new Headers(), + sparqlQuery: options?.sparqlQuery ?? DEFAULT_SPARQL_QUERY, + }; +} + +/** + * Strip `user:pass@` from a URL and turn it into an `Authorization: Basic` + * header. Returns the cleaned URL and a merged Headers object that preserves + * any caller-supplied headers. + */ +function extractUrlCredentials(url: URL, baseHeaders: Headers): [URL, Headers] { + const headers = new Headers(baseHeaders); + if (url.username === '' && url.password === '') { + return [url, headers]; + } + const credentials = `${decodeURIComponent(url.username)}:${decodeURIComponent( + url.password, + )}`; + if (!headers.has('Authorization')) { + headers.set( + 'Authorization', + `Basic ${Buffer.from(credentials).toString('base64')}`, ); } + const cleanUrl = new URL(url.toString()); + cleanUrl.username = ''; + cleanUrl.password = ''; + return [cleanUrl, headers]; +} + +/** + * Classify a SPARQL query. Comments are stripped; the first keyword match + * wins. Falls back to `SELECT` when no keyword is found – robust enough for + * availability probing but not a full SPARQL parser. + */ +function detectSparqlQueryType(query: string): SparqlQueryType { + const withoutComments = query.replace(/#[^\n\r]*/g, ' '); + const match = /\b(ASK|SELECT|CONSTRUCT|DESCRIBE)\b/i.exec(withoutComments); + return (match?.[1].toUpperCase() ?? 'SELECT') as SparqlQueryType; +} + +function acceptHeaderForQueryType(queryType: SparqlQueryType): string { + if (queryType === 'ASK' || queryType === 'SELECT') { + return SPARQL_RESULTS_JSON; + } + return SPARQL_RDF_RESULTS; } async function probeSparqlEndpoint( - distribution: Distribution, - timeout: number, + url: string, + _distribution: Distribution, + options: Required, + authHeaders: Headers, + start: number, ): Promise { - const url = distribution.accessUrl!.toString(); + const queryType = detectSparqlQueryType(options.sparqlQuery); + const accept = acceptHeaderForQueryType(queryType); + const headers = new Headers({ + 'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', + Accept: accept, + }); + for (const [key, value] of authHeaders) { + headers.set(key, value); + } + const response = await fetch(url, { - signal: AbortSignal.timeout(timeout), + signal: AbortSignal.timeout(options.timeoutMs), method: 'POST', - headers: { - 'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', - Accept: SPARQL_RESULTS_JSON, - }, - body: `query=${encodeURIComponent('SELECT * { ?s ?p ?o } LIMIT 1')}`, + headers, + body: `query=${encodeURIComponent(options.sparqlQuery)}`, }); - const isJsonResponse = response.headers - .get('Content-Type') - ?.startsWith(SPARQL_RESULTS_JSON); + + const actualContentType = response.headers.get('Content-Type'); + const contentTypeMatches = actualContentType?.startsWith(accept) ?? false; let failureReason: string | null = null; - if (response.ok && isJsonResponse) { - failureReason = await validateSparqlResponse(response); + if (response.ok && contentTypeMatches) { + failureReason = await validateSparqlResponse(response, queryType); } else { // Drain unconsumed body to release the underlying connection. await response.body?.cancel(); } - return new SparqlProbeResult(url, response, failureReason); + const responseTimeMs = Math.round(performance.now() - start); + return new SparqlProbeResult( + url, + response, + responseTimeMs, + accept, + failureReason, + ); } async function validateSparqlResponse( response: Response, + queryType: SparqlQueryType, ): Promise { const body = await response.text(); if (body.length === 0) { return 'SPARQL endpoint returned an empty response'; } + if (queryType === 'CONSTRUCT' || queryType === 'DESCRIBE') { + // Body should be RDF; a non-empty response is sufficient to confirm the + // endpoint answered. Deep parse validation is the data-dump path’s job. + return null; + } + + let json: Record; try { - const json = JSON.parse(body) as Record; - if (!json.results || typeof json.results !== 'object') { - return 'SPARQL endpoint did not return a valid results object'; - } + json = JSON.parse(body) as Record; } catch { return 'SPARQL endpoint returned invalid JSON'; } + if (queryType === 'ASK') { + if (typeof json.boolean !== 'boolean') { + return 'SPARQL endpoint did not return a valid ASK result'; + } + return null; + } + + // SELECT + if (!json.results || typeof json.results !== 'object') { + return 'SPARQL endpoint did not return a valid results object'; + } return null; } async function probeDataDump( + url: string, distribution: Distribution, - timeout: number, + options: Required, + authHeaders: Headers, + start: number, ): Promise { - const url = distribution.accessUrl!.toString(); + const headers = new Headers({ + Accept: distribution.mimeType ?? '*/*', + 'Accept-Encoding': 'identity', + }); + for (const [key, value] of authHeaders) { + headers.set(key, value); + } + const requestOptions = { - signal: AbortSignal.timeout(timeout), - headers: { - Accept: distribution.mimeType ?? '*/*', - 'Accept-Encoding': 'identity', // Return uncompressed responses. - }, + signal: AbortSignal.timeout(options.timeoutMs), + headers, }; const headResponse = await fetch(url, { @@ -192,12 +349,19 @@ async function probeDataDump( const failureReason = isHttpSuccess ? validateBody(body, getResponse.headers.get('Content-Type')) : null; - const result = new DataDumpProbeResult(url, getResponse, failureReason); + const responseTimeMs = Math.round(performance.now() - start); + const result = new DataDumpProbeResult( + url, + getResponse, + responseTimeMs, + failureReason, + ); checkContentTypeMismatch(result, distribution.mimeType); return result; } - const result = new DataDumpProbeResult(url, headResponse); + const responseTimeMs = Math.round(performance.now() - start); + const result = new DataDumpProbeResult(url, headResponse, responseTimeMs); checkContentTypeMismatch(result, distribution.mimeType); return result; } diff --git a/packages/distribution-probe/test/probe.test.ts b/packages/distribution-probe/test/probe.test.ts index 4a4b7b9a..7b336eaa 100644 --- a/packages/distribution-probe/test/probe.test.ts +++ b/packages/distribution-probe/test/probe.test.ts @@ -348,6 +348,321 @@ describe('probe', () => { expect(result).toBeInstanceOf(NetworkError); expect((result as NetworkError).message).toBe('Connection refused'); + expect((result as NetworkError).responseTimeMs).toBeGreaterThanOrEqual(0); + }); + }); + + describe('options', () => { + it('accepts ProbeOptions with timeoutMs', async () => { + vi.mocked(fetch).mockResolvedValue( + new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + ); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + const result = await probe(distribution, { timeoutMs: 1000 }); + + expect(result).toBeInstanceOf(SparqlProbeResult); + }); + }); + + describe('URL-embedded Basic auth', () => { + it('moves user:pass from URL into Authorization header (SPARQL)', async () => { + let capturedUrl: string | undefined; + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (input, init) => { + capturedUrl = + typeof input === 'string' ? input : (input as URL).toString(); + capturedHeaders = new Headers(init?.headers); + return new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://alice:secret@example.org/sparql'), + ); + + await probe(distribution); + + expect(capturedUrl).toBe('http://example.org/sparql'); + expect(capturedHeaders?.get('Authorization')).toBe( + `Basic ${Buffer.from('alice:secret').toString('base64')}`, + ); + }); + + it('decodes URL-encoded credentials', async () => { + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedHeaders = new Headers(init?.headers); + return new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://user%40domain:p%40ss@example.org/sparql'), + ); + + await probe(distribution); + + expect(capturedHeaders?.get('Authorization')).toBe( + `Basic ${Buffer.from('user@domain:p@ss').toString('base64')}`, + ); + }); + + it('applies URL auth to data-dump probes too', async () => { + let capturedUrl: string | undefined; + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (input, init) => { + capturedUrl = + typeof input === 'string' ? input : (input as URL).toString(); + capturedHeaders = new Headers(init?.headers); + return new Response('', { + status: 200, + headers: { + 'Content-Type': 'application/n-triples', + 'Content-Length': '50000', + }, + }); + }); + + const distribution = new Distribution( + new URL('http://alice:secret@example.org/data.nt'), + 'application/n-triples', + ); + + await probe(distribution); + + expect(capturedUrl).toBe('http://example.org/data.nt'); + expect(capturedHeaders?.get('Authorization')).toBe( + `Basic ${Buffer.from('alice:secret').toString('base64')}`, + ); + }); + + it('does not overwrite a caller-supplied Authorization header', async () => { + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedHeaders = new Headers(init?.headers); + return new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const callerHeaders = new Headers({ + Authorization: 'Bearer caller-token', + }); + const distribution = Distribution.sparql( + new URL('http://alice:secret@example.org/sparql'), + ); + + await probe(distribution, { headers: callerHeaders }); + + expect(capturedHeaders?.get('Authorization')).toBe('Bearer caller-token'); + }); + }); + + describe('custom headers', () => { + it('merges caller headers with probe-generated ones', async () => { + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedHeaders = new Headers(init?.headers); + return new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + await probe(distribution, { + headers: new Headers({ 'User-Agent': 'TestAgent/1.0' }), + }); + + expect(capturedHeaders?.get('User-Agent')).toBe('TestAgent/1.0'); + expect(capturedHeaders?.get('Accept')).toBe( + 'application/sparql-results+json', + ); + }); + + it('lets caller headers override probe-generated Accept', async () => { + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedHeaders = new Headers(init?.headers); + return new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + await probe(distribution, { + headers: new Headers({ Accept: 'application/sparql-results+xml' }), + }); + + expect(capturedHeaders?.get('Accept')).toBe( + 'application/sparql-results+xml', + ); + }); + }); + + describe('custom SPARQL query', () => { + it('uses the supplied query instead of the default', async () => { + let capturedBody: string | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedBody = init?.body?.toString(); + return new Response('{"boolean": true}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + const result = await probe(distribution, { + sparqlQuery: 'ASK { ?s ?p ?o }', + }); + + expect(capturedBody).toContain(encodeURIComponent('ASK { ?s ?p ?o }')); + expect((result as SparqlProbeResult).isSuccess()).toBe(true); + }); + + it('validates ASK response body', async () => { + vi.mocked(fetch).mockResolvedValue( + new Response('{"results": {}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + ); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + const result = await probe(distribution, { + sparqlQuery: 'ASK { ?s ?p ?o }', + }); + + const sparqlResult = result as SparqlProbeResult; + expect(sparqlResult.isSuccess()).toBe(false); + expect(sparqlResult.failureReason).toBe( + 'SPARQL endpoint did not return a valid ASK result', + ); + }); + + it('requests an RDF media type for CONSTRUCT queries', async () => { + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedHeaders = new Headers(init?.headers); + return new Response(' .\n', { + status: 200, + headers: { 'Content-Type': 'application/n-triples' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + const result = await probe(distribution, { + sparqlQuery: 'CONSTRUCT WHERE { ?s ?p ?o } LIMIT 1', + }); + + expect(capturedHeaders?.get('Accept')).toContain('application/n-triples'); + expect((result as SparqlProbeResult).isSuccess()).toBe(true); + }); + + it('ignores # comments when detecting query type', async () => { + let capturedHeaders: Headers | undefined; + vi.mocked(fetch).mockImplementation(async (_input, init) => { + capturedHeaders = new Headers(init?.headers); + return new Response('{"boolean": true}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }); + }); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + await probe(distribution, { + sparqlQuery: '# SELECT is in a comment\nASK { ?s ?p ?o }', + }); + + expect(capturedHeaders?.get('Accept')).toBe( + 'application/sparql-results+json', + ); + }); + }); + + describe('responseTimeMs', () => { + it('is set on SparqlProbeResult', async () => { + vi.mocked(fetch).mockResolvedValue( + new Response('{"results": {"bindings": []}}', { + status: 200, + headers: { 'Content-Type': 'application/sparql-results+json' }, + }), + ); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + const result = (await probe(distribution)) as SparqlProbeResult; + + expect(result.responseTimeMs).toBeGreaterThanOrEqual(0); + expect(Number.isInteger(result.responseTimeMs)).toBe(true); + }); + + it('is set on DataDumpProbeResult', async () => { + vi.mocked(fetch).mockResolvedValue( + new Response('', { + status: 200, + headers: { + 'Content-Type': 'application/n-triples', + 'Content-Length': '50000', + }, + }), + ); + + const distribution = new Distribution( + new URL('http://example.org/data.nt'), + 'application/n-triples', + ); + + const result = (await probe(distribution)) as DataDumpProbeResult; + + expect(result.responseTimeMs).toBeGreaterThanOrEqual(0); + expect(Number.isInteger(result.responseTimeMs)).toBe(true); + }); + + it('is set on NetworkError', async () => { + vi.mocked(fetch).mockRejectedValue(new Error('Connection refused')); + + const distribution = Distribution.sparql( + new URL('http://example.org/sparql'), + ); + + const result = (await probe(distribution)) as NetworkError; + + expect(result.responseTimeMs).toBeGreaterThanOrEqual(0); + expect(Number.isInteger(result.responseTimeMs)).toBe(true); }); }); }); diff --git a/packages/distribution-probe/vite.config.ts b/packages/distribution-probe/vite.config.ts index 3fb583b9..027f2af7 100644 --- a/packages/distribution-probe/vite.config.ts +++ b/packages/distribution-probe/vite.config.ts @@ -11,10 +11,10 @@ export default mergeConfig( coverage: { thresholds: { autoUpdate: true, - lines: 98.68, + lines: 99.16, functions: 100, - branches: 84.48, - statements: 97.46, + branches: 86.36, + statements: 98.37, }, }, }, diff --git a/packages/pipeline/src/distribution/resolver.ts b/packages/pipeline/src/distribution/resolver.ts index a46f52e0..93b953b1 100644 --- a/packages/pipeline/src/distribution/resolver.ts +++ b/packages/pipeline/src/distribution/resolver.ts @@ -69,7 +69,7 @@ export class SparqlDistributionResolver implements DistributionResolver { ): Promise { const results = await Promise.all( dataset.distributions.map(async (distribution) => { - const result = await probe(distribution, this.timeout); + const result = await probe(distribution, { timeoutMs: this.timeout }); callbacks?.onProbe?.(distribution, result); return result; }), diff --git a/packages/pipeline/test/distribution/importResolver.test.ts b/packages/pipeline/test/distribution/importResolver.test.ts index e01619de..0af78b61 100644 --- a/packages/pipeline/test/distribution/importResolver.test.ts +++ b/packages/pipeline/test/distribution/importResolver.test.ts @@ -23,6 +23,7 @@ const dataDumpProbeResult = new DataDumpProbeResult( 'Content-Type': 'application/n-triples', }, }), + 0, ); function makeDataset(): Dataset { @@ -220,9 +221,7 @@ describe('ImportResolver', () => { expect(result).toBeInstanceOf(NoDistributionAvailable); const noDistribution = result as NoDistributionAvailable; - expect(noDistribution.message).toBe( - 'No supported import format available', - ); + expect(noDistribution.message).toBe('No supported import format available'); expect(onImportFailed).toHaveBeenCalledWith( dataset.distributions[0], 'No supported import format', diff --git a/packages/pipeline/test/distribution/report.test.ts b/packages/pipeline/test/distribution/report.test.ts index c2b7313c..c11de693 100644 --- a/packages/pipeline/test/distribution/report.test.ts +++ b/packages/pipeline/test/distribution/report.test.ts @@ -35,7 +35,7 @@ function sparqlResponse(overrides?: ResponseInit): Response { describe('probeResultsToQuads', () => { it('yields schema:error literal for a network error', async () => { const results = [ - new NetworkError('http://example.org/sparql', 'ECONNREFUSED'), + new NetworkError('http://example.org/sparql', 'ECONNREFUSED', 0), ]; const store = await collect( probeResultsToQuads(results, 'http://example.org/dataset'), @@ -50,6 +50,8 @@ describe('probeResultsToQuads', () => { const result = new SparqlProbeResult( 'http://example.org/sparql', new Response('', { status: 404, statusText: 'Not Found' }), + 0, + 'application/sparql-results+json', ); const store = await collect( probeResultsToQuads([result], 'http://example.org/dataset'), @@ -66,6 +68,8 @@ describe('probeResultsToQuads', () => { const result = new SparqlProbeResult( 'http://example.org/sparql', sparqlResponse(), + 0, + 'application/sparql-results+json', ); const store = await collect( probeResultsToQuads([result], 'http://example.org/dataset'), @@ -106,6 +110,7 @@ describe('probeResultsToQuads', () => { 'Last-Modified': 'Wed, 01 Jan 2025 00:00:00 GMT', }, }), + 0, ); const store = await collect( probeResultsToQuads([result], 'http://example.org/dataset'), @@ -156,8 +161,13 @@ describe('probeResultsToQuads', () => { it('yields quads for multiple probe results', async () => { const results = [ - new SparqlProbeResult('http://example.org/sparql', sparqlResponse()), - new NetworkError('http://example.org/other', 'timeout'), + new SparqlProbeResult( + 'http://example.org/sparql', + sparqlResponse(), + 0, + 'application/sparql-results+json', + ), + new NetworkError('http://example.org/other', 'timeout', 0), ]; const store = await collect( probeResultsToQuads(results, 'http://example.org/dataset'), @@ -178,6 +188,7 @@ describe('probeResultsToQuads', () => { status: 200, headers: { 'Content-Length': '1000' }, }), + 0, ); const importError = new ImportFailed(distribution, 'Parse error'); @@ -211,6 +222,7 @@ describe('probeResultsToQuads', () => { status: 200, headers: { 'Content-Type': 'text/turtle' }, }), + 0, 'Distribution is empty', ); const store = await collect( diff --git a/packages/pipeline/test/distribution/resolveDistributions.test.ts b/packages/pipeline/test/distribution/resolveDistributions.test.ts index a327ef79..e7e485b2 100644 --- a/packages/pipeline/test/distribution/resolveDistributions.test.ts +++ b/packages/pipeline/test/distribution/resolveDistributions.test.ts @@ -38,6 +38,8 @@ describe('resolveDistributions', () => { status: 200, headers: { 'Content-Type': 'application/sparql-results+json' }, }), + 0, + 'application/sparql-results+json', ); const dataset = new Dataset({ iri: new URL('http://example.org/dataset'), @@ -65,6 +67,7 @@ describe('resolveDistributions', () => { const networkError = new NetworkError( 'http://example.org/sparql', 'Connection refused', + 0, ); const resolver = mockResolver( new NoDistributionAvailable( @@ -102,6 +105,7 @@ describe('resolveDistributions', () => { status: 200, headers: { 'Content-Length': '1000' }, }), + 0, ); const importFailed = new ImportFailed(dataDumpDistribution, 'Parse error'); const resolver = mockResolver( @@ -137,6 +141,8 @@ describe('resolveDistributions', () => { status: 200, headers: { 'Content-Type': 'application/sparql-results+json' }, }), + 0, + 'application/sparql-results+json', ); const dataset = new Dataset({ iri: new URL('http://custom.org/dataset'), diff --git a/packages/pipeline/test/pipeline.test.ts b/packages/pipeline/test/pipeline.test.ts index 677b8c86..60d3d5e1 100644 --- a/packages/pipeline/test/pipeline.test.ts +++ b/packages/pipeline/test/pipeline.test.ts @@ -619,14 +619,18 @@ describe('Pipeline', () => { status: 200, headers: { 'Content-Type': 'application/sparql-results+json' }, }), + 0, + 'application/sparql-results+json', ); const dataDumpResult = new DataDumpProbeResult( 'http://example.org/data.nt', new Response('', { status: 404 }), + 0, ); const networkError = new NetworkError( 'http://example.org/down', 'Connection refused', + 0, ); const pipeline = new Pipeline({ @@ -759,6 +763,7 @@ describe('Pipeline', () => { const networkError = new NetworkError( 'http://example.org/down', 'Connection refused', + 0, ); const pipeline = new Pipeline({ diff --git a/packages/sparql-monitor/README.md b/packages/sparql-monitor/README.md deleted file mode 100644 index 24d09d08..00000000 --- a/packages/sparql-monitor/README.md +++ /dev/null @@ -1,158 +0,0 @@ -# SPARQL Monitor - -Monitor SPARQL endpoints with periodic checks, storing observations in PostgreSQL. - -## Installation - -```bash -npm install @lde/sparql-monitor -``` - -## CLI Usage - -The easiest way to use the monitor is via the CLI with a configuration file. - -### Quick Start - -1. Create a configuration file (TypeScript, JavaScript, JSON, or YAML) -2. Run the monitor - -```bash -# Start continuous monitoring -npx sparql-monitor start - -# Run a one-off check -npx sparql-monitor check - -# Check a specific monitor -npx sparql-monitor check dbpedia - -# Use a custom config path -npx sparql-monitor start --config ./configs/production.config.ts -``` - -### TypeScript Config (`sparql-monitor.config.ts`) - -```typescript -import { defineConfig } from '@lde/sparql-monitor'; - -export default defineConfig({ - databaseUrl: process.env.DATABASE_URL, - intervalSeconds: 300, - monitors: [ - { - identifier: 'dbpedia', - endpointUrl: new URL('https://dbpedia.org/sparql'), - query: 'ASK { ?s ?p ?o }', - }, - { - identifier: 'wikidata', - endpointUrl: new URL('https://query.wikidata.org/sparql'), - query: 'SELECT * WHERE { ?s ?p ?o } LIMIT 1', - }, - ], -}); -``` - -### YAML Config (`sparql-monitor.config.yaml`) - -```yaml -databaseUrl: ${DATABASE_URL} -intervalSeconds: 300 -monitors: - - identifier: dbpedia - endpointUrl: https://dbpedia.org/sparql - query: ASK { ?s ?p ?o } - - identifier: wikidata - endpointUrl: https://query.wikidata.org/sparql - query: SELECT * WHERE { ?s ?p ?o } LIMIT 1 -``` - -### Environment Variables - -Create a `.env` file for sensitive configuration: - -``` -DATABASE_URL=postgres://user:pass@localhost:5432/monitoring -``` - -The CLI automatically loads `.env` files. - -### Config Auto-Discovery - -The CLI searches for configuration in this order: - -1. `sparql-monitor.config.{ts,mts,js,mjs,json,yaml,yml}` -2. `.sparql-monitorrc` -3. `package.json` → `"sparql-monitor"` key - -## Programmatic Usage - -```typescript -import { - MonitorService, - PostgresObservationStore, - type MonitorConfig, -} from '@lde/sparql-monitor'; - -// Define monitors -const monitors: MonitorConfig[] = [ - { - identifier: 'dbpedia', - endpointUrl: new URL('https://dbpedia.org/sparql'), - query: 'ASK { ?s ?p ?o }', - }, - { - identifier: 'wikidata', - endpointUrl: new URL('https://query.wikidata.org/sparql'), - query: 'SELECT * WHERE { ?s ?p ?o } LIMIT 1', - }, -]; - -// Create store (initializes database schema automatically) -const store = await PostgresObservationStore.create( - 'postgres://user:pass@localhost:5432/db' -); - -// Create service with polling interval -const service = new MonitorService({ - store, - monitors, - intervalSeconds: 300, // Check all endpoints every 5 minutes -}); - -// Start periodic monitoring -service.start(); - -// Or perform immediate checks -await service.checkAll(); -await service.checkNow('dbpedia'); - -// Get latest observations -const observations = await store.getLatest(); -for (const [identifier, observation] of observations) { - console.log( - `${identifier}: ${observation.success ? 'OK' : 'FAIL'} (${ - observation.responseTimeMs - }ms)` - ); -} - -// Stop monitoring and close the store -service.stop(); -await store.close(); -``` - -## Database Initialisation - -`PostgresObservationStore.create()` automatically initializes the database schema: - -- `observations` table for storing check results -- `latest_observations` materialized view for efficient queries -- Required indexes - -This is idempotent and safe to call on every startup. - -## Query Types - -The monitor supports ASK, SELECT, and CONSTRUCT queries. The check is considered successful if the query executes without error. diff --git a/packages/sparql-monitor/src/config.ts b/packages/sparql-monitor/src/config.ts deleted file mode 100644 index 48d13fde..00000000 --- a/packages/sparql-monitor/src/config.ts +++ /dev/null @@ -1,70 +0,0 @@ -import type { MonitorConfig } from './types.js'; - -/** - * Raw config as loaded from file (URL can be string or URL). - */ -export interface RawMonitorConfig { - /** Unique identifier for this monitor. */ - identifier: string; - /** URL of the SPARQL endpoint to monitor (string or URL). */ - endpointUrl: string | URL; - /** SPARQL query to execute. */ - query: string; -} - -/** - * Configuration for the SPARQL monitor. - */ -export interface SparqlMonitorConfig { - /** PostgreSQL connection string. */ - databaseUrl?: string; - /** Polling interval in seconds (default: 300). */ - intervalSeconds?: number; - /** Monitor definitions. */ - monitors: RawMonitorConfig[]; -} - -/** - * Type helper for TypeScript config files. - * - * @example - * ```ts - * // sparql-monitor.config.ts - * import { defineConfig } from '@lde/sparql-monitor'; - * - * export default defineConfig({ - * databaseUrl: process.env.DATABASE_URL, - * intervalSeconds: 300, - * monitors: [ - * { - * identifier: 'dbpedia', - * endpointUrl: new URL('https://dbpedia.org/sparql'), - * query: 'ASK { ?s ?p ?o }', - * }, - * ], - * }); - * ``` - */ -export function defineConfig(config: SparqlMonitorConfig): SparqlMonitorConfig { - return config; -} - -/** - * Normalize config (convert string URLs to URL objects). - */ -export function normalizeConfig(raw: SparqlMonitorConfig): { - databaseUrl?: string; - intervalSeconds?: number; - monitors: MonitorConfig[]; -} { - return { - ...raw, - monitors: raw.monitors.map((m) => ({ - ...m, - endpointUrl: - typeof m.endpointUrl === 'string' - ? new URL(m.endpointUrl) - : m.endpointUrl, - })), - }; -} diff --git a/packages/sparql-monitor/src/index.ts b/packages/sparql-monitor/src/index.ts deleted file mode 100644 index 22e6aa1a..00000000 --- a/packages/sparql-monitor/src/index.ts +++ /dev/null @@ -1,5 +0,0 @@ -export type { MonitorConfig, Observation, ObservationStore } from './types.js'; -export { SparqlMonitor, type SparqlMonitorOptions } from './monitor.js'; -export { PostgresObservationStore } from './store.js'; -export { MonitorService, type MonitorServiceOptions } from './service.js'; -export { defineConfig, type SparqlMonitorConfig } from './config.js'; diff --git a/packages/sparql-monitor/src/monitor.ts b/packages/sparql-monitor/src/monitor.ts deleted file mode 100644 index 7192a3c4..00000000 --- a/packages/sparql-monitor/src/monitor.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { SparqlEndpointFetcher } from 'fetch-sparql-endpoint'; -import type { CheckResult } from './types.js'; - -/** - * Extract credentials from a URL and convert them to a Basic auth header. - * Returns a tuple of [URL without credentials, Headers with Authorization]. - */ -function extractUrlCredentials( - url: URL, - baseHeaders?: Headers, -): [URL, Headers] { - const headers = new Headers(baseHeaders); - - if (url.username || url.password) { - const credentials = `${decodeURIComponent( - url.username, - )}:${decodeURIComponent(url.password)}`; - headers.set( - 'Authorization', - `Basic ${Buffer.from(credentials).toString('base64')}`, - ); - - const cleanUrl = new URL(url.toString()); - cleanUrl.username = ''; - cleanUrl.password = ''; - return [cleanUrl, headers]; - } - - return [url, headers]; -} - -export interface SparqlMonitorOptions { - /** Timeout in milliseconds for the SPARQL request. */ - timeoutMs?: number; - /** HTTP headers to include in requests (e.g., User-Agent). */ - headers?: Headers; -} - -/** - * Executes SPARQL queries against an endpoint and measures response time. - */ -export class SparqlMonitor { - private readonly fetcher: SparqlEndpointFetcher; - private readonly options?: SparqlMonitorOptions; - - constructor(options?: SparqlMonitorOptions) { - this.options = options; - this.fetcher = new SparqlEndpointFetcher({ - timeout: options?.timeoutMs ?? 30000, - defaultHeaders: options?.headers, - }); - } - - /** - * Execute a SPARQL query against an endpoint and return the result. - */ - async check(endpointUrl: URL, query: string): Promise { - const observedAt = new Date(); // UTC - const startTime = performance.now(); - const [url, fetcher] = this.prepareFetcherForUrl(endpointUrl); - - try { - const queryType = fetcher.getQueryType(query); - - switch (queryType) { - case 'ASK': - await fetcher.fetchAsk(url, query); - break; - case 'SELECT': - await this.consumeStream(await fetcher.fetchBindings(url, query)); - break; - case 'CONSTRUCT': - await this.consumeStream(await fetcher.fetchTriples(url, query)); - break; - } - - const responseTimeMs = Math.round(performance.now() - startTime); - return { - success: true, - responseTimeMs, - errorMessage: null, - observedAt, - }; - } catch (error) { - const responseTimeMs = Math.round(performance.now() - startTime); - const errorMessage = - error instanceof Error ? error.message : String(error); - - return { - success: false, - responseTimeMs, - errorMessage, - observedAt, - }; - } - } - - private prepareFetcherForUrl( - endpointUrl: URL, - ): [string, SparqlEndpointFetcher] { - const [url, headers] = extractUrlCredentials( - endpointUrl, - this.options?.headers, - ); - const hasCredentials = - headers.has('Authorization') && - !this.options?.headers?.has('Authorization'); - - if (!hasCredentials) { - return [url.toString(), this.fetcher]; - } - - const fetcher = new SparqlEndpointFetcher({ - timeout: this.options?.timeoutMs ?? 30000, - defaultHeaders: headers, - }); - - return [url.toString(), fetcher]; - } - - private async consumeStream( - stream: NodeJS.ReadableStream | NodeJS.EventEmitter, - ): Promise { - return new Promise((resolve, reject) => { - stream.on('data', () => { - // Just consume the data - }); - stream.on('end', () => resolve()); - stream.on('error', reject); - }); - } -} diff --git a/packages/sparql-monitor/src/service.ts b/packages/sparql-monitor/src/service.ts deleted file mode 100644 index 422c148e..00000000 --- a/packages/sparql-monitor/src/service.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { CronJob } from 'cron'; -import { SparqlMonitor } from './monitor.js'; -import type { ObservationStore, MonitorConfig } from './types.js'; - -export interface MonitorServiceOptions { - /** Store for persisting observations. */ - store: ObservationStore; - /** Monitor configurations. */ - monitors: MonitorConfig[]; - /** Polling interval in seconds (default: 300). */ - intervalSeconds?: number; - /** Optional custom monitor instance. */ - sparqlMonitor?: SparqlMonitor; -} - -/** - * Orchestrates monitoring of multiple SPARQL endpoints. - */ -export class MonitorService { - private readonly store: ObservationStore; - private readonly sparqlMonitor: SparqlMonitor; - private readonly configs: MonitorConfig[]; - private readonly intervalSeconds: number; - private job: CronJob | null = null; - - constructor(options: MonitorServiceOptions) { - this.store = options.store; - this.sparqlMonitor = options.sparqlMonitor ?? new SparqlMonitor(); - this.configs = options.monitors; - this.intervalSeconds = options.intervalSeconds ?? 300; - } - - /** - * Perform an immediate check for a monitor. - */ - async checkNow(identifier: string): Promise { - const config = this.configs.find((c) => c.identifier === identifier); - if (!config) { - throw new Error(`Monitor not found: ${identifier}`); - } - await this.performCheck(config); - await this.refreshView(); - } - - /** - * Perform an immediate check for all monitors. - */ - async checkAll(): Promise { - await Promise.all(this.configs.map((config) => this.performCheck(config))); - await this.refreshView(); - } - - /** - * Start monitoring all configured endpoints. - */ - start(): void { - if (!this.job) { - const cronExpression = this.secondsToCron(this.intervalSeconds); - this.job = new CronJob(cronExpression, () => this.checkAll()); - this.job.start(); - } - } - - /** - * Stop monitoring. - */ - stop(): void { - if (this.job) { - this.job.stop(); - this.job = null; - } - } - - /** - * Check whether monitoring is running. - */ - isRunning(): boolean { - return this.job !== null; - } - - /** - * Convert seconds to a cron expression. - */ - private secondsToCron(seconds: number): string { - if (seconds < 60) { - return `*/${seconds} * * * * *`; - } - const minutes = Math.floor(seconds / 60); - if (minutes < 60) { - return `0 */${minutes} * * * *`; - } - const hours = Math.floor(minutes / 60); - return `0 0 */${hours} * * *`; - } - - private async performCheck(config: MonitorConfig): Promise { - const result = await this.sparqlMonitor.check( - config.endpointUrl, - config.query - ); - await this.store.store({ - monitor: config.identifier, - ...result, - }); - } - - private async refreshView(): Promise { - try { - await this.store.refreshLatestObservationsView(); - } catch { - // View refresh failure is not critical - } - } -} diff --git a/packages/sparql-monitor/test/config.test.ts b/packages/sparql-monitor/test/config.test.ts deleted file mode 100644 index 31b47d57..00000000 --- a/packages/sparql-monitor/test/config.test.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { describe, it, expect } from 'vitest'; -import { defineConfig, normalizeConfig } from '../src/config.js'; - -describe('defineConfig', () => { - it('returns the config as-is', () => { - const config = { - databaseUrl: 'postgres://localhost/test', - intervalSeconds: 60, - monitors: [ - { - identifier: 'test', - endpointUrl: 'https://example.org/sparql', - query: 'ASK { ?s ?p ?o }', - }, - ], - }; - - expect(defineConfig(config)).toEqual(config); - }); -}); - -describe('normalizeConfig', () => { - it('converts string URLs to URL objects', () => { - const raw = { - databaseUrl: 'postgres://localhost/test', - intervalSeconds: 300, - monitors: [ - { - identifier: 'test', - endpointUrl: 'https://example.org/sparql', - query: 'ASK { ?s ?p ?o }', - }, - ], - }; - - const normalized = normalizeConfig(raw); - - expect(normalized.databaseUrl).toBe('postgres://localhost/test'); - expect(normalized.intervalSeconds).toBe(300); - expect(normalized.monitors[0].identifier).toBe('test'); - expect(normalized.monitors[0].endpointUrl).toBeInstanceOf(URL); - expect(normalized.monitors[0].endpointUrl.href).toBe( - 'https://example.org/sparql' - ); - }); - - it('preserves URL objects', () => { - const url = new URL('https://example.org/sparql'); - const raw = { - monitors: [ - { - identifier: 'test', - endpointUrl: url, - query: 'ASK { ?s ?p ?o }', - }, - ], - }; - - const normalized = normalizeConfig(raw); - - expect(normalized.monitors[0].endpointUrl).toBe(url); - }); -}); diff --git a/packages/sparql-monitor/test/monitor.test.ts b/packages/sparql-monitor/test/monitor.test.ts deleted file mode 100644 index 25fe316f..00000000 --- a/packages/sparql-monitor/test/monitor.test.ts +++ /dev/null @@ -1,207 +0,0 @@ -import { describe, it, expect, vi } from 'vitest'; -import { SparqlMonitor } from '../src/monitor.js'; - -function mockFetchResponse( - body: string, - contentType = 'application/sparql-results+json' -) { - return new Response(body, { - status: 200, - headers: { 'Content-Type': contentType }, - }); -} - -describe('SparqlMonitor', () => { - describe('check with ASK query', () => { - it('returns success when endpoint responds', async () => { - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockResolvedValue(mockFetchResponse('{"boolean": true}')); - - const monitor = new SparqlMonitor(); - const result = await monitor.check( - new URL('http://example.org/sparql'), - 'ASK { ?s ?p ?o }' - ); - - expect(result.success).toBe(true); - expect(result.responseTimeMs).toBeGreaterThanOrEqual(0); - expect(result.errorMessage).toBeNull(); - expect(result.observedAt).toBeInstanceOf(Date); - - fetchSpy.mockRestore(); - }); - - it('returns failure when endpoint throws', async () => { - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockRejectedValue(new Error('Connection refused')); - - const monitor = new SparqlMonitor(); - const result = await monitor.check( - new URL('http://example.org/sparql'), - 'ASK { ?s ?p ?o }' - ); - - expect(result.success).toBe(false); - expect(result.responseTimeMs).toBeGreaterThanOrEqual(0); - expect(result.errorMessage).toBe('Connection refused'); - - fetchSpy.mockRestore(); - }); - }); - - describe('check with SELECT query', () => { - it('returns success when query completes', async () => { - const jsonResponse = JSON.stringify({ - results: { bindings: [{ s: { value: 'x' } }] }, - }); - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockResolvedValue(mockFetchResponse(jsonResponse)); - - const monitor = new SparqlMonitor(); - const result = await monitor.check( - new URL('http://example.org/sparql'), - 'SELECT * WHERE { ?s ?p ?o } LIMIT 1' - ); - - expect(result.success).toBe(true); - - fetchSpy.mockRestore(); - }); - }); - - describe('check with CONSTRUCT query', () => { - it('returns success when query completes', async () => { - const turtleResponse = - ' .'; - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockResolvedValue(mockFetchResponse(turtleResponse, 'text/turtle')); - - const monitor = new SparqlMonitor(); - const result = await monitor.check( - new URL('http://example.org/sparql'), - 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o } LIMIT 1' - ); - - expect(result.success).toBe(true); - - fetchSpy.mockRestore(); - }); - }); - - describe('invalid query', () => { - it('returns failure for unparseable query', async () => { - const monitor = new SparqlMonitor(); - const result = await monitor.check( - new URL('http://example.org/sparql'), - 'INVALID QUERY' - ); - - expect(result.success).toBe(false); - expect(result.errorMessage).toContain('Parse error'); - }); - }); - - describe('headers configuration', () => { - it('passes headers to fetcher constructor', () => { - const headers = new Headers({ 'User-Agent': 'TestAgent/1.0' }); - - // We can't easily test the internal fetcher construction, - // but we can verify the monitor accepts headers without error - const monitor = new SparqlMonitor({ headers }); - expect(monitor).toBeInstanceOf(SparqlMonitor); - }); - }); - - describe('URL with embedded credentials', () => { - it('extracts credentials and converts to Basic auth header', async () => { - let capturedUrl: string | undefined; - let capturedHeaders: Headers | undefined; - - const monitor = new SparqlMonitor(); - - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockImplementation( - async (input: string | URL | Request, init?: RequestInit) => { - capturedUrl = - typeof input === 'string' ? input : (input as URL).toString(); - capturedHeaders = new Headers(init?.headers); - return mockFetchResponse('{"boolean": true}'); - } - ); - - await monitor.check( - new URL('http://user:pass@example.org/sparql'), - 'ASK { ?s ?p ?o }' - ); - - expect(capturedUrl).toContain('http://example.org/sparql'); - expect(capturedUrl).not.toContain('user'); - expect(capturedUrl).not.toContain('pass'); - expect(capturedHeaders?.get('Authorization')).toBe( - `Basic ${Buffer.from('user:pass').toString('base64')}` - ); - - fetchSpy.mockRestore(); - }); - - it('decodes URL-encoded credentials', async () => { - let capturedHeaders: Headers | undefined; - - const monitor = new SparqlMonitor(); - - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockImplementation( - async (_input: string | URL | Request, init?: RequestInit) => { - capturedHeaders = new Headers(init?.headers); - return mockFetchResponse('{"boolean": true}'); - } - ); - - // URL with encoded special characters: user@domain:p@ss - await monitor.check( - new URL('http://user%40domain:p%40ss@example.org/sparql'), - 'ASK { ?s ?p ?o }' - ); - - expect(capturedHeaders?.get('Authorization')).toBe( - `Basic ${Buffer.from('user@domain:p@ss').toString('base64')}` - ); - - fetchSpy.mockRestore(); - }); - - it('preserves existing headers when adding auth', async () => { - let capturedHeaders: Headers | undefined; - - const headers = new Headers({ 'User-Agent': 'TestAgent/1.0' }); - const monitor = new SparqlMonitor({ headers }); - - const fetchSpy = vi - .spyOn(globalThis, 'fetch') - .mockImplementation( - async (_input: string | URL | Request, init?: RequestInit) => { - capturedHeaders = new Headers(init?.headers); - return mockFetchResponse('{"boolean": true}'); - } - ); - - await monitor.check( - new URL('http://user:pass@example.org/sparql'), - 'ASK { ?s ?p ?o }' - ); - - expect(capturedHeaders?.get('Authorization')).toBe( - `Basic ${Buffer.from('user:pass').toString('base64')}` - ); - expect(capturedHeaders?.get('User-Agent')).toBe('TestAgent/1.0'); - - fetchSpy.mockRestore(); - }); - }); -}); diff --git a/packages/sparql-monitor/test/service.test.ts b/packages/sparql-monitor/test/service.test.ts deleted file mode 100644 index 5fe7d29a..00000000 --- a/packages/sparql-monitor/test/service.test.ts +++ /dev/null @@ -1,121 +0,0 @@ -import { describe, it, expect, vi } from 'vitest'; -import { MonitorService } from '../src/service.js'; -import { SparqlMonitor } from '../src/monitor.js'; -import type { ObservationStore, MonitorConfig } from '../src/types.js'; - -function createMockStore(): ObservationStore { - return { - getLatest: vi.fn().mockResolvedValue(new Map()), - get: vi.fn().mockResolvedValue(null), - store: vi.fn().mockResolvedValue({ - id: 'obs-1', - monitor: 'test-monitor', - observedAt: new Date(), - success: true, - responseTimeMs: 100, - errorMessage: null, - }), - refreshLatestObservationsView: vi.fn().mockResolvedValue(undefined), - close: vi.fn().mockResolvedValue(undefined), - }; -} - -function createMockMonitor(): SparqlMonitor { - return { - check: vi.fn().mockResolvedValue({ - success: true, - responseTimeMs: 100, - errorMessage: null, - observedAt: new Date(), - }), - } as unknown as SparqlMonitor; -} - -const testMonitors: MonitorConfig[] = [ - { - identifier: 'test-monitor', - endpointUrl: new URL('http://example.org/sparql'), - query: 'ASK { ?s ?p ?o }', - }, -]; - -describe('MonitorService', () => { - describe('checkNow', () => { - it('performs an immediate check', async () => { - const store = createMockStore(); - const sparqlMonitor = createMockMonitor(); - const service = new MonitorService({ - store, - monitors: testMonitors, - sparqlMonitor, - }); - - await service.checkNow('test-monitor'); - - expect(sparqlMonitor.check).toHaveBeenCalledWith( - new URL('http://example.org/sparql'), - 'ASK { ?s ?p ?o }' - ); - expect(store.store).toHaveBeenCalledWith( - expect.objectContaining({ - monitor: 'test-monitor', - success: true, - responseTimeMs: 100, - }) - ); - }); - - it('throws when monitor not found', async () => { - const store = createMockStore(); - const service = new MonitorService({ store, monitors: testMonitors }); - - await expect(service.checkNow('nonexistent')).rejects.toThrow( - 'Monitor not found: nonexistent' - ); - }); - }); - - describe('checkAll', () => { - it('checks all monitors', async () => { - const store = createMockStore(); - const sparqlMonitor = createMockMonitor(); - const monitors: MonitorConfig[] = [ - { - identifier: 'monitor-1', - endpointUrl: new URL('http://example.org/sparql1'), - query: 'ASK { ?s ?p ?o }', - }, - { - identifier: 'monitor-2', - endpointUrl: new URL('http://example.org/sparql2'), - query: 'SELECT * WHERE { ?s ?p ?o } LIMIT 1', - }, - ]; - const service = new MonitorService({ store, monitors, sparqlMonitor }); - - await service.checkAll(); - - expect(sparqlMonitor.check).toHaveBeenCalledTimes(2); - expect(store.store).toHaveBeenCalledTimes(2); - expect(store.store).toHaveBeenCalledWith( - expect.objectContaining({ monitor: 'monitor-1' }) - ); - expect(store.store).toHaveBeenCalledWith( - expect.objectContaining({ monitor: 'monitor-2' }) - ); - }); - }); - - describe('start/stop', () => { - it('starts and stops monitoring', () => { - const store = createMockStore(); - const service = new MonitorService({ store, monitors: testMonitors }); - - service.start(); - expect(service.isRunning()).toBe(true); - - service.stop(); - expect(service.isRunning()).toBe(false); - }); - }); -}); diff --git a/tsconfig.json b/tsconfig.json index 7b2fab6f..63aa7f8b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -42,7 +42,7 @@ "path": "./packages/docgen" }, { - "path": "./packages/sparql-monitor" + "path": "./packages/distribution-monitor" }, { "path": "./packages/fastify-rdf"