Skip to content

Commit

Permalink
allow flattenResults override in BigQueryClient
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh committed Apr 19, 2016
1 parent 1d9b44d commit 23f31a6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ class BigQueryClient private (private val projectId: String,
}

/** Get rows from a query. */
def getQueryRows(sqlQuery: String): Iterator[TableRow] = {
val queryJob = queryIntoTable(sqlQuery)
def getQueryRows(sqlQuery: String, flattenResults: Boolean = false): Iterator[TableRow] = {
val queryJob = queryIntoTable(sqlQuery, flattenResults)
queryJob.waitForResult()
getTableRows(queryJob.table)
}
Expand Down Expand Up @@ -198,7 +198,7 @@ class BigQueryClient private (private val projectId: String,
}

/** Execute a query and save results into a temporary table. */
def queryIntoTable(sqlQuery: String): QueryJob = {
def queryIntoTable(sqlQuery: String, flattenResults: Boolean): QueryJob = {
try {
val sourceTimes =
BigQueryUtil.extractTables(sqlQuery).map(t => BigInt(getTable(t).getLastModifiedTime))
Expand All @@ -218,15 +218,15 @@ class BigQueryClient private (private val projectId: String,
logger.info(s"Cache invalid for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
makeQueryJob(sqlQuery, temp)
makeQueryJob(sqlQuery, temp, flattenResults)
}
} catch {
case NonFatal(_) =>
val temp = temporaryTable(TABLE_PREFIX)
logger.info(s"Cache miss for query: $sqlQuery")
logger.info(s"New destination table: ${BigQueryIO.toTableSpec(temp)}")
setCacheDestinationTable(sqlQuery, temp)
makeQueryJob(sqlQuery, temp)
makeQueryJob(sqlQuery, temp, flattenResults)
}
}

Expand Down Expand Up @@ -314,15 +314,16 @@ class BigQueryClient private (private val projectId: String,
}

private def makeQueryJob(sqlQuery: String,
destinationTable: TableReference): QueryJob = new QueryJob {
destinationTable: TableReference,
flattenResults: Boolean): QueryJob = new QueryJob {
override def waitForResult(): Unit = self.waitForJobs(this)
override lazy val jobReference: Option[JobReference] = {
prepareStagingDataset()
logger.info(s"Executing query: $sqlQuery")
val queryConfig: JobConfigurationQuery = new JobConfigurationQuery()
.setQuery(sqlQuery)
.setAllowLargeResults(true)
.setFlattenResults(false)
.setFlattenResults(flattenResults)
.setPriority(PRIORITY)
.setCreateDisposition("CREATE_IF_NEEDED")
.setWriteDisposition("WRITE_EMPTY")
Expand Down
5 changes: 3 additions & 2 deletions scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ class ScioContext private[scio] (val options: DataflowPipelineOptions,
* Get an SCollection for a BigQuery SELECT query.
* @group input
*/
def bigQuerySelect(sqlQuery: String): SCollection[TableRow] = pipelineOp {
def bigQuerySelect(sqlQuery: String,
flattenResults: Boolean = false): SCollection[TableRow] = pipelineOp {
if (this.isTest) {
this.getTestInput(BigQueryIO(sqlQuery))
} else {
val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery)
val queryJob = this.bigQueryClient.queryIntoTable(sqlQuery, flattenResults)
_queryJobs.append(queryJob)
wrap(this.applyInternal(GBigQueryIO.Read.from(queryJob.table).withoutValidation()))
.setName(sqlQuery)
Expand Down
11 changes: 7 additions & 4 deletions scio-core/src/main/scala/com/spotify/scio/io/Taps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ trait Taps {
mkTap(s"Avro: $path", () => isPathDone(path), () => AvroTap[T](path, schema))

/** Get a `Future[Tap[T]]` for BigQuery SELECT query. */
def bigQuerySelect(sqlQuery: String): Future[Tap[TableRow]] =
mkTap(s"BigQuery SELECT: $sqlQuery", () => isQueryDone(sqlQuery), () => bigQueryTap(sqlQuery))
def bigQuerySelect(sqlQuery: String, flattenResults: Boolean = false): Future[Tap[TableRow]] =
mkTap(
s"BigQuery SELECT: $sqlQuery",
() => isQueryDone(sqlQuery),
() => bigQueryTap(sqlQuery, flattenResults))

/** Get a `Future[Tap[T]]` for BigQuery table. */
def bigQueryTable(table: TableReference): Future[Tap[TableRow]] =
Expand All @@ -70,9 +73,9 @@ trait Taps {
private def tableExists(table: TableReference): Boolean =
Try(BigQueryClient.defaultInstance().getTableSchema(table)).isSuccess

private def bigQueryTap(sqlQuery: String): BigQueryTap = {
private def bigQueryTap(sqlQuery: String, flattenResults: Boolean): BigQueryTap = {
val bq = BigQueryClient.defaultInstance()
val queryJob = bq.queryIntoTable(sqlQuery)
val queryJob = bq.queryIntoTable(sqlQuery, flattenResults)
queryJob.waitForResult()
BigQueryTap(queryJob.table)
}
Expand Down

0 comments on commit 23f31a6

Please sign in to comment.