Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions pyaml/arrays/bpm_array.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ..common.abstract import ReadFloatArray
from ..bpm.bpm import BPM
from ..control.deviceaccesslist import DeviceAccessList
from .element_array import get_peer_from_array
from .element_array import ElementArray

import numpy as np

Expand Down Expand Up @@ -53,7 +53,7 @@ def set_aggregator(self,agg:DeviceAccessList):



class BPMArray(list[BPM]):
class BPMArray(ElementArray):
"""
Class that implements access to a BPM array
"""
Expand All @@ -72,23 +72,18 @@ def __init__(self,arrayName:str,bpms:list[BPM],use_aggregator = True):
use_aggregator : bool
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in bpms)
self.__name = arrayName
holder = get_peer_from_array(self)
super().__init__(arrayName,bpms,use_aggregator)

self.__hvpos = RWBPMPosition(arrayName,bpms)
self.__hpos = RWBPMSinglePosition(arrayName,bpms,0)
self.__vpos = RWBPMSinglePosition(arrayName,bpms,1)

if use_aggregator:
aggs = holder.create_bpm_aggregators(bpms)
aggs = self.get_peer().create_bpm_aggregators(bpms)
self.__hvpos.set_aggregator(aggs[0])
self.__hpos.set_aggregator(aggs[1])
self.__vpos.set_aggregator(aggs[2])

def get_name(self) -> str:
return self.__name

@property
def positions(self) -> RWBPMPosition:
"""
Expand Down
14 changes: 5 additions & 9 deletions pyaml/arrays/cfm_magnet_array.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ..common.abstract import ReadWriteFloatArray
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from .element_array import get_peer_from_array
from .element_array import ElementArray
from ..common.exception import PyAMLException
import numpy as np

#TODO handle aggregator for CFM
Expand Down Expand Up @@ -76,7 +77,7 @@ def unit(self) -> list[str]:
return r


class CombinedFunctionMagnetArray(list[CombinedFunctionMagnet]):
class CombinedFunctionMagnetArray(ElementArray):
"""
Class that implements access to a magnet array
"""
Expand All @@ -95,18 +96,13 @@ def __init__(self,arrayName:str,magnets:list[CombinedFunctionMagnet],use_aggrega
use_aggregator : bool
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in magnets)
self.__name = arrayName
holder = get_peer_from_array(self)
super().__init__(arrayName,magnets,use_aggregator)

self.__rwstrengths = RWMagnetStrengths(arrayName,magnets)
self.__rwhardwares = RWMagnetHardwares(arrayName,magnets)

if use_aggregator:
raise("Aggregator not implemented for CombinedFunctionMagnetArray")

def get_name(self) -> str:
return self.__name
raise(PyAMLException("Aggregator not implemented for CombinedFunctionMagnetArray"))

@property
def strengths(self) -> RWMagnetStrengths:
Expand Down
105 changes: 91 additions & 14 deletions pyaml/arrays/element_array.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
from ..common.element import Element
from ..magnet.magnet import Magnet
from ..bpm.bpm import BPM
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from ..common.exception import PyAMLException

def get_peer_from_array(array):
"""
Returns the peer (Simulator or ControlSystem) of an element list
"""
peer = array[0]._peer if len(array)>0 else None
if peer is None or any([m._peer!=peer for m in array]):
raise PyAMLException(f"{array.__class__.__name__} {array.get_name()}: All elements must be attached to the same instance of either a Simulator or a ControlSystem")
return peer
import importlib
import fnmatch

class ElementArray(list[Element]):
"""
Class that implements access to a magnet array
Class that implements access to a element array
"""

def __init__(self,arrayName:str,elements:list[Element]):
def __init__(self,arrayName:str,elements:list[Element],use_aggregator = True):
"""
Construct an element array

Expand All @@ -26,20 +23,100 @@ def __init__(self,arrayName:str,elements:list[Element]):
elements: list[Element]
Element list, all elements must be attached to the same instance of
either a Simulator or a ControlSystem.
use_aggregator : bool
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in elements)
self.__name = arrayName
holder = get_peer_from_array(self)
self.__name = arrayName
self.__peer = self[0]._peer if len(self)>0 else None
self.__use_aggretator = use_aggregator
if self.__peer is None or any([m._peer!=self.__peer for m in self]):
raise PyAMLException(f"{self.__class__.__name__} {self.get_name()}: All elements must be attached to the same instance of either a Simulator or a ControlSystem")

def get_peer(self):
"""
Returns the peer (Simulator or ControlSystem) of an element list
"""
return self.__peer

def get_name(self) -> str:
return self.__name

def names(self) -> list[str]:
return [e.get_name() for e in self]

def __create_array(self,arrName:str,eltType:type,elements:list):

if len(elements)==0:
return []

if issubclass(eltType,Magnet):
m = importlib.import_module("pyaml.arrays.magnet_array")
arrayClass = getattr(m, "MagnetArray", None)
return arrayClass("",elements,self.__use_aggretator)
elif issubclass(eltType,BPM):
m = importlib.import_module("pyaml.arrays.bpm_array")
arrayClass = getattr(m, "BPMArray", None)
return arrayClass("",elements,self.__use_aggretator)
elif issubclass(eltType,CombinedFunctionMagnet):
m = importlib.import_module("pyaml.arrays.cfm_magnet_array")
arrayClass = getattr(m, "CombinedFunctionMagnetArray", None)
return arrayClass("",elements,self.__use_aggretator)
elif issubclass(eltType,Element):
return ElementArray("",elements,self.__use_aggretator)
else:
raise PyAMLException(f"Unsupported sliced array for type {str(eltType)}")

def __eval_field(self,attName:str,e:Element) -> str:
funcName = "get_" + attName
func = getattr(e,funcName, None)
return func() if func is not None else ""

def __getitem__(self,key):

if isinstance(key,slice):

# Slicing
eltType = None
r = []
for i in range(*key.indices(len(self))):
if eltType is None:
eltType = type(self[i])
elif not isinstance(self[i],eltType):
eltType = Element # Fall back to element
r.append(self[i])
return self.__create_array("",eltType,r)

elif isinstance(key,str):

fields = key.split(':')


if len(fields)<=1:
# Selection by name
eltType = None
r = []
for e in self:
if fnmatch.fnmatch(e.get_name(), key):
if eltType is None:
eltType = type(e)
elif not isinstance(e,eltType):
eltType = Element # Fall back to element
r.append(e)
else:
# Selection by fields
eltType = None
r = []
for e in self:
txt = self.__eval_field(fields[0],e)
if fnmatch.fnmatch(txt , fields[1]):
if eltType is None:
eltType = type(e)
elif not isinstance(e,eltType):
eltType = Element # Fall back to element
r.append(e)

return self.__create_array("",eltType,r)


else:
# Default to super selection
return super().__getitem__(key)
15 changes: 5 additions & 10 deletions pyaml/arrays/magnet_array.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ..common.abstract import ReadWriteFloatArray
from ..magnet.magnet import Magnet
from ..common.abstract_aggregator import ScalarAggregator
from .element_array import get_peer_from_array
from .element_array import ElementArray
import numpy as np

class RWMagnetStrength(ReadWriteFloatArray):
Expand Down Expand Up @@ -76,7 +76,7 @@ def unit(self) -> list[str]:
def set_aggregator(self,agg:ScalarAggregator):
self.__aggregator = agg

class MagnetArray(list[Magnet]):
class MagnetArray(ElementArray):
"""
Class that implements access to a magnet array
"""
Expand All @@ -95,22 +95,17 @@ def __init__(self,arrayName:str,magnets:list[Magnet],use_aggregator = True):
use_aggregator : bool
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in magnets)
self.__name = arrayName
holder = get_peer_from_array(self)
super().__init__(arrayName,magnets,use_aggregator)

self.__rwstrengths = RWMagnetStrength(arrayName,magnets)
self.__rwhardwares = RWMagnetHardware(arrayName,magnets)

if use_aggregator:
aggs = holder.create_magnet_strength_aggregator(magnets)
aggh = holder.create_magnet_harddware_aggregator(magnets)
aggs = self.get_peer().create_magnet_strength_aggregator(magnets)
aggh = self.get_peer().create_magnet_harddware_aggregator(magnets)
self.__rwstrengths.set_aggregator(aggs)
self.__rwhardwares.set_aggregator(aggh)

def get_name(self) -> str:
return self.__name

@property
def strengths(self) -> RWMagnetStrength:
"""
Expand Down
2 changes: 1 addition & 1 deletion pyaml/common/element_holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def get_cfm_magnets(self,name:str) -> CombinedFunctionMagnetArray:
return self.__get("CombinedFunctionMagnet array",name,self.__CFM_MAGNET_ARRAYS)

def get_all_cfm_magnets(self) -> list[CombinedFunctionMagnet]:
return [value for key, value in self.__CFM_MAGNET_ARRAYS.items()]
return [value for key, value in self.__CFM_MAGNETS.items()]

# BPMs

Expand Down
6 changes: 6 additions & 0 deletions pyaml/magnet/cfm_magnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ def __init__(self, cfg: ConfigModel, peer = None):
# Attach
self._peer = peer

def get_model_name(self) -> str:
"""
Returns the model name of this magnet
"""
return self._cfg.name

def __create_virutal_manget(self,name:str,idx:int) -> Magnet:
args = {"name":name,"model":self.model}
mVirtual:Magnet = _fmap[idx](MagnetConfigModel(**args))
Expand Down
8 changes: 7 additions & 1 deletion pyaml/magnet/magnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,14 @@ def set_model_name(self, name:str):
"""
self.__modelName = name

def get_model_name(self) -> str:
"""
Returns the model name of this magnet
"""
return self.__modelName

def __repr__(self):
return "%s(peer='%s', name='%s', model='%s', magnet_model=%s)" % (
return "%s(peer='%s', name='%s', model_name='%s', magnet_model=%s)" % (
self.__class__.__name__,
self.get_peer(),
self.get_name(),
Expand Down
22 changes: 21 additions & 1 deletion tests/test_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from pyaml.instrument import Instrument
from pyaml.arrays.element_array import ElementArray
from pyaml.arrays.magnet_array import MagnetArray
from pyaml.arrays.cfm_magnet_array import CombinedFunctionMagnetArray
from pyaml.arrays.bpm_array import BPMArray
from pyaml.arrays.cfm_magnet_array import CombinedFunctionMagnet
import importlib

import numpy as np
Expand Down Expand Up @@ -174,4 +174,24 @@ def test_arrays(install_test_package):

Factory.clear()

# Test dynamic arrays

ml:PyAML = pyaml("tests/config/EBSOrbit.yaml")
sr:Instrument = ml.get('sr')
ae = ElementArray("All",sr.design.get_all_elements())
acfm = ElementArray("AllCFM",sr.design.get_all_cfm_magnets(),use_aggregator=False)

bpmC5 = ae['BPM*'][10:20] # All BPM C5
assert(isinstance(bpmC5,BPMArray) and len(bpmC5)==10)
bpmc10 = ae['BPM*C10*'] # All BPM C10
assert(isinstance(bpmc10,BPMArray) and len(bpmc10)==10)
magSHV = ae['SH*-V'] # All SH vertical corrector
assert(isinstance(magSHV,MagnetArray) and len(magSHV)==96)
magSH1A = ae['model_name:SH1A-*'] # All SH1A magnets (including the CFMs)
assert(isinstance(magSH1A,ElementArray) and len(magSH1A)==129)
magSH1AC = acfm['model_name:SH1A-*'] # All SH1A magnets (CFMs only)
assert(isinstance(magSH1AC,ElementArray) and len(magSH1AC)==32)

Factory.clear()


Loading