diff --git a/HDP-CHANGES.txt b/HDP-CHANGES.txt index 26d77bcd47c..5312972f19b 100644 --- a/HDP-CHANGES.txt +++ b/HDP-CHANGES.txt @@ -1,5 +1,7 @@ Changes only in Hortonworks github and not in zeppelin 0.7.0 Apache Release +BUG-80721 ZEPPELIN-1965. Livy SQL Interpreter: Should use df.show(1000, false) to display results +BUG-80473 ZEPPELIN-2530: Zeppelin user impersonation with domain name suffix is failing BUG-80074 jdbc interpreter throwing CNF for SSLContexts class BUG-79934 [minor] Improve Session BUG-79635 zeppelin build failure in 2.6-maint diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index ce1c34add8f..a7b776c0f09 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -56,10 +56,15 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory` URL where livy server is running - zeppelin.livy.spark.maxResult + zeppelin.livy.spark.sql.maxResult 1000 Max number of Spark SQL result to display. + + zeppelin.livy.spark.sql.field.truncate + true + Whether to truncate field values longer than 20 characters or not + zeppelin.livy.session.create_timeout 120 diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 9a5197139d8..401621dd368 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -32,14 +32,25 @@ */ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { + public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE = + "zeppelin.livy.spark.sql.field.truncate"; + + public static final String ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT = + "zeppelin.livy.spark.sql.maxResult"; + private LivySparkInterpreter sparkInterpreter; private boolean isSpark2 = false; private int maxResult = 1000; + private boolean truncate = true; public LivySparkSQLInterpreter(Properties property) { super(property); - this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult")); + this.maxResult = Integer.parseInt(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT)); + if (property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE) != null) { + this.truncate = + Boolean.parseBoolean(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE)); + } } @Override @@ -111,9 +122,11 @@ public InterpreterResult interpret(String line, InterpreterContext context) { // use triple quote so that we don't need to do string escape. String sqlQuery = null; if (isSpark2) { - sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; + sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + + truncate + ")"; } else { - sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; + sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + + truncate + ")"; } InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(), this.displayAppInfo, true); diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json index 42f64cf00f0..8d3dea0f395 100644 --- a/livy/src/main/resources/interpreter-setting.json +++ b/livy/src/main/resources/interpreter-setting.json @@ -118,6 +118,11 @@ "defaultValue": "1000", "description": "Max number of Spark SQL result to display." }, + "zeppelin.livy.spark.sql.field.truncate": { + "propertyName": "zeppelin.livy.spark.sql.field.truncate", + "defaultValue": "true", + "description": "If true, truncate field values longer than 20 characters." + }, "zeppelin.livy.concurrentSQL": { "propertyName": "zeppelin.livy.concurrentSQL", "defaultValue": "false", diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index 94213f55b05..fcc812ef18a 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -20,7 +20,6 @@ import com.cloudera.livy.test.framework.Cluster; import com.cloudera.livy.test.framework.Cluster$; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.user.AuthenticationInfo; @@ -33,7 +32,6 @@ import java.util.Properties; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class LivyInterpreterIT { @@ -317,6 +315,131 @@ public void testSparkSQLInterpreter() { } } + @Test + public void testStringWithTruncation() { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList()); + LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + "title", "text", authInfo, null, null, null, null, null, output); + sparkInterpreter.open(); + + LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + sqlInterpreter.open(); + + try { + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData()); + } finally { + sparkInterpreter.close(); + sqlInterpreter.close(); + } + } + + @Test + public void testStringWithoutTruncation() { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList()); + Properties newProps = new Properties(); + for (Object name: properties.keySet()) { + newProps.put(name, properties.get(name)); + } + newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false"); + LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + "title", "text", authInfo, null, null, null, null, null, output); + sparkInterpreter.open(); + + LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + sqlInterpreter.open(); + + try { + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData()); + } finally { + sparkInterpreter.close(); + sqlInterpreter.close(); + } + } + @Test public void testPySparkInterpreter() { if (!checkPreCondition()) {