Skip to content

Commit

Permalink
Merge 8e947a9 into 7fae320
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimvh committed Sep 4, 2020
2 parents 7fae320 + 8e947a9 commit c342091
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
7 changes: 3 additions & 4 deletions src/storage/FileResourceStore.ts
@@ -1,7 +1,6 @@
import { createReadStream, createWriteStream, promises as fsPromises, Stats } from 'fs';
import { posix } from 'path';
import { Readable } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { contentType as getContentTypeFromExtension } from 'mime-types';
import { Quad } from 'rdf-js';
import streamifyArray from 'streamify-array';
Expand Down Expand Up @@ -69,7 +68,7 @@ export class FileResourceStore implements ResourceStore {
const linkTypes = representation.metadata.linkRel?.type;
let metadata;
if (raw.length > 0) {
metadata = this.metadataController.generateReadableFromQuads(raw);
metadata = this.metadataController.serializeQuads(raw);
}

// Create a new container or resource in the parent container with a specific name based on the incoming headers.
Expand Down Expand Up @@ -252,7 +251,7 @@ export class FileResourceStore implements ResourceStore {
let rawMetadata: Quad[] = [];
try {
const readMetadataStream = createReadStream(`${path}.metadata`);
rawMetadata = await this.metadataController.generateQuadsFromReadable(readMetadataStream);
rawMetadata = await this.metadataController.parseQuads(readMetadataStream);
} catch (_) {
// Metadata file doesn't exist so lets keep `rawMetaData` an empty array.
}
Expand Down Expand Up @@ -289,7 +288,7 @@ export class FileResourceStore implements ResourceStore {
let rawMetadata: Quad[] = [];
try {
const readMetadataStream = createReadStream(joinPath(path, '.metadata'));
rawMetadata = await arrayifyStream(readMetadataStream);
rawMetadata = await this.metadataController.parseQuads(readMetadataStream);
} catch (_) {
// Metadata file doesn't exist so lets keep `rawMetaData` an empty array.
}
Expand Down
17 changes: 8 additions & 9 deletions src/storage/conversion/RdfToQuadConverter.ts
Expand Up @@ -3,7 +3,7 @@ import rdfParser from 'rdf-parse';
import { Representation } from '../../ldp/representation/Representation';
import { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { pipeStreamsAndErrors } from '../../util/Util';
import { checkRequest } from './ConversionUtil';
import { RepresentationConverterArgs } from './RepresentationConverter';
import { TypedRepresentationConverter } from './TypedRepresentationConverter';
Expand All @@ -30,20 +30,19 @@ export class RdfToQuadConverter extends TypedRepresentationConverter {

private rdfToQuads(representation: Representation, baseIRI: string): Representation {
const metadata: RepresentationMetadata = { ...representation.metadata, contentType: INTERNAL_QUADS };

// Catch parsing errors and emit correct error
// Node 10 requires both writableObjectMode and readableObjectMode
const errorStream = new PassThrough({ writableObjectMode: true, readableObjectMode: true });
const data = rdfParser.parse(representation.data, {
const rawQuads = rdfParser.parse(representation.data, {
contentType: representation.metadata.contentType as string,
baseIRI,
});
data.pipe(errorStream);
data.on('error', (error): boolean => errorStream.emit('error', new UnsupportedHttpError(error.message)));

// Wrap the stream such that errors are transformed
// (Node 10 requires both writableObjectMode and readableObjectMode)
const data = new PassThrough({ writableObjectMode: true, readableObjectMode: true });
pipeStreamsAndErrors(rawQuads, data);

return {
binary: false,
data: errorStream,
data,
metadata,
};
}
Expand Down
11 changes: 6 additions & 5 deletions src/util/MetadataController.ts
Expand Up @@ -6,6 +6,7 @@ import { NamedNode, Quad } from 'rdf-js';
import streamifyArray from 'streamify-array';
import { TEXT_TURTLE } from '../util/ContentTypes';
import { LDP, RDF, STAT, TERMS, XML } from './Prefixes';
import { pipeStreamsAndErrors } from './Util';

export const TYPE_PREDICATE = DataFactory.namedNode(`${RDF}type`);
export const MODIFIED_PREDICATE = DataFactory.namedNode(`${TERMS}modified`);
Expand Down Expand Up @@ -64,13 +65,13 @@ export class MetadataController {
}

/**
* Helper function to convert an array of quads into a Readable object.
* Helper function for serializing an array of quads, with as result a Readable object.
* @param quads - The array of quads.
*
* @returns The Readable object.
*/
public generateReadableFromQuads(quads: Quad[]): Readable {
return streamifyArray(quads).pipe(new StreamWriter({ format: TEXT_TURTLE }));
public serializeQuads(quads: Quad[]): Readable {
return pipeStreamsAndErrors(streamifyArray(quads), new StreamWriter({ format: TEXT_TURTLE }));
}

/**
Expand All @@ -79,7 +80,7 @@ export class MetadataController {
*
* @returns A promise containing the array of quads.
*/
public async generateQuadsFromReadable(readable: Readable): Promise<Quad[]> {
return arrayifyStream(readable.pipe(new StreamParser({ format: TEXT_TURTLE })));
public async parseQuads(readable: Readable): Promise<Quad[]> {
return await arrayifyStream(pipeStreamsAndErrors(readable, new StreamParser({ format: TEXT_TURTLE })));
}
}
17 changes: 16 additions & 1 deletion src/util/Util.ts
@@ -1,5 +1,6 @@
import { Readable } from 'stream';
import { Readable, Writable } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { UnsupportedHttpError } from './errors/UnsupportedHttpError';

/**
* Makes sure the input path has exactly 1 slash at the end.
Expand Down Expand Up @@ -51,3 +52,17 @@ export const matchingMediaType = (mediaA: string, mediaB: string): boolean => {
}
return subTypeA === subTypeB;
};

/**
* Pipes one stream into another.
* Makes sure an error of the first stream gets passed to the second.
* @param readable - Initial readable stream.
* @param destination - The destination for writing data.
*
* @returns The destination stream.
*/
export const pipeStreamsAndErrors = <T extends Writable>(readable: Readable, destination: T): T => {
readable.pipe(destination);
readable.on('error', (error): boolean => destination.emit('error', new UnsupportedHttpError(error.message)));
return destination;
};
5 changes: 3 additions & 2 deletions test/unit/storage/FileResourceStore.test.ts
Expand Up @@ -205,7 +205,7 @@ describe('A FileResourceStore', (): void => {
stats.isFile = jest.fn((): any => true);
(fsPromises.lstat as jest.Mock).mockReturnValueOnce(stats);
(fs.createReadStream as jest.Mock).mockReturnValueOnce(streamifyArray([ rawData ]));
(fs.createReadStream as jest.Mock).mockImplementationOnce((): any => new Error('Metadata file does not exist.'));
(fs.createReadStream as jest.Mock).mockReturnValueOnce(streamifyArray([]));

// Tests
await store.setRepresentation({ path: `${base}file.txt` }, representation);
Expand Down Expand Up @@ -488,7 +488,8 @@ describe('A FileResourceStore', (): void => {
stats.isFile = jest.fn((): any => true);
(fsPromises.lstat as jest.Mock).mockReturnValueOnce(stats);
(fs.createReadStream as jest.Mock).mockReturnValueOnce(streamifyArray([ rawData ]));
(fs.createReadStream as jest.Mock).mockImplementationOnce((): any => new Error('Metadata file does not exist.'));
(fs.createReadStream as jest.Mock).mockReturnValueOnce(new Readable()
.destroy(new Error('Metadata file does not exist.')));

const result = await store.getRepresentation({ path: `${base}.htaccess` });
expect(result).toEqual({
Expand Down

0 comments on commit c342091

Please sign in to comment.