forked from uwiger/jobs
/
jobs_queue_model.erl
131 lines (112 loc) · 3.35 KB
/
jobs_queue_model.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
-module(jobs_queue_model).
-include("jobs.hrl").
-compile(export_all).
new(Options, Q) ->
case proplists:get_value(type, Options) of
fifo ->
Q#queue { type = fifo,
st = queue:new() };
lifo ->
Q#queue { type = lifo,
st = queue:new() }
end.
is_empty(#queue { st = Q}) ->
queue:is_empty(Q).
info(oldest_job, #queue { oldest_job = OJ}) ->
OJ;
info(max_time, #queue { max_time = MT}) -> MT;
info(length, #queue { st = Q}) ->
queue:len(Q).
timedout(#queue { max_time = undefined}) ->
%% This return value is highly illogical, but it is what the code returns!
[];
timedout(#queue { type = Ty,
max_time = TO, st = Queue} = Q) ->
Now = jobs_lib:timestamp(),
QL = queue:to_list(Queue),
{Left, Timedout} = lists:partition(
fun({TS, _}) ->
not(is_expired(TS, Now, TO))
end, QL),
OJ = get_oldest_job(Left),
{case Ty of
fifo -> Timedout;
lifo -> lists:reverse(Timedout)
end, Q#queue { oldest_job = OJ,
st = queue:from_list(Left)}}.
is_expired(TS, Now, TO) ->
MS = Now - TS,
MS > TO.
get_oldest_job([]) -> undefined;
get_oldest_job(L) ->
lists:min([TS || {TS, _} <- L]).
peek(#queue { type = fifo, st = Q }) ->
case queue:peek(Q) of
empty -> undefined;
{value, K} -> K
end;
peek(#queue { type = lifo, st = Q }) ->
case queue:peek_r(Q) of
empty -> undefined;
{value, K} -> K
end.
all(#queue { type = fifo, st = Q}) ->
queue:to_list(Q);
all(#queue { type = lifo, st = Q}) ->
queue:to_list(queue:reverse(Q)).
in(TS, E, #queue { st = Q,
oldest_job = OJ } = S) ->
S#queue { st = queue:in({TS, E}, Q),
oldest_job = case OJ of undefined -> TS;
_ -> OJ
end}.
out(N, #queue { type = Ty, st = Q} = S) ->
{Elems, NQ} = out(Ty, N, Q, []),
{Elems, S#queue { st = NQ,
oldest_job = set_oldest_job(Ty, NQ) }}.
set_oldest_job(fifo, Q) ->
case queue:out(Q) of
{{value, {TS, _}}, _} ->
TS;
{empty, _} ->
undefined
end;
set_oldest_job(lifo, Q) ->
case queue:out(Q) of
{{value, {TS, _}}, _} ->
TS;
{empty, _} ->
undefined
end.
out(fifo, 0, Q, Taken) ->
{lists:reverse(Taken), Q};
out(lifo, 0, Q, Taken) ->
{Taken, Q};
out(fifo, K, Q, Acc) when K > 0 ->
case queue:out(Q) of
{{value, E}, NQ} ->
out(fifo, K-1, NQ, [E | Acc]);
{empty, NQ} ->
out(fifo, 0, NQ, Acc)
end;
out(lifo, K, Q, Acc) ->
case queue:out_r(Q) of
{{value, E}, NQ} ->
out(lifo, K-1, NQ, [E | Acc]);
{empty, NQ} ->
out(lifo, 0, NQ, Acc)
end.
empty(#queue {} = Q) ->
Q#queue { st = queue:new(),
oldest_job = undefined }.
representation(#queue { type = fifo, st = Q, oldest_job = OJ} ) ->
Cts = queue:to_list(Q),
[{oldest_job, OJ},
{contents, Cts}];
representation(#queue { type = lifo, st = Q, oldest_job = OJ} ) ->
Cts = queue:to_list(queue:reverse(Q)),
[{oldest_job, OJ},
{contents, Cts}];
representation(O) ->
io:format("Otherwise: ~p", [O]),
exit(fail).