/
actor_registration.h
177 lines (150 loc) · 7.31 KB
/
actor_registration.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#ifndef RAY_RAYLET_ACTOR_REGISTRATION_H
#define RAY_RAYLET_ACTOR_REGISTRATION_H
#include <unordered_map>
#include "ray/common/id.h"
#include "ray/common/task/task.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
namespace raylet {
using rpc::ActorTableData;
using ActorState = rpc::ActorTableData::ActorState;
using rpc::ActorCheckpointData;
/// \class ActorRegistration
///
/// Information about an actor registered in the system. This includes the
/// actor's current node manager location, and if local, information about its
/// current execution state, used for reconstruction purposes, and whether the
/// actor is currently alive or not.
class ActorRegistration {
public:
/// Create an actor registration.
///
/// \param actor_table_data Information from the global actor table about
/// this actor. This includes the actor's node manager location.
explicit ActorRegistration(const ActorTableData &actor_table_data);
/// Recreate an actor's registration from a checkpoint.
///
/// \param checkpoint_data The checkpoint used to restore the actor.
ActorRegistration(const ActorTableData &actor_table_data,
const ActorCheckpointData &checkpoint_data);
/// Each actor may have multiple callers, or "handles". A frontier leaf
/// represents the execution state of the actor with respect to a single
/// handle.
struct FrontierLeaf {
/// The number of tasks submitted by this handle that have executed on the
/// actor so far.
int64_t task_counter;
/// The execution dependency returned by the task submitted by this handle
/// that most recently executed on the actor.
ObjectID execution_dependency;
};
/// Get the actor table data.
///
/// \return The actor table data.
const ActorTableData &GetTableData() const { return actor_table_data_; }
/// Get the actor's current state (ALIVE or DEAD).
///
/// \return The actor's current state.
const ActorState GetState() const { return actor_table_data_.state(); }
/// Update actor's state.
void SetState(const ActorState &state) { actor_table_data_.set_state(state); }
/// Get the actor's node manager location.
///
/// \return The actor's node manager location. All tasks for the actor should
/// be forwarded to this node.
const ClientID GetNodeManagerId() const;
/// Get the object that represents the actor's initial state. This is the
/// execution dependency returned by this actor's creation task. If
/// reconstructed, this will recreate the actor.
///
/// \return The execution dependency returned by the actor's creation task.
const ObjectID GetActorCreationDependency() const;
/// Get actor's job ID.
const JobID GetJobId() const;
/// Get the max number of times this actor should be reconstructed.
const int64_t GetMaxReconstructions() const;
/// Get the remaining number of times this actor should be reconstructed.
const int64_t GetRemainingReconstructions() const;
/// Get the object that represents the actor's current state. This is the
/// execution dependency returned by the task most recently executed on the
/// actor. The next task to execute on the actor should be marked as
/// execution-dependent on this object.
///
/// \return The execution dependency returned by the most recently executed
/// task.
const ObjectID GetExecutionDependency() const;
/// Get the execution frontier of the actor, indexed by handle. This captures
/// the execution state of the actor, a summary of which tasks have executed
/// so far.
///
/// \return The actor frontier, a map from handle ID to execution state for
/// that handle.
const std::unordered_map<ActorHandleID, FrontierLeaf> &GetFrontier() const;
/// Get all the dummy objects of this actor's tasks.
const std::unordered_map<ObjectID, int64_t> &GetDummyObjects() const {
return dummy_objects_;
}
/// Extend the frontier of the actor by a single task. This should be called
/// whenever the actor executes a task.
///
/// \param handle_id The ID of the handle that submitted the task.
/// \param execution_dependency The object representing the actor's new
/// state. This is the execution dependency returned by the task.
/// \return The dummy object that can be released as a result of the executed
/// task. If no dummy object can be released, then this is nil.
ObjectID ExtendFrontier(const ActorHandleID &handle_id,
const ObjectID &execution_dependency);
/// Add a new handle to the actor frontier. This does nothing if the actor
/// handle already exists.
///
/// \param handle_id The ID of the handle to add.
/// \param execution_dependency This is the expected execution dependency for
/// the first task submitted on the new handle. If the new handle hasn't been
/// seen yet, then this dependency will be added to the actor frontier and is
/// not safe to release until the first task has been submitted.
void AddHandle(const ActorHandleID &handle_id, const ObjectID &execution_dependency);
/// Returns num handles to this actor entry.
///
/// \return int.
int NumHandles() const;
/// Generate checkpoint data based on actor's current state.
///
/// \param actor_id ID of this actor.
/// \param task The task that just finished on the actor.
/// \return A shared pointer to the generated checkpoint data.
std::shared_ptr<ActorCheckpointData> GenerateCheckpointData(const ActorID &actor_id,
const Task &task);
private:
/// Information from the global actor table about this actor, including the
/// node manager location.
ActorTableData actor_table_data_;
/// The object representing the state following the actor's most recently
/// executed task. The next task to execute on the actor should be marked as
/// execution-dependent on this object.
ObjectID execution_dependency_;
/// The execution frontier of the actor, which represents which tasks have
/// executed so far and which tasks may execute next, based on execution
/// dependencies. This is indexed by handle.
std::unordered_map<ActorHandleID, FrontierLeaf> frontier_;
/// This map is used to track all the unreleased dummy objects for this
/// actor. The map key is the dummy object ID, and the map value is the
/// number of actor handles that depend on that dummy object. When the map
/// value decreases to 0, the dummy object is safe to release from the object
/// manager, since this means that no actor handle will depend on that dummy
/// object again.
///
/// An actor handle depends on a dummy object when its next unfinished task
/// depends on the dummy object. For a given dummy object (say D) created by
/// task (say T) that was submitted by an actor handle (say H), there could
/// be 2 types of such actor handles:
/// 1. T is the last task submitted by H that was executed. If the next task
/// submitted by H hasn't finished yet, then H still depends on D since D
/// will be in the next task's execution dependencies.
/// 2. Any handles that were forked from H after T finished, and before T's
/// next task finishes. Such handles depend on D until their first tasks
/// finish since D will be their first tasks' execution dependencies.
std::unordered_map<ObjectID, int64_t> dummy_objects_;
};
} // namespace raylet
} // namespace ray
#endif // RAY_RAYLET_ACTOR_REGISTRATION_H