-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Poor performance when processing large amounts of LLM text data #45928
Comments
Hi, you should try to use map_batches() to process vectorized data. Map is not efficient as it will process each row and create a task... Maybe you can change number of parallels by setting |
Thanks very much, I'll try it. |
If your setup allows as well please grab the latest version of Ray; there was a bunch of perf and stability updates we did from 2.10 to 2.24, which is latest. (we've started releasing Ray weekly to keep up with the rapid pace of change in the AI/ML space in recent months) |
Thanks. According to the previous suggestions, I tried to use map_batches, and adjust the batch_size, concurrency, and memory parameters, and the performance has improved. But it still does not meet expectations. Currently, it takes about 1 hour for 2 nodes, which is still a bit different from the half-hour for a single machine. But I am using 2.10. I will continue to test it with the latest version. Thanks. def FixUnicodeMapperBatches(samples):
text_key = 'content'
normalization = 'NFKC'
normalize_text = list(map(lambda t: ftfy.fix_text(t, normalization=normalization), samples[text_key]))
samples[text_key] = normalize_text
return samples
dataset = read_json(data_path)
res_datasets = dataset.map_batches(
FixUnicodeMapperBatches,
batch_size=1024*2,
# concurrency=256*2
) |
@Bye-legumes @anyscalesam I used version 2.24, but the performance is still not improved. Is there any way to speed up? I used a single node (not using ray) to process a single file (58G) about 30 minutes with multiple processes, which was faster than splitting it into multiple small files. I also tried to merge the files into larger files and process it with ray. There was a slight improvement. But it still did not meet expectations.
|
Hi, your plot is very useful! Thanks @Cathy0908 |
@Bye-legumes Thank you for your suggestions. I changed to actor and tried several sets of parameters, but it didn't seem to work. Later I tried to increase the machine's memory to 512G. It was obviously improved, the performance has reached my expectations, and it does not kill workers frequently. However, it seems that when the memory reaches a certain proportion, the performance seems not very good , unless there is enough memory import ftfy
import time
import ray
from ray.data import read_json
from ray.data.context import DatasetContext
DatasetContext.max_errored_blocks = 2
class FixUnicodeMapperActor:
def __init__(self):
self.text_key = 'content'
self.normalization = 'NFKC'
def __call__(self, samples):
normalize_text = list(map(
lambda t: ftfy.fix_text(t, normalization=self.normalization), samples[self.text_key]))
samples[self.text_key] = normalize_text
return samples
if __name__ == "__main__":
ray.init()
num_nodes = ray.nodes()
print(">>>>>> Number nodes:", len(num_nodes))
data_path = "/mnt/data/json_parts_500w_v4/"
export_path = '/mnt/data/outputs/actor_v4/'
s_time = time.time()
dataset = read_json(data_path)
res_datasets = dataset.map_batches(
FixUnicodeMapperActor,
batch_size=1024*2,
num_cpus=0.25,
concurrency=1024
)
res_datasets.write_json(export_path, force_ascii=False)
print('cost time: ', time.time() - s_time)
ray.shutdown()
```python |
Great to hear! So based on the above discussion / screenshots, it looks like the main issue is that Ray is not dedicating enough memory to the Ray Data tasks. @Cathy0908 What commands were you using to adjust the heap memory / object store capacity / etc? |
@scottjlee I think the main friction here is that currently there's no way to adjust heap and preallocated object store ratio dynamically, which in many cases user simply goes by the default value and it just sits there taking up the available heap. |
Yes, that sounds like it could work. Please keep us updated with a PR, or if you have any questions. Thanks! |
What happened + What you expected to happen
I want to use ray to do large-scale text data cleaning tasks, and extracted 5 million data from the open source redpajama github dataset for testing, which is about 58G. I used two 128 vCPU, 256 GiB nodes for testing, which took more than 2 hours.
However, I used a 128 vCPU, 256 GiB node, did not use the ray framework, and used the
datasets
api to do multi-process processing, which only took about 20 minutes.When using ray, it always reports an error: (raylet) A worker died or was killed while executing a task by an unexpected system error. It should be that the worker was killed continuously due to out-of-memory. I have increased --object-store-memory, reduced concurrency and other debugging settings, but still cannot improve.
What should I do to improve performance. Now the performance of ray 2 nodes and single machines without ray is very different.
Versions / Dependencies
Reproduction script
ray script:
Issue Severity
None
The text was updated successfully, but these errors were encountered: