Skip to content

Commit

Permalink
time read api
Browse files Browse the repository at this point in the history
  • Loading branch information
mingjialiu committed Aug 17, 2020
1 parent 55e1de7 commit 5829482
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.collect.ImmutableList;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Iterator;
Expand All @@ -32,6 +34,7 @@ class BigQueryInputPartitionReader implements InputPartitionReader<InternalRow>
private ReadRowsHelper readRowsHelper;
private Iterator<InternalRow> rows = ImmutableList.<InternalRow>of().iterator();
private InternalRow currentRow;
private static final Logger logger = LoggerFactory.getLogger(BigQueryDataSourceReader.class);

BigQueryInputPartitionReader(
Iterator<ReadRowsResponse> readRowsResponses,
Expand All @@ -42,13 +45,30 @@ class BigQueryInputPartitionReader implements InputPartitionReader<InternalRow>
this.readRowsHelper = readRowsHelper;
}

private long lastCallTime = -1;

@Override
public boolean next() throws IOException {

while (!rows.hasNext()) {
if (!readRowsResponses.hasNext()) {

if (lastCallTime > 0) {
logger.warn("Time between hasNext calls : " + (System.currentTimeMillis() - lastCallTime));
}

long startTime = System.currentTimeMillis();
boolean hasNext = readRowsResponses.hasNext();
long endTime = System.currentTimeMillis();
logger.warn("Time of hasNext call : " + (endTime - startTime));
if (!hasNext) {
return false;
}

lastCallTime = System.currentTimeMillis();

ReadRowsResponse readRowsResponse = readRowsResponses.next();

// logWarning(s"read ${response.getSerializedSize} bytes")
rows = converter.convert(readRowsResponse);
}
currentRow = rows.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ class BigQueryRDD(sc: SparkContext,
*/
case class ArrowConverter(columnsInOrder: Seq[String],
rawArrowSchema : ByteString,
rowResponseIterator : Iterator[ReadRowsResponse])
rowResponseIterator : Iterator[ReadRowsResponse]) extends Logging
{
def getIterator(): Iterator[InternalRow] = {
logWarning(s"arrow getit")
rowResponseIterator.flatMap(readRowResponse =>
new ArrowBinaryIterator(columnsInOrder.asJava,
rawArrowSchema,
Expand All @@ -102,20 +103,26 @@ case class ArrowConverter(columnsInOrder: Seq[String],
case class AvroConverter (bqSchema: Schema,
columnsInOrder: Seq[String],
rawAvroSchema: String,
rowResponseIterator : Iterator[ReadRowsResponse])
rowResponseIterator : Iterator[ReadRowsResponse]) extends Logging
{
@transient private lazy val avroSchema = new AvroSchema.Parser().parse(rawAvroSchema)

def getIterator(): Iterator[InternalRow] =
{
logWarning(s"avro getit")
rowResponseIterator.flatMap(toRows)
}

def toRows(response: ReadRowsResponse): Iterator[InternalRow] = new AvroBinaryIterator(
bqSchema,
columnsInOrder.asJava,
avroSchema,
response.getAvroRows.getSerializedBinaryRows).asScala
def toRows(response: ReadRowsResponse): Iterator[InternalRow] =
{
logWarning(s"avro torows")
new AvroBinaryIterator(
bqSchema,
columnsInOrder.asJava,
avroSchema,
response.getAvroRows.getSerializedBinaryRows).asScala
}

}

case class BigQueryPartition(stream: String, index: Int) extends Partition
Expand All @@ -138,15 +145,37 @@ class ReadRowsIterator (val helper: ReadRowsHelper,
extends Logging with Iterator[ReadRowsResponse] {
var readRowsCount: Long = 0
var retries: Int = 0
var lastCallTime: Long = -1;

override def hasNext: Boolean = {
if(lastCallTime > 0) {
val currentTime = System.currentTimeMillis()
logWarning(
s"""
|time between hasNext calls: ${currentTime - lastCallTime}
"""
.stripMargin.replace('\n', ' ').trim)
}

override def hasNext: Boolean = serverResponses.hasNext
val startTime = System.currentTimeMillis()
val hasNextVariable = serverResponses.hasNext
val endTime = System.currentTimeMillis()
logWarning(
s"""
|time of hasNext call: ${endTime - startTime}
"""
.stripMargin.replace('\n', ' ').trim)

lastCallTime = System.currentTimeMillis()
return hasNextVariable;
}

override def next(): ReadRowsResponse = {
do {
try {
val response = serverResponses.next
readRowsCount += response.getRowCount
logDebug(s"read ${response.getSerializedSize} bytes")
logWarning(s"read ${response.getSerializedSize} bytes")
return response
} catch {
case e: Exception =>
Expand All @@ -169,8 +198,9 @@ case class ReadRowsHelper(
client: ReadRowsClient,
request: ReadRowsRequest.Builder,
maxReadRowsRetries: Int
) {
) extends Logging {
def readRows(): Iterator[ReadRowsResponse] = {
logWarning(s"readrowhelper")
val serverResponses = fetchResponses(request)
new ReadRowsIterator(this, serverResponses)
}
Expand All @@ -194,6 +224,7 @@ object BigQueryRDD {
options: SparkBigQueryOptions,
getClient: SparkBigQueryOptions => BigQueryReadClient,
bigQueryClient: SparkBigQueryOptions => BigQuery): BigQueryRDD = {

new BigQueryRDD(sqlContext.sparkContext,
parts,
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[bigquery] class DirectBigQueryRelation(
}

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
logInfo(
logWarning(
s"""
|Querying table $tableName, parameters sent from Spark:
|requiredColumns=[${requiredColumns.mkString(",")}],
Expand All @@ -100,7 +100,7 @@ private[bigquery] class DirectBigQueryRelation(
val filter = getCompiledFilter(filters)
val actualTable = getActualTable(requiredColumns, filter)

logInfo(
logWarning(
s"""
|Going to read from ${BigQueryUtil.friendlyTableName(actualTable.getTableId)}
|columns=[${requiredColumns.mkString(", ")}],
Expand All @@ -111,7 +111,7 @@ private[bigquery] class DirectBigQueryRelation(
generateEmptyRowRDD(actualTable, filter)
} else {
if (requiredColumns.isEmpty) {
logDebug(s"Not using optimized empty projection")
logWarning(s"Not using optimized empty projection")
}
val actualTableDefinition = actualTable.getDefinition[StandardTableDefinition]
val actualTablePath = DirectBigQueryRelation.toTablePath(actualTable.getTableId)
Expand Down Expand Up @@ -189,7 +189,7 @@ private[bigquery] class DirectBigQueryRelation(
val result = bigQuery.query(QueryJobConfiguration.of(sql))
result.iterateAll.iterator.next.get(0).getLongValue
}
logDebug(s"Creating a DataFrame of empty roes of size $numberOfRows")
logWarning(s"Creating a DataFrame of empty roes of size $numberOfRows")
sqlContext.sparkContext.range(0, numberOfRows)
.map(_ => InternalRow.empty)
.asInstanceOf[RDD[Row]]
Expand All @@ -206,7 +206,7 @@ private[bigquery] class DirectBigQueryRelation(
TableDefinition.Type.MATERIALIZED_VIEW == tableType)) {
// get it from the view
val querySql = createSql(tableDefinition.getSchema, requiredColumns, filtersString)
logDebug(s"querySql is $querySql")
logWarning(s"querySql is $querySql")
destinationTableCache.get(querySql, DestinationTableBuilder(querySql))
} else {
// use the default one
Expand All @@ -216,15 +216,15 @@ private[bigquery] class DirectBigQueryRelation(

def createTableFromQuery(querySql: String): TableInfo = {
val destinationTable = createDestinationTable
logDebug(s"destinationTable is $destinationTable")
logWarning(s"destinationTable is $destinationTable")
val jobInfo = JobInfo.of(
QueryJobConfiguration
.newBuilder(querySql)
.setDestinationTable(destinationTable)
.build())
logDebug(s"running query $jobInfo")
logWarning(s"running query $jobInfo")
val job = bigQuery.create(jobInfo).waitFor()
logDebug(s"job has finished. $job")
logWarning(s"job has finished. $job")
if(job.getStatus.getError != null) {
BigQueryUtil.convertAndThrow(job.getStatus.getError)
}
Expand Down Expand Up @@ -327,7 +327,7 @@ private[bigquery] class DirectBigQueryRelation(
}

val unhandled = filters.filterNot(handledFilters(filters).contains)
logDebug(s"unhandledFilters: ${unhandled.mkString(" ")}")
logWarning(s"unhandledFilters: ${unhandled.mkString(" ")}")
unhandled
}
}
Expand Down

0 comments on commit 5829482

Please sign in to comment.