Skip to content

Commit

Permalink
Merge c866592 into a1dcd3b
Browse files Browse the repository at this point in the history
  • Loading branch information
surilindur committed Mar 25, 2024
2 parents a1dcd3b + c866592 commit 47f925b
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 254 deletions.
36 changes: 18 additions & 18 deletions lib/SparqlEndpointFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { Readable } from 'node:stream';
import type * as RDF from '@rdfjs/types';
import { ReadableWebToNodeStream } from '@smessie/readable-web-to-node-stream';
import * as isStream from 'is-stream';
import { StreamParser } from 'n3';
import { readableFromWeb } from 'readable-from-web';
import type { Readable } from 'readable-stream';
import { type InsertDeleteOperation, type ManagementOperation, Parser as SparqlParser } from 'sparqljs';
import { type ISettings as ISparqlJsonParserArgs, SparqlJsonParser } from 'sparqljson-parse';
import { type ISettings as ISparqlXmlParserArgs, SparqlXmlParser } from 'sparqlxml-parse';
Expand Down Expand Up @@ -41,13 +41,13 @@ export class SparqlEndpointFetcher {
parseBooleanStream: sparqlResponseStream =>
this.sparqlJsonParser.parseJsonBooleanStream(sparqlResponseStream),
parseResultsStream: sparqlResponseStream =>
this.sparqlJsonParser.parseJsonResultsStream(sparqlResponseStream),
<Readable> this.sparqlJsonParser.parseJsonResultsStream(sparqlResponseStream),
},
[SparqlEndpointFetcher.CONTENTTYPE_SPARQL_XML]: {
parseBooleanStream: sparqlResponseStream =>
this.sparqlXmlParser.parseXmlBooleanStream(sparqlResponseStream),
parseResultsStream: sparqlResponseStream =>
this.sparqlXmlParser.parseXmlResultsStream(sparqlResponseStream),
<Readable> this.sparqlXmlParser.parseXmlResultsStream(sparqlResponseStream),
},
};
}
Expand Down Expand Up @@ -97,9 +97,9 @@ export class SparqlEndpointFetcher {
* @see IBindings
* @param {string} endpoint A SPARQL endpoint URL. (without the `?query=` suffix).
* @param {string} query A SPARQL query string.
* @return {Promise<NodeJS.ReadableStream>} A stream of {@link IBindings}.
* @return {Promise<Readable>} A stream of {@link IBindings}.
*/
public async fetchBindings(endpoint: string, query: string): Promise<NodeJS.ReadableStream> {
public async fetchBindings(endpoint: string, query: string): Promise<Readable> {
const [ contentType, responseStream ] = await this.fetchRawStream(
endpoint,
query,
Expand Down Expand Up @@ -143,7 +143,7 @@ export class SparqlEndpointFetcher {
query,
SparqlEndpointFetcher.CONTENTTYPE_TURTLE,
);
return responseStream.pipe(new StreamParser({ format: contentType }));
return <Readable> <unknown> responseStream.pipe(new StreamParser({ format: contentType }));
}

/**
Expand Down Expand Up @@ -184,13 +184,13 @@ export class SparqlEndpointFetcher {
* @param {string} endpoint A SPARQL endpoint URL. (without the `?query=` suffix).
* @param {string} query A SPARQL query string.
* @param {string} acceptHeader The HTTP accept to use.
* @return {Promise<[string, NodeJS.ReadableStream]>} The content type and SPARQL endpoint response stream.
* @return {Promise<[string, Readable]>} The content type and SPARQL endpoint response stream.
*/
public async fetchRawStream(
endpoint: string,
query: string,
acceptHeader: string,
): Promise<[ string, NodeJS.ReadableStream ]> {
): Promise<[ string, Readable ]> {
let url: string = this.method === 'POST' ? endpoint : `${endpoint}?query=${encodeURIComponent(query)}`;

// Initiate request
Expand Down Expand Up @@ -219,15 +219,15 @@ export class SparqlEndpointFetcher {
* @param {string} url The URL to call.
* @param {RequestInit} init Options to pass along to the fetch call.
* @param {any} options Other specific fetch options.
* @return {Promise<[string, NodeJS.ReadableStream]>} The content type and SPARQL endpoint response stream.
* @return {Promise<[string, Readable]>} The content type and SPARQL endpoint response stream.
*/
private async handleFetchCall(
url: string,
init: RequestInit,
options?: { ignoreBody: boolean },
): Promise<[ string, NodeJS.ReadableStream ]> {
let timeout: NodeJS.Timeout | undefined;
let responseStream: NodeJS.ReadableStream | undefined;
): Promise<[ string, Readable ]> {
let timeout;
let responseStream: Readable | undefined;

if (this.timeout) {
const controller = new AbortController();
Expand All @@ -243,9 +243,9 @@ export class SparqlEndpointFetcher {
if (!options?.ignoreBody && httpResponse.body) {
// Wrap WhatWG readable stream into a Node.js readable stream
// If the body already is a Node.js stream (in the case of node-fetch), don't do explicit conversion.
responseStream = <NodeJS.ReadableStream>(
isStream(httpResponse.body) ? httpResponse.body : new ReadableWebToNodeStream(httpResponse.body)
);
responseStream = isStream(httpResponse.body) ?
<Readable> <unknown> httpResponse.body :
readableFromWeb(httpResponse.body);
}

// Emit an error if the server returned an invalid response
Expand Down Expand Up @@ -278,8 +278,8 @@ export interface ISparqlEndpointFetcherArgs extends ISparqlJsonParserArgs, ISpar
}

export interface ISparqlResultsParser {
parseResultsStream: (sparqlResponseStream: NodeJS.ReadableStream) => NodeJS.ReadableStream;
parseBooleanStream: (sparqlResponseStream: NodeJS.ReadableStream) => Promise<boolean>;
parseResultsStream: (sparqlResponseStream: Readable) => Readable;
parseBooleanStream: (sparqlResponseStream: Readable) => Promise<boolean>;
}

export type IBindings = Record<string, RDF.Term>;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
},
"dependencies": {
"@rdfjs/types": "*",
"@smessie/readable-web-to-node-stream": "^3.0.3",
"@types/n3": "^1.0.0",
"@types/readable-stream": "^4.0.0",
"@types/sparqljs": "^3.0.0",
"is-stream": "^2.0.0",
"n3": "^1.0.0",
"rdf-string": "^1.0.0",
"readable-from-web": "^1.0.0",
"sparqljs": "^3.0.0",
"sparqljson-parse": "^2.0.0",
"sparqlxml-parse": "^2.0.0",
Expand Down
45 changes: 43 additions & 2 deletions test/SparqlEndpointFetcher-test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import 'jest-rdf';
import { ReadableWebToNodeStream } from '@smessie/readable-web-to-node-stream';
import arrayifyStream from 'arrayify-stream';
import { DataFactory } from 'rdf-data-factory';
import { Readable } from 'readable-stream';
import { SparqlEndpointFetcher } from '../lib/SparqlEndpointFetcher';

// The import of 'readable-stream-node-to-web' is down here because of the above workaround
const readableStreamNodeToWeb = require('readable-stream-node-to-web');
const stringifyStream = require('stream-to-string');
const streamifyString = require('streamify-string');

const DF = new DataFactory();
Expand Down Expand Up @@ -341,6 +342,26 @@ describe('SparqlEndpointFetcher', () => {
]);
});

it('should throw with a failing node stream', async() => {
const expectedError = new Error('Expected error');
const nodeStream = new Readable();
nodeStream._read = () => {
throw expectedError;
};
const fetchCbThis = () => Promise.resolve(<Response> {
body: <any>nodeStream,
headers: new Headers(),
ok: true,
status: 200,
statusText: 'Ok!',
});
const fetcherThis = new SparqlEndpointFetcher({ fetch: fetchCbThis });
const [ contentType, rawStream ] = await fetcherThis.fetchRawStream(endpoint, querySelect, 'myacceptheader');
expect(contentType).toBe('');
expect(rawStream).toBeInstanceOf(Readable);
await expect(stringifyStream(rawStream)).rejects.toBe(expectedError);
});

it('should fetch with a web stream', async() => {
const fetchCbThis = () => Promise.resolve(<Response> {
body: readableStreamNodeToWeb(streamifyString('abc')),
Expand All @@ -352,7 +373,27 @@ describe('SparqlEndpointFetcher', () => {
const fetcherThis = new SparqlEndpointFetcher({ fetch: fetchCbThis });
const [ contentType, rawStream ] = await fetcherThis.fetchRawStream(endpoint, querySelect, 'myacceptheader');
expect(contentType).toBe('');
expect(rawStream).toBeInstanceOf(ReadableWebToNodeStream);
expect(rawStream).toBeInstanceOf(Readable);
});

it('should throw with a failing web stream', async() => {
const expectedError = new Error('Expected error');
const nodeStream = new Readable();
nodeStream._read = () => {
throw expectedError;
};
const fetchCbThis = () => Promise.resolve(<Response> {
body: readableStreamNodeToWeb(nodeStream),
headers: new Headers(),
ok: true,
status: 200,
statusText: 'Ok!',
});
const fetcherThis = new SparqlEndpointFetcher({ fetch: fetchCbThis });
const [ contentType, rawStream ] = await fetcherThis.fetchRawStream(endpoint, querySelect, 'myacceptheader');
expect(contentType).toBe('');
expect(rawStream).toBeInstanceOf(Readable);
await expect(stringifyStream(rawStream)).rejects.toBe(expectedError);
});

it('should throw with a missing stream', async() => {
Expand Down

0 comments on commit 47f925b

Please sign in to comment.