Skip to content

Commit

Permalink
Merge pull request #19 from vixen-project/fix-processor-ui
Browse files Browse the repository at this point in the history
Fix processor ui
  • Loading branch information
prabhuramachandran committed Apr 29, 2018
2 parents f4a1f5c + da8d914 commit 6b93306
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 20 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
traits
jigna
tornado
tornado<5.0
mock
pytest
json_tricks>=3.0
Expand Down
2 changes: 1 addition & 1 deletion vixen/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def setup_logger():
logdir = get_project_dir()
fname = os.path.join(logdir, 'vixen.log')
handler = RotatingFileHandler(
filename=fname, maxBytes=2**17, backupCount=3
filename=fname, maxBytes=2**18, backupCount=3
)
formatter = logging.Formatter(
'%(levelname)s|%(asctime)s|%(name)s|%(message)s'
Expand Down
5 changes: 5 additions & 0 deletions vixen/html/css/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ p.loading {
box-shadow:inset 0 0 10px rgba(0,0,0,0.2);
}

.view-processor {
border: solid;
border-width: 1px;
}

.processed {
color: green;
}
Expand Down
33 changes: 24 additions & 9 deletions vixen/html/vixen_ui.html
Original file line number Diff line number Diff line change
Expand Up @@ -491,16 +491,30 @@ <h3 style="margin: 5px;"> View Project: {{viewer.name}}</h3>

<!-- view-processor template -->
<script type="text/x-template" id="view-processor-template">
<label> Processor status: </label> {{processor.status}}
<br/>
<div class="view-processor">
<b> Processor options </b>
<br/>
<div v-if="processor.status === 'running'">
Completed {{processor.completed.length}} of
{{processor.jobs.length}} jobs
<ul>
<li v-for="job in processor.running">{{job.info}}</li>
</ul>
<button v-on:click="threaded(ui, 'process', project)"
v-bind:disabled="processor.status === 'running'" id="run-processing">
Run processing</button>
{{processor.jobs.length}} jobs (see log for details)

<button v-if="processor.interrupt === 'pause'"
v-on:click="processor.resume()"
id="resume-processing">
Resume</button>
<button v-else
v-on:click="processor.pause()"
id="pause-processing">
Pause</button>
<button v-on:click="processor.stop()"
id="stop-processing">
Stop</button>
</div>
<button v-bind:disabled="processor.status === 'running'"
v-on:click="threaded(ui, 'process', project)"
id="run-processing">
Run processing</button>

<div v-if="processor.status === 'error'">
<label>Errors</label>
<div v-for="job in processor.errored_jobs"
Expand All @@ -509,6 +523,7 @@ <h3 style="margin: 5px;"> View Project: {{viewer.name}}</h3>
</div>
</div>
<br/>
</div>
</script>


Expand Down
39 changes: 34 additions & 5 deletions vixen/processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import multiprocessing
import os
import shlex
Expand All @@ -11,6 +12,9 @@
Int, List, Str)


logger = logging.getLogger(__name__)


class Job(HasTraits):
func = Callable

Expand Down Expand Up @@ -39,6 +43,7 @@ def reset(self):

def _run(self):
self.status = 'running'
logger.info("Running: %s", self.info)
try:
self.result = self.func(*self.args, **self.kw)
self.status = 'success'
Expand All @@ -47,6 +52,7 @@ def _run(self):
self.error = 'OUTPUT: %s\n' % e.output
self.error += format_exc()
self.status = 'error'
logger.info(self.error)

def _thread_default(self):
t = Thread(target=self._run)
Expand All @@ -69,8 +75,11 @@ class Processor(HasTraits):

status = Enum('none', 'running', 'success', 'error')

interrupt = Enum('', 'pause', 'stop')

def process(self):
self.running = []
self.interrupt = ''
running = self.running
error = None
self.status = 'running'
Expand All @@ -89,14 +98,20 @@ def process(self):
running.remove(j)
time.sleep(0.01)

if error is None:
running.append(job)
job.run()
else:
while self.interrupt == 'pause':
time.sleep(0.5)

if error is not None:
self.status = 'error'
self.errored_jobs.append(error)
break

if self.interrupt == 'stop':
break
else:
running.append(job)
job.run()

# Wait for all remaining jobs to complete.
for job in running:
job.thread.join()
Expand All @@ -110,6 +125,18 @@ def process(self):
if self.status != 'error':
self.status = 'success'

def stop(self):
if self.status == 'running':
self.interrupt = 'stop'

def pause(self):
if self.status == 'running':
self.interrupt = 'pause'

def resume(self):
if self.status == 'running' and self.interrupt == 'pause':
self.interrupt = ''

def _reset_errored_jobs(self):
for job in self.errored_jobs:
job.reset()
Expand Down Expand Up @@ -289,6 +316,7 @@ def _run(self, command, media):
stdout, stderr = p.communicate()
if p.returncode == 0:
tag_types = self._tag_types
updates = {}
for line in stdout.splitlines():
line = line.decode('utf-8')
if len(line) == 0:
Expand All @@ -299,8 +327,9 @@ def _run(self, command, media):
pass
else:
if tag in tag_types:
media.tags[tag] = tag_types[tag](value)
updates[tag] = tag_types[tag](value)

media.tags.update(updates)
self._done[media.path] = True

def _setup_tag_types(self, project):
Expand Down
6 changes: 6 additions & 0 deletions vixen/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,17 @@ def update_tags(self, new_tags):
def copy(self):
"""Make a copy of this project. This does not copy the data but only
the tags, extensions and the other settings of the project.
This will not copy any of the processor states but only their settings.
"""
name = self.name + ' copy'
p = Project(name=name)
traits = ['description', 'extensions', 'path', 'processors', 'tags']
p.copy_traits(self, traits, copy='deep')
# Clear out the _done information from the processors
for proc in p.processors:
proc._done.clear()
return p

# #### CRUD interface to the data ####
Expand Down
74 changes: 74 additions & 0 deletions vixen/tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import shutil
import tempfile
from threading import Thread
import time
import unittest

from vixen.processor import Processor, Job, CommandFactory, \
Expand Down Expand Up @@ -67,6 +69,78 @@ def test_processor_completes_jobs(self):
j.func.assert_called_once_with(i)
self.assertEqual(j.result, i)

def test_processor_pauses_correctly(self):
# Given
def _sleep(x):
time.sleep(0.01)
return x

jobs = [Job(func=mock.Mock(side_effect=_sleep), args=[x])
for x in range(10)]
p = Processor(jobs=jobs)
self.addCleanup(p.stop)
self.assertEqual(p.status, 'none')

# When
t = Thread(target=p.process)
t.start()

# Sleep for a tiny bit and pause
time.sleep(0.05)
p.pause()
time.sleep(0.01)

# Then
self.assertEqual(p.status, 'running')
self.assertTrue(len(p.completed) < 10)

# When
p.resume()
count = 0
while p.status == 'running' and count < 10:
time.sleep(0.5)
count += 1

self.assertEqual(len(p.completed), 10)
self.assertEqual(len(p.running), 0)
for i, j in enumerate(jobs):
self.assertEqual(j.status, 'success')
j.func.assert_called_once_with(i)
self.assertEqual(j.result, i)

def test_processor_stops_correctly(self):
# Given
def _sleep(x):
time.sleep(0.01)
return x

jobs = [Job(func=mock.Mock(side_effect=_sleep), args=[x])
for x in range(5)]
p = Processor(jobs=jobs)
self.addCleanup(p.stop)
self.assertEqual(p.status, 'none')

# When
t = Thread(target=p.process)
t.start()

# Sleep for a tiny bit and pause
time.sleep(0.05)
p.stop()
count = 0
while p.status == 'running' and count < 10:
time.sleep(0.05)
count += 1

# Then
self.assertEqual(p.status, 'success')
self.assertTrue(len(p.completed) < 5)
self.assertEqual(len(p.running), 0)
for i, j in enumerate(p.completed):
self.assertEqual(j.status, 'success')
j.func.assert_called_once_with(i)
self.assertEqual(j.result, i)

def test_processor_bails_on_error(self):
# Given
f = mock.Mock()
Expand Down
12 changes: 10 additions & 2 deletions vixen/tests/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def test_project_copy_does_not_copy_data(self):
command='echo $input $output')
p.processors = [cf]
p.scan()
# Update the _done trait the processors to check if it is copied.
m = p.get('root.txt')
cf._done[m.path] = True

# When
p1 = p.copy()
Expand All @@ -118,8 +121,13 @@ def test_project_copy_does_not_copy_data(self):
self.assertEqual(p1.extensions, p.extensions)
self.assertEqual(len(p1._relpath2index), 0)
self.assertEqual(len(p1.processors), len(p.processors))
self.assertEqual(p1.processors[0].trait_get(),
p.processors[0].trait_get())
p1_proc_traits = p1.processors[0].trait_get()
p1_proc_traits.pop('_done')
p_proc_traits = p.processors[0].trait_get()
p_proc_traits.pop('_done')
self.assertEqual(p1_proc_traits, p_proc_traits)
self.assertEqual(len(p.processors[0]._done), 1)
self.assertEqual(len(p1.processors[0]._done), 0)

# When
p.tags[0].type = 'int'
Expand Down
16 changes: 14 additions & 2 deletions vixen/vixen.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,20 @@ def process(self, project):
to_process = project.keys()
jobs.extend(proc.make_jobs(to_process, project))
self.processor.jobs = jobs
self.processor.process()
self.info("Remember to save the project once processing completes.")
njobs = len(jobs)
if njobs > 0:
logger.info(
'Processing project: %s having %d jobs', project.name, njobs
)
self.processor.process()
self.info(
"Processing complete. Save the project to persist changes."
)
else:
self.info(
'Nothing to process for project: %s.\n'
'Processing already completed.' % project.name
)

def remove(self, project):
name = project.name
Expand Down

0 comments on commit 6b93306

Please sign in to comment.