Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor bigquery client #1439

Merged
merged 9 commits into from Oct 12, 2018
@@ -23,102 +23,104 @@ import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema}
import com.google.cloud.storage.Storage.BlobListOption
import com.google.cloud.storage.{Blob, StorageOptions}
import com.spotify.scio.bigquery.client.BigQuery
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
import org.scalatest.{FlatSpec, Matchers}

import scala.collection.JavaConverters._
import scala.util.Success

class BigQueryClientIT extends FlatSpec with Matchers {

val bq = BigQueryClient.defaultInstance()
val bq = BigQuery.defaultInstance()

val legacyQuery =
"SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare] LIMIT 10"
val sqlQuery =
"SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` LIMIT 10"

"extractLocation" should "work with legacy syntax" in {
"QueryService.extractLocation" should "work with legacy syntax" in {
val query = "SELECT word FROM [data-integration-test:samples_%s.shakespeare]"
bq.extractLocation(query.format("eu")) shouldBe Some("EU")
bq.query.extractLocation(query.format("eu")) shouldBe Some("EU")
}

it should "work with SQL syntax" in {
val query = "SELECT word FROM `data-integration-test.samples_%s.shakespeare`"
bq.extractLocation(query.format("eu")) shouldBe Some("EU")
bq.query.extractLocation(query.format("eu")) shouldBe Some("EU")
}

it should "support missing source tables" in {
bq.extractLocation("SELECT 6") shouldBe None
bq.query.extractLocation("SELECT 6") shouldBe None
}

"extractTables" should "work with legacy syntax" in {
"QueryService.extractTables" should "work with legacy syntax" in {
val tableSpec = BigQueryHelpers.parseTableSpec("bigquery-public-data:samples.shakespeare")
bq.extractTables(legacyQuery) shouldBe Set(tableSpec)
bq.query.extractTables(legacyQuery) shouldBe Set(tableSpec)
}

it should "work with SQL syntax" in {
val tableSpec = BigQueryHelpers.parseTableSpec("bigquery-public-data:samples.shakespeare")
bq.extractTables(sqlQuery) shouldBe Set(tableSpec)
bq.query.extractTables(sqlQuery) shouldBe Set(tableSpec)
}

"getQuerySchema" should "work with legacy syntax" in {
"QueryService.getSchema" should "work with legacy syntax" in {
val expected = new TableSchema().setFields(List(
new TableFieldSchema().setName("word").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("word_count").setType("INTEGER").setMode("REQUIRED")
).asJava)
bq.getQuerySchema(legacyQuery) shouldBe expected
bq.query.schema(legacyQuery) shouldBe expected
}

it should "work with SQL syntax" in {
val expected = new TableSchema().setFields(List(
new TableFieldSchema().setName("word").setType("STRING").setMode("NULLABLE"),
new TableFieldSchema().setName("word_count").setType("INTEGER").setMode("NULLABLE")
).asJava)
bq.getQuerySchema(sqlQuery) shouldBe expected
bq.query.schema(sqlQuery) shouldBe expected
}

// scalastyle:off no.whitespace.before.left.bracket
it should "fail invalid legacy syntax" in {
(the [GoogleJsonResponseException] thrownBy {
bq.getQuerySchema("SELECT word, count FROM [bigquery-public-data:samples.shakespeare]")
bq.query.schema("SELECT word, count FROM [bigquery-public-data:samples.shakespeare]")
}).getDetails.getCode shouldBe 400
}

it should "fail invalid SQL syntax" in {
(the [GoogleJsonResponseException] thrownBy {
bq.getQuerySchema("SELECT word, count FROM `bigquery-public-data.samples.shakespeare`")
bq.query.schema("SELECT word, count FROM `bigquery-public-data.samples.shakespeare`")
}).getDetails.getCode shouldBe 400
}
// scalastyle:on no.whitespace.before.left.bracket

"getQueryRows" should "work with legacy syntax" in {
val rows = bq.getQueryRows(legacyQuery).toList
"QueryService.getRows" should "work with legacy syntax" in {
val rows = bq.query.rows(legacyQuery).toList
rows.size shouldBe 10
all(rows.map(_.keySet().asScala)) shouldBe Set("word", "word_count")
}

it should "work with SQL syntax" in {
val rows = bq.getQueryRows(sqlQuery).toList
val rows = bq.query.rows(sqlQuery).toList
rows.size shouldBe 10
all(rows.map(_.keySet().asScala)) shouldBe Set("word", "word_count")
}

"getTableSchema" should "work" in {
val schema = bq.getTableSchema("bigquery-public-data:samples.shakespeare")
"TableService.getTableSchema" should "work" in {
val schema = bq.tables.schema("bigquery-public-data:samples.shakespeare")
val fields = schema.getFields.asScala
fields.size shouldBe 4
fields.map(_.getName) shouldBe Seq("word", "word_count", "corpus", "corpus_date")
fields.map(_.getType) shouldBe Seq("STRING", "INTEGER", "STRING", "INTEGER")
fields.map(_.getMode) shouldBe Seq("REQUIRED", "REQUIRED", "REQUIRED", "REQUIRED")
}

"getTableRows" should "work" in {
val rows = bq.getTableRows("bigquery-public-data:samples.shakespeare").take(10).toList
"TableService.getRows" should "work" in {
val rows = bq.tables.rows("bigquery-public-data:samples.shakespeare").take(10).toList
val columns = Set("word", "word_count", "corpus", "corpus_date")
all(rows.map(_.keySet().asScala)) shouldBe columns
}

"loadTableFromCsv" should "work" in {
"Load.csv" should "work" in {
val schema = BigQueryUtil.parseSchema(
"""
|{
@@ -131,15 +133,15 @@ class BigQueryClientIT extends FlatSpec with Matchers {
|}
""".stripMargin)
val sources = List("gs://data-integration-test-eu/shakespeare-sample-10.csv")
val table = bq.temporaryTable(location = "EU")
val tableRef = bq.loadTableFromCsv(sources, table.asTableSpec, skipLeadingRows = 1,
val table = bq.tables.createTemporary(location = "EU")
val tableRef = bq.load.csv(sources, table.asTableSpec, skipLeadingRows = 1,
schema = Some(schema))
val createdTable = bq.getTable(tableRef)
createdTable.getNumRows.intValue() shouldBe 10
bq.deleteTable(tableRef)
val createdTable = tableRef.map(bq.tables.table)
createdTable.map(_.getNumRows.intValue()) shouldBe Success(10)
tableRef.map(bq.tables.delete)
}

"loadTableFromJson" should "work" in {
"Load.json" should "work" in {
val schema = BigQueryUtil.parseSchema(
"""
|{
@@ -152,54 +154,54 @@ class BigQueryClientIT extends FlatSpec with Matchers {
|}
""".stripMargin)
val sources = List("gs://data-integration-test-eu/shakespeare-sample-10.json")
val table = bq.temporaryTable(location = "EU")
val tableRef = bq.loadTableFromJson(sources, table.asTableSpec, schema = Some(schema))
val createdTable = bq.getTable(tableRef)
createdTable.getNumRows.intValue() shouldBe 10
bq.deleteTable(tableRef)
val table = bq.tables.createTemporary(location = "EU")
val tableRef = bq.load.json(sources, table.asTableSpec, schema = Some(schema))
val createdTable = tableRef.map(bq.tables.table)
createdTable.map(_.getNumRows.intValue()) shouldBe Success(10)
tableRef.map(bq.tables.delete)
}

"loadTableFromAvro" should "work" in {
"Load.avro" should "work" in {
val sources = List("gs://data-integration-test-eu/shakespeare-sample-10.avro")
val table = bq.temporaryTable(location = "EU")
val tableRef = bq.loadTableFromAvro(sources, table.asTableSpec)
val createdTable = bq.getTable(tableRef)
createdTable.getNumRows.intValue() shouldBe 10
bq.deleteTable(tableRef)
val table = bq.tables.createTemporary(location = "EU")
val tableRef = bq.load.avro(sources, table.asTableSpec)
val createdTable = tableRef.map(bq.tables.table)
createdTable.map(_.getNumRows.intValue()) shouldBe Success(10)
tableRef.map(bq.tables.delete)
}

"exportTableAsCsv" should "work" in {
"extract.asCsv" should "work" in {
val sourceTable = "bigquery-public-data:samples.shakespeare"
val (bucket, prefix) = ("data-integration-test-eu", s"extract/csv/${UUID.randomUUID}")
GcsUtils.exists(bucket, prefix) shouldBe false
val destination = List(
s"gs://$bucket/$prefix"
)
bq.exportTableAsCsv(sourceTable, destination)
bq.extract.asCsv(sourceTable, destination)
GcsUtils.exists(bucket, prefix) shouldBe true
GcsUtils.remove(bucket, prefix)
}

"exportTableAsJson" should "work" in {
"extract.asJson" should "work" in {
val sourceTable = "bigquery-public-data:samples.shakespeare"
val (bucket, prefix) = ("data-integration-test-eu", s"extract/json/${UUID.randomUUID}")
GcsUtils.exists(bucket, prefix) shouldBe false
val destination = List(
s"gs://$bucket/$prefix"
)
bq.exportTableAsJson(sourceTable, destination)
bq.extract.asJson(sourceTable, destination)
GcsUtils.exists(bucket, prefix) shouldBe true
GcsUtils.remove(bucket, prefix)
}

"exportTableAsAvro" should "work" in {
"extract.asAvro" should "work" in {
val sourceTable = "bigquery-public-data:samples.shakespeare"
val (bucket, prefix) = ("data-integration-test-eu", s"extract/avro/${UUID.randomUUID}")
GcsUtils.exists(bucket, prefix) shouldBe false
val destination = List(
s"gs://$bucket/$prefix"
)
bq.exportTableAsAvro(sourceTable, destination)
bq.extract.asAvro(sourceTable, destination)
GcsUtils.exists(bucket, prefix) shouldBe true
GcsUtils.remove(bucket, prefix)
}
@@ -17,7 +17,7 @@

package com.spotify.scio.bigquery.types

import com.spotify.scio.bigquery.BigQueryClient
import com.spotify.scio.bigquery.client.BigQuery
import org.scalatest.{Assertion, FlatSpec, Matchers}

import scala.annotation.StaticAnnotation
@@ -73,7 +73,7 @@ object BigQueryTypeIT {

// run this to re-populate tables used for this test and BigQueryPartitionUtilIT
def main(args: Array[String]): Unit = {
val bq = BigQueryClient.defaultInstance()
val bq = BigQuery.defaultInstance()
val data = List(ToTableT("a", 1), ToTableT("b", 2))
bq.writeTypedRows("data-integration-test:partition_a.table_20170101", data)
bq.writeTypedRows("data-integration-test:partition_a.table_20170102", data)
@@ -88,7 +88,7 @@ class BigQueryTypeIT extends FlatSpec with Matchers {

import BigQueryTypeIT._

val bq = BigQueryClient.defaultInstance()
val bq = BigQuery.defaultInstance()

val legacyQuery =
"SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare] WHERE word = 'Romeo'"
@@ -127,15 +127,15 @@ class BigQueryTypeIT extends FlatSpec with Matchers {

it should "round trip rows with legacy syntax" in {
val bqt = BigQueryType[LegacyT]
val rows = bq.getQueryRows(legacyQuery).toList
val rows = bq.query.rows(legacyQuery).toList
val typed = Seq(LegacyT("Romeo", 117L))
rows.map(bqt.fromTableRow) shouldBe typed
typed.map(bqt.toTableRow).map(bqt.fromTableRow) shouldBe typed
}

it should "round trip rows with SQL syntax" in {
val bqt = BigQueryType[SqlT]
val rows = bq.getQueryRows(sqlQuery).toList
val rows = bq.query.rows(sqlQuery).toList
val typed = Seq(SqlT(Some("Romeo"), Some(117L)))
rows.map(bqt.fromTableRow) shouldBe typed
typed.map(bqt.toTableRow).map(bqt.fromTableRow) shouldBe typed
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.