Skip to content

Commit

Permalink
Merge branch 'branch-0.9' of https://github.com/apache/zeppelin into …
Browse files Browse the repository at this point in the history
…apache-0.9
  • Loading branch information
xiejiajun committed Jul 7, 2020
2 parents 0e5f65a + 95624c9 commit 3d57719
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object ParagraphParser {

val GENERIC_STATEMENT_PREFIX: Regex =
"""(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|ALTER|
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r
|DROP|GRANT|REVOKE|TRUNCATE|LIST|USE|[a-z]\w+)\s+""".r

val VALID_IDENTIFIER = "[a-z][a-z0-9_]*"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,8 @@ def show(self, obj, **kwargs):
super(IPySparkZeppelinContext, self).show(obj, **kwargs)

z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext(), gateway)

# add jars to path
import sys
jarlist = map(lambda url: url.replace("file:/", "/"), (conf.get("spark.jars") or "").split(","))
sys.path.extend(filter(lambda jar: jar not in sys.path, jarlist))
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ def show(self, obj, **kwargs):

z = __zeppelin__ = PySparkZeppelinContext(intp.getZeppelinContext(), gateway)
__zeppelin__._setup_matplotlib()

# add jars to path
import sys
jarlist = map(lambda url: url.replace("file:/", "/"), (conf.get("spark.jars") or "").split(","))
sys.path.extend(filter(lambda jar: jar not in sys.path, jarlist))
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,20 @@ public void testConfInterpreter() throws IOException {
p1.setText("%spark\nimport com.databricks.spark.csv._");
note.run(p1.getId(), true);
assertEquals(Status.FINISHED, p1.getStatus());

// test pyspark imports path
Paragraph p2 = note.addNewParagraph(anonymous);
p2.setText("%spark.pyspark\nimport sys\nsys.path");
note.run(p2.getId(), true);
assertEquals(Status.FINISHED, p2.getStatus());
assertTrue(p2.getReturn().toString().contains("databricks_spark"));

Paragraph p3 = note.addNewParagraph(anonymous);
p3.setText("%spark.ipyspark\nimport sys\nsys.path");
note.run(p3.getId(), true);
assertEquals(Status.FINISHED, p3.getStatus());
assertTrue(p3.getReturn().toString().contains("databricks_spark"));

} finally {
if (null != note) {
TestUtils.getInstance(Notebook.class).removeNote(note, anonymous);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime,
if (!conf.isJobManagerEnabled()) {
return new ArrayList<>();
}
List<NoteJobInfo> notesJobInfo = new ArrayList<>();
notebook.getNoteStream()

List<NoteJobInfo> notesJobInfo = notebook.getNoteStream()
.filter(note -> authorizationService.isOwner(context.getUserAndRoles(), note.getId()))
.map(note -> new NoteJobInfo(note))
.filter(noteJobInfo -> noteJobInfo.unixTimeLastRun > lastUpdateServerUnixTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ public void onFailure(Exception ex, ServiceContext context) throws IOException {
});
}

public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException {
getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, null,
public void broadcastUpdateNoteJobInfo(Note note, long lastUpdateUnixTime) throws IOException {
ServiceContext context = new ServiceContext(new AuthenticationInfo(),
getNotebookAuthorizationService().getOwners(note.getId()));
getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, context,
new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) {
@Override
public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo,
Expand Down Expand Up @@ -1799,7 +1801,9 @@ public void run() {
@Override
public void onParagraphRemove(Paragraph p) {
try {
getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, null,
ServiceContext context = new ServiceContext(new AuthenticationInfo(),
getNotebookAuthorizationService().getOwners(p.getNote().getId()));
getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, context,
new JobManagerServiceCallback());
} catch (IOException e) {
LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
Expand All @@ -1809,7 +1813,7 @@ public void onParagraphRemove(Paragraph p) {
@Override
public void onNoteRemove(Note note, AuthenticationInfo subject) {
try {
broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
broadcastUpdateNoteJobInfo(note, System.currentTimeMillis() - 5000);
} catch (IOException e) {
LOG.warn("can not broadcast for job manager: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -1918,7 +1922,7 @@ public void onStatusChange(Paragraph p, Status before, Status after) {
p.setStatusToUserParagraph(p.getStatus());
broadcastParagraph(p.getNote(), p);
try {
broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000);
broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000);
} catch (IOException e) {
LOG.error("can not broadcast for job manager {}", e);
}
Expand Down

0 comments on commit 3d57719

Please sign in to comment.