Skip to content

Commit

Permalink
fix: Reduce memory consumption (#932)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeboer committed Jun 17, 2024
1 parent b44aa5b commit 89087f1
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 85 deletions.
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ export default {
coverageReporters: ['json-summary', 'text'],
coverageThreshold: {
global: {
lines: 71.97,
statements: 72.16,
branches: 63.57,
functions: 70,
lines: 72,
statements: 72.2,
branches: 64.02,
functions: 69.15,
},
},
transform: {
Expand Down
15 changes: 7 additions & 8 deletions src/crawler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {RegistrationStore} from './registration.js';
import {DatasetStore, extractIri, extractIris} from './dataset.js';
import {DatasetStore, extractIri} from './dataset.js';
import {dereference, fetch, HttpError, NoDatasetFoundAtUrl} from './fetch.js';
import DatasetExt from 'rdf-ext/lib/Dataset';
import Pino from 'pino';
import {Valid, Validator} from './validator.js';
import {crawlCounter} from './instrumentation.js';
Expand All @@ -24,23 +23,23 @@ export class Crawler {
await this.registrationStore.findRegistrationsReadBefore(dateLastRead);
for (const registration of registrations) {
this.logger.info(`Crawling registration URL ${registration.url}...`);
let datasets: DatasetExt[] = [];
let statusCode = 200;
let isValid = false;
const datasetIris: URL[] = [];

try {
const data = await dereference(registration.url);
const validationResult = await this.validator.validate(data);
isValid = validationResult.state === 'valid';
if (isValid) {
this.logger.info(`${registration.url} passes validation`);
datasets = await fetch(registration.url);
await this.datasetStore.store(datasets);
datasets.map(async dataset => {
for await (const dataset of fetch(registration.url)) {
datasetIris.push(extractIri(dataset));
await this.datasetStore.store(dataset);
const dcatValidationResult = await this.validator.validate(dataset);
const rating = rate(dcatValidationResult as Valid);
await this.ratingStore.store(extractIri(dataset), rating);
});
}
} else {
this.logger.info(`${registration.url} does not pass validation`);
}
Expand All @@ -64,7 +63,7 @@ export class Crawler {
});

const updatedRegistration = registration.read(
[...extractIris(datasets).keys()],
datasetIris,
statusCode,
isValid
);
Expand Down
9 changes: 1 addition & 8 deletions src/dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface DatasetStore {
/**
* Store an array of dataset descriptions, replacing any triples that were previously stored for the datasets.
*/
store(datasets: DatasetCore[]): Promise<void>;
store(dataset: DatasetCore): Promise<void>;

countDatasets(): Promise<number>;

Expand All @@ -30,13 +30,6 @@ export function extractIri(dataset: DatasetCore): URL {
return new URL(quad.subject.value);
}

export function extractIris(datasets: DatasetCore[]): Map<URL, DatasetCore> {
return datasets.reduce((map, dataset) => {
map.set(extractIri(dataset), dataset);
return map;
}, new Map<URL, DatasetCore>());
}

export async function load(
stream: Readable,
contentType: 'application/ld+json' | string
Expand Down
21 changes: 10 additions & 11 deletions src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@ export class NoDatasetFoundAtUrl extends Error {
}
}

export async function fetch(url: URL): Promise<DatasetExt[]> {
let datasets = await doFetch(url);
export async function* fetch(url: URL): AsyncGenerator<DatasetExt> {
yield* doFetch(url);

const nextPage = await findNextPage(url);
if (nextPage !== null && nextPage !== url) {
datasets = [...datasets, ...(await fetch(nextPage))];
}

if (datasets.length === 0) {
throw new NoDatasetFoundAtUrl();
yield* fetch(nextPage);
}

return datasets;
}

export async function doFetch(url: URL) {
export async function* doFetch(url: URL) {
let datasets: DatasetExt[];
try {
return await query(url);
datasets = await query(url);
for (const dataset of datasets) {
yield dataset;
}
} catch (e) {
handleComunicaError(e);
}
Expand Down
9 changes: 3 additions & 6 deletions src/graphdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
Registration,
RegistrationStore,
} from './registration.js';
import {DatasetStore, extractIris} from './dataset.js';
import {DatasetStore, extractIri} from './dataset.js';
import {Rating, RatingStore} from './rate.js';
import http from 'node:http';
import {DatasetCore, Quad, Quad_Object, Quad_Predicate} from '@rdfjs/types';
Expand Down Expand Up @@ -370,12 +370,9 @@ export class GraphDbDatasetStore implements DatasetStore {
*
* @see https://graphdb.ontotext.com/documentation/standard/replace-graph.html
*/
public async store(datasets: DatasetCore[]) {
public async store(dataset: DatasetCore) {
// Find each Dataset’s IRI.
for (const [iri, dataset] of [...extractIris(datasets)]) {
// Serialize requests: wait for each response before sending next request to prevent GraphDB from running OOM.
await this.storeDataset(dataset, iri);
}
await this.storeDataset(dataset, extractIri(dataset));
}

private async storeDataset(dataset: DatasetCore, graphIri: URL) {
Expand Down
18 changes: 9 additions & 9 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
Registration,
RegistrationStore,
} from './registration.js';
import {DatasetStore, extractIris, load} from './dataset.js';
import {DatasetStore, extractIri, load} from './dataset.js';
import {IncomingMessage, Server} from 'http';
import * as psl from 'psl';
import {rdfSerializer} from './rdf.js';
Expand Down Expand Up @@ -169,18 +169,18 @@ export async function server(
await registrationStore.store(registration);

// Fetch dataset descriptions and store them.
const datasets = await fetch(url);
const datasetIris: URL[] = [];
for await (const dataset of fetch(url)) {
datasetIris.push(extractIri(dataset));
await datasetStore.store(dataset);
}

request.log.info(
`Found ${datasets.length} datasets at ${url.toString()}`
`Found ${datasetIris.length} datasets at ${url.toString()}`
);
await datasetStore.store(datasets);

// Update registration with dataset descriptions that we found.
const updatedRegistration = registration.read(
[...extractIris(datasets).keys()],
200,
true
);
const updatedRegistration = registration.read(datasetIris, 200, true);
await registrationStore.store(updatedRegistration);
}

Expand Down
59 changes: 22 additions & 37 deletions test/fetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('Fetch', () => {
.get('/valid-dcat-dataset')
.reply(200, response);

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/valid-dcat-dataset')
);

Expand Down Expand Up @@ -54,7 +54,7 @@ describe('Fetch', () => {
.get('/minimal-valid-schema-org-dataset')
.reply(200, response);

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/minimal-valid-schema-org-dataset')
);

Expand All @@ -70,7 +70,7 @@ describe('Fetch', () => {
.get('/valid-schema-org-dataset')
.reply(200, response);

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/valid-schema-org-dataset')
);

Expand Down Expand Up @@ -195,7 +195,7 @@ describe('Fetch', () => {
.get('/valid-schema-org-dataset')
.reply(200, response);

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/valid-schema-org-dataset')
);

Expand All @@ -209,7 +209,7 @@ describe('Fetch', () => {
.get('/valid-schema-org-dataset')
.reply(200, response);

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/valid-schema-org-dataset')
);

Expand All @@ -223,7 +223,7 @@ describe('Fetch', () => {
.get('/valid-schema-org-dataset')
.reply(200, response);

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/valid-schema-org-dataset')
);

Expand All @@ -234,7 +234,7 @@ describe('Fetch', () => {
nock('https://example.com').get('/404').reply(404);
expect.assertions(2);
try {
await fetch(new URL('https://example.com/404'));
await fetchDatasetsAsArray(new URL('https://example.com/404'));
} catch (e) {
expect(e).toBeInstanceOf(HttpError);
expect((e as HttpError).statusCode).toBe(404);
Expand All @@ -245,7 +245,7 @@ describe('Fetch', () => {
nock('https://example.com').get('/500').reply(500);
expect.assertions(2);
try {
await fetch(new URL('https://example.com/500'));
await fetchDatasetsAsArray(new URL('https://example.com/500'));
} catch (e) {
expect(e).toBeInstanceOf(HttpError);
expect((e as HttpError).statusCode).toBe(500);
Expand All @@ -254,35 +254,12 @@ describe('Fetch', () => {

it('handles empty dataset response', async () => {
nock('https://example.com').get('/200').reply(200);
expect.assertions(1);
try {
await fetch(new URL('https://example.com/200'));
} catch (e) {
expect(e).toBeInstanceOf(NoDatasetFoundAtUrl);
}
});

it('handles paginated responses', async () => {
nock('https://example.com')
.get('/datasets/hydra-page1.ttl')
.replyWithFile(200, 'test/datasets/hydra-page1.ttl', {
'Content-Type': 'text/turtle',
});

nock('https://example.com')
.get('/datasets/hydra-page2.ttl')
.replyWithFile(200, 'test/datasets/hydra-page2.ttl', {
'Content-Type': 'text/turtle',
});

const datasets = await fetch(
new URL('https://example.com/datasets/hydra-page1.ttl')
);

expect(datasets).toHaveLength(2);
expect(
async () => await fetchDatasetsAsArray(new URL('https://example.com/200'))
).rejects.toThrow(NoDatasetFoundAtUrl);
});

it('handles paginated JSON-ld responses', async () => {
it('handles paginated JSON-LD responses', async () => {
nock('https://example.com')
.get('/datasets/hydra-page1.jsonld')
.replyWithFile(200, 'test/datasets/hydra-page1.jsonld', {
Expand All @@ -295,7 +272,7 @@ describe('Fetch', () => {
'Content-Type': 'application/ld+json',
});

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/datasets/hydra-page1.jsonld')
);

Expand All @@ -315,10 +292,18 @@ describe('Fetch', () => {
'Content-Type': 'text/turtle',
});

const datasets = await fetch(
const datasets = await fetchDatasetsAsArray(
new URL('https://example.com/datasets/hydra-page1.ttl')
);

expect(datasets).toHaveLength(2);
});
});

const fetchDatasetsAsArray = async (url: URL) => {
const datasets = [];
for await (const dataset of fetch(url)) {
datasets.push(dataset);
}
return datasets;
};
4 changes: 2 additions & 2 deletions test/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export class MockAllowedRegistrationDomainStore
export class MockDatasetStore implements DatasetStore {
private datasets: DatasetExt[] = [];

async store(datasets: DatasetExt[]): Promise<void> {
this.datasets.push(...datasets);
async store(datasets: DatasetExt): Promise<void> {
this.datasets.push(datasets);
}

countDatasets(): Promise<number> {
Expand Down

0 comments on commit 89087f1

Please sign in to comment.