Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/zeppelin into py4j…
Browse files Browse the repository at this point in the history
…PythonInterpreter
  • Loading branch information
astroshim committed Mar 8, 2017
2 parents 1395875 + 142597b commit c3f5b78
Show file tree
Hide file tree
Showing 23 changed files with 948 additions and 641 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ Thumbs.db
target/
**/target/

# maven flattened pom files
**/.flattened-pom.xml

# Generated by Jekyll
docs/_site/

Expand Down
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ cache:
- ${HOME}/R
- zeppelin-web/node
- zeppelin-web/node_modules
- zeppelin-web/bower_components

addons:
apt:
Expand Down
24 changes: 16 additions & 8 deletions dev/change_scala_version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ if [[ ($# -ne 1) || ( $1 == "--help") || $1 == "-h" ]]; then
usage
fi

TO_VERSION=$1
TO_VERSION="$1"

check_scala_version() {
for i in ${VALID_VERSIONS[*]}; do [ $i = "$1" ] && return 0; done
echo "Invalid Scala version: $1. Valid versions: ${VALID_VERSIONS[*]}" 1>&2
exit 1
}

check_scala_version "$TO_VERSION"
check_scala_version "${TO_VERSION}"

if [ $TO_VERSION = "2.11" ]; then
if [ "${TO_VERSION}" = "2.11" ]; then
FROM_VERSION="2.10"
SCALA_LIB_VERSION="2.11.7"
else
FROM_VERSION="2.11"
SCALA_LIB_VERSION="2.10.5"
fi

sed_i() {
Expand All @@ -57,11 +59,17 @@ sed_i() {
export -f sed_i

BASEDIR=$(dirname $0)/..
find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \
-exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \;
find "${BASEDIR}" -name 'pom.xml' -not -path '*target*' -print \
-exec bash -c "sed_i 's/\(artifactId.*\)_'${FROM_VERSION}'/\1_'${TO_VERSION}'/g' {}" \;

# Also update <scala.binary.version> in parent POM
# update <scala.binary.version> in parent POM
# Match any scala binary version to ensure idempotency
sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'$TO_VERSION'</' \
"$BASEDIR/pom.xml"
sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'${TO_VERSION}'</' \
"${BASEDIR}/pom.xml"

# update <scala.version> in parent POM
# This is to make variables in leaf pom to be substituted to real value when flattened-pom is created.
# maven-flatten plugin doesn't take properties defined under profile even if scala-2.11/scala-2.10 is activated via -Pscala-2.11/-Pscala-2.10,
# and use default defined properties to create flatten pom.
sed_i '1,/<scala\.version>[0-9]*\.[0-9]*\.[0-9]*</s/<scala\.version>[0-9]*\.[0-9]*\.[0-9]*</<scala.version>'${SCALA_LIB_VERSION}'</' \
"${BASEDIR}/pom.xml"
9 changes: 9 additions & 0 deletions docs/interpreter/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ There are more JDBC interpreter properties you can specify like below.
<td>default.jceks.credentialKey</td>
<td>jceks credential key</td>
</tr>
<tr>
<td>zeppelin.jdbc.precode</td>
<td>Some SQL which executes while opening connection</td>
</tr>
</table>

You can also add more properties by using this [method](http://docs.oracle.com/javase/7/docs/api/java/sql/DriverManager.html#getConnection%28java.lang.String,%20java.util.Properties%29).
Expand Down Expand Up @@ -423,8 +427,13 @@ Here are some examples you can refer to. Including the below connectors, you can
<td>default.password</td>
<td>hive_password</td>
</tr>
<tr>
<td>hive.proxy.user</td>
<td>true or false</td>
</table>

Connection to Hive JDBC with a proxy user can be disabled with `hive.proxy.user` property (set to true by default)

[Apache Hive 1 JDBC Driver Docs](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC)
[Apache Hive 2 JDBC Driver Docs](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBC)

Expand Down
4 changes: 2 additions & 2 deletions docs/manual/interpreterinstallation.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ You can also install 3rd party interpreters located in the maven repository by u
./bin/install-interpreter.sh --name interpreter1 --artifact groupId1:artifact1:version1
```

The above command will download maven artifact `groupId1:artifact1:version1` and all of it's transitive dependencies into `interpreter/interpreter1` directory.
The above command will download maven artifact `groupId1:artifact1:version1` and all of its transitive dependencies into `interpreter/interpreter1` directory.

After restart Zeppelin, then [create interpreter setting](../manual/interpreters.html#what-is-zeppelin-interpreter) and [bind it with your notebook](../manual/interpreters.html#what-is-zeppelin-interpreter-setting).
After restart Zeppelin, then [create interpreter setting](../manual/interpreters.html#what-is-zeppelin-interpreter) and [bind it with your note](../manual/interpreters.html#what-is-zeppelin-interpreter-setting).

#### Install multiple 3rd party interpreters at once

Expand Down
55 changes: 38 additions & 17 deletions jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,7 @@
*/
package org.apache.zeppelin.jdbc;

import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -37,19 +30,22 @@
import java.util.Properties;
import java.util.Set;

import com.google.common.base.Throwables;
import org.apache.commons.dbcp2.ConnectionFactory;
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDriver;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
Expand All @@ -61,9 +57,13 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;

import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;

/**
* JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ,
Expand Down Expand Up @@ -103,6 +103,7 @@ public class JDBCInterpreter extends Interpreter {
static final String PASSWORD_KEY = "password";
static final String JDBC_JCEKS_FILE = "jceks.file";
static final String JDBC_JCEKS_CREDENTIAL_KEY = "jceks.credentialKey";
static final String ZEPPELIN_JDBC_PRECODE_KEY = "zeppelin.jdbc.precode";
static final String DOT = ".";

private static final char WHITESPACE = ' ';
Expand Down Expand Up @@ -340,6 +341,9 @@ private Connection getConnectionFromPool(String url, String user, String propert

if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
createConnectionPool(url, user, propertyKey, properties);
try (Connection connection = DriverManager.getConnection(jdbcDriver)) {
executePrecode(connection);
}
}
return DriverManager.getConnection(jdbcDriver);
}
Expand Down Expand Up @@ -374,16 +378,20 @@ public Connection getConnection(String propertyKey, InterpreterContext interpret
if (lastIndexOfUrl == -1) {
lastIndexOfUrl = connectionUrl.length();
}
connectionUrl.insert(lastIndexOfUrl, ";hive.server2.proxy.user=" + user + ";");
boolean hasProxyUser = property.containsKey("hive.proxy.user");
if (!hasProxyUser || !property.getProperty("hive.proxy.user").equals("false")){
logger.debug("Using hive proxy user");
connectionUrl.insert(lastIndexOfUrl, ";hive.server2.proxy.user=" + user + ";");
}
connection = getConnectionFromPool(connectionUrl.toString(),
user, propertyKey, properties);
user, propertyKey, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getCurrentUser());
ugi = UserGroupInformation.createProxyUser(
user, UserGroupInformation.getCurrentUser());
} catch (Exception e) {
logger.error("Error in createProxyUser", e);
logger.error("Error in getCurrentUser", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
Expand Down Expand Up @@ -540,6 +548,20 @@ protected ArrayList<String> splitSqlQueries(String sql) {
return queries;
}

private void executePrecode(Connection connection) throws SQLException {
String precode = getProperty(ZEPPELIN_JDBC_PRECODE_KEY);
if (StringUtils.isNotBlank(precode)) {
precode = StringUtils.trim(precode);
logger.info("Run SQL precode '{}'", precode);
try (Statement statement = connection.createStatement()) {
statement.execute(precode);
if (!connection.getAutoCommit()) {
connection.commit();
}
}
}
}

private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
Connection connection;
Expand Down Expand Up @@ -761,4 +783,3 @@ int getMaxConcurrentConnection() {
}
}
}

6 changes: 6 additions & 0 deletions jdbc/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
"propertyName": "zeppelin.jdbc.principal",
"defaultValue": "",
"description": "Kerberos principal"
},
"zeppelin.jdbc.precode": {
"envName": null,
"propertyName": "zeppelin.jdbc.precode",
"defaultValue": "",
"description": "SQL which executes while opening connection"
}
},
"editor": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.junit.Test;

import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;

import static org.apache.zeppelin.jdbc.JDBCInterpreter.ZEPPELIN_JDBC_PRECODE_KEY;

/**
* JDBC interpreter unit tests
*/
Expand Down Expand Up @@ -386,4 +389,43 @@ public void testMultiTenant() throws SQLException, IOException {
assertNull(user2JDBC2Conf.getPropertyMap("default").get("password"));
jdbc2.close();
}

@Test
public void testPrecode() throws SQLException, IOException {
Properties properties = new Properties();
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
properties.setProperty(ZEPPELIN_JDBC_PRECODE_KEY, "SET @testVariable=1");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();

String sqlQuery = "select @testVariable";

InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);

assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("@TESTVARIABLE\n1\n", interpreterResult.message().get(0).getData());
}

@Test
public void testIncorrectPrecode() throws SQLException, IOException {
Properties properties = new Properties();
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
properties.setProperty(ZEPPELIN_JDBC_PRECODE_KEY, "incorrect command");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();

String sqlQuery = "select 1";

InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);

assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code());
assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
}
}

0 comments on commit c3f5b78

Please sign in to comment.