Skip to content
This repository has been archived by the owner on Sep 6, 2022. It is now read-only.

Commit

Permalink
add balance-based attribute adding (for events and periodic)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergirubio committed Apr 25, 2019
1 parent 84e11f4 commit d4c00ec
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 109 deletions.
103 changes: 72 additions & 31 deletions PyTangoArchiving/hdbpp/config.py
Expand Up @@ -98,6 +98,7 @@ def __init__(self,db_name='',host='',user='',
self.dedicated = {}
self.status = fn.defaultdict(list)
ArchivingDB.__init__(self,db_name,host,user,passwd,)
self.setLogLevel('INFO')
try:
self.get_manager()
self.get_attributes()
Expand Down Expand Up @@ -182,7 +183,7 @@ def get_archivers(self, from_db = True):

return [d for d in p if d.strip()]

@Cached(expire=60.)
@Cached(expire=5.)
def get_archiver_attributes(self, archiver, from_db=False, full=False):
"""
Obtain archiver AttributeList, either from TangoDB or a running device
Expand All @@ -197,7 +198,7 @@ def get_archiver_attributes(self, archiver, from_db=False, full=False):
attrs]
else:
try:
attrs = get_device(archiver, keep=True).AttributeList
attrs = get_device(archiver, keep=True).AttributeList or []
except:
traceback.print_exc()
attrs = []
Expand Down Expand Up @@ -239,21 +240,36 @@ def get_archiver_errors(self,archiver):
return dict((a,e) for a,e in zip(al,er) if e)

def get_archiver_load(self,archiver,use_freq=True):

if use_freq:
return fn.tango.read_attribute(archiver+'/recordfreq')
return fn.tango.read_attribute(archiver+'/attributerecordfreq')
else:
return len(self.get_archiver_attributes(archiver))
return len(self.get_archiver_attributes(archiver,from_db=False))

def get_next_archiver(self,errors=False):
loads = dict(fn.kmap(self.get_archiver_load,self.get_archivers()))
def get_next_archiver(self,errors=False,use_freq=False, attrexp=''):
"""
errors/use_freq are used to compute the archiver load
attrexp can be used to get archivers already archiving attributes
"""

loads = dict((a,self.get_archiver_load(a,use_freq=use_freq))
for a in self.get_archivers())
if errors:
# Errors count twice as load
for a,v in loads.items():
errs = self.get_archiver_errors(a)
loads[a] += 10*len(errs)

if attrexp:
attrs = [a for a in self.get_attributes(True)
if fn.clmatch(attrexp,a)]
archs = [self.get_attribute_archiver(a) for a in attrs]
if archs:
loads = dict((k,v) for k,v in loads.items() if k in archs)

loads = sorted((v,k) for k,v in loads.items())
return loads[-1][-1]

return loads[0][-1]

@Cached(depth=2,expire=60.)
def get_attributes(self,active=None):
Expand Down Expand Up @@ -457,7 +473,7 @@ def add_archiving_manager(self,srv,dev,libname=None):
self.get_manager()
return dev

def add_event_subscriber(self,srv,dev,libpath=''):
def add_event_subscriber(self,srv,dev,libname=''):
if not fn.check_device(self.manager):
raise Exception('%s not running!' % self.manager)
if '/' not in srv: srv = 'hdb++es-srv/'+srv
Expand Down Expand Up @@ -495,14 +511,16 @@ def add_event_subscriber(self,srv,dev,libpath=''):

def add_attribute(self,attribute,archiver=None,period=0,
rel_event=None,per_event=None,abs_event=None,
code_event=False, ttl=None, start=False):
code_event=False, ttl=None, start=False,
use_freq=True):
"""
set _event arguments to -1 to ignore them and not modify the database
"""
attribute = parse_tango_model(attribute,fqdn=True).fullname
archiver = archiver or self.get_next_archiver()
archiver = archiver or self.get_next_archiver(
use_freq=use_freq,attrexp=fn.tango.get_dev_name(attribute)+'/*')
self.info('add_attribute(%s, %s) to %s'
% (attribute,archiver,self.db_name))
config = get_attribute_config(attribute)
Expand All @@ -518,21 +536,23 @@ def add_attribute(self,attribute,archiver=None,period=0,
print('SetAttributeName: %s'%attribute)
d.write_attribute('SetAttributeName',attribute)
time.sleep(0.2)

if period>0:
d.write_attribute('SetPollingPeriod',period)
if per_event not in (None,-1):

if per_event not in (None,-1,0):
d.write_attribute('SetPeriodEvent',per_event)

if not any((abs_event,rel_event,code_event)):
if re.search("short|long",data_type.lower()):
abs_event = 1
elif not re.search("bool|string",data_type.lower()):
rel_event = 1e-2
if abs_event not in (None,-1):
code_event = True

if abs_event not in (None,-1,0):
print('SetAbsoluteEvent: %s'%abs_event)
d.write_attribute('SetAbsoluteEvent',abs_event)
if rel_event not in (None,-1):

if rel_event not in (None,-1,0):
d.write_attribute('SetRelativeEvent',rel_event)

if ttl not in (None,-1):
d.write_attribute('SetTTL',ttl)

Expand Down Expand Up @@ -570,17 +590,26 @@ def add_attributes(self,attributes,*args,**kwargs):
See add_attribute? for more help on arguments
"""
try:
start = kwargs.get('start',True)
for a in attributes:
kwargs['start'] = False #Avoid recursive start
self.add_attribute(a,*args,**kwargs)
time.sleep(3.)

if start:
archs = set(map(self.get_attribute_archiver,attributes))
for h in archs:
self.info('%s.Start()' % h)
fn.get_device(h, keep=True).Start()
attributes = sorted(attributes)
start = kwargs.get('start',True)
devs = fn.defaultdict(list)
[devs[fn.tango.get_dev_name(a)].append(a) for a in attributes]
for dev,attrs in devs.items():
arch = self.get_next_archiver(attrexp=dev+'/*')
for a in attrs:
kwargs['start'] = False #Avoid recursive start
try:
self.add_attribute(a,archiver=arch,*args,**kwargs)
except:
self.warning('add_attribute(%s) failed!\n%s' %
(a,traceback.format_exc()))
time.sleep(3.)

if start:
archs = set(map(self.get_attribute_archiver,attributes))
for h in archs:
self.info('%s.Start()' % h)
fn.get_device(h, keep=True).Start()

except Exception,e:
print('add_attribute(%s) failed!: %s'%(a,traceback.print_exc()))
Expand Down Expand Up @@ -630,17 +659,29 @@ def restart_attribute(self,attr, d=''):
self.start_devices('(.*/)?'+d,do_restart=True)

dp.AttributeStop(attr)
fn.wait(.1)
fn.wait(10.)
dp.AttributeStart(attr)
except:
print('%s.AttributeStart(%s) failed!'%(d,attr))

def restart_attributes(self,attributes=None):
if attributes is None:
attributes = self.get_failed_attributes()

devs = dict(fn.kmap(self.get_attribute_archiver,attributes))

for a in sorted(attributes):
self.restart_attribute(a)
for a,d in sorted(devs.items()):
if not fn.check_device(d):
self.start_devices('(.*/)?'+d,do_restart=True)
else:
dp = fn.get_device(d, keep=True)
dp.AttributeStop(a)

fn.wait(10.)

for a,d in devs.items():
dp = fn.get_device(d, keep=True)
dp.AttributeStart(a)

print('%d attributes restarted' % len(attributes))

Expand Down
122 changes: 57 additions & 65 deletions PyTangoArchiving/hdbpp/multi.py
Expand Up @@ -4,18 +4,13 @@
import fandango.db as fdb
import fandango.tango as ft
from fandango.functional import *
from fandango import Cached

import PyTangoArchiving
import PyTangoArchiving as pta
##############################################################################


def get_archivers_filters(archiver='archiving/es/*'):
filters = fn.SortedDict(sorted((k,v['AttributeFilters']) for k,v in
fn.tango.get_matching_device_properties(
archiver,'AttributeFilters').items()))
return filters

def get_schema_attributes(schema='*'):
rd = pta.Reader(schema)
alls = rd.get_attributes(active=True)
Expand Down Expand Up @@ -45,7 +40,7 @@ def is_attribute_code_pushed(device,attribute,\
return None


def get_hdbpp_databases(archivers=[],dbs={}):
def get_hdbpp_databases(active=True): #archivers=[],dbs={}):
"""
Method to obtain list of dbs/archivers; it allows to match any
archiver list against existing dbs.
Expand All @@ -57,46 +52,33 @@ def get_hdbpp_databases(archivers=[],dbs={}):
db = get_hdbpp_databases(a,dbs).keys()[0]
"""
if not dbs:
dbs = {}
print('Loading databases from Tango')
cms = ft.get_class_devices('HdbConfigurationManager')
for c in cms:
schemas = pta.Schemas.load()
hdbpp = sorted(k for k in schemas if fn.clsearch('hdbpp',str(schemas[k])))
if active:
r = []
for h in hdbpp:
try:
props = ['LibConfiguration','ArchiverList']
props = ft.get_database().get_device_property(c,props)
db = dict(t.split('=')
for t in props['LibConfiguration'])['dbname']
dbs[db] = {c:None}
for c in props['ArchiverList']:
dbs[db][c] = ft.get_device_property(c,'AttributeList')
if fn.check_device(pta.api(h).manager):
r.append(h)
except:
print('Unable to load %s config' % str(c))
traceback.print_exc()
pass
return r
else:
dbs = dbs.copy()

if archivers:
archivers = list(archivers) #Don't use toList here!
targets = []
for a in archivers:
m = fn.parse_tango_model(a,fqdn=True)
targets.extend((m.fullname, m.devicename, m.devicemodel))

print(targets)

for db,archs in dbs.items():
narchs = {}
for a in archs.keys():
if fn.inCl(a,targets):
m = fn.parse_tango_model(a,fqdn=True).fullname
narchs[m] = archs[a]
if not narchs:
dbs.pop(db)
else:
dbs[db] = narchs

return dbs
return hdbpp

def get_hdbpp_filters():
dbs = get_hdbpp_databases(active=True)
sch = pta.Schemas.load()
return dict((d,sch[d].get('filters','*')) for d in dbs)

def get_hdbpp_for_attributes(attrlist):
filters = get_hdbpp_filters()
r = fn.defaultdict(list)
for a in attrlist:
for d,f in filters.items():
if fn.clmatch(f,a,extend=True):
r[d].append(a)
return r

def merge_csv_attrs(exported = True, currents = True, check_dups = True):
"""
Expand Down Expand Up @@ -180,6 +162,27 @@ def check_attribute_in_all_dbs(attr_regexp,reader = None,
result.append((a,s,v and len(v)))
return result

## DEPRECATED
#def get_managers_filters(archiver=''):
##filters = fn.SortedDict(sorted((k,v['AttributeFilters']) for k,v in
##fn.tango.get_matching_device_properties(
##archiver,'AttributeFilters').items()))
#managers = fn.tango.get_class_devices('HdbConfigurationManager')
#filters = [fn.tango.get_device_property(m,'AttributeFilters')
#for m in managers]
#filters = zip(managers,map(fn.toList,filters))
#return filters

## DEPRECATED
#@Cached(expire=60.)
#def get_database_for_attributes(attrs):
#result = dict()
#filters = get_managers_filters()
#for a in attrs:
#for m,f in filters.items():
#pass


def get_archivers_for_attributes(attrs=[],archs='archiving/es/*'):
"""
This method returns matching archivers for a list of attributes
Expand All @@ -201,11 +204,12 @@ def get_archivers_for_attributes(attrs=[],archs='archiving/es/*'):
archattrs = {}

for i,k in enumerate(filters):
v = filters[k]
v = filters[k] # k is the archiver name
k = fn.tango.parse_tango_model(k, fqdn = True).fullname
if 'DEFAULT' in v:
df = k
else:
#filtersmart(list,regexp): returns a clsearch on the list
m = fn.filtersmart(r,v)
currattrs = set(fn.join(*[devattrs[d] for d in m]))
if len(currattrs):
Expand Down Expand Up @@ -276,25 +280,20 @@ def start_archiving_for_attributes(attrs,*args,**kwargs):
See HDBpp.add_attribute.__doc__ for a full description of arguments
"""
archs = get_archivers_for_attributes(attrs)
dbs = get_hdbpp_databases(archs.keys())
#archs = get_archivers_for_attributes(attrs)
#dbs = get_hdbpp_databases(archs.keys())
done = []

if not args and not kwargs:
kwargs['code_event'] = True

for db,devs in dbs.items():
dbs = get_hdbpp_for_attributes(attrs)

for db,attrlist in dbs.items():
print('Launching %d attributes in %s'%(len(attrlist),db))
api = PyTangoArchiving.Schemas.getApi(db)
devs = [d for d in devs if d in archs]
for d in devs:
ts = archs[d]
print('Launching %d attributes in %s.%s'%(len(ts),db,d))
for t in ts:
api.start_archiving(t,d,*args,**kwargs)
done.append(fn.tango.get_full_name(t))

if not kwargs.get('start'):
fn.get_device(d).Start()
api.add_attributes(attrlist,*args,**kwargs)
done.extend(map(fn.tango.get_full_name,attrlist))

if len(done)!=len(attrs):
print('No hdbpp database match for: %s' % str(
Expand Down Expand Up @@ -450,10 +449,3 @@ def migrate_matching_attributes(regexp,simulate=True):
for a in hdb,tdb:
pass


__all__ = [
'get_archivers_for_attributes',
'get_archivers_filters', 'get_hdbpp_databases',
'get_class_archiving',
'is_attribute_code_pushed', 'check_attribute_in_all_dbs',
]

0 comments on commit d4c00ec

Please sign in to comment.