Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrates to json-event-parser #102

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/ContextTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import {JsonLdContextNormalized} from "jsonld-context-parser";
*/
export class ContextTree {

private readonly subTrees: {[key: string]: ContextTree} = {};
private readonly subTrees: {[key: string | number]: ContextTree} = {};
private context: Promise<JsonLdContextNormalized> | null;

public getContext(keys: string[]): Promise<{ context: JsonLdContextNormalized, depth: number }> | null {
public getContext(keys: (string | number)[]): Promise<{ context: JsonLdContextNormalized, depth: number }> | null {
if (keys.length > 0) {
const [head, ...tail] = keys;
const subTree = this.subTrees[head];
Expand All @@ -25,7 +25,7 @@ export class ContextTree {
return this.context ? this.context.then((context) => ({ context, depth: 0 })) : null;
}

public setContext(keys: any[], context: Promise<JsonLdContextNormalized> | null) {
public setContext(keys: (string | number)[], context: Promise<JsonLdContextNormalized> | null) {
if (keys.length === 0) {
this.context = context;
} else {
Expand Down
179 changes: 95 additions & 84 deletions lib/JsonLdParser.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import * as RDF from "@rdfjs/types";
// tslint:disable-next-line:no-var-requires
const Parser = require('jsonparse');
import {ERROR_CODES, ErrorCoded, IDocumentLoader, JsonLdContext, Util as ContextUtil} from "jsonld-context-parser";
import {PassThrough, Transform, Readable} from "readable-stream";
// @ts-ignore The types are not updated yet
import {PassThrough, Transform, Stream, pipeline} from "readable-stream";
import {EntryHandlerArrayValue} from "./entryhandler/EntryHandlerArrayValue";
import {EntryHandlerContainer} from "./entryhandler/EntryHandlerContainer";
import {EntryHandlerInvalidFallback} from "./entryhandler/EntryHandlerInvalidFallback";
Expand All @@ -20,6 +19,9 @@ import {EntryHandlerKeywordValue} from "./entryhandler/keyword/EntryHandlerKeywo
import {ParsingContext} from "./ParsingContext";
import {Util} from "./Util";
import {parse as parseLinkHeader} from "http-link-header";
import {JsonEventParser} from "json-event-parser";
import {JsonEvent} from "json-event-parser/lib/JsonEventParser";


/**
* A stream transformer that parses JSON-LD (text) streams to an {@link RDF.Stream}.
Expand All @@ -46,11 +48,10 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
private readonly parsingContext: ParsingContext;
private readonly util: Util;

private readonly jsonParser: any;
// Jobs that are not started yet that process a @context (only used if streamingProfile is false)
private readonly contextJobs: (() => Promise<void>)[][];
// Jobs that are not started yet that process a @type (only used if streamingProfile is false)
private readonly typeJobs: { job: () => Promise<void>, keys: string[] }[];
private readonly typeJobs: { job: () => Promise<void>, keys: (string | number)[] }[];
// Jobs that are not started yet because of a missing @context or @type (only used if streamingProfile is false)
private readonly contextAwaitingJobs: { job: () => Promise<void>, keys: string[] }[];

Expand All @@ -60,30 +61,27 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
private lastKeys: any[];
// A promise representing the last job
private lastOnValueJob: Promise<void>;
// The keys inside of the JSON tree
private readonly jsonKeyStack: (string | number | undefined)[];
// The value inside of the JSON tree
private readonly jsonValueStack: any[];

constructor(options?: IJsonLdParserOptions) {
super({ readableObjectMode: true });
super({ readableObjectMode: true, writableObjectMode: true });
options = options || {};
this.options = options;
this.parsingContext = new ParsingContext({ parser: this, ...options });
this.util = new Util({ dataFactory: options.dataFactory, parsingContext: this.parsingContext });

this.jsonParser = new Parser();
this.contextJobs = [];
this.typeJobs = [];
this.contextAwaitingJobs = [];

this.lastDepth = 0;
this.lastKeys = [];
this.lastOnValueJob = Promise.resolve();

this.attachJsonParserListeners();

this.on('end', () => {
if (typeof this.jsonParser.mode !== 'undefined') {
this.emit('error', new Error('Unclosed document'))
}
})
this.jsonKeyStack = [];
this.jsonValueStack = [];
}

/**
Expand Down Expand Up @@ -157,22 +155,20 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
* @return {RDF.Stream} A quad stream.
*/
public import(stream: EventEmitter): RDF.Stream {
if('pipe' in stream) {
stream.on('error', (error) => parsed.emit('error', error));
const parsed = (<Readable>stream).pipe(new JsonLdParser(this.options));
return parsed;
} else {
const output = new PassThrough({ readableObjectMode: true });
stream.on('error', (error) => parsed.emit('error', error));
stream.on('data', (data) => output.push(data));
stream.on('end', () => output.push(null));
const parsed = output.pipe(new JsonLdParser(this.options));
return parsed;
let input: Stream = (<Stream>stream);
if(!('pipe' in stream)) {
input = new PassThrough({ readableObjectMode: true });
stream.on('error', (error) => input.emit('error', error));
stream.on('data', (data) => input.push(data));
stream.on('end', () => input.push(null));
}
return pipeline(input, new JsonEventParser(), new JsonLdParser(this.options), (err: any) => {
// We ignore the error?
});
}

public _transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void): void {
this.jsonParser.write(chunk);
public _transform(event: any, _encoding: string, callback: (error?: Error | null, data?: any) => void): void {
this.onJsonEvent(event);
this.lastOnValueJob
.then(() => callback(), (error) => callback(error));
}
Expand All @@ -199,7 +195,7 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
if (listPointer) {
// Terminate the list if the had at least one value
if (listPointer.value) {
this.emit('data', this.util.dataFactory.quad(listPointer.value, this.util.rdfRest, this.util.rdfNil,
this.push(this.util.dataFactory.quad(listPointer.value, this.util.rdfRest, this.util.rdfNil,
this.util.getDefaultGraph()));
}

Expand Down Expand Up @@ -413,65 +409,71 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
*
* This should only be called once.
*/
protected attachJsonParserListeners() {
// Listen to json parser events
this.jsonParser.onValue = (value: any) => {
const depth = this.jsonParser.stack.length;
const keys = (new Array(depth + 1).fill(0)).map((v, i) => {
return i === depth ? this.jsonParser.key : this.jsonParser.stack[i].key;
});

if (!this.isParsingContextInner(depth)) { // Don't parse inner nodes inside @context
const valueJobCb = () => this.newOnValueJob(keys, value, depth, true);
if (!this.parsingContext.streamingProfile
&& !this.parsingContext.contextTree.getContext(keys.slice(0, -1))) {
// If an out-of-order context is allowed,
// we have to buffer everything.
// We store jobs for @context's and @type's separately,
// because at the end, we have to process them first.
// We also handle @type because these *could* introduce a type-scoped context.
if (keys[depth] === '@context') {
let jobs = this.contextJobs[depth];
if (!jobs) {
jobs = this.contextJobs[depth] = [];
}
jobs.push(valueJobCb);
} else if (keys[depth] === '@type'
|| typeof keys[depth] === 'number' && keys[depth - 1] === '@type') { // Also capture @type with array values
// Remove @type from keys, because we want it to apply to parent later on
this.typeJobs.push({ job: valueJobCb, keys: keys.slice(0, keys.length - 1) });
} else {
this.contextAwaitingJobs.push({ job: valueJobCb, keys });
}
} else {
// Make sure that our value jobs are chained synchronously
this.lastOnValueJob = this.lastOnValueJob.then(valueJobCb);
}

// Execute all buffered jobs on deeper levels
if (!this.parsingContext.streamingProfile && depth === 0) {
this.lastOnValueJob = this.lastOnValueJob
.then(() => this.executeBufferedJobs());
}
protected onJsonEvent(event: JsonEvent) {
switch (event.type) {
case 'open-object':
this.jsonKeyStack.push(event.key);
this.insertContainerInValueStack(event.key, {});
break;
case 'open-array':
this.jsonKeyStack.push(event.key);
this.insertContainerInValueStack(event.key, []);
break;
case 'value':
if (!this.jsonKeyStack.includes('@context')) { // Don't parse inner nodes inside @context
this.onValue(event.key, event.value, this.jsonKeyStack);
}
if (typeof event.key === 'string') {
this.jsonValueStack[this.jsonValueStack.length - 1][event.key] = event.value;
} else if (typeof event.key === 'number') {
this.jsonValueStack[this.jsonValueStack.length - 1].push(event.value);
}
break;
case 'close-object':
case 'close-array':
const key = this.jsonKeyStack.pop();
const value = this.jsonValueStack.pop();
if (!this.jsonKeyStack.includes('@context')) { // Don't parse inner nodes inside @context
this.onValue(key, value, this.jsonKeyStack);
}
};
this.jsonParser.onError = (error: Error) => {
this.emit('error', error);
};
}
}

/**
* Check if the parser is currently parsing an element that is part of an @context entry.
* @param {number} depth A depth.
* @return {boolean} A boolean.
*/
protected isParsingContextInner(depth: number) {
for (let i = depth; i > 0; i--) {
if (this.jsonParser.stack[i - 1].key === '@context') {
return true;
protected onValue(key: string | number | undefined, value: any, keyStack: (string | number | undefined)[]) {
const depth = keyStack.length;
const keys = <string[]><any[]>[...keyStack, key];

const valueJobCb = () => this.newOnValueJob(keys, value, depth, true);
if (!this.parsingContext.streamingProfile
&& !this.parsingContext.contextTree.getContext(keys.slice(0, -1))) {
// If an out-of-order context is allowed,
// we have to buffer everything.
// We store jobs for @context's and @type's separately,
// because at the end, we have to process them first.
// We also handle @type because these *could* introduce a type-scoped context.
if (key === '@context') {
let jobs = this.contextJobs[depth];
if (!jobs) {
jobs = this.contextJobs[depth] = [];
}
jobs.push(valueJobCb);
} else if (key === '@type'
|| typeof key === 'number' && keys[keys.length - 2] === '@type') { // Also capture @type with array values
// Remove @type from keys, because we want it to apply to parent later on
this.typeJobs.push({ job: valueJobCb, keys: keys.slice(0, keys.length - 1) });
} else {
this.contextAwaitingJobs.push({ job: valueJobCb, keys });
}
} else {
// Make sure that our value jobs are chained synchronously
this.lastOnValueJob = this.lastOnValueJob.then(valueJobCb);
}

// Execute all buffered jobs on deeper levels
if (!this.parsingContext.streamingProfile && depth === 0) {
this.lastOnValueJob = this.lastOnValueJob
.then(() => this.executeBufferedJobs());
}
return false;
}

/**
Expand All @@ -497,7 +499,7 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
// We check all possible parent nodes for the current job, from root to leaves.
if (this.typeJobs.length > 0) {
// First collect all applicable type jobs
const applicableTypeJobs: { job: () => Promise<void>, keys: string[] }[] = [];
const applicableTypeJobs: { job: () => Promise<void>, keys: (string | number)[] }[] = [];
const applicableTypeJobIds: number[] = [];
for (let i = 0; i < this.typeJobs.length; i++) {
const typeJob = this.typeJobs[i];
Expand Down Expand Up @@ -526,6 +528,15 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
await job.job();
}
}

private insertContainerInValueStack(key: string | number | undefined, value: any): void {
if (typeof key === 'string') {
this.jsonValueStack[this.jsonValueStack.length - 1][key] = value;
} else if (typeof key === 'number') {
this.jsonValueStack[this.jsonValueStack.length - 1].push(value);
}
this.jsonValueStack.push(value);
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class Util {
* @param needle An array to check if it is a prefix.
* @param haystack An array to look in.
*/
public static isPrefixArray(needle: string[], haystack: string[]): boolean {
public static isPrefixArray(needle: any[], haystack: any[]): boolean {
if (needle.length > haystack.length) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"canonicalize": "^1.0.1",
"http-link-header": "^1.0.2",
"jsonld-context-parser": "^2.1.3",
"jsonparse": "^1.3.1",
"json-event-parser": "1.0.0-beta.1",
"rdf-data-factory": "^1.1.0",
"readable-stream": "^4.0.0"
},
Expand Down
Loading