Skip to content

Commit

Permalink
Fix Horovod pyarrow IndexError: list index out of range (horovod#3255)
Browse files Browse the repository at this point in the history
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
  • Loading branch information
WeichenXu123 committed Nov 4, 2021
1 parent 3b256e4 commit 3efc229
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
43 changes: 43 additions & 0 deletions horovod/spark/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import contextlib
import os
import time

from multiprocessing.pool import ThreadPool
import pyarrow as pa
import numpy as np
import pyspark.sql.functions as f
Expand Down Expand Up @@ -539,6 +541,41 @@ def _train_val_split(df, validation):
return train_df, val_df, validation_ratio


_FILE_AVAILABILITY_WAIT_TIMEOUT_SECS = \
int(os.environ.get('FILE_AVAILABILITY_WAIT_TIMEOUT_SECS', '30'))


def _wait_file_available(store, url_list):
"""Waiting about _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS seconds (default 30 seconds) to make sure
all files are available for reading. This is useful in some filesystems, such as S3 which only
providing eventually consistency.
"""
# Import LocalStore here to avoid circular import
from horovod.spark.common.store import LocalStore
if isinstance(store, LocalStore):
return

def wait_for_file(path):
end_time = time.time() + _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS
while time.time() < end_time:
if store.exists(path):
return True
time.sleep(0.1)
return False

pool = ThreadPool(min(len(url_list), 64))
try:
results = pool.map(wait_for_file, url_list)
failed_list = [url for url, result in zip(url_list, results) if not result]
if failed_list:
raise RuntimeError('Timeout while waiting for all parquet-store files to appear at urls {failed_list},'
'Please check whether these files were saved successfully when materializing dataframe.'
.format(failed_list=','.join(failed_list)))
finally:
pool.close()
pool.join()


def _get_or_create_dataset(key, store, df, feature_columns, label_columns,
validation, sample_weight_col, compress_sparse,
num_partitions, num_processes, verbose):
Expand Down Expand Up @@ -602,6 +639,12 @@ def _get_or_create_dataset(key, store, df, feature_columns, label_columns,
.mode('overwrite') \
.parquet(val_data_path)

saved_file_list = list(train_df._jdf.inputFiles())
if val_df:
saved_file_list += list(val_df._jdf.inputFiles())

_wait_file_available(store, saved_file_list)

train_rows, val_rows, pq_metadata, avg_row_size = get_simple_meta_from_parquet(
store, label_columns, feature_columns, sample_weight_col, dataset_idx)

Expand Down
42 changes: 42 additions & 0 deletions test/utils/spark_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import contextlib
import os
import platform
import pytest
import stat
import sys
import threading
import time

from tempfile import TemporaryDirectory

Expand All @@ -30,6 +33,7 @@

from horovod.runner.common.util import secret
from horovod.spark.common.store import LocalStore
from horovod.spark.common.util import _wait_file_available
from horovod.spark.driver.driver_service import SparkDriverService, SparkDriverClient
from horovod.spark.task.task_service import SparkTaskService, SparkTaskClient

Expand Down Expand Up @@ -232,3 +236,41 @@ def create_mnist_data(spark):

def create_test_data_from_schema(spark, data, schema):
return spark.createDataFrame(data, schema=schema)


def test_wait_file_available():
with tempdir() as d:
pq_dir = os.path.join(d, 'test_ev')
os.makedirs(pq_dir)
file1_path = os.path.join(pq_dir, 'file1')
file2_path = os.path.join(pq_dir, 'file2')
url1 = 'file://' + file1_path.replace(os.sep, '/')
url2 = 'file://' + file2_path.replace(os.sep, '/')

url_list = [url1, url2]

def create_file(p):
with open(p, 'w'):
pass

# 1. test all files exists.
create_file(file1_path)
create_file(file2_path)
_wait_file_available(url_list)

# 2. test one file does not exists. Raise error.
os.remove(file2_path)
with pytest.raises(
RuntimeError,
match='Timeout while waiting for all parquet-store files to appear at urls'
):
_wait_file_available(url_list)

# 3. test one file accessible after 1 second.
def delay_create_file2():
time.sleep(1)
create_file(file2_path)

threading.Thread(target=delay_create_file2()).start()

_wait_file_available(url_list)

0 comments on commit 3efc229

Please sign in to comment.