Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51540][PS][DOCS] Best practice for distributed-sequence misalignment case #50302

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions python/docs/source/user_guide/pandas_on_spark/best_practices.rst
Original file line number Diff line number Diff line change
@@ -242,6 +242,80 @@ to handle large data in production, make it distributed by configuring the defau
See `Default Index Type <options.rst#default-index-type>`_ for more details about configuring default index.


Handling index misalignment with ``distributed-sequence``
----------------------------------------------------------

While ``distributed-sequence`` ensures a globally sequential index, it does **not** guarantee that the same row-to-index mapping is maintained across different operations.
Operations such as ``apply()``, ``groupby()``, or ``transform()`` may cause the index to be regenerated, leading to misalignment between rows and computed values.

Issue example with ``apply()``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In the following example, we load a dataset where ``record_id`` acts as a unique identifier, and we compute the duration (number of business days) using an ``apply()`` function.
However, due to ``distributed-sequence`` index regeneration during ``apply()``, the results may be assigned to incorrect rows.

.. code-block:: python

import pyspark.pandas as ps
import numpy as np

ps.set_option('compute.default_index_type', 'distributed-sequence')

df = ps.DataFrame({
'record_id': ["RECORD_1001", "RECORD_1002"],
'start_date': ps.to_datetime(["2024-01-01", "2024-01-02"]),
'end_date': ps.to_datetime(["2024-01-01", "2024-01-03"])
})

df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)

Expected output:

.. code-block::

record_id start_date end_date duration
RECORD_1001 2024-01-01 2024-01-01 0
RECORD_1002 2024-01-02 2024-01-03 1

However, due to the ``distributed-sequence`` index being re-generated during ``apply()``, the resulting DataFrame might look like this:

.. code-block::

record_id start_date end_date duration
RECORD_1002 2024-01-02 2024-01-03 0 # Wrong mapping!
RECORD_1001 2024-01-01 2024-01-01 1 # Wrong mapping!

Best practices to prevent index misalignment
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To ensure the row-to-index mapping remains consistent, consider the following approaches:

1. **Explicitly set an index column before applying functions:**

.. code-block:: python

df = df.set_index("record_id") # Ensure the index is explicitly set
df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)

2. **Persist the DataFrame before applying functions to maintain row ordering:**

.. code-block:: python

df = df.spark.persist()
df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)

3. **Use the sequence index type instead (be aware of potential performance trade-offs):**

.. code-block:: python

ps.set_option('compute.default_index_type', 'sequence')

If your application requires strict row-to-index mapping, consider using one of the above approaches rather than relying on the default ``distributed-sequence`` index.

For more information, refer to `Default Index Type <options.rst#default-index-type>`_



Reduce the operations on different DataFrame/Series
---------------------------------------------------

15 changes: 9 additions & 6 deletions python/docs/source/user_guide/pandas_on_spark/options.rst
Original file line number Diff line number Diff line change
@@ -209,14 +209,17 @@ This is conceptually equivalent to the PySpark example as below:
[0, 1, 2]

.. warning::
Unlike `sequence`, since `distributed-sequence` is executed in a distributed environment,
the rows corresponding to each index may vary although the index itself still
remains globally sequential.
Unlike ``sequence``, since ``distributed-sequence`` is executed in a distributed environment,
the rows corresponding to each index may vary although the index itself still remains globally sequential.

This happens because the rows are distributed across multiple partitions and nodes,
leading to indeterministic row-to-index mappings when the data is loaded.
Therefore, it is recommended to explicitly set an index column by using `index_col` parameter
instead of relying on the default index when creating `DataFrame`
if the row-to-index mapping is critical for your application.

Additionally, when using operations such as ``apply()``, ``groupby()``, or ``transform()``,
a new ``distributed-sequence`` index may be generated, which does not necessarily match the original index of the DataFrame.
This can result in misaligned row-to-index mappings, leading to incorrect calculations.

To avoid this issue, see `Handling index misalignment with distributed-sequence <best_practices.rst#handling-index-misalignment-with-distributed-sequence>`_

**distributed**: It implements a monotonically increasing sequence simply by using
PySpark's `monotonically_increasing_id` function in a fully distributed manner. The