Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask ms profiling #114

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
170 changes: 128 additions & 42 deletions daskms/table_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import logging
from threading import Lock
import weakref
import os

from functools import wraps
from time import time

from dask.base import normalize_token
import pyrap.tables as pt
Expand All @@ -15,6 +19,18 @@
_table_cache = weakref.WeakValueDictionary()
_table_lock = Lock()

# Environment variable to check for profiling enabling
# DASK_MS_PROFILE --> [True, False]
if 'DASK_MS_PROFILE' in os.environ:
dask_ms_profile = os.environ.get('DASK_MS_PROFILE')
else:
dask_ms_profile = False

# Dictionary to store runtimes
# {function_name: list(execution_time, call_count)}
# Storing execution_time and call_count is still a problem
_function_runs = {}

# CASA Table Locking Modes
NOLOCK = 0
READLOCK = 1
Expand Down Expand Up @@ -85,39 +101,64 @@ def proxied_method_factory(method, locktype):
if locktype == NOLOCK:
def _impl(table_future, args, kwargs):
try:
return getattr(table_future.result(), method)(*args, **kwargs)
_impl.calls += 1
start_time = time()
result = getattr(table_future.result(), method)(*args, **kwargs)
end_time = time()
run_time.append(end_time - start_time)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
return result
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", method)
raise
finally:
_function_runs[method] = (_impl.run_time, _impl.calls)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
_impl.calls = 0
_impl.run_time = []

elif locktype == READLOCK:
def _impl(table_future, args, kwargs):
table = table_future.result()
table.lock(write=False)

try:
return getattr(table, method)(*args, **kwargs)
_impl.calls += 1
start_time = time()
result = getattr(table, method)(*args, **kwargs)
end_time = time()
_impl.run_time.append(end_time - start_time)
return result
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", method)
raise
raise
finally:
table.unlock()
_function_runs[method] = (_impl.run_time, _impl.calls)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
_impl.calls = 0
_impl.run_time = []

elif locktype == WRITELOCK:
def _impl(table_future, args, kwargs):
table = table_future.result()
table.lock(write=True)

try:
return getattr(table, method)(*args, **kwargs)
_impl.calls += 1
start_time = time()
result = getattr(table, method)(*args, **kwargs)
start_time = time()
_impl.run_time.append(end_time - start_time)
return result
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", method)
raise
finally:
table.unlock()
_function_runs[method] = (_impl.run_time, _impl.calls)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
_impl.calls = 0
_impl.run_time = []

else:
raise ValueError("Invalid locktype %s" % locktype)
Expand Down Expand Up @@ -194,44 +235,89 @@ def taql_factory(query, style='Python', tables=(), readonly=True):
t.unlock()


def _nolock_runner(table_future, fn, args, kwargs):
try:
return fn(table_future.result(), *args, **kwargs)
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", fn.__name__)
raise


def _readlock_runner(table_future, fn, args, kwargs):
table = table_future.result()
table.lock(write=False)

try:
return fn(table, *args, **kwargs)
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", fn.__name__)
raise
finally:
table.unlock()


def _writelock_runner(table_future, fn, args, kwargs):
table = table_future.result()
table.lock(write=True)

try:
result = fn(table, *args, **kwargs)
table.flush()
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", fn.__name__)
raise
else:
return result
finally:
table.unlock()
def _nolock_runner(fn):
"""
_nolock_runner wrapper with profiling
"""
@wraps(fn)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
def wrapper(table_future, *args, **kwargs):
try:
wrapper.calls += 1
start_time = time()
result = fn(table_future.result(), *args, **kwargs)
end_time = time()
wrapper.run_time.append(end_time - start_time)
return result
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", fn.__name__)
raise
finally:
_function_runs[fn.__name__] = (wrapper.run_time, wrapper.calls)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved

wrapper.calls = 0
wrapper.run_time = []
return wrapper


def _readlock_runner(fn):
"""
_readlock_runner wrapper with profiling
"""
@wraps(fn)
def wrapper(table_future, *args, **kwargs):
table = table_future.result()
table.lock(write=False)

try:
wrapper.calls += 1
start_time = time()
result = fn(table_future.result(), *args, **kwargs)
end_time = time()
wrapper.run_time.append(end_time - start_time)
return result
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", fn.__name__)
raise
finally:
table.unlock()
_function_runs[fn.__name__] = (wrapper.run_time, wrapper.calls)

wrapper.calls = 0
wrapper.run_time = []
return wrapper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As _nolock_runner



def _writelock_runner(fn):
"""
_writelock_runner wrapper with profiling
"""
@wraps(fn)
def wrapper(table_future, *args, **kwargs):
table = table_future.result()
table.lock(write=True)

try:
wrapper.calls += 1
start_time = time()
result = fn(table_future.result(), *args, **kwargs)
end_time = time()
wrapper.append(end_time - start_time)
return result
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", fn.__name__)
raise
else:
return result
finally:
table.unlock()
_function_runs[fn.__name__] = (wrapper.run_time, wrapper.calls)

wrapper.calls = 0
wrapper.run_time = []
return wrapper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As _writelock_runner



def _iswriteable(table_future):
Expand Down
Loading