-
Notifications
You must be signed in to change notification settings - Fork 28
/
BigQueryDataFrame.scala
86 lines (75 loc) · 3.3 KB
/
BigQueryDataFrame.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.samelamin.spark.bigquery
import com.google.api.services.bigquery.model.{TableReference, TableSchema}
import com.google.cloud.hadoop.io.bigquery._
import com.google.gson._
import com.samelamin.spark.bigquery.converters.{BigQueryAdapter, SchemaConverters}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.DataFrame
import org.slf4j.LoggerFactory
import scala.util.Random
/**
* Created by samelamin on 12/08/2017.
*/
class BigQueryDataFrame(self: DataFrame) extends Serializable {
val adaptedDf = BigQueryAdapter(self)
private val logger = LoggerFactory.getLogger(classOf[BigQueryClient])
@transient
lazy val hadoopConf = self.sparkSession.sparkContext.hadoopConfiguration
lazy val bq = BigQueryClient.getInstance(self.sqlContext)
@transient
lazy val jsonParser = new JsonParser()
/**
* Save DataFrame data into BigQuery table using Hadoop writer API
*
* @param fullyQualifiedOutputTableId output-table id of the form
* [optional projectId]:[datasetId].[tableId]
* @param isPartitionedByDay partion the table by day
*/
def saveAsBigQueryTable(fullyQualifiedOutputTableId: String,
isPartitionedByDay: Boolean = false,
timePartitionExpiration: Long = 0,
writeDisposition: WriteDisposition.Value = null,
createDisposition: CreateDisposition.Value = null): Unit = {
val destinationTable = BigQueryStrings.parseTableReference(fullyQualifiedOutputTableId)
val bigQuerySchema = SchemaConverters.SqlToBQSchema(adaptedDf)
val gcsPath = writeDFToGoogleStorage(adaptedDf,destinationTable,bigQuerySchema)
bq.load(destinationTable,
bigQuerySchema,
gcsPath,
isPartitionedByDay,
timePartitionExpiration,
writeDisposition,
createDisposition)
delete(new Path(gcsPath))
}
def writeDFToGoogleStorage(adaptedDf: DataFrame,
destinationTable: TableReference,
bqSchema: TableSchema): String = {
val tableName = BigQueryStrings.toString(destinationTable)
BigQueryConfiguration.configureBigQueryOutput(hadoopConf, tableName, bqSchema.toPrettyString())
hadoopConf.set("mapreduce.job.outputformat.class", classOf[BigQueryOutputFormat[_, _]].getName)
val bucket = self.sparkSession.conf.get(BigQueryConfiguration.GCS_BUCKET_KEY)
val temp = s"spark-bigquery-${System.currentTimeMillis()}=${Random.nextInt(Int.MaxValue)}"
val gcsPath = s"gs://$bucket/hadoop/tmp/spark-bigquery/$temp"
if(hadoopConf.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY) == null) {
hadoopConf.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, gcsPath)
}
logger.info(s"Loading $gcsPath into $tableName")
adaptedDf
.toJSON
.rdd
.map(json => (null, jsonParser.parse(json)))
.saveAsNewAPIHadoopFile(gcsPath,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[TextOutputFormat[NullWritable, JsonObject]],
hadoopConf)
gcsPath
}
private def delete(path: Path): Unit = {
val fs = FileSystem.get(path.toUri, hadoopConf)
fs.delete(path, true)
}
}