Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ matrix:
- os: linux
python: "3.7-dev"
env: DEBUGGER_TEST=true
- os: linux
python: "3.7-dev"
env: FUNCTIONAL_TEST=true
- os: linux
python: "3.7-dev"
env: DEBUGGER_TEST_RELEASE=true
Expand Down
7 changes: 4 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

253 changes: 114 additions & 139 deletions src/client/datascience/jupyterServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import * as fs from 'fs-extra';
import { inject, injectable } from 'inversify';
import * as path from 'path';
import { Observable } from 'rxjs/Observable';
import { Subscriber } from 'rxjs/Subscriber';
import * as uuid from 'uuid/v4';
import * as vscode from 'vscode';

Expand Down Expand Up @@ -150,7 +151,7 @@ export class JupyterServer implements INotebookServer {
output = cells;
},
(error) => {
deferred.resolve(output);
deferred.reject(error);
},
() => {
deferred.resolve(output);
Expand Down Expand Up @@ -199,23 +200,34 @@ export class JupyterServer implements INotebookServer {
}

public executeSilently = (code: string) : Promise<void> => {
// If we have a session, execute the code now.
if (this.session) {
// Generate a new request and wrap it in a promise as we wait for it to finish
const request = this.generateRequest(code, true);

return new Promise((resolve, reject) => {
// Just wait for our observable to finish
const observable = this.generateExecuteObservable(code, 'file', -1, '0', request);
// tslint:disable-next-line:no-empty
observable.subscribe(() => {
},
reject,
resolve);
});
}

return Promise.reject(new Error(localize.DataScience.sessionDisposed()));
return new Promise((resolve, reject) => {
// If we have a session, execute the code now.
if (this.session) {
// Generate a new request and resolve when it's done.
const request = this.generateRequest(code, true);

if (request) {

// // For debugging purposes when silently is failing.
// request.onIOPub = (msg: KernelMessage.IIOPubMessage) => {
// try {
// this.logger.logInformation(`Execute silently message ${msg.header.msg_type} : hasData=${'data' in msg.content}`);
// } catch (err) {
// this.logger.logError(err);
// }
// };

request.done.then(() => {
this.logger.logInformation(`Execute for ${code} silently finished.`);
resolve();
}).catch(reject);
} else {
reject(new Error(localize.DataScience.sessionDisposed()));
}
} else {
reject(new Error(localize.DataScience.sessionDisposed()));
}
});
}

public get onStatusChanged() : vscode.Event<boolean> {
Expand Down Expand Up @@ -299,7 +311,7 @@ export class JupyterServer implements INotebookServer {
return this.session ? this.session.kernel.requestExecute(
{
// Replace windows line endings with unix line endings.
code: code.replace('\r\n', '\n'),
code: code.replace(/\r\n/g, '\n'),
stop_on_error: false,
allow_stdin: false,
silent: silent
Expand Down Expand Up @@ -437,68 +449,6 @@ export class JupyterServer implements INotebookServer {
});
}

private changeDirectoryObservable = (file: string) : Observable<boolean> => {
return new Observable<boolean>(subscriber => {
// Execute some code and when its done, finish our subscriber
const dir = path.dirname(file);
this.executeSilently(`%cd "${dir}"`)
.then(() => {
subscriber.next(true);
subscriber.complete();
})
.catch(err => subscriber.error(err));
});
}

private chainObservables<T>(first : Observable<T>, second : () => Observable<ICell>) : Observable<ICell> {
return new Observable<ICell>(subscriber => {
first.subscribe(
() => { return; },
(err) => subscriber.error(err),
() => {
// When the first completes, tell the second to go
second().subscribe((cell : ICell) => {
subscriber.next(cell);
},
(err) => {
subscriber.error(err);
},
() => {
subscriber.complete();
});
}
);
});
}

private executeCodeObservable = (code: string, file: string, line: number) : Observable<ICell> => {

if (this.session) {
// Send a magic that changes the current directory if we aren't already sending a magic
if (line >= 0 && fs.existsSync(file)) {
return this.chainObservables(
this.changeDirectoryObservable(file),
() => this.executeCodeObservableInternal(code, file, line));
} else {
// We're inside of an execute silently already, don't recurse
return this.executeCodeObservableInternal(code, file, line);
}
}

return new Observable<ICell>(subscriber => {
subscriber.error(new Error(localize.DataScience.sessionDisposed()));
subscriber.complete();
});
}

private executeCodeObservableInternal = (code: string, file: string, line: number) : Observable<ICell> => {
// Send an execute request with this code
const id = uuid();
const request = this.session ? this.generateRequest(code, false) : undefined;

return this.generateExecuteObservable(code, file, line, id, request);
}

private appendLineFeed(arr : string[], modifier? : (s : string) => string) {
return arr.map((s: string, i: number) => {
const out = modifier ? modifier(s) : s;
Expand Down Expand Up @@ -529,7 +479,76 @@ export class JupyterServer implements INotebookServer {
});
}

private generateExecuteObservable(code: string, file: string, line: number, id: string, request: Kernel.IFuture | undefined) : Observable<ICell> {
private changeDirectoryIfPossible = async (file: string, line: number) : Promise<void> => {
if (line >= 0 && await fs.pathExists(file)) {
const dir = path.dirname(file);
await this.executeSilently(`%cd "${dir}"`);
}
}

private handleCodeRequest = (subscriber: Subscriber<ICell>, startTime: number, cell: ICell, code: string) => {
// Generate a new request.
const request = this.generateRequest(code, false);

// Transition to the busy stage
cell.state = CellState.executing;

// Listen to the reponse messages and update state as we go
if (request) {
request.onIOPub = (msg: KernelMessage.IIOPubMessage) => {
try {
if (KernelMessage.isExecuteResultMsg(msg)) {
this.handleExecuteResult(msg as KernelMessage.IExecuteResultMsg, cell);
} else if (KernelMessage.isExecuteInputMsg(msg)) {
this.handleExecuteInput(msg as KernelMessage.IExecuteInputMsg, cell);
} else if (KernelMessage.isStatusMsg(msg)) {
this.handleStatusMessage(msg as KernelMessage.IStatusMsg);
} else if (KernelMessage.isStreamMsg(msg)) {
this.handleStreamMesssage(msg as KernelMessage.IStreamMsg, cell);
} else if (KernelMessage.isDisplayDataMsg(msg)) {
this.handleDisplayData(msg as KernelMessage.IDisplayDataMsg, cell);
} else if (KernelMessage.isErrorMsg(msg)) {
this.handleError(msg as KernelMessage.IErrorMsg, cell);
} else {
this.logger.logWarning(`Unknown message ${msg.header.msg_type} : hasData=${'data' in msg.content}`);
}

// Set execution count, all messages should have it
if (msg.content.execution_count) {
cell.data.execution_count = msg.content.execution_count as number;
}

// Show our update if any new output
subscriber.next(cell);
} catch (err) {
// If not a restart error, then tell the subscriber
if (startTime > this.sessionStartTime) {
this.logger.logError(`Error during message ${msg.header.msg_type}`);
subscriber.error(err);
}
}
};

// Create completion and error functions so we can bind our cell object
// tslint:disable-next-line:no-any
const completion = (error?: any) => {
cell.state = error ? CellState.error : CellState.finished;
// Only do this if start time is still valid. Dont log an error to the subscriber. Error
// state should end up in the cell output.
if (startTime > this.sessionStartTime) {
subscriber.next(cell);
}
subscriber.complete();
};

// When the request finishes we are done
request.done.then(completion).catch(completion);
} else {
subscriber.error(new Error(localize.DataScience.sessionDisposed()));
}
}

private executeCodeObservable(code: string, file: string, line: number) : Observable<ICell> {
return new Observable<ICell>(subscriber => {
// Start out empty;
const cell: ICell = {
Expand All @@ -540,7 +559,7 @@ export class JupyterServer implements INotebookServer {
metadata: {},
execution_count: 0
},
id: id,
id: uuid(),
file: file,
line: line,
state: CellState.init
Expand All @@ -549,64 +568,20 @@ export class JupyterServer implements INotebookServer {
// Keep track of when we started.
const startTime = Date.now();

// Tell our listener.
// Tell our listener. NOTE: have to do this asap so that markdown cells don't get
// run before our cells.
subscriber.next(cell);

// Transition to the busy stage
cell.state = CellState.executing;

// Listen to the reponse messages and update state as we go
if (request) {
request.onIOPub = (msg: KernelMessage.IIOPubMessage) => {
try {
if (KernelMessage.isExecuteResultMsg(msg)) {
this.handleExecuteResult(msg as KernelMessage.IExecuteResultMsg, cell);
} else if (KernelMessage.isExecuteInputMsg(msg)) {
this.handleExecuteInput(msg as KernelMessage.IExecuteInputMsg, cell);
} else if (KernelMessage.isStatusMsg(msg)) {
this.handleStatusMessage(msg as KernelMessage.IStatusMsg);
} else if (KernelMessage.isStreamMsg(msg)) {
this.handleStreamMesssage(msg as KernelMessage.IStreamMsg, cell);
} else if (KernelMessage.isDisplayDataMsg(msg)) {
this.handleDisplayData(msg as KernelMessage.IDisplayDataMsg, cell);
} else if (KernelMessage.isErrorMsg(msg)) {
this.handleError(msg as KernelMessage.IErrorMsg, cell);
} else {
this.logger.logWarning(`Unknown message ${msg.header.msg_type} : hasData=${'data' in msg.content}`);
}

// Set execution count, all messages should have it
if (msg.content.execution_count) {
cell.data.execution_count = msg.content.execution_count as number;
}

// Show our update if any new output
subscriber.next(cell);
} catch (err) {
// If not a restart error, then tell the subscriber
if (this.sessionStartTime && startTime > this.sessionStartTime) {
this.logger.logError(`Error during message ${msg.header.msg_type}`);
subscriber.error(err);
}
}
};

// Create completion and error functions so we can bind our cell object
const completion = (error : boolean) => {
cell.state = error ? CellState.error : CellState.finished;
// Only do this if start time is still valid
if (this.sessionStartTime && startTime > this.sessionStartTime) {
subscriber.next(cell);
}
subscriber.complete();
};

// When the request finishes we are done
request.done.then(() => completion(false), () => completion(true));
} else {
subscriber.error(new Error(localize.DataScience.sessionDisposed()));
}

// Attempt to change to the current directory. When that finishes
// send our real request
this.changeDirectoryIfPossible(file, line)
.then(() => {
this.handleCodeRequest(subscriber, startTime, cell, code);
})
.catch(() => {
// Ignore errors if they occur. Just execute normally
this.handleCodeRequest(subscriber, startTime, cell, code);
});
});
}

Expand Down