Skip to content

Commit

Permalink
[BEAM-14081] [CdapIO] Add context classes for CDAP plugins (apache#17104
Browse files Browse the repository at this point in the history
)

* [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs

* [BEAM-14048] Fix checkstyle

* [BEAM-14048] Fix warnings

* [BEAM-14048] Fix warnings

* [BEAM-14048] Fix warning

* [BEAM-14048] Fix warning

* [BEAM-14048] Remove unused dependencies

* [BEAM-14048] Add needed dependencies

* [BEAM-14048] Fix spotless

* [BEAM-14048] Fix typo

* [BEAM-14048] Use fori instead of stream

* [BEAM-14048] Suppress warning

* [BEAM-14048] Add used undeclared artifacts

* [BEAM-14048] Change dependencies to test

* Add context.

* Fix dependencies issue

* Add null annotation

* [BEAM-14048] Refactoring

* Add SuppressWarning.

* Fix style.

* Determine dependencies.

* [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects

* [BEAM-14048] Suppress warning

* [BEAM-14081] Refactoring

* Update maven repo

* Update build.gradle

* [BEAM-14081] Refactoring

* [BEAM-14048] Use ServiceNow CDAP dependency from Maven central

* [BEAM-14048] Set macroFields

* [BEAM-14081] Fix javadoc

* [BEAM-14081] Make BatchContextImpl class abstract

Co-authored-by: vitaly.terentyev <vitaly.terentyev@akvelon.com>
Co-authored-by: Alex Kosolapov <alex.kosolapov@gmail.com>
Co-authored-by: Elizaveta Lomteva <elizaveta.lomteva@akvelon.com>
Co-authored-by: Elizaveta Lomteva <57974525+Lizzfox@users.noreply.github.com>
  • Loading branch information
5 people committed May 11, 2022
1 parent 43cc865 commit f30e4e5
Show file tree
Hide file tree
Showing 10 changed files with 585 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,10 @@ class BeamModulePlugin implements Plugin<Project> {
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
cdap_api : "io.cdap.cdap:cdap-api:$cdap_version",
cdap_api_commons : "io.cdap.cdap:cdap-api-common:$cdap_version",
cdap_common : "io.cdap.cdap:cdap-common:$cdap_version",
cdap_etl_api : "io.cdap.cdap:cdap-etl-api:$cdap_version",
cdap_etl_api_spark : "io.cdap.cdap:cdap-etl-api-spark:$cdap_version",
cdap_plugin_service_now : "io.cdap.plugin:servicenow-plugins:1.1.0",
checker_qual : "org.checkerframework:checker-qual:$checkerframework_version",
classgraph : "io.github.classgraph:classgraph:$classgraph_version",
Expand Down Expand Up @@ -693,6 +695,7 @@ class BeamModulePlugin implements Plugin<Project> {
spark3_sql : "org.apache.spark:spark-sql_2.12:$spark3_version",
spark3_streaming : "org.apache.spark:spark-streaming_2.12:$spark3_version",
stax2_api : "org.codehaus.woodstox:stax2-api:4.2.1",
tephra : "org.apache.tephra:tephra-api:0.15.0-incubating",
testcontainers_base : "org.testcontainers:testcontainers:$testcontainers_version",
testcontainers_clickhouse : "org.testcontainers:clickhouse:$testcontainers_version",
testcontainers_elasticsearch : "org.testcontainers:elasticsearch:$testcontainers_version",
Expand Down
11 changes: 9 additions & 2 deletions sdks/java/io/cdap/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ interface for integration with CDAP plugins."""
*/

dependencies {
implementation library.java.guava
implementation library.java.cdap_api
implementation library.java.cdap_common
implementation library.java.cdap_api_commons
implementation (library.java.cdap_common) {
exclude module: "log4j-over-slf4j"
}
implementation library.java.cdap_etl_api
implementation library.java.cdap_etl_api_spark
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.guava
implementation library.java.slf4j_api
implementation library.java.tephra
implementation project(path: ":sdks:java:core", configuration: "shadow")
testImplementation library.java.cdap_plugin_service_now
testImplementation library.java.cdap_etl_api
testImplementation library.java.vendored_guava_26_0_jre
testImplementation library.java.junit
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap.context;

import io.cdap.cdap.api.data.DatasetInstantiationException;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.Dataset;
import io.cdap.cdap.api.dataset.DatasetManagementException;
import io.cdap.cdap.api.dataset.DatasetProperties;
import io.cdap.cdap.api.metadata.Metadata;
import io.cdap.cdap.api.metadata.MetadataEntity;
import io.cdap.cdap.api.metadata.MetadataException;
import io.cdap.cdap.api.metadata.MetadataScope;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.Lookup;
import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.SubmitterLifecycle;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.lineage.field.FieldOperation;
import java.net.URL;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/** Class for Batch, Sink and Stream CDAP wrapper classes that use it to provide common details. */
@SuppressWarnings({"TypeParameterUnusedInFormals", "nullness"})
public abstract class BatchContextImpl implements BatchContext {

private final FailureCollectorWrapper failureCollector = new FailureCollectorWrapper();

/**
* This should be set after {@link SubmitterLifecycle#prepareRun(Object)} call with passing this
* context object as a param.
*/
protected InputFormatProvider inputFormatProvider;

private final Timestamp startTime = new Timestamp(System.currentTimeMillis());

public InputFormatProvider getInputFormatProvider() {
return inputFormatProvider;
}

@Override
public String getStageName() {
return null;
}

@Override
public String getNamespace() {
return null;
}

@Override
public String getPipelineName() {
return null;
}

@Override
public long getLogicalStartTime() {
return this.startTime.getTime();
}

@Override
public StageMetrics getMetrics() {
return null;
}

@Override
public PluginProperties getPluginProperties() {
return null;
}

@Override
public PluginProperties getPluginProperties(String pluginId) {
return null;
}

@Override
public <T> Class<T> loadPluginClass(String pluginId) {
return null;
}

@Override
public <T> T newPluginInstance(String pluginId) throws InstantiationException {
return null;
}

@Nullable
@Override
public Schema getInputSchema() {
return null;
}

@Override
public @Nullable Map<String, Schema> getInputSchemas() {
return null;
}

@Override
public @Nullable Schema getOutputSchema() {
return null;
}

@Override
public Map<String, Schema> getOutputPortSchemas() {
return null;
}

@Override
public void createDataset(String datasetName, String typeName, DatasetProperties properties)
throws DatasetManagementException {}

@Override
public boolean datasetExists(String datasetName) throws DatasetManagementException {
return false;
}

@Override
public SettableArguments getArguments() {
return null;
}

@Override
public FailureCollector getFailureCollector() {
return this.failureCollector;
}

@Nullable
@Override
public URL getServiceURL(String applicationId, String serviceId) {
return null;
}

@Nullable
@Override
public URL getServiceURL(String serviceId) {
return null;
}

@Override
public Map<MetadataScope, Metadata> getMetadata(MetadataEntity metadataEntity)
throws MetadataException {
return null;
}

@Override
public Metadata getMetadata(MetadataScope scope, MetadataEntity metadataEntity)
throws MetadataException {
return null;
}

@Override
public void addProperties(MetadataEntity metadataEntity, Map<String, String> properties) {}

@Override
public void addTags(MetadataEntity metadataEntity, String... tags) {}

@Override
public void addTags(MetadataEntity metadataEntity, Iterable<String> tags) {}

@Override
public void removeMetadata(MetadataEntity metadataEntity) {}

@Override
public void removeProperties(MetadataEntity metadataEntity) {}

@Override
public void removeProperties(MetadataEntity metadataEntity, String... keys) {}

@Override
public void removeTags(MetadataEntity metadataEntity) {}

@Override
public void removeTags(MetadataEntity metadataEntity, String... tags) {}

@Override
public void record(List<FieldOperation> fieldOperations) {}

@Override
public <T extends Dataset> T getDataset(String name) throws DatasetInstantiationException {
return null;
}

@Override
public <T extends Dataset> T getDataset(String namespace, String name)
throws DatasetInstantiationException {
return null;
}

@Override
public <T extends Dataset> T getDataset(String name, Map<String, String> arguments)
throws DatasetInstantiationException {
return null;
}

@Override
public <T extends Dataset> T getDataset(
String namespace, String name, Map<String, String> arguments)
throws DatasetInstantiationException {
return null;
}

@Override
public void releaseDataset(Dataset dataset) {}

@Override
public void discardDataset(Dataset dataset) {}

@Override
public <T> Lookup<T> provide(String table, Map<String, String> arguments) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap.context;

import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;

/** Class for creating context object of different CDAP classes with batch sink type. */
public class BatchSinkContextImpl extends BatchContextImpl implements BatchSinkContext {

@Override
public void addOutput(Output output) {}

@Override
public boolean isPreviewEnabled() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap.context;

import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;

/** Class for creating context object of different CDAP classes with batch source type. */
public class BatchSourceContextImpl extends BatchContextImpl implements BatchSourceContext {

@Override
public void setInput(Input input) {
this.inputFormatProvider = ((Input.InputFormatProviderInput) input).getInputFormatProvider();
}

@Override
public boolean isPreviewEnabled() {
return false;
}

@Override
public int getMaxPreviewRecords() {
return 0;
}
}
Loading

0 comments on commit f30e4e5

Please sign in to comment.