diff --git a/pyaml/arrays/bpm_array.py b/pyaml/arrays/bpm_array.py index 508b475c..70123f22 100644 --- a/pyaml/arrays/bpm_array.py +++ b/pyaml/arrays/bpm_array.py @@ -1,7 +1,7 @@ from ..common.abstract import ReadFloatArray from ..bpm.bpm import BPM from ..control.deviceaccesslist import DeviceAccessList -from .element_array import get_peer_from_array +from .element_array import ElementArray import numpy as np @@ -53,7 +53,7 @@ def set_aggregator(self,agg:DeviceAccessList): -class BPMArray(list[BPM]): +class BPMArray(ElementArray): """ Class that implements access to a BPM array """ @@ -72,23 +72,18 @@ def __init__(self,arrayName:str,bpms:list[BPM],use_aggregator = True): use_aggregator : bool Use aggregator to increase performance by using paralell access to underlying devices. """ - super().__init__(i for i in bpms) - self.__name = arrayName - holder = get_peer_from_array(self) + super().__init__(arrayName,bpms,use_aggregator) self.__hvpos = RWBPMPosition(arrayName,bpms) self.__hpos = RWBPMSinglePosition(arrayName,bpms,0) self.__vpos = RWBPMSinglePosition(arrayName,bpms,1) if use_aggregator: - aggs = holder.create_bpm_aggregators(bpms) + aggs = self.get_peer().create_bpm_aggregators(bpms) self.__hvpos.set_aggregator(aggs[0]) self.__hpos.set_aggregator(aggs[1]) self.__vpos.set_aggregator(aggs[2]) - def get_name(self) -> str: - return self.__name - @property def positions(self) -> RWBPMPosition: """ diff --git a/pyaml/arrays/cfm_magnet_array.py b/pyaml/arrays/cfm_magnet_array.py index b43eab42..30b1cd93 100644 --- a/pyaml/arrays/cfm_magnet_array.py +++ b/pyaml/arrays/cfm_magnet_array.py @@ -1,6 +1,7 @@ from ..common.abstract import ReadWriteFloatArray from ..magnet.cfm_magnet import CombinedFunctionMagnet -from .element_array import get_peer_from_array +from .element_array import ElementArray +from ..common.exception import PyAMLException import numpy as np #TODO handle aggregator for CFM @@ -76,7 +77,7 @@ def unit(self) -> list[str]: return r -class CombinedFunctionMagnetArray(list[CombinedFunctionMagnet]): +class CombinedFunctionMagnetArray(ElementArray): """ Class that implements access to a magnet array """ @@ -95,18 +96,13 @@ def __init__(self,arrayName:str,magnets:list[CombinedFunctionMagnet],use_aggrega use_aggregator : bool Use aggregator to increase performance by using paralell access to underlying devices. """ - super().__init__(i for i in magnets) - self.__name = arrayName - holder = get_peer_from_array(self) + super().__init__(arrayName,magnets,use_aggregator) self.__rwstrengths = RWMagnetStrengths(arrayName,magnets) self.__rwhardwares = RWMagnetHardwares(arrayName,magnets) if use_aggregator: - raise("Aggregator not implemented for CombinedFunctionMagnetArray") - - def get_name(self) -> str: - return self.__name + raise(PyAMLException("Aggregator not implemented for CombinedFunctionMagnetArray")) @property def strengths(self) -> RWMagnetStrengths: diff --git a/pyaml/arrays/element_array.py b/pyaml/arrays/element_array.py index baf9f3f5..d05cf5be 100644 --- a/pyaml/arrays/element_array.py +++ b/pyaml/arrays/element_array.py @@ -1,21 +1,18 @@ from ..common.element import Element +from ..magnet.magnet import Magnet +from ..bpm.bpm import BPM +from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..common.exception import PyAMLException -def get_peer_from_array(array): - """ - Returns the peer (Simulator or ControlSystem) of an element list - """ - peer = array[0]._peer if len(array)>0 else None - if peer is None or any([m._peer!=peer for m in array]): - raise PyAMLException(f"{array.__class__.__name__} {array.get_name()}: All elements must be attached to the same instance of either a Simulator or a ControlSystem") - return peer +import importlib +import fnmatch class ElementArray(list[Element]): """ - Class that implements access to a magnet array + Class that implements access to a element array """ - def __init__(self,arrayName:str,elements:list[Element]): + def __init__(self,arrayName:str,elements:list[Element],use_aggregator = True): """ Construct an element array @@ -26,20 +23,100 @@ def __init__(self,arrayName:str,elements:list[Element]): elements: list[Element] Element list, all elements must be attached to the same instance of either a Simulator or a ControlSystem. + use_aggregator : bool + Use aggregator to increase performance by using paralell access to underlying devices. """ super().__init__(i for i in elements) self.__name = arrayName - holder = get_peer_from_array(self) - self.__name = arrayName + self.__peer = self[0]._peer if len(self)>0 else None + self.__use_aggretator = use_aggregator + if self.__peer is None or any([m._peer!=self.__peer for m in self]): + raise PyAMLException(f"{self.__class__.__name__} {self.get_name()}: All elements must be attached to the same instance of either a Simulator or a ControlSystem") + + def get_peer(self): + """ + Returns the peer (Simulator or ControlSystem) of an element list + """ + return self.__peer def get_name(self) -> str: return self.__name def names(self) -> list[str]: return [e.get_name() for e in self] + + def __create_array(self,arrName:str,eltType:type,elements:list): + + if len(elements)==0: + return [] + + if issubclass(eltType,Magnet): + m = importlib.import_module("pyaml.arrays.magnet_array") + arrayClass = getattr(m, "MagnetArray", None) + return arrayClass("",elements,self.__use_aggretator) + elif issubclass(eltType,BPM): + m = importlib.import_module("pyaml.arrays.bpm_array") + arrayClass = getattr(m, "BPMArray", None) + return arrayClass("",elements,self.__use_aggretator) + elif issubclass(eltType,CombinedFunctionMagnet): + m = importlib.import_module("pyaml.arrays.cfm_magnet_array") + arrayClass = getattr(m, "CombinedFunctionMagnetArray", None) + return arrayClass("",elements,self.__use_aggretator) + elif issubclass(eltType,Element): + return ElementArray("",elements,self.__use_aggretator) + else: + raise PyAMLException(f"Unsupported sliced array for type {str(eltType)}") + + def __eval_field(self,attName:str,e:Element) -> str: + funcName = "get_" + attName + func = getattr(e,funcName, None) + return func() if func is not None else "" + + def __getitem__(self,key): + + if isinstance(key,slice): + + # Slicing + eltType = None + r = [] + for i in range(*key.indices(len(self))): + if eltType is None: + eltType = type(self[i]) + elif not isinstance(self[i],eltType): + eltType = Element # Fall back to element + r.append(self[i]) + return self.__create_array("",eltType,r) + + elif isinstance(key,str): + fields = key.split(':') - + if len(fields)<=1: + # Selection by name + eltType = None + r = [] + for e in self: + if fnmatch.fnmatch(e.get_name(), key): + if eltType is None: + eltType = type(e) + elif not isinstance(e,eltType): + eltType = Element # Fall back to element + r.append(e) + else: + # Selection by fields + eltType = None + r = [] + for e in self: + txt = self.__eval_field(fields[0],e) + if fnmatch.fnmatch(txt , fields[1]): + if eltType is None: + eltType = type(e) + elif not isinstance(e,eltType): + eltType = Element # Fall back to element + r.append(e) + return self.__create_array("",eltType,r) - \ No newline at end of file + else: + # Default to super selection + return super().__getitem__(key) diff --git a/pyaml/arrays/magnet_array.py b/pyaml/arrays/magnet_array.py index 288870b1..a67519e5 100644 --- a/pyaml/arrays/magnet_array.py +++ b/pyaml/arrays/magnet_array.py @@ -1,7 +1,7 @@ from ..common.abstract import ReadWriteFloatArray from ..magnet.magnet import Magnet from ..common.abstract_aggregator import ScalarAggregator -from .element_array import get_peer_from_array +from .element_array import ElementArray import numpy as np class RWMagnetStrength(ReadWriteFloatArray): @@ -76,7 +76,7 @@ def unit(self) -> list[str]: def set_aggregator(self,agg:ScalarAggregator): self.__aggregator = agg -class MagnetArray(list[Magnet]): +class MagnetArray(ElementArray): """ Class that implements access to a magnet array """ @@ -95,22 +95,17 @@ def __init__(self,arrayName:str,magnets:list[Magnet],use_aggregator = True): use_aggregator : bool Use aggregator to increase performance by using paralell access to underlying devices. """ - super().__init__(i for i in magnets) - self.__name = arrayName - holder = get_peer_from_array(self) + super().__init__(arrayName,magnets,use_aggregator) self.__rwstrengths = RWMagnetStrength(arrayName,magnets) self.__rwhardwares = RWMagnetHardware(arrayName,magnets) if use_aggregator: - aggs = holder.create_magnet_strength_aggregator(magnets) - aggh = holder.create_magnet_harddware_aggregator(magnets) + aggs = self.get_peer().create_magnet_strength_aggregator(magnets) + aggh = self.get_peer().create_magnet_harddware_aggregator(magnets) self.__rwstrengths.set_aggregator(aggs) self.__rwhardwares.set_aggregator(aggh) - def get_name(self) -> str: - return self.__name - @property def strengths(self) -> RWMagnetStrength: """ diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 81b4eaba..0287251e 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -107,7 +107,7 @@ def get_cfm_magnets(self,name:str) -> CombinedFunctionMagnetArray: return self.__get("CombinedFunctionMagnet array",name,self.__CFM_MAGNET_ARRAYS) def get_all_cfm_magnets(self) -> list[CombinedFunctionMagnet]: - return [value for key, value in self.__CFM_MAGNET_ARRAYS.items()] + return [value for key, value in self.__CFM_MAGNETS.items()] # BPMs diff --git a/pyaml/magnet/cfm_magnet.py b/pyaml/magnet/cfm_magnet.py index 393cd617..362f776a 100644 --- a/pyaml/magnet/cfm_magnet.py +++ b/pyaml/magnet/cfm_magnet.py @@ -77,6 +77,12 @@ def __init__(self, cfg: ConfigModel, peer = None): # Attach self._peer = peer + def get_model_name(self) -> str: + """ + Returns the model name of this magnet + """ + return self._cfg.name + def __create_virutal_manget(self,name:str,idx:int) -> Magnet: args = {"name":name,"model":self.model} mVirtual:Magnet = _fmap[idx](MagnetConfigModel(**args)) diff --git a/pyaml/magnet/magnet.py b/pyaml/magnet/magnet.py index 2554dd81..63fef983 100644 --- a/pyaml/magnet/magnet.py +++ b/pyaml/magnet/magnet.py @@ -88,8 +88,14 @@ def set_model_name(self, name:str): """ self.__modelName = name + def get_model_name(self) -> str: + """ + Returns the model name of this magnet + """ + return self.__modelName + def __repr__(self): - return "%s(peer='%s', name='%s', model='%s', magnet_model=%s)" % ( + return "%s(peer='%s', name='%s', model_name='%s', magnet_model=%s)" % ( self.__class__.__name__, self.get_peer(), self.get_name(), diff --git a/tests/test_arrays.py b/tests/test_arrays.py index 58a860f5..59ff2f16 100644 --- a/tests/test_arrays.py +++ b/tests/test_arrays.py @@ -3,8 +3,8 @@ from pyaml.instrument import Instrument from pyaml.arrays.element_array import ElementArray from pyaml.arrays.magnet_array import MagnetArray +from pyaml.arrays.cfm_magnet_array import CombinedFunctionMagnetArray from pyaml.arrays.bpm_array import BPMArray -from pyaml.arrays.cfm_magnet_array import CombinedFunctionMagnet import importlib import numpy as np @@ -174,4 +174,24 @@ def test_arrays(install_test_package): Factory.clear() + # Test dynamic arrays + + ml:PyAML = pyaml("tests/config/EBSOrbit.yaml") + sr:Instrument = ml.get('sr') + ae = ElementArray("All",sr.design.get_all_elements()) + acfm = ElementArray("AllCFM",sr.design.get_all_cfm_magnets(),use_aggregator=False) + + bpmC5 = ae['BPM*'][10:20] # All BPM C5 + assert(isinstance(bpmC5,BPMArray) and len(bpmC5)==10) + bpmc10 = ae['BPM*C10*'] # All BPM C10 + assert(isinstance(bpmc10,BPMArray) and len(bpmc10)==10) + magSHV = ae['SH*-V'] # All SH vertical corrector + assert(isinstance(magSHV,MagnetArray) and len(magSHV)==96) + magSH1A = ae['model_name:SH1A-*'] # All SH1A magnets (including the CFMs) + assert(isinstance(magSH1A,ElementArray) and len(magSH1A)==129) + magSH1AC = acfm['model_name:SH1A-*'] # All SH1A magnets (CFMs only) + assert(isinstance(magSH1AC,ElementArray) and len(magSH1AC)==32) + + Factory.clear() +