-
Notifications
You must be signed in to change notification settings - Fork 1
/
task_queue.py
278 lines (237 loc) · 10.9 KB
/
task_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
manages queue for compute tasks
"""
from config import DAEMON_LOGGER, app, db, Task
from utils import run_shell_cmd, calculate_frame_times, get_last_frame_completed
import os
import datetime as dt
import requests
import tempfile
import uuid
import io
import zipfile
import pymysql
def push_task(params):
"""
add a task to the queue; could be benchmark or render
"""
task_id = params.get("task_id")
render_file = params.get("render_file")
start_frame = params.get("start_frame")
end_frame = params.get("end_frame")
blender_version = params.get("blender_version")
is_cpu = params.get("is_cpu")
is_cpu = 1 if is_cpu else 0
cuda_visible_devices = params.get("cuda_visible_devices")
cuda_visible_devices = "NULL" if not cuda_visible_devices else f'"{cuda_visible_devices}"'
is_zip = params.get("is_zip")
is_render = render_file is not None
DAEMON_LOGGER.debug(f"Pushing task {task_id}...")
# prevent duplicate tasks from being created in case of network delays or failures
with app.app_context():
existing_task = Task.query.filter_by(task_id=task_id).first()
if existing_task:
DAEMON_LOGGER.info(f"Task {task_id} already in queue! Exiting...")
return
# create directory for task and write render file there
task_dir = os.path.join(FILE_DIR, str(task_id))
os.makedirs(task_dir)
# create task straight away to add it to queue so we don't restart crypto miner if we have to take a few minutes to process a large render file
with app.app_context():
task = Task(task_dir=task_dir, task_id=task_id)
db.session.add(task)
db.session.commit()
if is_render:
if is_zip:
# NOTE: partially duplicated in job_queue.py and scan.py
with io.BytesIO(render_file) as archive:
archive.seek(0)
with zipfile.ZipFile(archive, mode='r') as zipf:
main_subfile = ""
for subfile in zipf.namelist():
# parse zip file and look for the main animation file to identify which software is used
# guaranteed to exist since rentaflop servers already found it
sub_extension = os.path.splitext(subfile)[1]
if sub_extension in [".blend"]:
main_subfile = subfile
break
zipf.extractall(task_dir)
render_path = os.path.join(task_dir, main_subfile)
else:
render_path = os.path.join(task_dir, "render_file.blend")
with open(render_path, "wb") as f:
f.write(render_file)
uuid_str = uuid.uuid4().hex
os.system(f"gpg --passphrase {uuid_str} --batch --no-tty -c '{render_path}' && mv '{render_path}.gpg' '{render_path}'")
task_id = int(task_id)
sql1 = f'UPDATE task SET main_file_path="{render_path}" WHERE task_id={task_id}'
sql2 = f'UPDATE task SET start_frame={start_frame} WHERE task_id={task_id}'
sql3 = f'UPDATE task SET end_frame={end_frame} WHERE task_id={task_id}'
sql4 = f'UPDATE task SET uuid_str="{uuid_str}" WHERE task_id={task_id}'
sql5 = f'UPDATE task SET blender_version="{blender_version}" WHERE task_id={task_id}'
sql6 = f'UPDATE task SET is_cpu={is_cpu} WHERE task_id={task_id}'
sql7 = f'UPDATE task SET cuda_visible_devices={cuda_visible_devices} WHERE task_id={task_id}'
# updating task with pymysql instead of flask sqlalchemy because the initial commit in this function is causing the connection to sometimes close,
# and trying to use it again here fails intermittently
connection = pymysql.connect(host="localhost", user="root", password="daemon", database="daemon")
with connection:
with connection.cursor() as cursor:
cursor.execute(sql1)
cursor.execute(sql2)
cursor.execute(sql3)
cursor.execute(sql4)
cursor.execute(sql5)
cursor.execute(sql6)
cursor.execute(sql7)
connection.commit()
DAEMON_LOGGER.debug(f"Added task {task_id}")
def _delete_task_with_id(task_id):
"""
delete task from db if it exists
return task_dir if deleted, None otherwise
"""
with app.app_context():
task = Task.query.filter_by(task_id=task_id).first()
task_dir = None
if task:
task_dir = task.task_dir
task = db.session.merge(task)
db.session.delete(task)
db.session.commit()
return task_dir
def pop_task(params):
"""
remove task from queue
does nothing if already removed from queue
"""
task_id = params["task_id"]
DAEMON_LOGGER.debug(f"Popping task {task_id}...")
with app.app_context():
task_running = Task.query.first()
is_currently_running = True if task_running and task_running.id == task_id else False
task_dir = _delete_task_with_id(task_id)
# kill task if running and clean up files
if is_currently_running:
run_shell_cmd(f'pkill -f "task_{task_id}"', very_quiet=True)
run_shell_cmd('pkill -f octane', very_quiet=True)
run_shell_cmd('pkill -f blender', very_quiet=True)
run_shell_cmd(f"rm -rf {task_dir}", very_quiet=True)
DAEMON_LOGGER.debug(f"Removed task {task_id}...")
def queue_status(params):
"""
return contents of queue
params is empty dict
"""
with app.app_context():
tasks = Task.query.all()
# must include benchmark so we can set status to gpc
task_ids = [task.task_id for task in tasks]
last_frame_completed, first_frame_time, subsequent_frames_avg = [None] * 3
try:
if tasks:
last_frame_completed = get_last_frame_completed(tasks[0].task_dir, tasks[0].start_frame)
first_frame_time, subsequent_frames_avg = calculate_frame_times(tasks[0].task_dir, tasks[0].start_frame)
except Exception as e:
DAEMON_LOGGER.exception(f"Caught exception in queue status: {e}")
# need this because connection pool not getting cleared for some reason
with app.app_context():
db.close_all_sessions()
return {"queue": task_ids, "last_frame_completed": last_frame_completed, "first_frame_time": first_frame_time, \
"subsequent_frames_avg": subsequent_frames_avg}
def _read_benchmark():
"""
parse benchmark.txt file for benchmark info
return obh value
"""
benchmark = run_shell_cmd("awk '/Total score:/{getline; print}' octane/benchmark.txt", quiet=True, format_output=False).strip()
return benchmark
def _handle_benchmark():
"""
start benchmark task or check for benchmark output
if output exists, send to rentaflop servers
return True if benchmark task finished, False otherwise
"""
# check if benchmark started and start if necessary
if not os.path.exists("octane/started.txt"):
run_shell_cmd("touch octane/started.txt", quiet=True)
DAEMON_LOGGER.debug(f"Starting benchmark...")
os.system("./octane/octane --benchmark -a octane/benchmark.txt --no-gui &")
return False
# check if benchmark still running
if not os.path.exists("octane/benchmark.txt"):
# set timeout on queued task and kill if exceeded time limit
start_time = os.path.getmtime("octane/started.txt")
start_time = dt.datetime.fromtimestamp(start_time)
# must use now instead of utcnow since getmtime is local timestamp on local filesystem timezone
current_time = dt.datetime.now()
# timeout for benchmark is less than normal tasks
timeout = dt.timedelta(minutes=20)
if timeout < (current_time-start_time):
# end benchmark task and let pop_task handle killing octane process
DAEMON_LOGGER.info("Benchmark timed out! Exiting...")
return True
return False
# benchmark job has finished running, so send output and exit container
server_url = "https://api.rentaflop.com/host/output"
benchmark = _read_benchmark()
sandbox_id = os.getenv("SANDBOX_ID")
data = {"benchmark": str(benchmark), "sandbox_id": str(sandbox_id)}
DAEMON_LOGGER.debug(f"Sending benchmark score {benchmark} to servers")
requests.post(server_url, json=data)
DAEMON_LOGGER.debug("Finished benchmark")
return True
def update_queue(params={}):
"""
checks for any finished tasks and sends results back to servers
cleans up and removes files afterwards
starts the next task, if available
"""
# get first queued task
with app.app_context():
task = Task.query.first()
if not task:
return
task_id = task.task_id
# check if task finished
if os.path.exists(os.path.join(task.task_dir, "finished.txt")):
pop_task({"task_id": task_id})
DAEMON_LOGGER.debug(f"Finished task {task_id}")
# make another call to update_queue to start the next task immediately
return update_queue()
# check if task started
if os.path.exists(os.path.join(task.task_dir, "started.txt")):
# set timeout on queued task and kill if exceeded time limit
start_time = os.path.getmtime(os.path.join(task.task_dir, "started.txt"))
start_time = dt.datetime.fromtimestamp(start_time)
# must use now instead of utcnow since getmtime is local timestamp on local filesystem timezone
current_time = dt.datetime.now()
# NOTE: if timeout updated, make sure to also update in retask_task lambda
timeout = dt.timedelta(hours=24)
if timeout < (current_time-start_time):
DAEMON_LOGGER.info(f"Task timed out! Exiting...")
pop_task({"task_id": task_id})
return update_queue()
return
# task_id will be -1 iff benchmark task
if task_id == -1:
is_finished = _handle_benchmark()
if is_finished:
pop_task({"task_id": task_id})
# delete these after removing from db so we don't start it again
run_shell_cmd('rm octane/started.txt', quiet=True)
run_shell_cmd('rm octane/benchmark.txt', quiet=True)
return update_queue()
return
# task exists in db, but now we check to see if fields are set and it's ready to be started
if task.uuid_str:
# start task in bg
DAEMON_LOGGER.debug(f"Starting task {task_id}...")
cmd = f"python3 run.py {task.task_dir} '{task.main_file_path}' {task.start_frame} {task.end_frame} {task.uuid_str} {task.blender_version}"
# task directives
cmd += f" {task.is_cpu} {task.cuda_visible_devices}"
# run in background
cmd += " &"
os.system(cmd)
# create tmp dir that's cleaned up when TEMP_DIR is destroyed
TEMP_DIR = tempfile.TemporaryDirectory()
FILE_DIR = TEMP_DIR.name