Skip to content

Commit

Permalink
Merge pull request #40 from samelamin/python
Browse files Browse the repository at this point in the history
Add PySpark and DML query support
  • Loading branch information
samelamin committed Aug 13, 2017
2 parents e028331 + c022707 commit ed716c8
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 220 deletions.
31 changes: 26 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ spark-bigquery

This Spark module allows saving DataFrame as BigQuery table.

The project was inspired by [spotify/spark-bigquery](https://github.com/spotify/spark-bigquery), but there are several differences:
The project was inspired by [spotify/spark-bigquery](https://github.com/spotify/spark-bigquery), but there are several differences and enhancements:

* Use of the Structured Streaming API on Spark 2.1

* Use within Pyspark

* Allow saving to partitioned tables

* Easy integration with [Databricks](https://github.com/samelamin/spark-bigquery/blob/master/Databricks.md)

* Use of Standard SQL

* Run Data Manipulation Language Queries [DML](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-manipulation-language)

* Update schemas on writes using the [setSchemaUpdateOptions](https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/JobConfigurationQuery.html#setSchemaUpdateOptions(java.util.List))

* JSON is used as an intermediate format instead of Avro. This allows having fields on different levels named the same:
Expand Down Expand Up @@ -60,7 +64,7 @@ I created a container that launches zepplin with spark and the connector for eas
<dependency>
<groupId>com.github.samelamin</groupId>
<artifactId>spark-bigquery_${scala.binary.version}</artifactId>
<version>0.1.9</version>
<version>0.2.2</version>
</dependency>
</dependencies>
```
Expand All @@ -71,7 +75,7 @@ To use it in a local SBT console first add the package as a dependency then set
```sbt
resolvers += Opts.resolver.sonatypeReleases

libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.1.9"
libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.2.2"
```

```scala
Expand Down Expand Up @@ -132,6 +136,13 @@ val df = ...
df.saveAsBigQueryTable("project-id:dataset-id.table-name")
```

```python
bq = spark._sc._jvm.com.samelamin.spark.bigquery.BigQuerySQLContext(spark._wrapped._jsqlContext)
val df = ...
bqDF = spark._sc._jvm.com.samelamin.spark.bigquery.BigQueryDataFrame(df._jdf)
bqDF.saveAsBigQueryTable("project-id:dataset-id.table-name")
```

### Reading DataFrame From BigQuery

```scala
Expand All @@ -147,6 +158,18 @@ val df = sqlContext.bigQuerySelect(
"SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]")
```


### Running DML Queries

```scala
import com.samelamin.spark.bigquery._

// Load results from a SQL query
// Only legacy SQL dialect is supported for now
sqlContext.runDMLQuery("UPDATE dataset-id.table-name SET test_col = new_value WHERE test_col = old_value")
```
Please note that DML queries need to be done using Standard SQL

### Update Schemas

You can also allow the saving of a dataframe to update a schema:
Expand All @@ -159,10 +182,8 @@ sqlContext.setAllowSchemaUpdates()

Notes on using this API:

* Target data set must already exist
* Structured Streaming needs a partitioned table which is created by default when writing a stream
* Structured Streaming needs a timestamp column where offsets are retrieved from, by default all tables are created with a `bq_load_timestamp` column with a default value of the current timstamp.
* Structured Streaming currently does not support schema updates
* For use with Databricks please follow this [guide](https://github.com/samelamin/spark-bigquery/blob/master/Databricks.md)

# License
Expand Down
23 changes: 13 additions & 10 deletions src/main/scala/com/samelamin/spark/bigquery/BigQueryClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class BigQueryClient(sqlContext: SQLContext, var bigquery: Bigquery = null) exte
val fullyQualifiedInputTableId = BigQueryStrings.toString(tableReference)
BigQueryConfiguration.configureBigQueryInput(hadoopConf, fullyQualifiedInputTableId)
}

def runDMLQuery(dmlQuery:String): Unit = {
logger.info(s"Executing DML Statement $dmlQuery")
val job = createQueryJob(dmlQuery, null, dryRun = false,useStandardSQL = true)
waitForJob(job)
}

private val queryCache: LoadingCache[String, TableReference] =
CacheBuilder.newBuilder()
Expand All @@ -131,10 +137,8 @@ class BigQueryClient(sqlContext: SQLContext, var bigquery: Bigquery = null) exte
val location = hadoopConf.get(STAGING_DATASET_LOCATION, STAGING_DATASET_LOCATION_DEFAULT)
val destinationTable = temporaryTable(location)
logger.info(s"Destination table: $destinationTable")
val use_legacy_sql = !hadoopConf.get(USE_STANDARD_SQL_DIALECT,"true").toBoolean

val job = createQueryJob(sqlQuery, destinationTable, dryRun = false,use_legacy_sql)

val useStandardSQL = hadoopConf.get(USE_STANDARD_SQL_DIALECT,"false").toBoolean
val job = createQueryJob(sqlQuery, destinationTable, dryRun = false,useStandardSQL)
waitForJob(job)
destinationTable
}
Expand Down Expand Up @@ -183,19 +187,18 @@ class BigQueryClient(sqlContext: SQLContext, var bigquery: Bigquery = null) exte

private def createQueryJob(sqlQuery: String,
destinationTable: TableReference,
dryRun: Boolean,use_legacy_sql: Boolean = false): Job = {
dryRun: Boolean,useStandardSQL: Boolean = false): Job = {
var queryConfig = new JobConfigurationQuery()
.setQuery(sqlQuery)
.setPriority(PRIORITY)
.setCreateDisposition("CREATE_IF_NEEDED")
.setWriteDisposition("WRITE_EMPTY")

logger.info(s"Using legacy Sql: $use_legacy_sql")

queryConfig.setUseLegacySql(use_legacy_sql)
logger.info(s"Using legacy Sql: ${!useStandardSQL}")
queryConfig.setUseLegacySql(!useStandardSQL)

if (destinationTable != null) {
queryConfig = queryConfig
.setCreateDisposition("CREATE_IF_NEEDED")
.setWriteDisposition("WRITE_EMPTY")
.setDestinationTable(destinationTable)
.setAllowLargeResults(true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.samelamin.spark.bigquery

import com.google.api.services.bigquery.model.TableReference
import com.google.cloud.hadoop.io.bigquery._
import com.google.gson._
import com.samelamin.spark.bigquery.converters.{BigQueryAdapter, SchemaConverters}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.DataFrame
import org.slf4j.LoggerFactory

import scala.util.Random
/**
* Created by samelamin on 12/08/2017.
*/
class BigQueryDataFrame(self: DataFrame) extends Serializable {
val adaptedDf = BigQueryAdapter(self)
private val logger = LoggerFactory.getLogger(classOf[BigQueryClient])

@transient
lazy val hadoopConf = self.sparkSession.sparkContext.hadoopConfiguration
lazy val bq = BigQueryClient.getInstance(self.sqlContext)

@transient
lazy val jsonParser = new JsonParser()

/**
* Save DataFrame data into BigQuery table using Hadoop writer API
*
* @param fullyQualifiedOutputTableId output-table id of the form
* [optional projectId]:[datasetId].[tableId]
* @param isPartitionedByDay partion the table by day
*/
def saveAsBigQueryTable(fullyQualifiedOutputTableId: String,
isPartitionedByDay: Boolean = false,
timePartitionExpiration: Long = 0,
writeDisposition: WriteDisposition.Value = null,
createDisposition: CreateDisposition.Value = null): Unit = {
val destinationTable = BigQueryStrings.parseTableReference(fullyQualifiedOutputTableId)
val bigQuerySchema = SchemaConverters.SqlToBQSchema(adaptedDf)
val gcsPath = writeDFToGoogleStorage(adaptedDf,destinationTable,bigQuerySchema)
bq.load(destinationTable,
bigQuerySchema,
gcsPath,
isPartitionedByDay,
timePartitionExpiration,
writeDisposition,
createDisposition)
delete(new Path(gcsPath))
}

def writeDFToGoogleStorage(adaptedDf: DataFrame,
destinationTable: TableReference,
bigQuerySchema: String): String = {
val tableName = BigQueryStrings.toString(destinationTable)

BigQueryConfiguration.configureBigQueryOutput(hadoopConf, tableName, bigQuerySchema)
hadoopConf.set("mapreduce.job.outputformat.class", classOf[BigQueryOutputFormat[_, _]].getName)
val bucket = self.sparkSession.conf.get(BigQueryConfiguration.GCS_BUCKET_KEY)
val temp = s"spark-bigquery-${System.currentTimeMillis()}=${Random.nextInt(Int.MaxValue)}"
val gcsPath = s"gs://$bucket/hadoop/tmp/spark-bigquery/$temp"
if(hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY) == null) {
hadoopConf.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, gcsPath)
}

logger.info(s"Loading $gcsPath into $tableName")
adaptedDf
.toJSON
.rdd
.map(json => (null, jsonParser.parse(json)))
.saveAsNewAPIHadoopFile(gcsPath,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[TextOutputFormat[NullWritable, JsonObject]],
hadoopConf)
gcsPath
}



private def delete(path: Path): Unit = {
val fs = FileSystem.get(path.toUri, hadoopConf)
fs.delete(path, true)
}
}
132 changes: 132 additions & 0 deletions src/main/scala/com/samelamin/spark/bigquery/BigQuerySQLContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.samelamin.spark.bigquery


import java.math.BigInteger

import com.google.cloud.hadoop.io.bigquery.{AvroBigQueryInputFormat, _}
import com.samelamin.spark.bigquery.converters.SchemaConverters
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.io.LongWritable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.slf4j.LoggerFactory

/**
* Created by samelamin on 12/08/2017.
*/
class BigQuerySQLContext(sqlContext: SQLContext) extends Serializable {
lazy val bq = BigQueryClient.getInstance(sqlContext)
@transient
lazy val hadoopConf = sqlContext.sparkSession.sparkContext.hadoopConfiguration
private val logger = LoggerFactory.getLogger(classOf[BigQueryClient])

/**
* Set whether to allow schema updates
*/
def setAllowSchemaUpdates(value: Boolean = true): Unit = {
hadoopConf.set(bq.ALLOW_SCHEMA_UPDATES, value.toString)
}

/**
* Set whether to use the Standard SQL Dialect
*/
def useStandardSQLDialect(value: Boolean = true): Unit = {
hadoopConf.set(bq.USE_STANDARD_SQL_DIALECT, value.toString)
}
/**
* Set GCP project ID for BigQuery.
*/
def setBigQueryProjectId(projectId: String): Unit = {
hadoopConf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
}

def setGSProjectId(projectId: String): Unit = {
// Also set project ID for GCS connector
hadoopConf.set("fs.gs.project.id", projectId)
}

def setBQTableTimestampColumn(timestampColumn: String): Unit = {
hadoopConf.set("timestamp_column", timestampColumn)
}

/**
* Set GCS bucket for temporary BigQuery files.
*/
def setBigQueryGcsBucket(gcsBucket: String): Unit = {
hadoopConf.set(BigQueryConfiguration.GCS_BUCKET_KEY, gcsBucket)
sqlContext.sparkSession.conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, gcsBucket)
}

/**
* Set BigQuery dataset location, e.g. US, EU.
*/
def setBigQueryDatasetLocation(location: String): Unit = {
hadoopConf.set(bq.STAGING_DATASET_LOCATION, location)
}

/**
* Set GCP JSON key file.
*/
def setGcpJsonKeyFile(jsonKeyFile: String): Unit = {
hadoopConf.set("mapred.bq.auth.service.account.json.keyfile", jsonKeyFile)
hadoopConf.set("fs.gs.auth.service.account.json.keyfile", jsonKeyFile)
}

/**
* Set GCP pk12 key file.
*/
def setGcpPk12KeyFile(pk12KeyFile: String): Unit = {
hadoopConf.set("google.cloud.auth.service.account.keyfile", pk12KeyFile)
hadoopConf.set("mapred.bq.auth.service.account.keyfile", pk12KeyFile)
hadoopConf.set("fs.gs.auth.service.account.keyfile", pk12KeyFile)
}

def bigQuerySelect(sqlQuery: String): DataFrame = {
bq.selectQuery(sqlQuery)
val tableData = sqlContext.sparkSession.sparkContext.newAPIHadoopRDD(
hadoopConf,
classOf[AvroBigQueryInputFormat],
classOf[LongWritable],
classOf[GenericData.Record]).map(x=>x._2)
val schemaString = tableData.map(_.getSchema.toString).first()
val schema = new Schema.Parser().parse(schemaString)
val structType = SchemaConverters.avroToSqlType(schema).dataType.asInstanceOf[StructType]
val converter = SchemaConverters.createConverterToSQL(schema)
.asInstanceOf[GenericData.Record => Row]
sqlContext.createDataFrame(tableData.map(converter), structType)
}

def runDMLQuery(runDMLQuery:String):Unit = {
bq.runDMLQuery(runDMLQuery)
}

def getLatestBQModifiedTime(tableReference: String): Option[BigInteger] = {
bq.getLatestModifiedTime(BigQueryStrings.parseTableReference(tableReference))
}

def getBigQuerySchema(tableReference: String): StructType = {
SchemaConverters.BQToSQLSchema(bq.getTableSchema(BigQueryStrings.parseTableReference(tableReference)))
}


/**
* Load a BigQuery table as a [[DataFrame]].
*/
def bigQueryTable(tableSpec: String): DataFrame = {
val tableRef = BigQueryStrings.parseTableReference(tableSpec)
BigQueryConfiguration.configureBigQueryInput(
hadoopConf, tableRef.getProjectId, tableRef.getDatasetId, tableRef.getTableId)
val tableData = sqlContext.sparkContext.newAPIHadoopRDD(
hadoopConf,
classOf[AvroBigQueryInputFormat],
classOf[LongWritable],
classOf[GenericData.Record]).map(x=>x._2)
val schemaString = tableData.map(_.getSchema.toString).first()
val schema = new Schema.Parser().parse(schemaString)
val structType = SchemaConverters.avroToSqlType(schema).dataType.asInstanceOf[StructType]
val converter = SchemaConverters.createConverterToSQL(schema)
.asInstanceOf[GenericData.Record => Row]
sqlContext.createDataFrame(tableData.map(converter), structType)
}
}
Loading

0 comments on commit ed716c8

Please sign in to comment.