Skip to content

Commit

Permalink
[SPARK-33402][CORE] Jobs launched in same second have duplicate MapRe…
Browse files Browse the repository at this point in the history
…duce JobIDs

### What changes were proposed in this pull request?

1. Applies the SQL changes in SPARK-33230 to SparkHadoopWriter, so that `rdd.saveAsNewAPIHadoopDataset` passes in a unique job UUID in `spark.sql.sources.writeJobUUID`
1. `SparkHadoopWriterUtils.createJobTrackerID` generates a JobID by appending a random long number to the supplied timestamp to ensure the probability of a collision is near-zero.
1. With tests of uniqueness, round trips and negative jobID rejection.

### Why are the changes needed?

Without this, if more than one job is started in the same second *and the committer expects application attempt IDs to be unique* is at risk of clashing with other jobs.

With the fix,

* those committers which use the ID set in `spark.sql.sources.writeJobUUID` as a priority ID will pick that up instead and so be unique.
* committers which use the Hadoop JobID for unique paths and filenames will get the randomly generated jobID.  Assuming all clocks in a cluster in sync, the probability of two jobs launched in the same second has dropped from 1 to 1/(2^63)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests

There's a new test suite SparkHadoopWriterUtilsSuite which creates jobID, verifies they are unique even for the same timestamp and that they can be marshalled to string and parsed back in the hadoop code, which contains some (brittle) assumptions about the format of job IDs.

Functional Integration Tests

1. Hadoop-trunk built with [HADOOP-17318], publishing to local maven repository
1. Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARs.
1. Spark + Object store integration tests at [https://github.com/hortonworks-spark/cloud-integration](https://github.com/hortonworks-spark/cloud-integration) were built against that local spark version
1. And executed against AWS london.

The tests were run with `fs.s3a.committer.require.uuid=true`, so the s3a committers fail fast if they don't get a job ID down. This showed that `rdd.saveAsNewAPIHadoopDataset` wasn't setting the UUID option. It again uses the current Date value for an app attempt -which is not guaranteed to be unique.

With the change applied to spark, the relevant tests work, therefore the committers are getting unique job IDs.

Closes apache#30319 from steveloughran/BUG/SPARK-33402-jobuuid.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
steveloughran authored and dongjoon-hyun committed Nov 11, 2020
1 parent 7e86729 commit 318a173
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.internal.io

import java.text.NumberFormat
import java.util.{Date, Locale}
import java.util.{Date, Locale, UUID}

import scala.reflect.ClassTag

Expand Down Expand Up @@ -70,6 +70,11 @@ object SparkHadoopWriter extends Logging {
// Assert the output format/key/value class is set in JobConf.
config.assertConf(jobContext, rdd.conf)

// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
jobContext.getConfiguration.set("spark.sql.sources.writeJobUUID",
UUID.randomUUID.toString)

val committer = config.createCommitter(commitJobId)
committer.setupJob(jobContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.internal.io
import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import scala.util.DynamicVariable
import scala.util.{DynamicVariable, Random}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, JobID}
Expand All @@ -37,14 +37,35 @@ private[spark]
object SparkHadoopWriterUtils {

private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
private val RAND = new Random()

/**
* Create a job ID.
*
* @param time (current) time
* @param id job number
* @return a job ID
*/
def createJobID(time: Date, id: Int): JobID = {
if (id < 0) {
throw new IllegalArgumentException("Job number is negative")
}
val jobtrackerID = createJobTrackerID(time)
new JobID(jobtrackerID, id)
}

/**
* Generate an ID for a job tracker.
* @param time (current) time
* @return a string for a job ID
*/
def createJobTrackerID(time: Date): String = {
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
val base = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
var l1 = RAND.nextLong()
if (l1 < 0) {
l1 = -l1
}
base + l1
}

def createPathFromString(path: String, conf: JobConf): Path = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

import java.util.Date

import org.apache.hadoop.mapreduce.JobID

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.io.SparkHadoopWriterUtils.createJobID

/**
* Unit tests for functions in SparkHadoopWriterUtils.
*/
class SparkHadoopWriterUtilsSuite extends SparkFunSuite {

/**
* Core test of JobID generation:
* They are created.
* The job number is converted to the job ID.
* They round trip to string and back
* (which implies that the full string matches the regexp
* in the JobID class).
*/
test("JobID Generation") {
val jobNumber = 1010
val j1 = createJobID(new Date(), jobNumber)
assert(jobNumber == j1.getId,
s"Job number mismatch in $j1")

val jobStr = j1.toString
// the string value begins with job_
assert(jobStr.startsWith("job_"),
s"wrong prefix of $jobStr")
// and the hadoop code can parse it
val j2 = roundTrip(j1)
assert(j1.getId == j2.getId, "Job ID mismatch")
assert(j1.getJtIdentifier == j2.getJtIdentifier, "Job identifier mismatch")
}

/**
* This is the problem surfacing in situations where committers expect
* Job IDs to be unique: if the timestamp is (exclusively) used
* then there will conflict in directories created.
*/
test("JobIDs generated at same time are different") {
val now = new Date()
val j1 = createJobID(now, 1)
val j2 = createJobID(now, 1)
assert(j1.toString != j2.toString)
}

/**
* There's nothing explicitly in the Hadoop classes to stop
* job numbers being negative.
* There's some big assumptions in the FileOutputCommitter about attempt IDs
* being positive during any recovery operations; for safety the ID
* job number is validated.
*/
test("JobIDs with negative job number") {
intercept[IllegalArgumentException] {
createJobID(new Date(), -1)
}
}

/**
* If someone ever does reinstate use of timestamps,
* make sure that the case of timestamp == 0 is handled.
*/
test("JobIDs on Epoch are different") {
val j1 = createJobID(new Date(0), 0)
val j2 = createJobID(new Date(0), 0)
assert (j1.toString != j2.toString)
}

/**
* Do a round trip as a string and back again.
* This uses the JobID parser.
* @param jobID job ID
* @return the returned jobID
*/
private def roundTrip(jobID: JobID): JobID = {
val parsedJobId = JobID.forName(jobID.toString)
assert(jobID == parsedJobId, "Round trip was inconsistent")
parsedJobId
}
}

0 comments on commit 318a173

Please sign in to comment.