Skip to content

Commit

Permalink
[ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)…
Browse files Browse the repository at this point in the history
… to display results

Livy SQL interpreter truncate result strings of size greater than 20. In some cases, we like to see the full string. We are adding a interpreter property **zeppelin.livy.spark.sql.field.truncate** to control whether to truncate strings or not. By default, **zeppelin.livy.spark.sql.field.truncate** is set to **true**.

Improvement

https://issues.apache.org/jira/browse/ZEPPELIN-1965

Set zeppelin.livy.spark.sql.field.truncate to true or false
Run a SQL query which produces string values of length greater than 20.
Depending on the value of zeppelin.livy.spark.sql.field.truncate, the strings will either get truncated or not.

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Benoy Antony <benoy@apache.org>

Closes apache#2201 from benoyantony/master and squashes the following commits:

bb006c0 [Benoy Antony] changed field name and description
9eae68b [Benoy Antony] added a null check to avoid testcase failures, another nullcheck for backward compatibility and added two new testcases
ab1ead2 [Benoy Antony] documented zeppelin.livy.spark.sql.truncate
b6252be [Benoy Antony] [ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false) to display results

(cherry picked from commit 1135fb6)

Change-Id: Iee5341a7890bcdee386d1c22fc36d12692adccc3
  • Loading branch information
prabhjyotsingh committed Jun 7, 2017
1 parent 667e85f commit 2142f92
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 6 deletions.
2 changes: 2 additions & 0 deletions HDP-CHANGES.txt
@@ -1,5 +1,7 @@
Changes only in Hortonworks github and not in zeppelin 0.7.0 Apache Release

BUG-80721 ZEPPELIN-1965. Livy SQL Interpreter: Should use df.show(1000, false) to display results
BUG-80473 ZEPPELIN-2530: Zeppelin user impersonation with domain name suffix is failing
BUG-80074 jdbc interpreter throwing CNF for SSLContexts class
BUG-79934 [minor] Improve Session
BUG-79635 zeppelin build failure in 2.6-maint
Expand Down
7 changes: 6 additions & 1 deletion docs/interpreter/livy.md
Expand Up @@ -56,10 +56,15 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory`
<td>URL where livy server is running</td>
</tr>
<tr>
<td>zeppelin.livy.spark.maxResult</td>
<td>zeppelin.livy.spark.sql.maxResult</td>
<td>1000</td>
<td>Max number of Spark SQL result to display.</td>
</tr>
<tr>
<td>zeppelin.livy.spark.sql.field.truncate</td>
<td>true</td>
<td>Whether to truncate field values longer than 20 characters or not</td>
</tr>
<tr>
<td>zeppelin.livy.session.create_timeout</td>
<td>120</td>
Expand Down
Expand Up @@ -32,14 +32,25 @@
*/
public class LivySparkSQLInterpreter extends BaseLivyInterpreter {

public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE =
"zeppelin.livy.spark.sql.field.truncate";

public static final String ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT =
"zeppelin.livy.spark.sql.maxResult";

private LivySparkInterpreter sparkInterpreter;

private boolean isSpark2 = false;
private int maxResult = 1000;
private boolean truncate = true;

public LivySparkSQLInterpreter(Properties property) {
super(property);
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
this.maxResult = Integer.parseInt(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT));
if (property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE) != null) {
this.truncate =
Boolean.parseBoolean(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE));
}
}

@Override
Expand Down Expand Up @@ -111,9 +122,11 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
// use triple quote so that we don't need to do string escape.
String sqlQuery = null;
if (isSpark2) {
sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
truncate + ")";
} else {
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
truncate + ")";
}
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
this.displayAppInfo, true);
Expand Down
5 changes: 5 additions & 0 deletions livy/src/main/resources/interpreter-setting.json
Expand Up @@ -118,6 +118,11 @@
"defaultValue": "1000",
"description": "Max number of Spark SQL result to display."
},
"zeppelin.livy.spark.sql.field.truncate": {
"propertyName": "zeppelin.livy.spark.sql.field.truncate",
"defaultValue": "true",
"description": "If true, truncate field values longer than 20 characters."
},
"zeppelin.livy.concurrentSQL": {
"propertyName": "zeppelin.livy.concurrentSQL",
"defaultValue": "false",
Expand Down
127 changes: 125 additions & 2 deletions livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
Expand Up @@ -20,7 +20,6 @@

import com.cloudera.livy.test.framework.Cluster;
import com.cloudera.livy.test.framework.Cluster$;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
Expand All @@ -33,7 +32,6 @@
import java.util.Properties;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class LivyInterpreterIT {
Expand Down Expand Up @@ -317,6 +315,131 @@ public void testSparkSQLInterpreter() {
}
}

@Test
public void testStringWithTruncation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();

LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();

try {
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());

boolean isSpark2 = isSpark2(sparkInterpreter, context);

// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}

@Test
public void testStringWithoutTruncation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties newProps = new Properties();
for (Object name: properties.keySet()) {
newProps.put(name, properties.get(name));
}
newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();

LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();

try {
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());

boolean isSpark2 = isSpark2(sparkInterpreter, context);

// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}

@Test
public void testPySparkInterpreter() {
if (!checkPreCondition()) {
Expand Down

0 comments on commit 2142f92

Please sign in to comment.