Skip to content

Commit

Permalink
Merge pull request #221 from tigergraph/spark-conn-dev
Browse files Browse the repository at this point in the history
[Automated]Spark Connector Release 0.1.1
  • Loading branch information
chengjie-qin committed Apr 11, 2024
2 parents 48879dd + f819bb8 commit e7b479b
Show file tree
Hide file tree
Showing 29 changed files with 2,133 additions and 41 deletions.
45 changes: 41 additions & 4 deletions tools/etl/tg-spark-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,35 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<name>TigerGraph Spark Connector</name>
<description>TigerGraph Ecosystem - Spark Connector</description>
<url>https://github.com/tigergraph/ecosys/tree/master/tools/etl/tg-spark-connector</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<scm>
<connection>scm:git:git://github.com:tigergraph/ecosys.git</connection>
<developerConnection>scm:git:ssh://github.com:tigergraph/ecosys.git</developerConnection>
<url>https://github.com/tigergraph/ecosys/tree/master/tools/etl/tg-jdbc-driver</url>
</scm>

<developers>
<developer>
<name>Ping Xie</name>
<email>ping.xie@tigergraph.com</email>
<organization>TigerGraph Inc.</organization>
<organizationUrl>http://www.tigergraph.com</organizationUrl>
</developer>
</developers>

<groupId>com.tigergraph</groupId>
<artifactId>tigergraph-spark-connector</artifactId>
<version>0.1.0</version>

<name>tigergraph-spark-connector</name>
<version>0.1.1</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -44,20 +68,33 @@
<groupId>io.github.openfeign</groupId>
<artifactId>feign-hc5</artifactId>
</dependency>
<!-- Testing Frameworks -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.24.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock</artifactId>
<version>3.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<version>1.19</version>
<version>1.23</version>
<executions>
<execution>
<id>check-java-8-compatibility</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.tigergraph.spark.client.Builder;
import com.tigergraph.spark.client.Auth;
import com.tigergraph.spark.client.Misc;
import com.tigergraph.spark.client.Query;
import com.tigergraph.spark.client.Write;
import com.tigergraph.spark.client.Auth.AuthResponse;
import com.tigergraph.spark.client.common.RestppResponse;
import com.tigergraph.spark.client.common.RestppStreamDecoder;
import com.tigergraph.spark.util.Options;
import com.tigergraph.spark.util.Utils;
import feign.FeignException;
Expand Down Expand Up @@ -61,6 +63,9 @@ public class TigerGraphConnection implements Serializable {
static final String JOB_MACHINE = "all";
private String loadingJobId = null;
private transient Write write;
// Query variables
private transient Query query;
private static final int DEFAULT_QUERY_READ_TIMEOUT_MS = 1800000; // 30 min

/**
* Only be called in driver, serialized and sent to executors. <br>
Expand Down Expand Up @@ -206,7 +211,7 @@ public Misc getMisc() {
opts.getInt(Options.IO_RETRY_INTERVAL_MS),
opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS),
opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS))
.setRequestInterceptor(basicAuth, token, restAuthEnabled);
.setAuthInterceptor(basicAuth, token, restAuthEnabled);
if (url.trim().toLowerCase().startsWith("https://")) {
builder.setSSL(
opts.getString(Options.SSL_MODE),
Expand Down Expand Up @@ -247,7 +252,7 @@ public Write getWrite() {
opts.getInt(Options.LOADING_RETRY_INTERVAL_MS),
opts.getInt(Options.LOADING_MAX_RETRY_INTERVAL_MS),
opts.getInt(Options.LOADING_MAX_RETRY_ATTEMPTS))
.setRequestInterceptor(basicAuth, token, restAuthEnabled);
.setAuthInterceptor(basicAuth, token, restAuthEnabled);
if (url.trim().toLowerCase().startsWith("https://")) {
builder.setSSL(
opts.getString(Options.SSL_MODE),
Expand All @@ -260,6 +265,56 @@ public Write getWrite() {
return write;
}

/** Get query client (restpp built-in queries) */
public Query getQuery() {
if (!Options.OptionType.READ.equals(opts.getOptionType())) {
throw new UnsupportedOperationException(
"Can't build query client for OptionType " + opts.getOptionType());
}

if (!restAuthInited) {
initAuth();
}

if (query == null) {
int readTimeout =
Math.max(DEFAULT_QUERY_READ_TIMEOUT_MS, opts.getInt(Options.IO_READ_TIMEOUT_MS));
// The read timeout should be a bit longer(5 min) than the GSQL query timeout;
if (opts.containsOption(Options.QUERY_TIMEOUT_MS)) {
readTimeout = Math.max(readTimeout, opts.getInt(Options.QUERY_TIMEOUT_MS) + 300000);
}
Builder builder =
new Builder()
.setDecoder(new RestppStreamDecoder())
.setRequestOptions(opts.getInt(Options.IO_CONNECT_TIMEOUT_MS), readTimeout)
.setRetryer(
getAuth(),
basicAuth,
secret,
token,
opts.getInt(Options.IO_RETRY_INTERVAL_MS),
opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS),
opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS),
opts.getInt(Options.IO_RETRY_INTERVAL_MS),
opts.getInt(Options.IO_MAX_RETRY_INTERVAL_MS),
opts.getInt(Options.IO_MAX_RETRY_ATTEMPTS))
.setAuthInterceptor(basicAuth, token, restAuthEnabled)
.setRetryableCode(502, 503, 504)
.setQueryInterceptor(
opts.getInt(Options.QUERY_TIMEOUT_MS),
opts.getLong(Options.QUERY_MAX_RESPONSE_BYTES));
if (url.trim().toLowerCase().startsWith("https://")) {
builder.setSSL(
opts.getString(Options.SSL_MODE),
opts.getString(Options.SSL_TRUSTSTORE),
opts.getString(Options.SSL_TRUSTSTORE_TYPE),
opts.getString(Options.SSL_TRUSTSTORE_PASSWORD));
}
query = builder.build(Query.class, url);
}
return query;
}

/**
* Generate loading job id: <br>
* <graph_name>.<job_name>.file.all.<epoch_timestamp>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,26 @@
*/
package com.tigergraph.spark;

import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import com.tigergraph.spark.read.TigerGraphScanBuilder;
import com.tigergraph.spark.write.TigerGraphWriteBuilder;

/** The representation of logical structured data set of a TG, with supported capabilities. */
public class TigerGraphTable implements SupportsWrite {
public class TigerGraphTable implements SupportsWrite, SupportsRead {

private static final String TABLE_NAME = "TigerGraphTable";
private final StructType schema;
private final long creationTime = Instant.now().toEpochMilli();

TigerGraphTable(StructType schema) {
public TigerGraphTable(StructType schema) {
this.schema = schema;
}

Expand All @@ -49,6 +52,7 @@ public Set<TableCapability> capabilities() {
{
add(TableCapability.BATCH_WRITE);
add(TableCapability.STREAMING_WRITE);
add(TableCapability.BATCH_READ);
}
};
}
Expand All @@ -57,4 +61,9 @@ public Set<TableCapability> capabilities() {
public TigerGraphWriteBuilder newWriteBuilder(LogicalWriteInfo info) throws RuntimeException {
return new TigerGraphWriteBuilder(info, creationTime);
}

@Override
public TigerGraphScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new TigerGraphScanBuilder(options, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -32,10 +33,12 @@
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.spark.SparkFiles;
import org.slf4j.LoggerFactory;
import com.tigergraph.spark.client.common.RestppAuthInterceptor;
import com.tigergraph.spark.client.common.RestppDecoder;
import com.tigergraph.spark.client.common.RestppEncoder;
import com.tigergraph.spark.client.common.RestppErrorDecoder;
import com.tigergraph.spark.client.common.RestppQueryInterceptor;
import com.tigergraph.spark.client.common.RestppRetryer;
import com.tigergraph.spark.util.Options;
import com.tigergraph.spark.util.Utils;
Expand All @@ -48,6 +51,7 @@

/** Builder for all client, with custom client settings. */
public class Builder {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Builder.class);

private Feign.Builder builder = new Feign.Builder();
// default client settings
Expand All @@ -58,7 +62,8 @@ public class Builder {
private Decoder decoder = RestppDecoder.INSTANCE;
private ErrorDecoder errDecoder = new RestppErrorDecoder(RestppDecoder.INSTANCE);
private Retryer retryer = new Retryer.Default();
private RequestInterceptor reqInterceptor;
private RequestInterceptor authInterceptor;
private RequestInterceptor queryInterceptor;
private Request.Options reqOpts = new Request.Options();

public Builder setRequestOptions(int connectTimeoutMs, int readTimeoutMs) {
Expand All @@ -70,7 +75,13 @@ public Builder setRequestOptions(int connectTimeoutMs, int readTimeoutMs) {

/** Set response error decoder with the HTTP error codes that will be retried. */
public Builder setRetryableCode(Integer... code) {
this.errDecoder = new RestppErrorDecoder(decoder, code);
this.errDecoder = new RestppErrorDecoder(RestppDecoder.INSTANCE, code);
return this;
}

/** Set custom RESTPP response decoder. */
public Builder setDecoder(Decoder decoder) {
this.decoder = decoder;
return this;
}

Expand Down Expand Up @@ -116,8 +127,18 @@ public Builder setRetryerWithoutAuth(
}

/** Set request interceptor for adding authorization header */
public Builder setRequestInterceptor(String basicAuth, String token, boolean restAuthEnabled) {
this.reqInterceptor = new RestppAuthInterceptor(basicAuth, token, restAuthEnabled);
public Builder setAuthInterceptor(String basicAuth, String token, boolean restAuthEnabled) {
this.authInterceptor = new RestppAuthInterceptor(basicAuth, token, restAuthEnabled);
return this;
}

/** Set request interceptor for adding GSQL query headers */
public Builder setQueryInterceptor(Integer queryTimeoutMs, Long queryMaxRespByte) {
logger.debug(
"Query timeout: {}ms, query response size limit: {}bytes. (default value: 0)",
queryTimeoutMs,
queryMaxRespByte);
this.queryInterceptor = new RestppQueryInterceptor(queryTimeoutMs, queryMaxRespByte);
return this;
}

Expand Down Expand Up @@ -170,9 +191,14 @@ public <T> T build(Class<T> apiType, String url) {
.options(reqOpts)
.client(
new ApacheHttp5Client(hc5builder.setConnectionManager(connMgrBuilder.build()).build()));
if (reqInterceptor != null) {
builder.requestInterceptor(reqInterceptor);
}
List<RequestInterceptor> interceptorChain = new ArrayList<>();
if (authInterceptor != null) interceptorChain.add(authInterceptor);
if (queryInterceptor != null) interceptorChain.add(queryInterceptor);
builder.requestInterceptors(interceptorChain);

// Required to fetch the iterator after the response is processed, need to be close
if (Query.class.equals(apiType)) builder.doNotCloseAfterDecode();

return builder.target(new LoadBalanceTarget<T>(apiType, url));
}

Expand Down
Loading

0 comments on commit e7b479b

Please sign in to comment.