Skip to content

Commit

Permalink
HadoopWriteConfigUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed May 26, 2021
1 parent e460e1c commit a13531f
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 74 deletions.
2 changes: 1 addition & 1 deletion docs/HadoopMapRedCommitProtocol.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
= HadoopMapRedCommitProtocol
# HadoopMapRedCommitProtocol

`HadoopMapRedCommitProtocol` is...FIXME
2 changes: 1 addition & 1 deletion docs/HadoopMapRedWriteConfigUtil.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
= HadoopMapRedWriteConfigUtil
# HadoopMapRedWriteConfigUtil

`HadoopMapRedWriteConfigUtil` is...FIXME

Expand Down
2 changes: 1 addition & 1 deletion docs/HadoopMapReduceCommitProtocol.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
= HadoopMapReduceCommitProtocol
# HadoopMapReduceCommitProtocol

`HadoopMapReduceCommitProtocol` is...FIXME
2 changes: 1 addition & 1 deletion docs/HadoopMapReduceWriteConfigUtil.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
= HadoopMapReduceWriteConfigUtil
# HadoopMapReduceWriteConfigUtil

`HadoopMapReduceWriteConfigUtil` is...FIXME

Expand Down
105 changes: 39 additions & 66 deletions docs/HadoopWriteConfigUtil.md
Original file line number Diff line number Diff line change
@@ -1,113 +1,86 @@
= HadoopWriteConfigUtil
# HadoopWriteConfigUtil

`HadoopWriteConfigUtil[K, V]` is an <<contract, abstraction>> of <<implementations, writer configurers>>.
`HadoopWriteConfigUtil[K, V]` is an [abstraction](#contract) of [writer configurers](#implementations) for [SparkHadoopWriter](SparkHadoopWriter.md) to [write a key-value RDD](SparkHadoopWriter.md#write) (for [RDD.saveAsNewAPIHadoopDataset](rdd/PairRDDFunctions.md#saveAsNewAPIHadoopDataset) and [RDD.saveAsHadoopDataset](rdd/PairRDDFunctions.md#saveAsHadoopDataset) operators).

`HadoopWriteConfigUtil` is used for <<spark-internal-io-SparkHadoopWriter.md#, SparkHadoopWriter>> utility when requested to <<spark-internal-io-SparkHadoopWriter.md#write, write an RDD of key-value pairs>> (for rdd:PairRDDFunctions.md#saveAsNewAPIHadoopDataset[saveAsNewAPIHadoopDataset] and rdd:PairRDDFunctions.md#saveAsHadoopDataset[saveAsHadoopDataset] transformations).
## Contract

[[contract]]
.HadoopWriteConfigUtil Contract
[cols="30m,70",options="header",width="100%"]
|===
| Method
| Description
### <span id="assertConf"> assertConf

| assertConf
a| [[assertConf]]

[source, scala]
----
```scala
assertConf(
jobContext: JobContext,
conf: SparkConf): Unit
----
```

| closeWriter
a| [[closeWriter]]
### <span id="closeWriter"> closeWriter

[source, scala]
----
```scala
closeWriter(
taskContext: TaskAttemptContext): Unit
----
```

| createCommitter
a| [[createCommitter]]
### <span id="createCommitter"> createCommitter

[source, scala]
----
```scala
createCommitter(
jobId: Int): HadoopMapReduceCommitProtocol
----
```

Creates a [HadoopMapReduceCommitProtocol](HadoopMapReduceCommitProtocol.md) committer

Used when:

| createJobContext
a| [[createJobContext]]
* `SparkHadoopWriter` is requested to [write data out](SparkHadoopWriter.md#write)

[source, scala]
----
### <span id="createJobContext"> createJobContext

```scala
createJobContext(
jobTrackerId: String,
jobId: Int): JobContext
----
```

| createTaskAttemptContext
a| [[createTaskAttemptContext]]
### <span id="createTaskAttemptContext"> createTaskAttemptContext

[source, scala]
----
```scala
createTaskAttemptContext(
jobTrackerId: String,
jobId: Int,
splitId: Int,
taskAttemptId: Int): TaskAttemptContext
----
```

Creates a Hadoop https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapreduce/TaskAttemptContext.html[TaskAttemptContext]
Creates a Hadoop [TaskAttemptContext]({{ hadoop.api }}/org/apache/hadoop/mapreduce/TaskAttemptContext.html)

| initOutputFormat
a| [[initOutputFormat]]
### <span id="initOutputFormat"> initOutputFormat

[source, scala]
----
```scala
initOutputFormat(
jobContext: JobContext): Unit
----
```

| initWriter
a| [[initWriter]]
### <span id="initWriter"> initWriter

[source, scala]
----
```scala
initWriter(
taskContext: TaskAttemptContext,
splitId: Int): Unit
----
```

| write
a| [[write]]
### <span id="write"> write

[source, scala]
----
```scala
write(
pair: (K, V)): Unit
----
```

Writes out the key-value pair

Used when `SparkHadoopWriter` is requested to <<spark-internal-io-SparkHadoopWriter.md#executeTask, executeTask>> (while <<spark-internal-io-SparkHadoopWriter.md#write, writing out key-value pairs of a partition>>)

|===

[[implementations]]
.HadoopWriteConfigUtils
[cols="30,70",options="header",width="100%"]
|===
| HadoopWriteConfigUtil
| Description
Used when:

| <<spark-internal-io-HadoopMapReduceWriteConfigUtil.md#, HadoopMapReduceWriteConfigUtil>>
| [[HadoopMapReduceWriteConfigUtil]]
* `SparkHadoopWriter` is requested to [executeTask](SparkHadoopWriter.md#executeTask)

| <<spark-internal-io-HadoopMapRedWriteConfigUtil.md#, HadoopMapRedWriteConfigUtil>>
| [[HadoopMapRedWriteConfigUtil]]
## Implementations

|===
* [HadoopMapReduceWriteConfigUtil](HadoopMapReduceWriteConfigUtil.md)
* [HadoopMapRedWriteConfigUtil](HadoopMapRedWriteConfigUtil.md)
9 changes: 6 additions & 3 deletions docs/SparkHadoopWriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ write[K, V: ClassTag](
config: HadoopWriteConfigUtil[K, V]): Unit
```

!!! FIXME
Review Me
`write` [runs a Spark job](SparkContext.md#runJob) to [write out partition records](#executeTask) (for all partitions of the given key-value `RDD`) with the given [HadoopWriteConfigUtil](HadoopWriteConfigUtil.md) and a [HadoopMapReduceCommitProtocol](HadoopMapReduceCommitProtocol.md) committer.

The number of writer tasks (_parallelism_) is the number of the partitions in the given key-value `RDD`.

### <span id="write-internals"> Internals

<span id="write-commitJobId">
`write` uses the id of the given RDD as the `commitJobId`.
Internally, `write` uses the id of the given RDD as the `commitJobId`.

<span id="write-jobTrackerId">
`write` creates a `jobTrackerId` with the current date.
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,10 @@ nav:
- Workers: workers.md
- Internal IO:
- SparkHadoopWriter: SparkHadoopWriter.md
- HadoopWriteConfigUtil: HadoopWriteConfigUtil.md
- FileCommitProtocol: FileCommitProtocol.md
- HadoopMapReduceCommitProtocol: HadoopMapReduceCommitProtocol.md
- HadoopMapRedCommitProtocol: HadoopMapRedCommitProtocol.md
- HadoopWriteConfigUtil: HadoopWriteConfigUtil.md
- HadoopMapReduceWriteConfigUtil: HadoopMapReduceWriteConfigUtil.md
- HadoopMapRedWriteConfigUtil: HadoopMapRedWriteConfigUtil.md
- Stage-Level Scheduling:
Expand Down

0 comments on commit a13531f

Please sign in to comment.