Skip to content

Commit

Permalink
KE-19992 s3 read retry before accessing FileStatus properties (apache…
Browse files Browse the repository at this point in the history
…#191)

Co-authored-by: Yu Gan <yu.gan@kyligence.io>
  • Loading branch information
2 people authored and 7mming7 committed Feb 22, 2021
1 parent 9ffaccf commit 04cd7c2
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.util.S3FileUtils;
import org.apache.spark.util.AccumulatorV2;

/**
Expand Down Expand Up @@ -199,6 +200,7 @@ protected void initialize(String path, List<String> columns) throws IOException
config.set("spark.sql.parquet.int96AsTimestamp", "false");

this.file = new Path(path);
S3FileUtils.tryOpenClose(config, this.file);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
ParquetMetadata footer = readFooter(config, file, range(0, length));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapCol
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.S3FileUtils
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

class ParquetFileFormat
Expand Down Expand Up @@ -267,6 +268,7 @@ class ParquetFileFormat

val sharedConf = broadcastedHadoopConf.value.value

S3FileUtils.tryOpenClose(sharedConf, filePath)
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging

object S3FileUtils extends Logging {

@throws(classOf[IOException])
def tryOpenClose(conf: Configuration, fp: Path): Unit = {
val fs = fp.getFileSystem(conf)
if (fs.getScheme.startsWith("s3")) {
// Read retry before accessing FileStatus properties.
logInfo(s"Try open-close $fp")
fs.open(fp).close()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrate
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.util.S3FileUtils


/**
Expand Down Expand Up @@ -124,6 +125,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
S3FileUtils.tryOpenClose(hadoopConf, tablePath)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
Expand Down

0 comments on commit 04cd7c2

Please sign in to comment.