Skip to content

Commit

Permalink
Merge branch 'branch-0.9' of https://github.com/apache/zeppelin into …
Browse files Browse the repository at this point in the history
…apache-0.9
  • Loading branch information
xiejiajun committed Jul 6, 2020
2 parents 076668a + ce6fa44 commit 0e5f65a
Show file tree
Hide file tree
Showing 195 changed files with 5,052 additions and 1,773 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ jobs:

# Run Spark integration test and unit test

# Run spark integration of in one zeppelin instance: Spark 3.0
- jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Phadoop2 -Pintegration" R="true" BUILD_FLAG="install -DskipTests -DskipRat" TEST_FLAG="test -DskipRat" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown -am" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest30,SparkIntegrationTest30 -DfailIfNoTests=false"

# Run spark integration of in one zeppelin instance (2.4, 2.3, 2.2)
- jdk: "openjdk8"
dist: xenial
Expand Down Expand Up @@ -162,6 +167,7 @@ before_install:
- clearcache=$(echo $gitlog | grep -c -E "clear bower|bower clear" || true)
- if [ "$hasbowerchanged" -gt 0 ] || [ "$clearcache" -gt 0 ]; then echo "Clearing bower_components cache"; rm -r zeppelin-web/bower_components; npm cache verify; else echo "Using cached bower_components."; fi
- echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
- if [[ -n $R ]]; then ./testing/install_R.sh; fi
- bash -x ./testing/install_external_dependencies.sh
- ls -la .spark-dist ${HOME}/.m2/repository/.cache/maven-download-plugin || true
- ls .node_modules && cp -r .node_modules zeppelin-web/node_modules || echo "node_modules are not cached"
Expand Down
2 changes: 1 addition & 1 deletion cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<description>Zeppelin cassandra support</description>

<properties>
<cassandra.driver.version>4.6.1</cassandra.driver.version>
<cassandra.driver.version>4.7.2</cassandra.driver.version>
<snappy.version>1.1.7.3</snappy.version>
<lz4.version>1.6.0</lz4.version>
<scalate.version>1.7.1</scalate.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.internal.core.loadbalancing.DcInferringLoadBalancingPolicy;
import com.datastax.oss.driver.shaded.guava.common.net.InetAddresses;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
Expand All @@ -42,7 +42,9 @@
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static java.lang.Integer.parseInt;
Expand Down Expand Up @@ -112,15 +114,33 @@ public class CassandraInterpreter extends Interpreter {
public static final String CASSANDRA_TRUSTSTORE_PASSWORD =
"cassandra.ssl.truststore.password";


public static final String CASSANDRA_FORMAT_FLOAT_PRECISION =
"cassandra.format.float_precision";
public static final String CASSANDRA_FORMAT_DOUBLE_PRECISION =
"cassandra.format.double_precision";
public static final String CASSANDRA_FORMAT_TIMESTAMP =
"cassandra.format.timestamp";
public static final String CASSANDRA_FORMAT_TIME =
"cassandra.format.time";
public static final String CASSANDRA_FORMAT_DATE =
"cassandra.format.date";
public static final String CASSANDRA_FORMAT_TYPE =
"cassandra.format.output";
public static final String CASSANDRA_FORMAT_TIMEZONE =
"cassandra.format.timezone";
public static final String CASSANDRA_FORMAT_LOCALE =
"cassandra.format.locale";

public static final String NONE_VALUE = "none";
public static final String DEFAULT_VALUE = "DEFAULT";
public static final String DEFAULT_HOST = "127.0.0.1";
public static final String DEFAULT_PORT = "9042";
public static final String DEFAULT_KEYSPACE = "system";
public static final String DEFAULT_PROTOCOL_VERSION = "DEFAULT";
public static final String DEFAULT_COMPRESSION = "none";
public static final String DEFAULT_COMPRESSION = NONE_VALUE;
public static final String DEFAULT_CONNECTIONS_PER_HOST = "1";
public static final String DEFAULT_MAX_REQUEST_PER_CONNECTION = "1024";
public static final String DEFAULT_POLICY = "DEFAULT";
public static final String DEFAULT_POLICY = DEFAULT_VALUE;
public static final String DEFAULT_PARALLELISM = "10";
public static final String DEFAULT_POOL_TIMEOUT = "5000";
public static final String DEFAULT_HEARTBEAT_INTERVAL = "30";
Expand All @@ -133,75 +153,72 @@ public class CassandraInterpreter extends Interpreter {
public static final String DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS = "12";

static final List NO_COMPLETION = new ArrayList<>();
public static final String DATASTAX_JAVA_DRIVER_PREFIX = "datastax-java-driver.";
public static final String MILLISECONDS_STR = " milliseconds";
public static final String SECONDS_STR = " seconds";

InterpreterLogic helper;
CqlSession session;
private JavaDriverConfig driverConfig = new JavaDriverConfig();
private static final Map<String, DriverOption> optionMap = new HashMap<>();

static {
for (DefaultDriverOption opt: DefaultDriverOption.values()) {
optionMap.put(opt.getPath(), opt);
}
}

public CassandraInterpreter(Properties properties) {
super(properties);
}

@Override
public void open() {

final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST).split(",");
final String[] addresses = getProperty(CASSANDRA_HOSTS, DEFAULT_HOST)
.trim().split(",");
final int port = parseInt(getProperty(CASSANDRA_PORT, DEFAULT_PORT));
Collection<InetSocketAddress> hosts = new ArrayList<>();
for (String address : addresses) {
if (InetAddresses.isInetAddress(address)) {
hosts.add(new InetSocketAddress(address, port));
} else {
// TODO(alex): maybe it won't be necessary in 4.4
hosts.add(InetSocketAddress.createUnresolved(address, port));
if (!StringUtils.isBlank(address)) {
logger.debug("Adding contact point: {}", address);
if (InetAddresses.isInetAddress(address)) {
hosts.add(new InetSocketAddress(address, port));
} else {
hosts.add(InetSocketAddress.createUnresolved(address, port));
}
}
}

LOGGER.info("Bootstrapping Cassandra Java Driver to connect to " +
getProperty(CASSANDRA_HOSTS) + "on port " + port);

// start generation of the config
ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();

driverConfig.setCompressionProtocol(this, configBuilder);
driverConfig.setPoolingOptions(this, configBuilder);
driverConfig.setProtocolVersion(this, configBuilder);
driverConfig.setQueryOptions(this, configBuilder);
driverConfig.setReconnectionPolicy(this, configBuilder);
driverConfig.setRetryPolicy(this, configBuilder);
driverConfig.setSocketOptions(this, configBuilder);
driverConfig.setSpeculativeExecutionPolicy(this, configBuilder);
LOGGER.info("Bootstrapping Cassandra Java Driver to connect to {} on port {}",
getProperty(CASSANDRA_HOSTS), port);

//
configBuilder.withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
DcInferringLoadBalancingPolicy.class);
configBuilder.withBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, false);

configBuilder.withInt(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
parseInt(getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS)));

DriverConfigLoader loader = configBuilder.endProfile().build();
// TODO(alex): think how to dump built configuration...
logger.debug(loader.toString());
// end generation of config
DriverConfigLoader loader = createLoader();

LOGGER.debug("Creating cluster builder");
CqlSessionBuilder clusterBuilder = CqlSession.builder()
.addContactPoints(hosts)
.withAuthCredentials(getProperty(CASSANDRA_CREDENTIALS_USERNAME),
getProperty(CASSANDRA_CREDENTIALS_PASSWORD))
.withApplicationName("")
.withApplicationName("Zeppelin")
.withApplicationVersion("");
if (!hosts.isEmpty()) {
LOGGER.debug("Adding contact points");
clusterBuilder.addContactPoints(hosts);
}

String username = getProperty(CASSANDRA_CREDENTIALS_USERNAME, NONE_VALUE).trim();
String password = getProperty(CASSANDRA_CREDENTIALS_PASSWORD, NONE_VALUE).trim();
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password) &&
!NONE_VALUE.equalsIgnoreCase(username) && !NONE_VALUE.equalsIgnoreCase(password)) {
LOGGER.debug("Adding credentials. Username = {}", username);
clusterBuilder.withAuthCredentials(username, password);
}

String keyspace = getProperty(CASSANDRA_KEYSPACE_NAME, DEFAULT_KEYSPACE);
if (StringUtils.isNotBlank(keyspace) && !DEFAULT_KEYSPACE.equalsIgnoreCase(keyspace)) {
LOGGER.debug("Set default keyspace");
clusterBuilder.withKeyspace(keyspace);
}

final String runWithSSL = getProperty(CASSANDRA_WITH_SSL);
if (runWithSSL != null && runWithSSL.equals("true")) {
LOGGER.debug("Cassandra Interpreter: Using SSL");

final String runWithSSL = getProperty(CASSANDRA_WITH_SSL, "false");
if ("true".equalsIgnoreCase(runWithSSL)) {
LOGGER.debug("Using SSL");
try {
final SSLContext sslContext;
{
Expand All @@ -219,19 +236,149 @@ public void open() {
}
clusterBuilder = clusterBuilder.withSslContext(sslContext);
} catch (Exception e) {
LOGGER.error(e.toString());
LOGGER.error("Exception initializing SSL {}", e.toString());
}
} else {
LOGGER.debug("Cassandra Interpreter: Not using SSL");
LOGGER.debug("Not using SSL");
}

LOGGER.debug("Creating CqlSession");
session = clusterBuilder.withConfigLoader(loader).build();
helper = new InterpreterLogic(session);
LOGGER.debug("Session configuration");
for (Map.Entry<String, Object> entry:
session.getContext().getConfig().getDefaultProfile().entrySet()) {
logger.debug("{} = {}", entry.getKey(), entry.getValue().toString());
}
LOGGER.debug("Creating helper");
helper = new InterpreterLogic(session, properties);
}

private DriverConfigLoader createLoader() {
logger.debug("Creating programmatic config loader");
// start generation of the config
ProgrammaticDriverConfigLoaderBuilder configBuilder = DriverConfigLoader.programmaticBuilder();

Map<DriverOption, String> allOptions = new HashMap<>();

// set options from main configuration
String ts = getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS,
CassandraInterpreter.DEFAULT_CONNECTION_TIMEOUT) + MILLISECONDS_STR;
allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, ts);
allOptions.put(DefaultDriverOption.REQUEST_TIMEOUT,
getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS,
CassandraInterpreter.DEFAULT_READ_TIMEOUT) + MILLISECONDS_STR);
addIfNotBlank(allOptions,
getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY, CassandraInterpreter.DEFAULT_TCP_NO_DELAY),
DefaultDriverOption.SOCKET_TCP_NODELAY);
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_KEEP_ALIVE),
DefaultDriverOption.SOCKET_KEEP_ALIVE);
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES),
DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE);
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES),
DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE);
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS),
DefaultDriverOption.SOCKET_REUSE_ADDRESS);
addIfNotBlank(allOptions, getProperty(CASSANDRA_SOCKET_SO_LINGER),
DefaultDriverOption.SOCKET_LINGER_INTERVAL);
addIfNotBlank(allOptions,
getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE),
DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE);
allOptions.put(DefaultDriverOption.REQUEST_CONSISTENCY,
getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY,
CassandraInterpreter.DEFAULT_CONSISTENCY));
allOptions.put(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY,
getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY,
CassandraInterpreter.DEFAULT_SERIAL_CONSISTENCY));
allOptions.put(DefaultDriverOption.REQUEST_PAGE_SIZE,
getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE,
CassandraInterpreter.DEFAULT_FETCH_SIZE));
ts = getProperty(CASSANDRA_PROTOCOL_VERSION, DEFAULT_PROTOCOL_VERSION);
if (!DEFAULT_VALUE.equalsIgnoreCase(ts)) {
// for compatibility with previous configurations
if (ts.equals("4") || ts.equals("3")) {
ts = "V" + ts;
}
allOptions.put(DefaultDriverOption.PROTOCOL_VERSION, ts);
}
addIfNotBlank(allOptions, getProperty(CASSANDRA_COMPRESSION_PROTOCOL,
CassandraInterpreter.DEFAULT_COMPRESSION).toLowerCase(),
DefaultDriverOption.PROTOCOL_COMPRESSION);
addIfNotBlankOrDefault(allOptions, getProperty(CASSANDRA_RETRY_POLICY, DEFAULT_POLICY),
DefaultDriverOption.RETRY_POLICY_CLASS);
addIfNotBlankOrDefault(allOptions,
getProperty(CASSANDRA_RECONNECTION_POLICY, DEFAULT_POLICY),
DefaultDriverOption.RECONNECTION_POLICY_CLASS);
addIfNotBlankOrDefault(allOptions,
getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY, DEFAULT_POLICY),
DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS);
allOptions.put(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE,
getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_LOCAL,
DEFAULT_CONNECTIONS_PER_HOST));
allOptions.put(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE,
getProperty(CASSANDRA_POOLING_CONNECTION_PER_HOST_REMOTE,
DEFAULT_CONNECTIONS_PER_HOST));
allOptions.put(DefaultDriverOption.CONNECTION_MAX_REQUESTS,
getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION,
DEFAULT_MAX_REQUEST_PER_CONNECTION));
allOptions.put(DefaultDriverOption.HEARTBEAT_INTERVAL,
getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS,
DEFAULT_HEARTBEAT_INTERVAL) + SECONDS_STR);
ts = getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS,
DEFAULT_POOL_TIMEOUT) + MILLISECONDS_STR;
allOptions.put(DefaultDriverOption.HEARTBEAT_TIMEOUT, ts);
allOptions.put(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, ts);
allOptions.put(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS,
"DcInferringLoadBalancingPolicy");
allOptions.put(DefaultDriverOption.RESOLVE_CONTACT_POINTS, "false");
allOptions.put(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT,
getProperty(CASSANDRA_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS,
DEFAULT_MAX_SCHEMA_AGREEMENT_WAIT_SECONDS) + SECONDS_STR);

// extract additional options that may override values set by main configuration
for (String pname: properties.stringPropertyNames()) {
if (pname.startsWith(DATASTAX_JAVA_DRIVER_PREFIX)) {
String pvalue = properties.getProperty(pname);
logger.info("Custom config values: {} = {}", pname, pvalue);
String shortName = pname.substring(DATASTAX_JAVA_DRIVER_PREFIX.length());
if (optionMap.containsKey(shortName)) {
allOptions.put(optionMap.get(shortName), pvalue);
} else {
logger.warn("Incorrect option name: {}", pname);
}
}
}

for (Map.Entry<DriverOption, String> entry: allOptions.entrySet()) {
configBuilder.withString(entry.getKey(), entry.getValue());
}

DriverConfigLoader loader = configBuilder.endProfile().build();
logger.debug("Config loader is created");

return loader;
}

private static void addIfNotBlank(Map<DriverOption, String> allOptions,
String value,
DefaultDriverOption option) {
if (!StringUtils.isBlank(value)) {
allOptions.put(option, value);
}
}

private static void addIfNotBlankOrDefault(Map<DriverOption, String> allOptions,
String value,
DefaultDriverOption option) {
if (!StringUtils.isBlank(value) && !DEFAULT_VALUE.equalsIgnoreCase(value)) {
allOptions.put(option, value);
}
}

@Override
public void close() {
session.close();
if (session != null)
session.close();
}

@Override
Expand Down
Loading

0 comments on commit 0e5f65a

Please sign in to comment.