Skip to content

Commit

Permalink
Merge 9a20251 into a1dcd3b
Browse files Browse the repository at this point in the history
  • Loading branch information
surilindur committed Mar 25, 2024
2 parents a1dcd3b + 9a20251 commit 6d23a1d
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 275 deletions.
17 changes: 8 additions & 9 deletions lib/SparqlEndpointFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import type { Readable } from 'node:stream';
import type * as RDF from '@rdfjs/types';
import { ReadableWebToNodeStream } from '@smessie/readable-web-to-node-stream';
import * as isStream from 'is-stream';
import { StreamParser } from 'n3';
import { readableFromWeb } from 'readable-from-web';
import { type InsertDeleteOperation, type ManagementOperation, Parser as SparqlParser } from 'sparqljs';
import { type ISettings as ISparqlJsonParserArgs, SparqlJsonParser } from 'sparqljson-parse';
import { type ISettings as ISparqlXmlParserArgs, SparqlXmlParser } from 'sparqlxml-parse';
Expand Down Expand Up @@ -137,7 +136,7 @@ export class SparqlEndpointFetcher {
* @param {string} query A SPARQL query string.
* @return {Promise<Stream>} A stream of triples.
*/
public async fetchTriples(endpoint: string, query: string): Promise<Readable & RDF.Stream> {
public async fetchTriples(endpoint: string, query: string): Promise<NodeJS.ReadableStream & RDF.Stream> {
const [ contentType, responseStream ] = await this.fetchRawStream(
endpoint,
query,
Expand Down Expand Up @@ -184,7 +183,7 @@ export class SparqlEndpointFetcher {
* @param {string} endpoint A SPARQL endpoint URL. (without the `?query=` suffix).
* @param {string} query A SPARQL query string.
* @param {string} acceptHeader The HTTP accept to use.
* @return {Promise<[string, NodeJS.ReadableStream]>} The content type and SPARQL endpoint response stream.
* @return {Promise<[ string, NodeJS.ReadableStream ]>} The content type and SPARQL endpoint response stream.
*/
public async fetchRawStream(
endpoint: string,
Expand Down Expand Up @@ -219,14 +218,14 @@ export class SparqlEndpointFetcher {
* @param {string} url The URL to call.
* @param {RequestInit} init Options to pass along to the fetch call.
* @param {any} options Other specific fetch options.
* @return {Promise<[string, NodeJS.ReadableStream]>} The content type and SPARQL endpoint response stream.
* @return {Promise<[ string, NodeJS.ReadableStream ]>} The content type and SPARQL endpoint response stream.
*/
private async handleFetchCall(
url: string,
init: RequestInit,
options?: { ignoreBody: boolean },
): Promise<[ string, NodeJS.ReadableStream ]> {
let timeout: NodeJS.Timeout | undefined;
let timeout;
let responseStream: NodeJS.ReadableStream | undefined;

if (this.timeout) {
Expand All @@ -243,9 +242,9 @@ export class SparqlEndpointFetcher {
if (!options?.ignoreBody && httpResponse.body) {
// Wrap WhatWG readable stream into a Node.js readable stream
// If the body already is a Node.js stream (in the case of node-fetch), don't do explicit conversion.
responseStream = <NodeJS.ReadableStream>(
isStream(httpResponse.body) ? httpResponse.body : new ReadableWebToNodeStream(httpResponse.body)
);
responseStream = isStream(httpResponse.body) ?
<NodeJS.ReadableStream> <unknown> httpResponse.body :
readableFromWeb(httpResponse.body);
}

// Emit an error if the server returned an invalid response
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
},
"dependencies": {
"@rdfjs/types": "*",
"@smessie/readable-web-to-node-stream": "^3.0.3",
"@types/n3": "^1.0.0",
"@types/readable-stream": "^4.0.0",
"@types/sparqljs": "^3.0.0",
"is-stream": "^2.0.0",
"n3": "^1.0.0",
"rdf-string": "^1.0.0",
"readable-from-web": "^1.0.0",
"sparqljs": "^3.0.0",
"sparqljson-parse": "^2.0.0",
"sparqlxml-parse": "^2.0.0",
Expand Down
45 changes: 43 additions & 2 deletions test/SparqlEndpointFetcher-test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import 'jest-rdf';
import { ReadableWebToNodeStream } from '@smessie/readable-web-to-node-stream';
import arrayifyStream from 'arrayify-stream';
import { DataFactory } from 'rdf-data-factory';
import { Readable } from 'readable-stream';
import { SparqlEndpointFetcher } from '../lib/SparqlEndpointFetcher';

// The import of 'readable-stream-node-to-web' is down here because of the above workaround
const readableStreamNodeToWeb = require('readable-stream-node-to-web');
const stringifyStream = require('stream-to-string');
const streamifyString = require('streamify-string');

const DF = new DataFactory();
Expand Down Expand Up @@ -341,6 +342,26 @@ describe('SparqlEndpointFetcher', () => {
]);
});

it('should throw with a failing node stream', async() => {
const expectedError = new Error('Expected error');
const nodeStream = new Readable();
nodeStream._read = () => {
throw expectedError;
};
const fetchCbThis = () => Promise.resolve(<Response> {
body: <any>nodeStream,
headers: new Headers(),
ok: true,
status: 200,
statusText: 'Ok!',
});
const fetcherThis = new SparqlEndpointFetcher({ fetch: fetchCbThis });
const [ contentType, rawStream ] = await fetcherThis.fetchRawStream(endpoint, querySelect, 'myacceptheader');
expect(contentType).toBe('');
expect(rawStream).toBeInstanceOf(Readable);
await expect(stringifyStream(rawStream)).rejects.toBe(expectedError);
});

it('should fetch with a web stream', async() => {
const fetchCbThis = () => Promise.resolve(<Response> {
body: readableStreamNodeToWeb(streamifyString('abc')),
Expand All @@ -352,7 +373,27 @@ describe('SparqlEndpointFetcher', () => {
const fetcherThis = new SparqlEndpointFetcher({ fetch: fetchCbThis });
const [ contentType, rawStream ] = await fetcherThis.fetchRawStream(endpoint, querySelect, 'myacceptheader');
expect(contentType).toBe('');
expect(rawStream).toBeInstanceOf(ReadableWebToNodeStream);
expect(rawStream).toBeInstanceOf(Readable);
});

it('should throw with a failing web stream', async() => {
const expectedError = new Error('Expected error');
const nodeStream = new Readable();
nodeStream._read = () => {
throw expectedError;
};
const fetchCbThis = () => Promise.resolve(<Response> {
body: readableStreamNodeToWeb(nodeStream),
headers: new Headers(),
ok: true,
status: 200,
statusText: 'Ok!',
});
const fetcherThis = new SparqlEndpointFetcher({ fetch: fetchCbThis });
const [ contentType, rawStream ] = await fetcherThis.fetchRawStream(endpoint, querySelect, 'myacceptheader');
expect(contentType).toBe('');
expect(rawStream).toBeInstanceOf(Readable);
await expect(stringifyStream(rawStream)).rejects.toBe(expectedError);
});

it('should throw with a missing stream', async() => {
Expand Down

0 comments on commit 6d23a1d

Please sign in to comment.