Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion on batch Garbage Collection. #2242

Closed
guoyuhong opened this issue Jun 12, 2018 · 112 comments
Closed

Discussion on batch Garbage Collection. #2242

guoyuhong opened this issue Jun 12, 2018 · 112 comments

Comments

@guoyuhong
Copy link
Contributor

Hi @robertnishihara @pcmoritz , we are planning to add a batch Garbage Collection to Ray.

We have a concept called batchId (int64_t) used to do the Garbage Collection. For example, one job will use this batchId to generate all the objectIds and taskIds, and all these objectIds and taskIds will be stored under the Garbage Collection Table under the batchId in GCS. When the job is finished, we can simply pass a batchId to the garbage collector and the garbage collector will look up the Garbage Collection table in GCS and do the garbage collection to all the related tasks and objects.

In current id.h implementation, the lowest 32 bits in ObjectId is used for Object Index. We can use the higher 64 bits next to the Object Index as the batchId and add a new GC Table in GCS.

This GC mechanism will help release the memory resources in GCS and plasma.
How do you think of this code change?

@guoyuhong
Copy link
Contributor Author

@eric-jj @raulchen @imzhenyu . FYI.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 13, 2018

We used the task_id(part of the objectID) to GC objects for a task in ant finance internal version, which gets very promising results for the plasma object management. I am thinking to migrate the feature to the open source version, so we raise the issue. @pcmoritz @robertnishihara , do you have any comments on it?

@robertnishihara
Copy link
Collaborator

Can you post a code example showing what this would look like from the application level? Or would this be completely under the hood?

@guoyuhong
Copy link
Contributor Author

I will post a code review as an example shortly. Thank you Robert.

@robertnishihara
Copy link
Collaborator

Thanks! I have some reservations about this approach and how general it is. It clearly makes sense for at least one of your use cases, but we need to make sure we have a strategy that works in general.

cc @concretevitamin

@guoyuhong
Copy link
Contributor Author

Here is the user code change that our scenario needs. alipay#29
In this code change, there is no void gc(Long batchId); in RayApi.java, but our target is to add it.

@guoyuhong
Copy link
Contributor Author

guoyuhong commented Jun 13, 2018

In the user side, here is a sample of the user code. The user wants to do the GC to release the Object memory for the finished jobs.

public RayJobScheduler {
	public void init(JobSchedulerContext jobSchedulerContext) {
  		// Set the job id. Do some init work...
	}
	public void scheduleJob(IRayaJobDriver driver) {
  		// Set the job id. Do some init work.
		Ray.startBatch(jobid,...);
	}
	public void finishJob(Long jobId, JobRes jobRes) {
  		// Do some cleaning work...
		Ray.endBatch(...);
		if (job.Res.doGc) {
  			Ray.gc(jobId);
		}
	}
	
}

Here, we are referring JobId and BatchId as the same concept which means a whole job/procedure/experiment that generating many ray tasks and objects to achieve a goal. And the job finishes and we have the result, we know that the resources in the job are no longer needed, so we can release the resources for later and continuous job requirements.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 13, 2018

For the python code, user application will be the following code. The jobID is an option one, it will not affect the existing feature/code. Only user want to manage the task and object life cycle by himself, he can use the start_batch and end_batch to do GC.

@ray.remote
def f():
time.sleep(1)
return 1

ray.init()

jobID = ray.start_batch();
results = ray.get([f.remote() for i in range(4)])
ray.end_batch(jobID, need_gc = true);

@concretevitamin
Copy link
Contributor

Hey @guoyuhong @eric-jj - I think we have been conservative in two things

  • Adding new top-level Python APIs
  • Exposing internal control to the user

So I think it'd be great to have a deeper discussion on this proposed feature, perhaps in the form of a design doc? @robertnishihara @pcmoritz what do you guys think?

To clarify, we've been working on addressing GCS memory issue as well. The difference is our solution's more transparent to the user, while still being possible to call directly by the user should she choose so. Each GC batch is not tied to a job. When it's ready, I'd love to get you guys to play with this feature and see if it meets your needs!

@eric-jj
Copy link
Contributor

eric-jj commented Jun 14, 2018

@concretevitamin
The feature is not for the GCS memory, it is for the plasma memory management. It is an important feature for our production system, without the feature the unused objects will not be recycled in plasma, and useful object may be evicted because of the lifetime. We implement the feature because we met the problem when we use ray in our system. Our scenario is a graph computation, there are many temp variables during the calculation. So we implemented the feature on our internal version, and got the expected results. The system is online now.

I am trying to merge the internal version ray and github version. The first step to me is to deploy the github ray to our scenario. The feature is a blocking one for it, we can't apply the github version ray to our online system.

@guoyuhong will write a design doc and we can follow up the detail design. To the design, it will introduce a top level API, it is expected. Some scenarios need the RAII ability, user can choose to use it or not. And I don't think it will exposing internal control to user, how to implement the GC logical is transparent to user, we can do nothing although user mentioned that.

What's your plan to resolve the GCS memory issue, can you provide more details like design doc. We can evaluate it resolve our problem or not.

@guoyuhong
Copy link
Contributor Author

Background

Graph Computing

We have graph computing scenario based on Ray. Ray can be used as the backend for the continuous graph computing requests. The requests have the following characteristics:

  1. In the scenario of graph computing, object number grows exponentially. If unused memory is not release in time, the memory will be a concern.
  2. There are many requests at the same time and there are continual requests all the time.
  3. The computation costs for different requests are different. It is not guaranteed that the finishing order is the same as the creation order.

Currently, the Plasma Store will evict the objects according to creation time when the memory is not enough. However, this mechanism is not enough. For one thing, our scenario needs large amount of memory, so it won't take long to use up the memory with incoming requests. For another, if an object is evicted according to creation time while it is still in use for later tasks, there will be a reconstruction of the object, which will affect the performance. Since we have the knowledge which requests are finished and won't be used in the future, we can release the resource safely.

Streaming Computing

In the further evolution, Ray will support stream computing. The life cycle for streaming objects are different. Even the best automatic garbage collecion may not be enough. We need to provide the elaborate memory management like RAII to enable users to manage object life cycle according to their needs.

Solution

We introduce a concept of Batch and BatchId (int64_t). It is an identifier for a set of Ray tasks and Ray objects related to one high-level request/job concept. After the high-level request/job finished, there is a new API in Ray for users to release all the resources from GCS and Plasma according to one BatchId.

Implementation Design

API Level

# APIs:
# This function can be called after Ray.init().
# If this function is called, there is a batch concept in this Ray Driver. 
# If not, Ray will work as normal.
ray.start_batch(batch_id) 

# This function can be called by the users 
# if they think all their resources are not needed in the future.
ray.end_batch(batch_id, need_gc = true)  

###########################################################
# Scenario 1: Normal Mode
@ray.remote
def f():
    time.sleep(1)
    return 1

ray.init()
results = ray.get([f.remote() for i in range(4)])

# Scenario 2: Batch Mode
@ray.remote
def f():
    time.sleep(1)
    return 1

ray.init()
batchId = ray.start_batch()
results = ray.get([f.remote() for i in range(4)])
# Some other work...
ray.end_batch(batchId, need_gc = true)

The Java API can be design in a similar way.

ObjectId Bits

Currently, the lowest 32 bits are used for Object Index. The full higher 128 bits are used for TaskId. We can further split the 128 bits into two parts. The highest 64 bits can be used for BatchId.
image

Backend Support

GCS Support

There will be a GCS table to record the ObjectIds and TaskIds under one BatchId if the Batch mode is enable. The table name is "GC:${batchId}". Every objectId and taskId will be pushed into this table. After garbage collection is called with batchId, the GCS will release all the objectIds and taskIds under this table.

Plasma Support

Node manager will subscribe the delete message of the "GC:${batchId}" from GCS. If GCS has deleted the garbage collection table, the node manager will trigger a garbage collection request to Plasma. Plasma will go through all the objectIds and delete the objects under the requested batchId.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 15, 2018

@robertnishihara @concretevitamin any comments?

@robertnishihara
Copy link
Collaborator

@guoyuhong thanks for the code example! @eric-jj, thanks, I am still thinking through the implications.

Is there a clean way to implement this as a helper utility outside of the backend? E.g., in Python/Java you can track all of the object IDs that are generated between start_batch and end_batch (and use the task table to find tasks that were spawned by those tasks), and then generate new tasks that run on each node and use the plasma client API to evict the relevant IDs.

The advantage of doing it this way is that we can avoid adding functionality to the backend and also avoid adding extra information to the ObjectIDs.

@robertnishihara
Copy link
Collaborator

Is it accurate to say that the core issue is that the object store eviction policy is not good enough for some scenarios like you're streaming use case and that you want to provide application-level knowledge to improve the eviction strategy?

@eric-jj
Copy link
Contributor

eric-jj commented Jun 15, 2018

@robertnishihara your solution can solve some problems, but it can't meet the requirments.

  1. It can be done partial as your suggestion, we can recycle the objects, but can't recycle tasks, the taskid is transparent to application.
  2. It is a general requirement, not for an application, the solution will make every application to invent wheel by themselves in different languages(python, java and ...).
  3. The application code can't touch the GCS, user have to create an actor to record the object id. The actor will become very heavy, when the actor is failover, all the calculation must be re-do.
  4. The biggest issue to me is it will make the code ugly. User must record the every objectID by themselves. In most case, user don't care the object ID, just pass the objectID between functions directly.

@raulchen
Copy link
Contributor

Is it accurate to say that the core issue is that the object store eviction policy is not good enough for some scenarios like you're streaming use case and that you want to provide application-level knowledge to improve the eviction strategy?

@robertnishihara extactly

@raulchen
Copy link
Contributor

I think currently Ray is still lack of a good resource management ability.
For GCS, its memory is never recycled.
For object store, it simply adopts LRU-based eviction now, which definitely isn't the most effective approach.

Thus, I think we can consider introducing a Resource Manager component into the core. It takes responsibility for managing the life cycle of all resources in the system, including GCS, object store, etc.

I haven't thought about the detailed design of the Resource Manager. But I think having a component that unifies resource management will be great.

Also, I'd like to hear @concretevitamin's plan to address GCS memory issue.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 15, 2018

@robertnishihara For the proposal, it is not only for the GC management also. It is a starting point for the job level management. When I talked with ant finance's AI team, they have strong feature requirement for the job level management, such as observe the job status, cancel or re-submit a job.
The job level concept/definition is a missing piece in the ray stack. GC algorithm is important, but job level resource management is also required. Without it, it is hard to provide ray service to many AI engineers to work together.
When a java application exit, all the object created by it will be recycled by the OS also. We need the similar ability, when a job(just like application in java) complete its life cycle, return the resource back.

@istoica
Copy link
Contributor

istoica commented Jun 15, 2018

@guoyuhong thanks for raising these issues. These are great points to discuss.

First, regarding a job API. What are the requirements? Something like Spark submit (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-submit.html)? What else?

Second, regarding object eviction from the object store, it would be great to understand when LRU is not good enough. What workload (it looks to me that for the graph workload LRU should be fine, but maybe I'm missing something) ?

Of course, by LRU I mean that the objects will be evicted according with the times since they've been last accessed (red/write). And if we have workloads for which LRU is bad, one question here is whether configuring the eviction policy per driver/job would be good enough.

Also, related to the above we need to add the ability to spill over the objects to a persistent store. If you guys have some thoughts here please let us know. Also, this would be a great task to collaborate or maybe for you to lead.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 16, 2018

@istoica For the job API, the Spark submit is a similar one. And Ray should provide a http server to user can submit, view, cancel the jobs. Another required important feature about job is separate resource manager for different user to avoid one user occupy all the resources in a cluster. If the feature(resource isolation) is not a general one, we need the interface in the job management module to let us apply our internal policy.

For the LRU, different job's life cycle is different, many small jobs create many temp variables and exit, and few large job run a long time. So the objects created by the long run job will be created will be deleted because its' life time is longer than the objects' created by the small jobs.

We did consider to create a new GC algorithm to resolve the issue. Ray has provide the interface, it will modify the code less. But we can't find a good solution to our scenarios. LRU is a GC algorithm, the GC algorithm is not better than java GC algorithm. It can't provide the scope resource management just like RAII in c++ and using in C#.

If you have concern on add top level API for RAY, with the job level support, we can add a flag to submit to let user decide recycle the objects created in the job or not. It is more nature to user, and will not expose the details in the job.

Spill over the objects to a persistent store is an option, but it can't resolve the issue because many temp variables are useless when a job exit in graph computation. The objects inflation can't be avoided in graph computation. In our case, 1 node will access the 300 neighbor nodes, it will be done for 3 times, which means 300^3 temp variables will be created. After a job exit, we don't want to maintain these objects.

And in our current thinking, spill over the objects to persistent store need job level support. Retired objects in a job will be put to the persistent store as a group, and we will not keep entries for these retired objects in the plasma store. We also are trying to apply the ray graph to online calculation scenarios now, the perf of it is very sensitive, we only have less than 300ms budget, large page out and page in to persistent store is not acceptable.

@imzhenyu
Copy link
Contributor

imzhenyu commented Jun 16, 2018

@istoica LRU is not enough because the storage unnecessarily caches a lot of objects which brings in significant memory/storage/computation overhead in our long running service case with small jobs. We therefore introduce the above batch based approach in our internal version, and it is now running in production.

However, we do find limitations though for this approach as it does not allow interactions among different jobs (e.g., requests), and it does not support big job nicely (e.g., when the workload does not fit into total memory capacity). We therefore consider new distributed garbage collection solutions as mentioned in Tahoe to @robertnishihara etc. We will come back to you later when it is validated in production.

Said so, we still think a batch based approach valuable and it is useful for ray to come up the new capability for being a long running job service quickly.

@istoica
Copy link
Contributor

istoica commented Jun 16, 2018

@eric-jj and @imzhenyu thanks for the clarifications. A few things:

  1. Regarding Ray submit, if anyone has cycles to submit a github issue with a short product requirement document (PRD) that would be great. We definitely need help here, especially from people like you that understand well the use case.

  2. Regarding resource management would per-driver resource management be good enough? As you know, in Ray you can have multiple drivers in the same cluster, so maybe this is enough; just associate a job or a user with a driver. The second question here is what kind of policies we would need to implement: fair sharing, priority? Finally, again it would be great for someone to create a github issue and a PRD.

  3. Regarding ** object eviction policy**, there is no question that there are workloads for which LRU won't work well. However, it would be great to get more detailed examples. For instance, I'm still confused about this statement "... objects created by the long run job ... will be deleted because its' life time is longer than the objects' created by the small jobs." LRU will only evict the objects of the long jobs if they were not accessed recently; eviction is not really related to the object's lifetime. If you keep accessing the long job's objects, small jobs won't cause the eviction of these objects. Maybe in your application an object is not accessed for a long time since it was created.

Also, one question here. How useful would be an API like ray.unsafe_evict(object_list), which will force the eviction of all objects in object_list form the object store? Additionally, we could have ray.unsafe_evict(object_list, recursive=True), which will evict not only the objects in object_list, but also all objects on which these objects depend on. For instance, if obj is in object_list, then we evict not only obj, but also all "orphan" objects, i.e., objects whose descendants in the computation graph have been all removed. For example, in the following code:

xid = f.remote(a, b)
# use xid...
ray.unsafe_evict(xid, recursive=True)
# above instruction will cause not only the xid object to be evicted from object store,
# but also "a" and "b"

(I know that the above API might not be as easy to use, but certainly is very flexible.)

An alternative would be to provide an API like ray.pin(object_list) which makes sure that none of the objects in obj_list will be removed by LRU.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 17, 2018

@istoica I will follow up the Ray Submit issue next week.
For the per-driver resource management, I have not think about carefully, it should be ok to resolve our problem. For the policy issue, priority is a required one, but there are some other feature like security check, it seems can't be a common one for users out of ant financial.
For the object eviction policy, some function like ray.unsafe_evict with recursive support should be fine to resolve our issue. I am fine to use the API to do that.

@istoica
Copy link
Contributor

istoica commented Jun 17, 2018

Thanks, @eric-jj.

BTW, it would still be great to understand the workloads for which doesn't LRU work well. Maybe there is an issue with the current implementation, rather than a LRU-policy problem?

Also, in addition to evicting all objects of a job when finishes execution, would pinning an object in the object store be good enough? Arguably pinning would be easier to implement than ray.unsafe_evict(), and also more consistent with memory management patterns in OSes: https://lwn.net/Articles/600502/

@eric-jj
Copy link
Contributor

eric-jj commented Jun 17, 2018

@istoica
For LRU doesn't work, there are two types jobs are running together, job type A running for a long time, job type B running for short time and continue be submitted. When the type A is still running the first job of it, the job type B may be running for 100 times. The plasma are full occupied by the objects created by the job type B, but 99% of them are useless now. When the new objects are created, the object created by the type A will evicted from memory.
And for another reason we want to control the GC because we are trying to apply ray to online calculation scenario as I mentioned. At that scenario, the total latency budget is less than 200ms. We can't afford do GC during the job running time.
I think manually evicting all objects seems more reasonable for our case, and it is similar to our current solution running on the production.
For pin objects mean user need un-pin objects, and it is hard to predict which objects should be pin when the objects are created. It is hard to predict that.

@istoica
Copy link
Contributor

istoica commented Jun 17, 2018

@eric-jj, thanks for clarification. So I assume that in this case, during B's executions, A's won't access its data too much.

Let's then design a job API, and free resources under the hood when the job ends. This solution would require no batch API.

Here is a possible API for Python:

# @ray.job assigns (1) a job id which is propagated to all tasks/actors/objects which 
# are in the execution graph rooted at this task, (2) takes some scheduling 
# arguments (e.g., priority), (3) cleans up the objects and task table when the 
# job finishes.  
@ray.job(name = "name", sched_args = sched_arg_list, ...)
def f(...)
      # code of the job here

# if cleanup=True, clean up all state associate to the job when it finishes, including
# its object in the object store, and the associated GCS state; otherwise don't clean
# up any state.
jobId = f.job(..., cleanup=True)
# if you want to wait for finishing the job, or the job returns a result, 
# then just use ray.get()
jobRes = ray.get(jobId)

Alternatively, we can define a class/actor that also has methods to get the job progress. Maybe this is the way to go. Note that under the hood, we could use the batch implementations.

So it seems to me that the most important next steps are to define the job concept (e.g. what are its properties, e.g., name, unique id, scheduling parameters, ability to get a progress report, ...), and the API, including a submit API. Anyone would like to start a PRD on this?

@eric-jj
Copy link
Contributor

eric-jj commented Jun 18, 2018

@istoica Yes. Job level management is more make sense. We introduce the batch because we it will change less code. As I mentioned before, I planned to do it step by step. Implement the batch first, then build job on it.
I can ask someone in our side to write the design doc for the job first, then we can finalize the design on the design doc and go to the detail implementation.

@istoica
Copy link
Contributor

istoica commented Jun 18, 2018

Sounds good @eric-jj. I'help with the PRD document. It would be great to do it over the next couple of days. Thanks.

@pcmoritz
Copy link
Contributor

pcmoritz commented Jun 18, 2018

A different API I'd propose here is to use context managers for this:

Usage with the with statement:

@ray.remote
def f(something):
    # Do something

with ray.job(name="jobname"):
    x = f.remote(1)
    # Do other things

# The object "x" can now be cleaned up

with ray.job(name="jobname2"):
   y = f.remote(1)
   # Do more

Context managers can also be used without the with statement if needed, see the python documentation: https://docs.python.org/2/reference/datamodel.html#context-managers

The semantics here would be that there is one default job per driver and you can create custom ones that inherit all objects and function definitions from that default job. It is not allowed to pass objects between these inner jobs, so after the context manager is closed on the driver and all tasks from that job have finished, things can be cleaned up.

@eric-jj
Copy link
Contributor

eric-jj commented Jun 19, 2018

@pcmoritz yes, with operation seems more reasonable for the scenario. @guoyuhong please add it to the design doc.

@yuyiming
Copy link
Contributor

yuyiming commented Jul 9, 2018

@istoica For this pattern:

# Actor 1 -----S1.1----------------S1.2---------|--- -> timeline
#           ^                   ^               |
#           | T1.1              | T1.3 (finish) |
# Batch 1 --------------------------------------|--- -> timeline
#               | T1.2                          |
#               v                               |
# Actor 2 ---------S2.1----S2.2-----------------|--- -> timeline
#                       ^                       |
#                       | T2.1                  |
# Batch 2 --------------------------------------|--- -> timeline
#                                               |
#                                    (cleanup moment of batch 1)

S2.1 can NOT be checkpointed when batch 1 finishes, because at this moment Actor 2 is on the state S2.2. Although we can reconstruct the Actor state S2.1 by re-executing tasks, I think this is too consuming: It needs to re-run almost all the tasks to checkpoint all the boundary Actor states under highly interleaved batches.

On the other words, when T1.2 finishes, we do not know it is the last task of batch 1 in Actor 2, thus we do not have enough information to decide whether to checkpoint this Actor at this moment.

If I misunderstand the scenario, please point it out. Thanks a lot.

@imzhenyu
Copy link
Contributor

@istoica regarding to the previous 2 questions about sync/async. (1). "other tasks may further emit more actor tasks" Yes, it is correct. (2). Yes, async API is better for streaming, IMHO. Implementing that with sync API is doable I guess, but it will introduce unnecessary delays and complications, because streaming is more similar to one way RPC, where the upstream nodes do not care much about what the downstream nodes will do. With a add_return statement within the batch, we can implement the sync API but not the async ones. One possible way is to have a ray.end_batch API that can be invoked anywhere within the batch.

@imzhenyu
Copy link
Contributor

@istoica My previous statement about unnecessary delay may be wrong, as it is possible to emit the result to the environment before we end a batch (though extra notifications are needed to notify the driver after emitting results). Then I guess we may limit the API design to this sync version, which is simpler to use (though less flexible but good enough already for many cases) and easier to implement.

@istoica
Copy link
Contributor

istoica commented Jul 10, 2018

@imzhenyu can you provide an example of why we need multiple return statements in the same batch? Can't we emulate the behavior by having one batch per return? If we allow multiple return statements and the batch is async, it could be difficult to figure out when we can clean up the object store. Also the semantics of a batch with multiple return statements seems more complex/confusing.

@istoica
Copy link
Contributor

istoica commented Jul 10, 2018

Basically, I'm thinking of something like this where batch.end takes as argument a list of objects to return:

x = fun1.remote()
batch = ray.batch()
y = fun2.remote(a)
z = fun3.remote(c)
batch.end([z])  # change ownership of `z` to parent batch

Upon finishing the batch, the above code will clean up y but not z (as it was returned by the group) and x (as it was declared outside group's scope).

And, here is the with version which is virtually identical:

x = fun1.remote()
with ray.batch() as batch:
  y = fun2.remote(a)
  z = fun3.remote(c)
  batch.return([z])

@istoica
Copy link
Contributor

istoica commented Jul 10, 2018

@imzhenyu BTW, in the above example garbage collection happens only after the batch ends. Nothing will be garbage collected before then. So as long as you add an object to the return list, you can safely use that object elsewhere as soon as it is created as you know it won't be garbage collected during or after the batch ends (no need to wait for return -- probably need to change the name again to indicate that you do not need to wait for return to use an object).

In the above example, say you do a lot of stuff after creating z. You can still use z safely before the batch ends, as none of the objects will be garbage collected before then.

Wouldn't this be enough for your cases?

@istoica
Copy link
Contributor

istoica commented Jul 10, 2018

@imzhenyu Maybe rename return to keep, i.e., batch.keep(object_list, actor_list) ?

@istoica
Copy link
Contributor

istoica commented Jul 10, 2018

@yuyiming and @zhijunfu Thanks for your example illustrating that it is hard to checkpoint actors on the batch frontier. However, the proposal I added to the document should still work correctly. Is the a problem you have in mind an efficiency or correctness problem?

Anyway, I was thinking about the following alternative that will make the above discussion moot: how about checkpointing the actors at the beginning of the batch instead of at the end? As long as the actors are checkpointed frequently enough it shouldn't matter whether we do it at the beginning or the end. In a streaming example, just think of the next batch checkpointing the state of the current batch. This would be much easier to implement and it will have the added benefit to enable rollback, e.g., in an SGD iteration you can rollback to the initial state of an iteration if an actor fails, which is what you want.

@imzhenyu
Copy link
Contributor

@istoica Agree that each batch should only have one return statement (when I say anywhere for ray.end_bath I should say anywhere but only once:-)). That's also one subtle issue regarding the async design, as developers should guarantee this invariant.

As for the return(z) design, I think it is great, though we still need to figure out a way to garbage collect z in the end, esp. for online computing services. Currently in our production scenario, we don't have z returned but sent to remote services directly in the end batch task.

@istoica
Copy link
Contributor

istoica commented Jul 10, 2018

@imzhenyu can you please give an example showing this pattern, i.e., (1) needing to have a return statement that is not the last statement in the batch, and (2) having to garbage collect z at a latter point ? For (2) maybe we can use nested batches, so z is garbage collected in the outer batch. For instance consider the following code snippet:

with ray.batch() as outer:
  x = fun1.remote()
  with ray.batch() as inner:
    y = fun2.remote(a)
    z = fun3.remote(c)
    inner.return([z]) # move z to outer's scope
  # inner done; z is not garbage collected since it is in outer's scope
  do something more in inner's scope
# outer done; z is garbage collected

@imzhenyu
Copy link
Contributor

@istoica I don't have an immediate example for both. But for (1), considering a streaming example where the initial points only calls a function without a return value (as it is the ingest nodes), and the batch ends in a later stage deployed in another actor. For (2), considering caching the results for the computation requests, then you need to collect z a later point.

@istoica
Copy link
Contributor

istoica commented Jul 11, 2018

@imzhenyu Thanks. Correct me if I'm wrong but for (1) I think that you are saying there is no return statement, right?

Also, for (2) you are basically saying that you want to garbage collect an object outside the initial with statement where it was created, e.g., imagine a queue where z is created by an actor method (e.g., enqueue()), and you want to garbage collect after another method, dequeue(), removes it. Correct?

@yuyiming
Copy link
Contributor

@istoica I think one solution is to checkpoint Actor states on all batch transition points (batch boundaries). For instance, When T2.1 is received by Actor 2, S2.1 should be checkpointed before the execution of T2.1 because it is generated by another batch (T1.2). We say that S2.1 is a batch transition point from batch 1 (T1.2) to batch 2 (T2.1).

@imzhenyu
Copy link
Contributor

@istoica yes, there is no returned value or the batch returns empty results. (2) is correct.

@istoica
Copy link
Contributor

istoica commented Jul 12, 2018

@yuyiming and @imzhenyu, given that there are still questions about the use cases and hence about the high level API, how about starting with a low-level API, use it, and build a higher level API as we get more experience with the use cases. So how about the following low-level API:

  1. ray.free(object_list): delete objects in the object_list from the object store. We already have this functionality so it's a matter of exposing it. This will enable avoid polution of the object store as in your streaming app.
  2. ray.checkpoint(actor): checkpoint actor state. That is, instead of transparently checkpointing the actor state, give the developer the ability to do it explicitly. This should be enough for implementing fault-tolerant streaming apps.
  3. @ray.remote(fault-tolerance = false): disable actor fault-tolerance and raise exception when the actor fails. This will give control to the developer, and allow her/him to ignore such failures, e.g., an ML algorithm can ignore some results.
  4. ray.kill(actor) and ray.kill(task): ability to kill actors and tasks. This is needed to implement expiration timers or straggler mitigation.

Please let me know if the above API could cover all your use cases. Again, the key question here is feasibility not convenience. Thanks.

@imzhenyu
Copy link
Contributor

@istoica Agree, we can definitely go further first. I think @guoyuhong is already working on it :-)

@istoica
Copy link
Contributor

istoica commented Jul 13, 2018

@imzhenyu sounds good. So let's then go with the above plan then.

According to @stephanie-wang we already have functionality for (2) but we need to expose it. @robertnishihara is looking at (3).

Regarding (4) we need to think a bit of semantics, e.g.,

  • what happens with reconstruction when a task/actor is killed? We probably shouldn't reconstruct a task/actor we explicitly kill. Also, we should revert any state that the task has modified.
  • what are the implications of a method (i.e., an actor task) to be killed? A killed method might leave the actor in an inconsistent state, which makes it un-replayable. So once a method of an actor is killed we might no longer be able to provide fault tolerance for that actor.

Also, a better name for this API is ray.cancel() instead of ray.kill().

@istoica
Copy link
Contributor

istoica commented Jul 13, 2018

@imzhenyu, @eric-jj, @yuyiming, @robertnishihara, @pcmoritz I added the above proposal to the end of our design document: https://docs.google.com/document/d/1sMgVMajomF5j-GDkzhuejrAwLcxXNKY9lTmlDo7GkqM/edit?ts=5b360317#

Also wrote another small document about (4) here: https://docs.google.com/document/d/1kcl5SHXtQJQ5_F69rSRgbLrJ9LYDPEmY-totALi7BTc/edit#

Please take a look and add any comments you might have.

I really think we should start with implementing this proposal and use if for various example apps to get experience with it. Since (4) is the most complex, in the first phase I propose we do the first tree.

@imzhenyu
Copy link
Contributor

@istoica Sorry for the late response. Regarding to the semantic of kill & delete, I'm wondering it is better that a FO system does not have user-facing decreasing APIs as it is against the lineage based FO (but we can have them internally though). Instead, we may use the similar approach for how we handle the common objects using GC.

@istoica
Copy link
Contributor

istoica commented Jul 18, 2018

@imzhenyu Thanks for the note. Can you please be more precise about GC? When would you GC? Also, what about cancelling tasks/methods on timeouts? A few notes:

  • Please note that ray.free() doesn't remove the lineage, so you can still reconstruct the the object. It just removes it from the object store. So for all practical proposes it is a hint. Maybe we want different name?
  • I renamed ray.kill() to ray.cancel() (see the documents). This is basically needed to implement expiration and it is equivalent to it. The expiration concept itself is a decreasing API so if we want this functionality not sure how to get around it.
  • This is an experimental API. It is intended to get experience with the functionality and eventually develop a high level API.

@guoyuhong
Copy link
Contributor Author

@istoica Hi, Ion, I'm implementing ray.free(). From current implementation, the object list passed to ray.free() will not be deleted immediately. Each worker will held a PlasmaClient which will hold objects and increase the object ref counts in Object Store. And there is a L3Cache mechanism to delay the PlasmaClient release. The deletion will happen when the ref count is 0. Therefore, we can name the function ray.garbage_collet() or ray.object_store_gc(), which reminds the user there is a delay in deletion.

Moreover, since these functions are low-level functions, can we put them into another namespace to mark that these functions should be used carefully? For example, we can use ray.internal.garbage_collet().

@istoica
Copy link
Contributor

istoica commented Jul 20, 2018

@guoyuhong Can you please write down the exact algorithm you are planning to implement? When is the reference counter increased, and when it is decreased? What happen in case of failure? (As a disclaimer, we did consider reference counting early on, but we gave up since maintaining the counter in a distributed system in the presence of failures and message losses is complex; it's equivalent with implementing transactions in a distributed system, as you need to provide exactly once semantics.)

Again, I think that just deleting the objects or marking them for deletion so you evict them first when the object store is full, would be a fine solution--simple and effective. This is because we do not delete the lineage from GCS (we only flush it) so you can still reconstruct the objects after removing them from the object store, if needed.

Yes, using ray.internal sounds fine.

@guoyuhong
Copy link
Contributor Author

Current object store has implemented the ref count. Here is the code. The ref count is used to count the usage of an object by all the Plasma Clients. I have send a PR to cache the delete object ids, and these objects will be deleted once its ref count is 0. However, the L3 cache in PlasmaClient will held the Release request until the cache condition satisfies. Current condition is that the release cache has more than 64 items or cache memory reaches 100MB. Otherwise, this client will hold the reference of the target objects even it is released.

My current object store free algorithm:

  1. Node Manager has a RPC call to release a list of objects in object store. The driver will call this RPC.
  2. Node Manager will call its Object Manager to do the free operation, and its Object Manager will spread the request to all Object Managers through the direct connects among all Object Managers.
  3. All the Object Stores will receive the Delete message. They will wait util the ref counts of target objects reach 0.

From my current experiments, this algorithm works. But we need to wait until there are more than 64 items pushed to the L3 cache of Plasma Clients.

@istoica
Copy link
Contributor

istoica commented Jul 23, 2018

@guoyuhong Thanks for clarifications. I think there was a confusion. In my original post, I was referring to having global reference counters. The ones in Ray are per node.

At the first sight, your solution makes sense.

Also, I think that waiting for cache condition is not a big deal as 100MB is relatively tiny for today's machines. But we might want to change this, especially if we want to run Ray on edge devices as some users do.

@edoakes
Copy link
Contributor

edoakes commented Feb 10, 2020

I don't think this is relevant anymore now that we're working on doing exact GC for ObjectIDs. Please feel free to re-open if you disagree.

@edoakes edoakes closed this as completed Feb 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests