-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataAccessGroup.cpp
executable file
·162 lines (123 loc) · 4.06 KB
/
DataAccessGroup.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#include "RxStreamer/DataAccessGroup.h"
namespace Reactor
{
// -----------------------------------------------------------
// class DataAccessGroupState
// -----------------------------------------------------------
DataAccessGroupState::DataAccessGroupState()
{ }
DataAccessGroupState::~DataAccessGroupState()
{ }
void DataAccessGroupState::Chain(DataAccessBase::Ptr access)
{
chain.push_back(access);
}
const DataAccessGroupState::DataAccessList &DataAccessGroupState::accesses() const
{
return chain;
}
DataAccessGroupState::DataAccessList &DataAccessGroupState::accesses()
{
return chain;
}
const DataAccessGroupState::DataAccessList &DataAccessGroupState::CurrentFinishedExecution() const
{
return currentFinishedExecution;
}
DataAccessGroupState::DataAccessList &DataAccessGroupState::CurrentFinishedExecution()
{
return currentFinishedExecution;
}
// -----------------------------------------------------------
// class DataAccessGroup
// -----------------------------------------------------------
DataAccessGroup::DataAccessGroup(DataAccessGroupPolicy policy)
: Templates::ContextObjectShared<DataAccessGroupPolicy, DataAccessGroupState, Status::ExecutionStatus>
(
new DataAccessGroupPolicy(policy.Computation()), new DataAccessGroupState(), new Status::ExecutionStatus()
)
{ }
DataAccessGroup::~DataAccessGroup()
{ }
// -----------------------------------------------------------
// Factory functions
// -----------------------------------------------------------
DataAccessGroup DataAccessGroup::Sequential()
{
return DataAccessGroup(DataAccessGroupPolicy::Sequential());
}
DataAccessGroup DataAccessGroup::Parallel()
{
return DataAccessGroup(DataAccessGroupPolicy::Parallel());
}
// -----------------------------------------------------------
// Access statuses and state
// -----------------------------------------------------------
std::vector<DataAccessBase::Ptr> DataAccessGroup::getList()
{
Locker lock(this);
return this->data()->accesses();
}
bool DataAccessGroup::IsExecuting() const
{
return this->status()->IsExecuting();
}
bool DataAccessGroup::IsDone() const
{
Locker lock(this);
return isDone();
}
bool DataAccessGroup::IsReady() const
{
Locker lock(this);
return !isDone() && !this->data()->accesses().empty();
}
bool DataAccessGroup::IsFailure() const
{
return this->status()->IsLastExecutionFailure();
}
size_t DataAccessGroup::Size() const
{
Locker lock(this);
return this->data()->accesses().size();
}
void DataAccessGroup::Start()
{
Locker lock(this);
this->data()->CurrentFinishedExecution().clear();
this->status()->Start();
}
// -----------------------------------------------------------
// reactor functions - add dataAccess to current group execution status (similar to TCP acknowledgments, "Execution control protocol")
// -----------------------------------------------------------
bool DataAccessGroup::Next(DataAccessBase::Ptr dataAccess)
{
Locker lock(this);
IINFO() << "Time spent to complete " << dataAccess->CacheId() << ": " << dataAccess->StatusConst().executionStatus().Time().TimeSinceLastExecutionTime();
this->data()->CurrentFinishedExecution().push_back(dataAccess);
if(this->data()->CurrentFinishedExecution().size() == this->data()->accesses().size())
{
IINFO() << "Time elapsed to group finish: " << this->status()->Time().TimeSinceLastExecutionTime();
this->status()->Success();
}
return true;
}
bool DataAccessGroup::Error(BaseLib::GeneralException exception)
{
Locker lock(this);
IINFO() << "Time spent to error " << this->status()->Time().TimeSinceLastExecutionTime() << " details: " << exception;
//this->data()->CurrentFinishedExecution().push_back(dataAccess);
this->status()->Failure();
return true;
}
bool DataAccessGroup::Complete()
{
Locker lock(this);
this->status()->Success();
return true;
}
bool DataAccessGroup::isDone() const
{
return this->status()->IsSuccess() || this->data()->CurrentFinishedExecution().size() == this->data()->accesses().size();
}
}