Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

refactor examples a bit and add some stuff for threadpool parallel runs

  • Loading branch information...
commit 3e71d7180e58ad8965844361e7ec8f542c377d8f 1 parent a3c8f96
@posborne authored
View
0  anvil/examples/__init__.py
No changes.
View
47 anvil/examples/commit_histogram.py
@@ -0,0 +1,47 @@
+# Copyright (c) 2012 Paul Osborne <osbpau@gmail.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+from anvil import Anvil
+from anvil.examples.helpers import find_changesets_for_authors
+from matplotlib import pyplot as pp
+import datetime
+import times
+
+START_DT = datetime.datetime(2011, 11, 1)
+
+
+def main():
+ anvil = Anvil("spectrum")
+ anvil.create_session_by_prompting()
+
+ print "collecting related changesets"
+ changesets = find_changesets_for_authors(
+ anvil, ['Paul Osborne', ], datetime.datetime(2009, 1, 1)).values()[0]
+
+ commit_hour = [times.to_local(c.date_time, 'US/Central').hour
+ for c in changesets]
+ pp.hist(commit_hour, 24)
+ a = pp.gca()
+ a.set_xlim([0, 23])
+ pp.show()
+
+
+if __name__ == '__main__':
+ main()
View
41 examples/compute_linked_changesets.py → anvil/examples/compute_linked_changesets.py
@@ -19,52 +19,13 @@
# IN THE SOFTWARE.
from anvil import Anvil
+from anvil.examples.helpers import find_changesets_for_authors
import datetime
-import functools
-import threading
AUTHORS = ["Paul Osborne", "Tom Manley"]
START_DT = datetime.datetime(2011, 11, 1)
-def find_changesets_for_authors(anvil, authors, start_dt,
- end_dt=datetime.datetime.now()):
- """parallel search for changesets by some set of users in a date range"""
- repos = anvil.get_repos()
-
- kiln_start_date = start_dt.strftime("%m/%d/%Y")
- kiln_end_date = end_dt.strftime("%m/%d/%Y")
-
- results = {}
-
- def compute_and_store(repo, person, start_date, end_date):
- result = repo.search_changesets(
- 'edited:"{}:{}" author:"{}"'.format(kiln_start_date, kiln_end_date, person))
- results[(repo, person)] = result
-
- computations = []
- for repo in repos:
- for person in authors:
- f = functools.partial(compute_and_store,
- repo, person, "11/1/11", "4/6/12")
- computations.append(f)
-
- threads = [threading.Thread(target=computation)
- for computation in computations]
-
- for thread in threads:
- thread.start()
-
- for thread in threads:
- thread.join()
-
- changesets_by_author = {}
- for (repo, author), changesets in results.items():
- changesets_by_author.setdefault(author, []).extend(changesets)
-
- return changesets_by_author
-
-
def main():
anvil = Anvil("spectrum")
anvil.create_session_by_prompting()
View
55 anvil/examples/helpers.py
@@ -0,0 +1,55 @@
+# Copyright (c) 2012 Paul Osborne <osbpau@gmail.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"), to
+# deal in the Software without restriction, including without limitation the
+# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+# sell copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+from anvil.utils import parallel_execute
+import datetime
+import functools
+
+
+def find_changesets_for_authors(anvil, authors, start_dt,
+ end_dt=datetime.datetime.now()):
+ """parallel search for changesets by some set of users in a date range"""
+ results = {}
+
+ def compute_and_store(repo, person, start_date, end_date):
+ result = repo.search_changesets(
+ 'edited:"{}:{}" author:"{}"'.format(start_date,
+ end_date, person))
+ results[(repo, person)] = result
+
+ kiln_start_date = start_dt.strftime("%m/%d/%Y")
+ kiln_end_date = end_dt.strftime("%m/%d/%Y")
+
+ computations = []
+ repos = anvil.get_repos()
+ for repo in repos:
+ for person in authors:
+ f = functools.partial(
+ compute_and_store,
+ repo, person, kiln_start_date, kiln_end_date)
+ computations.append(f)
+
+ parallel_execute(*computations)
+
+ changesets_by_author = {}
+ for (repo, author), changesets in results.items():
+ changesets_by_author.setdefault(author, []).extend(changesets)
+
+ return changesets_by_author
View
62 anvil/utils.py
@@ -18,8 +18,10 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
+import Queue
import datetime
import functools
+import threading
def iso_8601_to_datetime(dt):
@@ -54,3 +56,63 @@ def __repr__(self):
def __get__(self, obj, objtype):
"""Support instance methods."""
return functools.partial(self.__call__, obj)
+
+
+class ThreadPoolWorker(threading.Thread):
+ """A very simmple threadpool worker (off a queue)"""
+
+ def __init__(self, queue, timeout=0.1):
+ threading.Thread.__init__(self)
+ self.queue = queue
+ self.timeout = timeout
+ self.has_exit = False
+
+ def stop(self):
+ self.has_exit = True
+
+ def run(self):
+ while True:
+ try:
+ action, args, kwargs = self.queue.get(timeout=self.timeout)
+ except Queue.Empty:
+ if self.has_exit:
+ return # only quit when no work to do
+ else:
+ action(*args, **kwargs)
+
+
+class ThreadPool(object):
+ """Dirt simple threadpool implementation"""
+
+ def __init__(self, workers=50):
+ self.workers = workers
+ self._workers = []
+ self._queue = Queue.Queue()
+
+ def start(self):
+ for _ in xrange(50):
+ worker = ThreadPoolWorker(self._queue)
+ self._workers.append(worker)
+ worker.start()
+
+ def queue_action(self, fn, *args, **kwargs):
+ self._queue.put((fn, args, kwargs))
+
+ def stop(self):
+ for worker in self._workers:
+ worker.stop()
+
+ def join(self):
+ for worker in self._workers:
+ worker.join()
+
+
+def parallel_execute(*executables, **kwargs):
+ """Execute a set of functions in parallel (on a threadpool)"""
+ workers = min(len(executables), kwargs.get('workers', 500))
+ threadpool = ThreadPool(workers=workers)
+ threadpool.start()
+ for fn in executables:
+ threadpool.queue_action(fn)
+ threadpool.stop()
+ threadpool.join()
View
65 examples/commit_histogram.py
@@ -1,65 +0,0 @@
-from anvil import Anvil
-import datetime
-import functools
-import threading
-import times
-from matplotlib import pyplot as pp
-
-START_DT = datetime.datetime(2011, 11, 1)
-
-
-def find_changesets_for_authors(anvil, authors, start_dt,
- end_dt=datetime.datetime.now()):
- """parallel search for changesets by some set of users in a date range"""
- repos = anvil.get_repos()
-
- kiln_start_date = start_dt.strftime("%m/%d/%Y")
- kiln_end_date = end_dt.strftime("%m/%d/%Y")
-
- results = {}
-
- def compute_and_store(repo, person, start_date, end_date):
- result = repo.search_changesets(
- 'edited:"{}:{}" author:"{}"'.format(kiln_start_date, kiln_end_date, person))
- results[(repo, person)] = result
-
- computations = []
- for repo in repos:
- for person in authors:
- f = functools.partial(compute_and_store,
- repo, person, "11/1/11", "4/6/12")
- computations.append(f)
-
- threads = [threading.Thread(target=computation)
- for computation in computations]
-
- for thread in threads:
- thread.start()
-
- for thread in threads:
- thread.join()
-
- changesets_by_author = {}
- for (repo, author), changesets in results.items():
- changesets_by_author.setdefault(author, []).extend(changesets)
-
- return changesets_by_author
-
-
-def main():
- anvil = Anvil("spectrum")
- anvil.create_session_by_prompting()
-
- print "collecting related changesets"
- changesets = find_changesets_for_authors(
- anvil, ['Paul Osborne', ], datetime.datetime(2009, 1, 1)).values()[0]
-
- commit_hour = [times.to_local(c.date_time, 'US/Central').hour for c in changesets]
- pp.hist(commit_hour, 24)
- a = pp.gca()
- a.set_xlim([0, 23])
- pp.show()
-
-
-if __name__ == '__main__':
- main()
Please sign in to comment.
Something went wrong with that request. Please try again.