Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
Enable user to specify write options, hinting about issues with neste…
Browse files Browse the repository at this point in the history
…d records (#59)

* Add tmpWriteOptions to saveAsBigQueryTable

When writing nested fields to BigQuery one has to specify a namespace
prefix for Avro - otherwise nested fields will have a namespace starting
with a dot which the BigQuery Avro read will not accept.

* Add hint to provide recordNamespace in README.md

To load schemas with nested fields, one has to specify an Avro
Namespace.

* Give a hint to BigQuery docs about Avro in README
  • Loading branch information
jobegrabber authored and nevillelyh committed Apr 17, 2018
1 parent a0ecb3f commit 7f65455
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,21 @@ val table = sqlContext.bigQueryTable("bigquery-public-data:samples.shakespeare")
val df = sqlContext.bigQuerySelect(
"SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]")

// Save data to a table
// Save data to a table
df.saveAsBigQueryTable("my-project:my_dataset.my_table")
```

If you'd like to write nested records to BigQuery, be sure to specify an Avro Namespace.
BigQuery is unable to load Avro Namespaces with a leading dot (`.nestedColumn`) on nested records.

```scala
// BigQuery is able to load fields with namespace 'myNamespace.nestedColumn'
df.saveAsBigQueryTable("my-project:my_dataset.my_table", tmpWriteOptions = Map("recordNamespace" -> "myNamespace"))
```
See also
[Loading Avro Data from Google Cloud Storage](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro)
for data type mappings and limitations. For example loading arrays of arrays is not supported.

# License

Copyright 2016 Spotify AB.
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/com/spotify/spark/bigquery/BigQueryDataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@ class BigQueryDataFrame(df: DataFrame) {
*/
def saveAsBigQueryTable(tableRef: TableReference,
writeDisposition: WriteDisposition.Value,
createDisposition: CreateDisposition.Value): Unit = {
createDisposition: CreateDisposition.Value,
tmpWriteOptions: Map[String, String]): Unit = {
val bucket = conf.get(BigQueryConfiguration.GCS_BUCKET_KEY)
val temp = s"spark-bigquery-${System.currentTimeMillis()}=${Random.nextInt(Int.MaxValue)}"
val gcsPath = s"gs://$bucket/hadoop/tmp/spark-bigquery/$temp"
df.write.avro(gcsPath)

tmpWriteOptions match {
case null => df.write.avro(gcsPath)
case _ => df.write.options(tmpWriteOptions).avro(gcsPath)
}

val fdf = bq.load(gcsPath, tableRef, writeDisposition, createDisposition)
delete(new Path(gcsPath))
Expand All @@ -63,11 +68,13 @@ class BigQueryDataFrame(df: DataFrame) {
*/
def saveAsBigQueryTable(tableSpec: String,
writeDisposition: WriteDisposition.Value = null,
createDisposition: CreateDisposition.Value = null): Unit =
createDisposition: CreateDisposition.Value = null,
tmpWriteOptions: Map[String,String] = null): Unit =
saveAsBigQueryTable(
BigQueryStrings.parseTableReference(tableSpec),
writeDisposition,
createDisposition)
createDisposition,
tmpWriteOptions)

private def delete(path: Path): Unit = {
val fs = FileSystem.get(path.toUri, conf)
Expand Down

0 comments on commit 7f65455

Please sign in to comment.