Permalink
Browse files

Polish partition.md documentation

  • Loading branch information...
WenboZhao authored and icexelloss committed Nov 14, 2017
1 parent 1c2342a commit d1e47d47d57dd961a6b549d255a72d4f0d842f42
Showing with 61 additions and 47 deletions.
  1. +60 −46 doc/partition.md
  2. +1 −1 src/main/scala/org/apache/spark/sql/PartitionPreservingOperation.scala
@@ -5,11 +5,12 @@ What are partition preserving operations
----------------------------------------
**Definition 1: Partitioning**
```
"partitioning" of DataFrame is the mapping from the key space to
partitions. Key range of a partition is a range that covers all keys
in the partition (key range doesn't need to be tight). For the ordered
key spaces we deal with here, partitioning is equivalent to the key
ranges of each partition and the key ordering within each partition.
The partitioning of DataFrame is a mapping from key space to
its partitions. The key range of a partition is a range that covers all keys
in the partition. here, key range doesn't need to be tight. For the ordered
key spaces we deal with here, partitioning includes the key
ranges of each partition and the key ordering within each partition
and we say
df1 and df2 have the same partitioning iff:
* df1 and df2 have the same number of partitions
@@ -21,11 +22,11 @@ df1 and df2 have the same partitioning iff:
**Notation 1: ->**
```
We use -> to represent a DataFrame operation. A DataFrame operation is
a function on DataFrame that returns another DataFrame. For instance,
a function transform DataFrame from one to the other. For instance,
select("col") is a DataFrame operation.
For the rest of the document,
df1 -> df2 means df2 is the result of applying a DataFrame operation -> on df1
df1 -> df2 means df2 is the result by applying a DataFrame operation -> on df1.
```

@@ -35,35 +36,35 @@ A partition preserving operation is a DataFrame operation that doesn't
change the partitioning of the input DataFrame, or equivalently:
Given a DataFrame operation df1 -> df2, -> is a partition preserving operation iff:
df1 and df2 have the same partitioning for any DataFrame df1
for any DataFrame df1, df2 has the same partitioning as df1.
```

Why partition preserving operations are important
-------------------------------------------------
`OrderedRDD` maintains metadata of each partition (i.e. dependencies
between `OrderedRDD` partitions and `DataFrame` partitions, time range
between `OrderedRDD` partitions and `DataFrame` partitions, key range
of each `OrderedRDD` partition). If a `DataFrame` operation is not
partition preserving, we will need to reconstruct the metadata when
creating a `OrderedRDD`, which can be as expensive as resorting the
entire DataFrame. On the other hand, if a DataFrame operation is
entire DataFrame. On the other hand, if a `DataFrame` operation is
partition preserving, we can reuse old partition metadata and the new
DataFrame and create a new OrderedRDD with relatively low cost.
`DataFrame` and create a new `OrderedRDD` with relatively low cost.

Catalyst's operation push down (predicates push down, column pruning)
can be problematic for Flint. For instance, one would think
`select("col")` is a partition preserving operation, however, this is
not true. For instance, `df.orderBy("col").select("col")` has
different partitioning as `df.orderBy("col")`. This is because
`df.orderBy("col").select("col")` is optimized to
`df.orderBy("col").select("col")` is optimized by Catalyst to
`df.select("col").orderBy("col")`. `df.select("col").orderBy("col")`
and `df.orderBy("col")` have different partitioning because the hashes
used by orderBy is different.
used by `orderBy()` is different.

In fact, it's hard to prove any operation is a partition preserving
operation. However it's easy to prove certain operations are partition
preserving operations on a subset of DataFrames. For the rest of the
doc, we will study partition preserving operations on a particular
subset of DataFrames.
subset of `DataFrame`s.

Partition Preserving Physical Node
----------------------------------
@@ -86,39 +87,39 @@ node_n
Catalyst rules might push a projection or a filter into a leaf
node. For instance, `ParquetRelation("time", "value") :: Projection("time")`
might be optimized to `ParquetRelation("time")`. Some types of leaf
node are "partition preserving" in the sense that their partitioning
node are partition preserving in the sense that their partitioning
won't be changed by such optimizations. Because of this property,
these nodes make it easy to reason about partition preserveness of
DataFrame operations. To give a formal definition:
these nodes make it easy to reason about partition preserving of
`DataFrame` operations. To give a formal definition:

**Definition 3: PartitionPreservingLeafNode**
```
For any given DataFrame df, where df.executedPlan = leaf :: node_1 :: node_2 :: ... :: node_n
and a DataFrame operations df1 -> df2, where df2.executedPlan = leaf' :: node_1' :: node_2' :: ... :: node_m',
For any given DataFrame df1, where df1.executedPlan = leaf :: node_1 :: node_2 :: ... :: node_n
and a DataFrame operation df1 -> df2, where df2.executedPlan = leaf' :: node_1' :: node_2' :: ... :: node_m',
if leaf and leaf' always have the same partitioning, then leaf is a PartitionPreservingLeafNode.
```
Note 1: It's important that df2 is the result of *one* operation on
df1, because one can always break partitioning by `orderBy("col").cache()`

**Definition 4: PartitionPreservingUnaryNode**
```
Given a DataFrame df, df1.executedPlan = leaf, df2.executedPlan = leaf :: node_1, node_1 is a PartitionPreservingUnaryNode iff:
Given a DataFrame df, df1.executedPlan = leaf, df2.executedPlan = leaf :: node, node is a PartitionPreservingUnaryNode iff:
df1 and df2 have the same partitioning
```

**Definition 5: PartitionPreservingDataFrame**
```
A DataFrame df is a PartitionPreservingDataFrame iff:
df.executedPlan = leaf :: node_1 :: node_2 :: .. :: node_n, where leaf is PartitionPreservingLeafNode and node_1 ... node_n are PartitionPreservingUnaryNodes
df.executedPlan = leaf :: node_1 :: node_2 :: .. :: node_n, where leaf is PartitionPreservingLeafNode and node_1 ... node_n are all PartitionPreservingUnaryNodes
```

Partition Info
--------------
TimeSeriesRDD consists of a DataFrame and an
OrderedRDD. OrderedRDD maintains partition metadata, called
`TimeSeriesRDD` consists of a `DataFrame` and an
`OrderedRDD`. `OrderedRDD` maintains partition metadata named
`PartitionInfo`. `PartitionInfo` consists of:
* Time ranges
* Partition dependencies between OrderedRDD and DataFrame
* Partition dependencies between `OrderedRDD` and `DataFrame`

**Theorem 1**
```
@@ -157,30 +158,30 @@ df2 is PartitionPreservingDataFrame, Theorem 1 => df1 and df2 have the same part
df1 and df2 have the same partitioning -> we can reuse tsrdd1.partitionInfo
```

From Theorem 2, for a DataFrame operation df1 -> df2, we just need to
check if df2 is a PartitionPreservingDataFrame to decide if we can reuse
partition info (and throws exception if we cannot)
From Theorem 2, for a `DataFrame` operation df1 -> df2, we just need to
check if df2 is a `PartitionPreservingDataFrame` to decide if we can reuse
partition info or throws exception if we cannot.


TimeSeriesDataFrame (Python)
----------------------------
Unlike TimeSeriesRDD, a TimeSeriesDataFrame is not always sorted, nor
Unlike `TimeSeriesRDD`, a `TimeSeriesDataFrame` is not always sorted, nor
does it always have partitionInfo.

For a given operation df1 -> df2, we need to decide:
* If df1 has partitionInfo, whether df2 has the same partitioning as
df1. (If df1 doesn't have partitionInfo, we don't care whether df2
has the same partitioning as df1 - there is no partitionInfo to
reuse)
* if df1 is sorted, whether or not df2 is sorted. (If df1 is not
df1. Note that If df1 doesn't have partitionInfo, we don't care whether df2
has the same partitioning as df1 as there is no partitionInfo to
reuse.
* if df1 is sorted, whether or not df2 is sorted. Note that If df1 is not
sorted, we don't care whether df2 is sorted - we treat df2 as it's
not sorted. Note that df2 could be sorted if the operation is
orderBy("time"), but it's beyond the scope of this doc.)
orderBy("time"), but it's beyond the scope of this doc.

### Partition Info
The first problem is similar to Scala. From Theorem 2, we know we can
reuse partitionInfo iff:
* df1 and df2 both are PartitionPreservingDataFrame
* df1 and df2 both are `PartitionPreservingDataFrame`

In Python, we ensure the following invariant:

@@ -193,7 +194,7 @@ the DataFrame is a `PartitionPreservingDataFrame`, therefore
there's no need to store partitionInfo if it's not.

From Theorem 1 and Invariant 2, we conclude that we can reuse
df1.partitionInfo iff df2 is a PartitionPreservingDataFrame
df1.partitionInfo iff df2 is a `PartitionPreservingDataFrame`.

### Ordering
#### Difference between Ordering and Partitioning
@@ -211,11 +212,11 @@ look at the logical plan to see whether or not the ordering is changed
by a DataFrame operation.

#### Analyzed Plan
There are four logical plans in QueryExecution, each one of them is
the output of some transformation of the previous one. To use -> to
There are four stages of logical plan transformations in QueryExecution,
each one of them is the output of some transformation of the previous one. To use ==> to
describe a transformation, we have:

logicalPlan -> analyzedPlan -> withCachedData -> optimizedPlan
logicalPlan ==> analyzedPlan ==> withCachedData ==> optimizedPlan

For an operation df1 -> df2, we *believe* analyzedPlan has the following properties:
* -> will only add logical node to df1.analyzedPlan, i.e, If
@@ -230,21 +231,34 @@ For an operation df1 -> df2, we *believe* analyzedPlan has the following propert
or they are ordering preserving (comparing to logical plan, in which
Project(sum("v")) has not yet transformed to Aggregate(sum("v")))

To give a few example, for `df1 = sqlContext.read.parquet("path").orderBy("time") df1.analyzedPlan = Relation :: Sort'
To give a few example, for `df1 = sqlContext.read.parquet("path").orderBy("time")`
and `df1.analyzedPlan = Relation :: Sort`.

Order Preserving:
```
df2 = df1.select("time"), df2.analyzedPlan = Relation :: Sort :: Project
df2 = df1.filter(df1("time") > 0), df2.analyzedPlan = Relation :: Sort :: Filter
df2 = df1.withColumn("time2", df1("time") * 2), df2.analyzedPlan = Relation :: Sort :: Project
df2 = df1.withColumn("time2", udf({time: Long => time * 2})(df1("time"))), df2.analyzedPlan = Relation :: Sort :: EvalutePython :: Project :: Project
df2 = df1.select("time")
df2.analyzedPlan = Relation :: Sort :: Project
df2 = df1.filter(df1("time") > 0)
df2.analyzedPlan = Relation :: Sort :: Filter
df2 = df1.withColumn("time2", df1("time") * 2)
df2.analyzedPlan = Relation :: Sort :: Project
df2 = df1.withColumn("time2", udf({time: Long => time * 2})(df1("time")))
df2.analyzedPlan = Relation :: Sort :: EvalutePython :: Project :: Project
```

Not Order Preserving (non order preserving logical node is marked with *):
```
df2 = df1.select(sum("time")), df2.analyzedPlan = Relation :: Sort :: *Aggregate*
df2 = df1.groupBy("time").agg(sum("value")) df2.analyzedPlan = Relation :: Sort :: *Aggregate*
df2 = df1.withColumn("v2", percentRank().over(window)) df2.analyzedPlan = Relation :: Sort :: Project :: *Window* :: Project :: Project
df2 = df1.select(sum("time"))
df2.analyzedPlan = Relation :: Sort :: *Aggregate*

df2 = df1.groupBy("time").agg(sum("value"))
df2.analyzedPlan = Relation :: Sort :: *Aggregate*

df2 = df1.withColumn("v2", percentRank().over(window))
df2.analyzedPlan = Relation :: Sort :: Project :: *Window* :: Project :: Project
```
**Theorem 3**
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.python.BatchEvalPythonExec
/**
* A class to used to check whether a DataFrame operation is partition preserving.
*
* See doc/partition-preserving-operation.md
* See doc/partition.md
*/
object PartitionPreservingOperation {

0 comments on commit d1e47d4

Please sign in to comment.