From 483dc3f2bb46d18b7bbb41d72118c356bd9de403 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 6 Mar 2018 17:17:35 +0800 Subject: [PATCH] ZEPPELIN-3296. Reorg livy integration test to minimize livy session ### What is this PR for? Just refactor livy integration test to minuze livy session so that we can reduce the livy build time. ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3296 ### How should this be tested? * Travis CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang Closes #2844 from zjffdu/ZEPPELIN-3296 and squashes the following commits: 206ea3e [Jeff Zhang] ZEPPELIN-3296. Reorg livy integration test to minimize livy session --- .travis.yml | 9 +- .../zeppelin/livy/LivyInterpreterIT.java | 646 +++++++----------- 2 files changed, 250 insertions(+), 405 deletions(-) diff --git a/.travis.yml b/.travis.yml index a6f72c87a79..9edb1986841 100644 --- a/.travis.yml +++ b/.travis.yml @@ -114,17 +114,16 @@ matrix: dist: trusty env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false" - # Test python/pyspark with python 2, livy 0.5 - sudo: required dist: trusty jdk: "openjdk7" - env: PYTHON="2" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' -DfailIfNoTests=false" + env: PYTHON="2" SPARK_VER="1.6.3" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="" - # Test python/pyspark with python 3, livy 0.5 + # Test livy 0.5 with spark 2.2.0 under python3 - sudo: required dist: trusty - jdk: "openjdk7" - env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' -DfailIfNoTests=false" + jdk: "openjdk8" + env: PYTHON="3" SPARK_VER="2.2.0" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="" before_install: # check files included in commit range, clear bower_components if a bower.json file has changed. diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index 3dfeb363f8f..96fdbea876b 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -18,17 +18,27 @@ package org.apache.zeppelin.livy; +import org.apache.commons.io.IOUtils; import org.apache.livy.test.framework.Cluster; import org.apache.livy.test.framework.Cluster$; -import org.apache.commons.io.IOUtils; -import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Properties; @@ -78,128 +88,7 @@ public static boolean checkPreCondition() { @Test - public void testSparkInterpreterRDD() throws InterpreterException { - if (!checkPreCondition()) { - return; - } - InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); - interpreterGroup.put("session_1", new ArrayList()); - final LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); - sparkInterpreter.setInterpreterGroup(interpreterGroup); - interpreterGroup.get("session_1").add(sparkInterpreter); - AuthenticationInfo authInfo = new AuthenticationInfo("user1"); - MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); - InterpreterOutput output = new InterpreterOutput(outputListener); - final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", - "title", "text", authInfo, null, null, null, null, null, null, output); - sparkInterpreter.open(); - - try { - // detect spark version - InterpreterResult result = sparkInterpreter.interpret("sc.version", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - - boolean isSpark2 = isSpark2(sparkInterpreter, context); - - // test RDD api - result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("Double = 55.0")); - - // single line comment - String singleLineComment = "println(1)// my comment"; - result = sparkInterpreter.interpret(singleLineComment, context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - - // multiple line comment - String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */"; - result = sparkInterpreter.interpret(multipleLineComment, context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - - // multi-line string - String multiLineString = "val str = \"\"\"multiple\n" + - "line\"\"\"\n" + - "println(str)"; - result = sparkInterpreter.interpret(multiLineString, context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("multiple\nline")); - - // case class - String caseClassCode = "case class Person(id:Int, \n" + - "name:String)\n" + - "val p=Person(1, \"name_a\")"; - result = sparkInterpreter.interpret(caseClassCode, context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)")); - - // object class - String objectClassCode = "object Person {}"; - result = sparkInterpreter.interpret(objectClassCode, context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - if (!isSpark2) { - assertTrue(result.message().get(0).getData().contains("defined module Person")); - } else { - assertTrue(result.message().get(0).getData().contains("defined object Person")); - } - - // html output - String htmlCode = "println(\"%html

hello

\")"; - result = sparkInterpreter.interpret(htmlCode, context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); - - // error - result = sparkInterpreter.interpret("println(a)", context); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); - assertTrue(result.message().get(0).getData().contains("error: not found: value a")); - - // incomplete code - result = sparkInterpreter.interpret("if(true){", context); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); - assertTrue(result.message().get(0).getData().contains("incomplete statement")); - - // cancel - if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) { - Thread cancelThread = new Thread() { - @Override - public void run() { - // invoke cancel after 1 millisecond to wait job starting - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - sparkInterpreter.cancel(context); - } - }; - cancelThread.start(); - result = sparkInterpreter - .interpret("sc.parallelize(1 to 10).foreach(e=>Thread.sleep(10*1000))", context); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - String message = result.message().get(0).getData(); - // 2 possibilities, sometimes livy doesn't return the real cancel exception - assertTrue(message.contains("cancelled part of cancelled job group") || - message.contains("Job is cancelled")); - } - - } finally { - sparkInterpreter.close(); - } - } - - - @Test - public void testSparkInterpreterDataFrame() throws InterpreterException { + public void testSparkInterpreter() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -227,309 +116,229 @@ public void testSparkInterpreterDataFrame() throws InterpreterException { assertEquals(1, result.message().size()); boolean isSpark2 = isSpark2(sparkInterpreter, context); + testRDD(sparkInterpreter, isSpark2); + testDataFrame(sparkInterpreter, sqlInterpreter, isSpark2); - // test DataFrame api - if (!isSpark2) { - result = sparkInterpreter.interpret( - "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); - } else { - result = sparkInterpreter.interpret( - "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); - } - sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); - // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter - result = sqlInterpreter.interpret("select * from df where col_1='hello'", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData()); - // double quotes - result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData()); - - // only enable this test in spark2 as spark1 doesn't work for this case - if (isSpark2) { - result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - } - - // single quotes inside attribute value - result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - - // test sql with syntax error - result = sqlInterpreter.interpret("select * from df2", context); - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); - - if (!isSpark2) { - assertTrue(result.message().get(0).getData().contains("Table not found")); - } else { - assertTrue(result.message().get(0).getData().contains("Table or view not found")); - } } finally { sparkInterpreter.close(); sqlInterpreter.close(); } } - @Test - public void testSparkSQLInterpreter() throws InterpreterException { - if (!checkPreCondition()) { - return; - } - InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); - interpreterGroup.put("session_1", new ArrayList()); - LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter( - new LivySparkInterpreter(properties)); - sparkInterpreter.setInterpreterGroup(interpreterGroup); - interpreterGroup.get("session_1").add(sparkInterpreter); - LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter( - new LivySparkSQLInterpreter(properties)); - interpreterGroup.get("session_1").add(sqlInterpreter); - sqlInterpreter.setInterpreterGroup(interpreterGroup); - sqlInterpreter.open(); - - try { - AuthenticationInfo authInfo = new AuthenticationInfo("user1"); - MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); - InterpreterOutput output = new InterpreterOutput(outputListener); - InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql", - "title", "text", authInfo, null, null, null, null, null, null, output); - InterpreterResult result = sqlInterpreter.interpret("show tables", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - assertTrue(result.message().get(0).getData().contains("tableName")); - int r = sqlInterpreter.getProgress(context); - assertTrue(r == 0); - } finally { - sqlInterpreter.close(); - } - } - - - @Test - public void testSparkSQLCancellation() throws InterpreterException { - if (!checkPreCondition()) { - return; - } - InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); - interpreterGroup.put("session_1", new ArrayList()); - LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); - sparkInterpreter.setInterpreterGroup(interpreterGroup); - interpreterGroup.get("session_1").add(sparkInterpreter); + private void testRDD(final LivySparkInterpreter sparkInterpreter, boolean isSpark2) { AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", "title", "text", authInfo, null, null, null, null, null, null, output); - sparkInterpreter.open(); - - final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties); - interpreterGroup.get("session_1").add(sqlInterpreter); - sqlInterpreter.setInterpreterGroup(interpreterGroup); - sqlInterpreter.open(); - - try { - // detect spark version - InterpreterResult result = sparkInterpreter.interpret("sc.version", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - - boolean isSpark2 = isSpark2(sparkInterpreter, context); - - // test DataFrame api - if (!isSpark2) { - result = sparkInterpreter.interpret( - "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); - } else { - result = sparkInterpreter.interpret( - "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); - } - sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); - // cancel - if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) { - Thread cancelThread = new Thread() { - @Override - public void run() { - sqlInterpreter.cancel(context); + InterpreterResult result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("Double = 55.0")); + + // single line comment + String singleLineComment = "println(1)// my comment"; + result = sparkInterpreter.interpret(singleLineComment, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + // multiple line comment + String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */"; + result = sparkInterpreter.interpret(multipleLineComment, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + // multi-line string + String multiLineString = "val str = \"\"\"multiple\n" + + "line\"\"\"\n" + + "println(str)"; + result = sparkInterpreter.interpret(multiLineString, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("multiple\nline")); + + // case class + String caseClassCode = "case class Person(id:Int, \n" + + "name:String)\n" + + "val p=Person(1, \"name_a\")"; + result = sparkInterpreter.interpret(caseClassCode, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)")); + + // object class + String objectClassCode = "object Person {}"; + result = sparkInterpreter.interpret(objectClassCode, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + if (!isSpark2) { + assertTrue(result.message().get(0).getData().contains("defined module Person")); + } else { + assertTrue(result.message().get(0).getData().contains("defined object Person")); + } + + // html output + String htmlCode = "println(\"%html

hello

\")"; + result = sparkInterpreter.interpret(htmlCode, context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); + + // error + result = sparkInterpreter.interpret("println(a)", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("error: not found: value a")); + + // incomplete code + result = sparkInterpreter.interpret("if(true){", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + assertTrue(result.message().get(0).getData().contains("incomplete statement")); + + // cancel + if (sparkInterpreter.livyVersion.newerThanEquals(LivyVersion.LIVY_0_3_0)) { + Thread cancelThread = new Thread() { + @Override + public void run() { + // invoke cancel after 1 millisecond to wait job starting + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); } - }; - cancelThread.start(); - //sleep so that cancelThread performs a cancel. - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - result = sqlInterpreter - .interpret("select count(1) from df", context); - if (result.code().equals(InterpreterResult.Code.ERROR)) { - String message = result.message().get(0).getData(); - // 2 possibilities, sometimes livy doesn't return the real cancel exception - assertTrue(message.contains("cancelled part of cancelled job group") || - message.contains("Job is cancelled")); + sparkInterpreter.cancel(context); } - } - } catch (LivyException e) { - } finally { - sparkInterpreter.close(); - sqlInterpreter.close(); + }; + cancelThread.start(); + result = sparkInterpreter + .interpret("sc.parallelize(1 to 10).foreach(e=>Thread.sleep(10*1000))", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + String message = result.message().get(0).getData(); + // 2 possibilities, sometimes livy doesn't return the real cancel exception + assertTrue(message.contains("cancelled part of cancelled job group") || + message.contains("Job is cancelled")); } } - @Test - public void testStringWithTruncation() throws InterpreterException { - if (!checkPreCondition()) { - return; - } - InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); - interpreterGroup.put("session_1", new ArrayList()); - LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); - sparkInterpreter.setInterpreterGroup(interpreterGroup); - interpreterGroup.get("session_1").add(sparkInterpreter); + private void testDataFrame(LivySparkInterpreter sparkInterpreter, + final LivySparkSQLInterpreter sqlInterpreter, + boolean isSpark2) throws LivyException { AuthenticationInfo authInfo = new AuthenticationInfo("user1"); MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); InterpreterOutput output = new InterpreterOutput(outputListener); - InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", "title", "text", authInfo, null, null, null, null, null, null, output); - sparkInterpreter.open(); - - LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties); - interpreterGroup.get("session_1").add(sqlInterpreter); - sqlInterpreter.setInterpreterGroup(interpreterGroup); - sqlInterpreter.open(); - try { - // detect spark version - InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + InterpreterResult result = null; + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - - boolean isSpark2 = isSpark2(sparkInterpreter, context); - - // test DataFrame api - if (!isSpark2) { - result = sparkInterpreter.interpret( - "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); - } else { - result = sparkInterpreter.interpret( - "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); - } - sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); - // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter - result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([hello,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='hello'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData()); + // double quotes + result = sqlInterpreter.interpret("select * from df where col_1=\"hello\"", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\nhello\t20", result.message().get(0).getData()); + + // only enable this test in spark2 as spark1 doesn't work for this case + if (isSpark2) { + result = sqlInterpreter.interpret("select * from df where col_1=\"he\\\"llo\" ", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData()); - } finally { - sparkInterpreter.close(); - sqlInterpreter.close(); } - } + // single quotes inside attribute value + result = sqlInterpreter.interpret("select * from df where col_1=\"he'llo\"", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - @Test - public void testStringWithoutTruncation() throws InterpreterException { - if (!checkPreCondition()) { - return; - } - InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); - interpreterGroup.put("session_1", new ArrayList()); - Properties newProps = new Properties(); - for (Object name: properties.keySet()) { - newProps.put(name, properties.get(name)); + // test sql with syntax error + result = sqlInterpreter.interpret("select * from df2", context); + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType()); + + if (!isSpark2) { + assertTrue(result.message().get(0).getData().contains("Table not found")); + } else { + assertTrue(result.message().get(0).getData().contains("Table or view not found")); } - newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false"); - LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps); - sparkInterpreter.setInterpreterGroup(interpreterGroup); - interpreterGroup.get("session_1").add(sparkInterpreter); - AuthenticationInfo authInfo = new AuthenticationInfo("user1"); - MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); - InterpreterOutput output = new InterpreterOutput(outputListener); - InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", - "title", "text", authInfo, null, null, null, null, null, null, output); - sparkInterpreter.open(); - LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps); - interpreterGroup.get("session_1").add(sqlInterpreter); - sqlInterpreter.setInterpreterGroup(interpreterGroup); - sqlInterpreter.open(); + // test sql cancel + if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) { + Thread cancelThread = new Thread() { + @Override + public void run() { + sqlInterpreter.cancel(context); + } + }; + cancelThread.start(); + //sleep so that cancelThread performs a cancel. + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + result = sqlInterpreter + .interpret("select count(1) from df", context); + if (result.code().equals(InterpreterResult.Code.ERROR)) { + String message = result.message().get(0).getData(); + // 2 possibilities, sometimes livy doesn't return the real cancel exception + assertTrue(message.contains("cancelled part of cancelled job group") || + message.contains("Job is cancelled")); + } + } - try { - // detect spark version - InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + // test result string truncate + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - - boolean isSpark2 = isSpark2(sparkInterpreter, context); - - // test DataFrame api - if (!isSpark2) { - result = sparkInterpreter.interpret( - "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); - } else { - result = sparkInterpreter.interpret( - "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" - + "df.collect()", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(1, result.message().size()); - assertTrue(result.message().get(0).getData() - .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); - } - sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); - // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter - result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); - assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData()); - } finally { - sparkInterpreter.close(); - sqlInterpreter.close(); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData()); + } @Test - public void testPySparkInterpreter() throws LivyException, InterpreterException { + public void testPySparkInterpreter() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -549,7 +358,7 @@ public void testPySparkInterpreter() throws LivyException, InterpreterException // for livy version >=0.3 , input some erroneous spark code, check the shown result is more than one line InterpreterResult result = pysparkInterpreter.interpret("sc.parallelize(wrongSyntax(1, 2)).count()", context); assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertTrue(result.message().get(0).getData().split("\n").length>1); + assertTrue(result.message().get(0).getData().split("\n").length > 1); assertTrue(result.message().get(0).getData().contains("Traceback")); } catch (APINotFoundException e) { // only livy 0.2 can throw this exception since it doesn't have /version endpoint @@ -557,17 +366,17 @@ public void testPySparkInterpreter() throws LivyException, InterpreterException // traceback InterpreterResult result = pysparkInterpreter.interpret("print(a)", context); assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertTrue(result.message().get(0).getData().split("\n").length>1); + assertTrue(result.message().get(0).getData().split("\n").length > 1); assertTrue(result.message().get(0).getData().contains("Traceback")); } // test utf-8 Encoding try { String utf8Str = "你你你你你你好"; - InterpreterResult result = pysparkInterpreter.interpret("print(\""+utf8Str+"\")", context); + InterpreterResult result = pysparkInterpreter.interpret("print(\"" + utf8Str + "\")", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains(utf8Str)); - }catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } @@ -650,7 +459,7 @@ public void run() { } @Test - public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException { + public void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -660,6 +469,7 @@ public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException properties2.put("zeppelin.livy.displayAppInfo", "true"); // enable spark ui because it is disabled by livy integration test properties2.put("livy.spark.ui.enabled", "true"); + properties2.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false"); LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties2); sparkInterpreter.setInterpreterGroup(interpreterGroup); interpreterGroup.get("session_1").add(sparkInterpreter); @@ -670,6 +480,11 @@ public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException "title", "text", authInfo, null, null, null, null, null, null, output); sparkInterpreter.open(); + LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties2); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + sqlInterpreter.open(); + try { InterpreterResult result = sparkInterpreter.interpret("sc.version", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -683,13 +498,44 @@ public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException assertEquals(2, result.message().size()); assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); + // detect spark version + result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(2, result.message().size()); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(2, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(2, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData()); } finally { sparkInterpreter.close(); + sqlInterpreter.close(); } } @Test - public void testSparkRInterpreter() throws LivyException, InterpreterException { + public void testSparkRInterpreter() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -847,7 +693,7 @@ public void testSharedInterpreter() throws InterpreterException { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); - boolean isSpark2 = isSpark2((BaseLivyInterpreter)sparkInterpreter.getInnerInterpreter(), context); + boolean isSpark2 = isSpark2((BaseLivyInterpreter) sparkInterpreter.getInnerInterpreter(), context); if (!isSpark2) { result = sparkInterpreter.interpret( @@ -891,10 +737,10 @@ public void testSharedInterpreter() throws InterpreterException { assertEquals(1, result.message().size()); assertTrue(result.message().get(0).getData() .contains("+-----+-----+\n" + - "|col_1|col_2|\n" + - "+-----+-----+\n" + - "|hello| 20|\n" + - "+-----+-----+")); + "|col_1|col_2|\n" + + "+-----+-----+\n" + + "|hello| 20|\n" + + "+-----+-----+")); // access table from sparkr result = sparkRInterpreter.interpret("head(sql(\"select * from df\"))", context);