Skip to content

Commit

Permalink
Writers and Committers, HadoopMapReduceCommitProtocol and FileCommitP…
Browse files Browse the repository at this point in the history
…rotocol
  • Loading branch information
jaceklaskowski committed Jan 6, 2024
1 parent 4b1a77f commit 6b94a95
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 32 deletions.
36 changes: 24 additions & 12 deletions docs/FileCommitProtocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

## Contract

### <span id="abortJob"> Aborting Job
### Aborting Job { #abortJob }

```scala
abortJob(
Expand All @@ -23,7 +23,7 @@ Used when:
* (Spark SQL) `FileFormatWriter` utility is used to write a result of a structured query (and writing fails)
* (Spark SQL) `FileBatchWrite` is requested to `abort`

### <span id="abortTask"> Aborting Task
### Aborting Task { #abortTask }

```scala
abortTask(
Expand All @@ -37,7 +37,7 @@ Used when:
* `SparkHadoopWriter` utility is used to [write an RDD partition](SparkHadoopWriter.md#executeTask)
* (Spark SQL) `FileFormatDataWriter` is requested to `abort`

### <span id="commitJob"> Committing Job
### Committing Job { #commitJob }

```scala
commitJob(
Expand All @@ -53,7 +53,7 @@ Used when:
* (Spark SQL) `FileFormatWriter` utility is used to write a result of a structured query
* (Spark SQL) `FileBatchWrite` is requested to `commit`

### <span id="commitTask"> Committing Task
### Committing Task { #commitTask }

```scala
commitTask(
Expand All @@ -65,7 +65,7 @@ Used when:
* `SparkHadoopWriter` utility is used to [write an RDD partition](SparkHadoopWriter.md#executeTask)
* (Spark SQL) `FileFormatDataWriter` is requested to `commit`

### <span id="deleteWithJob"> Deleting Path with Job
### Deleting Path with Job { #deleteWithJob }

```scala
deleteWithJob(
Expand All @@ -78,20 +78,32 @@ deleteWithJob(

Used when `InsertIntoHadoopFsRelationCommand` logical command (Spark SQL) is executed

### <span id="newTaskTempFile"> newTaskTempFile
### New Task Temp File { #newTaskTempFile }

```scala
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String
spec: FileNameSpec): String
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String // @deprecated
```

Builds a path of a temporary file (for a task to write data to)

See:

* [HadoopMapReduceCommitProtocol](HadoopMapReduceCommitProtocol.md#newTaskTempFile)
* `DelayedCommitProtocol` ([Delta Lake]({{ book.delta }}/DelayedCommitProtocol#newTaskTempFile))

Used when:

* (Spark SQL) `SingleDirectoryDataWriter` and `DynamicPartitionDataWriter` are requested to `write` (and in turn `newOutputWriter`)
* ([Spark SQL]({{ book.spark_sql }}/connectors/SingleDirectoryDataWriter/#write)) `SingleDirectoryDataWriter` is requested to `write` a record out
* ([Spark SQL]({{ book.spark_sql }}/connectors/BaseDynamicPartitionDataWriter/#write)) `BaseDynamicPartitionDataWriter` is requested to `renewCurrentWriter`

### <span id="newTaskTempFileAbsPath"> newTaskTempFileAbsPath
### newTaskTempFileAbsPath { #newTaskTempFileAbsPath }

```scala
newTaskTempFileAbsPath(
Expand All @@ -104,7 +116,7 @@ Used when:

* (Spark SQL) `DynamicPartitionDataWriter` is requested to `write`

### <span id="onTaskCommit"> On Task Committed
### On Task Committed { #onTaskCommit }

```scala
onTaskCommit(
Expand All @@ -115,7 +127,7 @@ Used when:

* (Spark SQL) `FileFormatWriter` is requested to `write`

### <span id="setupJob"> Setting Up Job
### Setting Up Job { #setupJob }

```scala
setupJob(
Expand All @@ -128,7 +140,7 @@ Used when:
* (Spark SQL) `FileFormatWriter` utility is used to write a result of a structured query
* (Spark SQL) `FileWriteBuilder` is requested to `buildForBatch`

### <span id="setupTask"> Setting Up Task
### Setting Up Task { #setupTask }

```scala
setupTask(
Expand Down
29 changes: 20 additions & 9 deletions docs/HadoopMapRedWriteConfigUtil.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
# HadoopMapRedWriteConfigUtil

`HadoopMapRedWriteConfigUtil` is...FIXME
`HadoopMapRedWriteConfigUtil` is a [HadoopWriteConfigUtil](HadoopWriteConfigUtil.md) for [RDD.saveAsHadoopDataset](rdd/PairRDDFunctions.md#saveAsHadoopDataset) operator.

== [[createCommitter]] `createCommitter` Method
## Creating Instance

[source, scala]
----
createCommitter(
jobId: Int): HadoopMapReduceCommitProtocol
----
`HadoopMapRedWriteConfigUtil` takes the following to be created:

NOTE: `createCommitter` is part of the <<spark-internal-io-HadoopWriteConfigUtil.md#createCommitter, HadoopWriteConfigUtil>> contract to...FIXME.
* <span id="conf"> `SerializableJobConf`

`createCommitter`...FIXME
`HadoopMapRedWriteConfigUtil` is created when:

* `PairRDDFunctions` is requested to [saveAsHadoopDataset](rdd/PairRDDFunctions.md#saveAsHadoopDataset)

## Logging

Enable `ALL` logging level for `org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil` logger to see what happens inside.

Add the following line to `conf/log4j2.properties`:

```text
logger.HadoopMapRedWriteConfigUtil.name = org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil
logger.HadoopMapRedWriteConfigUtil.level = all
```

Refer to [Logging](spark-logging.md).
31 changes: 30 additions & 1 deletion docs/HadoopMapReduceCommitProtocol.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
# HadoopMapReduceCommitProtocol

`HadoopMapReduceCommitProtocol` is...FIXME
`HadoopMapReduceCommitProtocol` is a [FileCommitProtocol](FileCommitProtocol.md).

`HadoopMapReduceCommitProtocol` is a `Serializable` ([Java]({{ java.api }}/java/io/Serializable.html)) (to be sent out in tasks over the wire to executors).

## Creating Instance

`HadoopMapReduceCommitProtocol` takes the following to be created:

* <span id="jobId"> Job ID
* <span id="path"> Path
* <span id="dynamicPartitionOverwrite"> `dynamicPartitionOverwrite` flag (default: `false`)

`HadoopMapReduceCommitProtocol` is created when:

* `HadoopWriteConfigUtil` is requested to [create a committer](HadoopWriteConfigUtil.md#createCommitter)
* `HadoopMapReduceWriteConfigUtil` is requested to [create a committer](HadoopMapReduceWriteConfigUtil.md#createCommitter)
* `HadoopMapRedWriteConfigUtil` is requested to [create a committer](HadoopMapRedWriteConfigUtil.md#createCommitter)

## Logging

Enable `ALL` logging level for `org.apache.spark.internal.io.HadoopMapReduceCommitProtocol` logger to see what happens inside.

Add the following line to `conf/log4j2.properties`:

```text
logger.HadoopMapReduceCommitProtocol.name = org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
logger.HadoopMapReduceCommitProtocol.level = all
```

Refer to [Logging](spark-logging.md).
29 changes: 20 additions & 9 deletions docs/HadoopMapReduceWriteConfigUtil.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
# HadoopMapReduceWriteConfigUtil

`HadoopMapReduceWriteConfigUtil` is...FIXME
`HadoopMapReduceWriteConfigUtil` is a [HadoopWriteConfigUtil](HadoopWriteConfigUtil.md) for [RDD.saveAsNewAPIHadoopDataset](rdd/PairRDDFunctions.md#saveAsNewAPIHadoopDataset) operator.

== [[createCommitter]] `createCommitter` Method
## Creating Instance

[source, scala]
----
createCommitter(
jobId: Int): HadoopMapReduceCommitProtocol
----
`HadoopMapReduceWriteConfigUtil` takes the following to be created:

NOTE: `createCommitter` is part of the <<spark-internal-io-HadoopWriteConfigUtil.md#createCommitter, HadoopWriteConfigUtil>> contract to...FIXME.
* <span id="conf"> `SerializableConfiguration`

`createCommitter`...FIXME
`HadoopMapReduceWriteConfigUtil` is created when:

* `PairRDDFunctions` is requested to [saveAsNewAPIHadoopDataset](rdd/PairRDDFunctions.md#saveAsNewAPIHadoopDataset)

## Logging

Enable `ALL` logging level for `org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil` logger to see what happens inside.

Add the following line to `conf/log4j2.properties`:

```text
logger.HadoopMapReduceWriteConfigUtil.name = org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil
logger.HadoopMapReduceWriteConfigUtil.level = all
```

Refer to [Logging](spark-logging.md).
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extra:
property: !ENV GOOGLE_ANALYTICS_KEY
book:
title: Spark Core
delta: https://books.japila.pl/delta-lake-internals
pyspark: https://books.japila.pl/pyspark-internals
spark_k8s: https://jaceklaskowski.github.io/spark-kubernetes-book
spark_sql: https://books.japila.pl/spark-sql-internals
Expand Down Expand Up @@ -389,7 +390,7 @@ nav:
- Driver: driver.md
- Master: master.md
- Workers: workers.md
- Internal IO:
- Writers and Committers:
- SparkHadoopWriter: SparkHadoopWriter.md
- HadoopWriteConfigUtil: HadoopWriteConfigUtil.md
- FileCommitProtocol: FileCommitProtocol.md
Expand Down

0 comments on commit 6b94a95

Please sign in to comment.