Skip to content

Commit

Permalink
Merge branch 'master' into py4jPythonInterpreter
Browse files Browse the repository at this point in the history
  • Loading branch information
astroshim committed Mar 8, 2017
2 parents 3c9585f + 0e19648 commit a48df58
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.SparkEnv;

import org.apache.spark.SecurityManager;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkILoop;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
Expand Down Expand Up @@ -126,6 +127,7 @@ public class SparkInterpreter extends Interpreter {
private SparkVersion sparkVersion;
private static File outputDir; // class outputdir for scala 2.11
private Object classServer; // classserver for scala 2.11
private JavaSparkContext jsc;


public SparkInterpreter(Properties property) {
Expand All @@ -152,6 +154,15 @@ public SparkContext getSparkContext() {
}
}

public JavaSparkContext getJavaSparkContext() {
synchronized (sharedInterpreterLock) {
if (jsc == null) {
jsc = JavaSparkContext.fromSparkContext(sc);
}
return jsc;
}
}

public boolean isSparkContextInitialized() {
synchronized (sharedInterpreterLock) {
return sc != null;
Expand Down Expand Up @@ -1422,6 +1433,7 @@ public void close() {
}
sparkSession = null;
sc = null;
jsc = null;
if (classServer != null) {
Utils.invokeMethod(classServer, "stop");
classServer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkRBackend;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
Expand All @@ -45,6 +46,7 @@ public class SparkRInterpreter extends Interpreter {
private SparkInterpreter sparkInterpreter;
private ZeppelinR zeppelinR;
private SparkContext sc;
private JavaSparkContext jsc;

public SparkRInterpreter(Properties property) {
super(property);
Expand Down Expand Up @@ -73,8 +75,10 @@ public void open() {

this.sparkInterpreter = getSparkInterpreter();
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
SparkVersion sparkVersion = new SparkVersion(sc.version());
ZeppelinRContext.setSparkContext(sc);
ZeppelinRContext.setJavaSparkContext(jsc);
if (Utils.isSpark2()) {
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.zeppelin.spark;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;

/**
Expand All @@ -28,6 +29,7 @@ public class ZeppelinRContext {
private static SQLContext sqlContext;
private static ZeppelinContext zeppelinContext;
private static Object sparkSession;
private static JavaSparkContext javaSparkContext;

public static void setSparkContext(SparkContext sparkContext) {
ZeppelinRContext.sparkContext = sparkContext;
Expand Down Expand Up @@ -60,4 +62,8 @@ public static ZeppelinContext getZeppelinContext() {
public static Object getSparkSession() {
return sparkSession;
}

public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; }

public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; }
}
2 changes: 1 addition & 1 deletion spark/src/main/resources/R/zeppelin_sparkr.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
if (version >= 20000) {
assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv)
assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv)
}
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,16 @@ public void run() {
this.exception = null;
errorMessage = null;
dateFinished = new Date();
progressUpdator.terminate();
} catch (NullPointerException e) {
LOGGER.error("Job failed", e);
progressUpdator.terminate();
this.exception = e;
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} catch (Throwable e) {
LOGGER.error("Job failed", e);
progressUpdator.terminate();
this.exception = e;
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} finally {
if (progressUpdator != null) {
progressUpdator.interrupt();
}
//aborted = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,45 @@
import org.slf4j.LoggerFactory;

/**
* Polls job progress with given interval
*
* @see Job#progress()
* @see JobListener#onProgressUpdate(org.apache.zeppelin.scheduler.Job, int)
*
* TODO(moon) : add description.
*/
public class JobProgressPoller extends Thread {
public static final long DEFAULT_INTERVAL_MSEC = 500;
Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);

private Job job;
private long intervalMs;
boolean terminate = false;

public JobProgressPoller(Job job, long intervalMs) {
super("JobProgressPoller, jobId=" + job.getId());
this.job = job;
this.intervalMs = intervalMs;
if (intervalMs < 0) {
throw new IllegalArgumentException("polling interval can't be " + intervalMs);
}
this.intervalMs = intervalMs == 0 ? DEFAULT_INTERVAL_MSEC : intervalMs;
}

@Override
public void run() {
if (intervalMs < 0) {
return;
} else if (intervalMs == 0) {
intervalMs = DEFAULT_INTERVAL_MSEC;
}

while (terminate == false) {
JobListener listener = job.getListener();
if (listener != null) {
try {
if (job.isRunning()) {
listener.onProgressUpdate(job, job.progress());
try {
while (!Thread.interrupted()) {
JobListener listener = job.getListener();
if (listener != null) {
try {
if (job.isRunning()) {
listener.onProgressUpdate(job, job.progress());
}
} catch (Exception e) {
logger.error("Can not get or update progress", e);
}
} catch (Exception e) {
logger.error("Can not get or update progress", e);
}
}
try {
Thread.sleep(intervalMs);
} catch (InterruptedException e) {
logger.error("Exception in JobProgressPoller while run Thread.sleep", e);
}
}
}

public void terminate() {
terminate = true;
} catch (InterruptedException ignored) {}
}
}

0 comments on commit a48df58

Please sign in to comment.