Skip to content

Commit

Permalink
[bq] 0.2 introduce BigQuery interactive reader
Browse files Browse the repository at this point in the history
  • Loading branch information
dgray16 committed Jan 17, 2023
1 parent 038f973 commit 9c02e11
Show file tree
Hide file tree
Showing 24 changed files with 791 additions and 101 deletions.
8 changes: 4 additions & 4 deletions spring-batch-bigquery/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2002-2022 the original author or authors.
~ Copyright 2002-2023 the original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.19.1</version>
<version>2.20.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
Expand All @@ -88,13 +88,13 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.1</version>
<version>5.9.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.9.0</version>
<version>5.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
@@ -0,0 +1,111 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.reader;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

import java.util.Iterator;
import java.util.Objects;

/**
* BigQuery {@link ItemReader} that accepts simple query as the input.
* <p>
* Internally BigQuery Java library creates a {@link com.google.cloud.bigquery.JobConfiguration.Type#QUERY} job.
* Which means that result is coming asynchronously.
* <p>
* Also, worth mentioning that you should take into account concurrency limits.
* <p>
* Results of this query by default are stored in a shape of temporary table.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
* @see <a href="https://cloud.google.com/bigquery/docs/running-queries#queries">Interactive queries</a>
* @see <a href="https://cloud.google.com/bigquery/quotas#concurrent_rate_interactive_queries">Concurrency limits</a>
*/
public class BigQueryInteractiveQueryItemReader<T> implements ItemReader<T>, InitializingBean {

private final Log logger = LogFactory.getLog(getClass());

private BigQuery bigQuery;
private Converter<FieldValueList, T> rowMapper;
private QueryJobConfiguration jobConfiguration;
private Iterator<FieldValueList> iterator;

/**
* BigQuery service, responsible for API calls.
*
* @param bigQuery BigQuery service
*/
public void setBigQuery(BigQuery bigQuery) {
this.bigQuery = bigQuery;
}

/**
* Row mapper which transforms single BigQuery row into desired type.
*
* @param rowMapper your row mapper
*/
public void setRowMapper(Converter<FieldValueList, T> rowMapper) {
this.rowMapper = rowMapper;
}

/**
* Specifies query to run, destination table, etc.
*
* @param jobConfiguration BigQuery job configuration
*/
public void setJobConfiguration(QueryJobConfiguration jobConfiguration) {
this.jobConfiguration = jobConfiguration;
}

@Override
public T read() throws Exception {
if (Objects.isNull(iterator)) {
doOpen();
}

if (logger.isDebugEnabled()) {
logger.debug("Reading next element");
}

return iterator.hasNext() ? rowMapper.convert(iterator.next()) : null;
}

private void doOpen() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Executing query");
}
iterator = bigQuery.query(jobConfiguration).getValues().iterator();
}

@Override
public void afterPropertiesSet() {
Assert.notNull(this.bigQuery, "BigQuery service must be provided");
Assert.notNull(this.rowMapper, "Row mapper must be provided");
Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
}

}
@@ -0,0 +1,116 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.reader.builder;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.extensions.bigquery.reader.BigQueryInteractiveQueryItemReader;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

import java.util.Objects;

/**
* A builder for {@link BigQueryInteractiveQueryItemReader}.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
* @see <a href="https://github.com/spring-projects/spring-batch-extensions/tree/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryInteractiveQueryItemReaderBuilderTests.java">Examples</a>
*/
public class BigQueryInteractiveQueryItemReaderBuilder<T> {

private BigQuery bigQuery;
private String query;
private Converter<FieldValueList, T> rowMapper;
private QueryJobConfiguration jobConfiguration;

/**
* BigQuery service, responsible for API calls.
*
* @param bigQuery BigQuery service
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
* @see BigQueryInteractiveQueryItemReader#setBigQuery(BigQuery)
*/
public BigQueryInteractiveQueryItemReaderBuilder<T> bigQuery(BigQuery bigQuery) {
this.bigQuery = bigQuery;
return this;
}

/**
* Schema of the query: {@code SELECT <column> FROM <dataset>.<table>}.
* <p>
* It is really recommended to use {@code LIMIT n}
* because BigQuery charges you for the amount of data that is being processed.
*
* @param query your query to run
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
* @see BigQueryInteractiveQueryItemReader#setJobConfiguration(QueryJobConfiguration)
*/
public BigQueryInteractiveQueryItemReaderBuilder<T> query(String query) {
this.query = query;
return this;
}

/**
* Row mapper which transforms single BigQuery row into desired type.
*
* @param rowMapper your row mapper
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
* @see BigQueryInteractiveQueryItemReader#setRowMapper(Converter)
*/
public BigQueryInteractiveQueryItemReaderBuilder<T> rowMapper(Converter<FieldValueList, T> rowMapper) {
this.rowMapper = rowMapper;
return this;
}

/**
* Specifies query to run, destination table, etc.
*
* @param jobConfiguration BigQuery job configuration
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
* @see BigQueryInteractiveQueryItemReader#setJobConfiguration(QueryJobConfiguration)
*/
public BigQueryInteractiveQueryItemReaderBuilder<T> jobConfiguration(QueryJobConfiguration jobConfiguration) {
this.jobConfiguration = jobConfiguration;
return this;
}

/**
* Please do not forget about {@link BigQueryInteractiveQueryItemReader#afterPropertiesSet()}.
*
* @return {@link BigQueryInteractiveQueryItemReader}
*/
public BigQueryInteractiveQueryItemReader<T> build() {
BigQueryInteractiveQueryItemReader<T> reader = new BigQueryInteractiveQueryItemReader<>();

reader.setBigQuery(this.bigQuery);
reader.setRowMapper(this.rowMapper);

if (Objects.nonNull(this.jobConfiguration)) {
reader.setJobConfiguration(this.jobConfiguration);
} else {
Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided");
reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
}

return reader;
}

}

0 comments on commit 9c02e11

Please sign in to comment.