Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pyaml/control/controlsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,16 @@ def fill_device(self,elements:list[Element]):

elif isinstance(e,RFPlant):
attachedTrans: list[RFTransmitter] = []
for t in e._cfg.transmitters:
voltage = RWRFVoltageScalar(t)
phase = RWRFPhaseScalar(t)
nt = t.attach(self,voltage,phase)
self.add_rf_transnmitter(nt)
attachedTrans.append(nt)
if e._cfg.transmitters:
for t in e._cfg.transmitters:
voltage = RWRFVoltageScalar(t)
phase = RWRFPhaseScalar(t)
nt = t.attach(self,voltage,phase)
self.add_rf_transnmitter(nt)
attachedTrans.append(nt)

frequency = RWRFFrequencyScalar(e)
voltage = RWTotalVoltage(attachedTrans)
voltage = RWTotalVoltage(attachedTrans) if e._cfg.transmitters else None
ne = e.attach(self,frequency,voltage)
self.add_rf_plant(ne)

Expand Down
60 changes: 51 additions & 9 deletions pyaml/lattice/abstract_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,8 @@ class RWRFVoltageScalar(abstract.ReadWriteFloatScalar):
Class providing read write access to a cavity voltage of a simulator for a given RF trasnmitter.
"""

def __init__(self, elements:list[at.Element], transmitter:RFTransmitter):
def __init__(self, elements:list[at.Element]):
self.__elements = elements
self.__transmitter = transmitter

def get(self) -> float:
sum = 0
Expand All @@ -324,7 +323,7 @@ def set_and_wait(self, value:float):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return self.__transmitter._cfg.voltage.unit()
return "V"

#------------------------------------------------------------------------------

Expand All @@ -333,9 +332,8 @@ class RWRFPhaseScalar(abstract.ReadWriteFloatScalar):
Class providing read write access to a cavity phase of a simulator for a given RF trasnmitter.
"""

def __init__(self, elements:list[at.Element], transmitter:RFTransmitter):
def __init__(self, elements:list[at.Element]):
self.__elements = elements
self.__transmitter = transmitter

def get(self) -> float:
# Assume that all cavities of this transmitter have the same Time Lag and Frequency
Expand All @@ -351,7 +349,7 @@ def set_and_wait(self, value:float):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return self.__transmitter._cfg.phase.unit()
return "rad"

#------------------------------------------------------------------------------

Expand All @@ -360,10 +358,9 @@ class RWRFFrequencyScalar(abstract.ReadWriteFloatScalar):
Class providing read write access to RF frequency of a simulator.
"""

def __init__(self, elements:list[at.Element], harmonics:list[float], rf:RFPlant ):
def __init__(self, elements:list[at.Element], harmonics:list[float]):
self.__elements = elements
self.__harm = harmonics
self.__rf = rf

def get(self) -> float:
# Serialized cavity has the same frequency
Expand All @@ -377,7 +374,52 @@ def set_and_wait(self, value:float):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return self.__rf._cfg.masterclock.unit()
return "Hz"

#------------------------------------------------------------------------------

class RWRFATFrequencyScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to RF frequency of a simulator using
AT methods.
"""

def __init__(self, ring: at.Lattice):
self.__ring = ring

def get(self) -> float:
return self.__ring.get_rf_frequency()

def set(self,value:float):
self.__ring.set_rf_frequency(value)

def set_and_wait(self, value:float):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return 'Hz'

#------------------------------------------------------------------------------

class RWRFATotalVoltageScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a RF voltage of a simulator using AT methods.
"""

def __init__(self, ring: at.Lattice):
self.__ring = ring

def get(self) -> float:
return self.__ring.get_rf_voltage()

def set(self,value:float):
self.__ring.set_rf_voltage(value)

def set_and_wait(self, value:float):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return 'V'

#------------------------------------------------------------------------------

Expand Down
47 changes: 27 additions & 20 deletions pyaml/lattice/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..lattice.abstract_impl import RWHardwareScalar,RWHardwareArray
from ..lattice.abstract_impl import RWStrengthScalar,RWStrengthArray
from ..lattice.abstract_impl import RWRFFrequencyScalar,RWRFVoltageScalar,RWRFPhaseScalar
from ..lattice.abstract_impl import RWRFATFrequencyScalar,RWRFATotalVoltageScalar
from ..common.element_holder import ElementHolder
from ..common.abstract_aggregator import ScalarAggregator
from ..lattice.abstract_impl import RBetatronTuneArray
Expand Down Expand Up @@ -115,32 +116,38 @@ def fill_device(self,elements:list[Element]):
self.add_bpm(e)

elif isinstance(e,RFPlant):
cavs: list[at.Element] = []
harmonics: list[float] = []
attachedTrans: list[RFTransmitter] = []
for t in e._cfg.transmitters:
if e._cfg.transmitters:
cavs: list[at.Element] = []
harmonics: list[float] = []
attachedTrans: list[RFTransmitter] = []
for t in e._cfg.transmitters:
cavsPerTrans: list[at.Element] = []
for c in t._cfg.cavities:
# Expect unique name for cavities
cav = self.get_at_elems(Element(c))
if len(cav)>1:
raise PyAMLException(f"RF transmitter {t.get_name()}, multiple cavity definition:{cav[0]}")
if len(cav)==0:
raise PyAMLException(f"RF transmitter {t.get_name()}, No cavity found")
cavsPerTrans.append(cav[0])
harmonics.append(t._cfg.harmonic)

voltage = RWRFVoltageScalar(cavsPerTrans,t)
phase = RWRFPhaseScalar(cavsPerTrans,t)
# Expect unique name for cavities
cav = self.get_at_elems(Element(c))
if len(cav)>1:
raise PyAMLException(f"RF transmitter {t.get_name()}, multiple cavity definition:{cav[0]}")
if len(cav)==0:
raise PyAMLException(f"RF transmitter {t.get_name()}, No cavity found")
cavsPerTrans.append(cav[0])
harmonics.append(t._cfg.harmonic)
voltage = RWRFVoltageScalar(cavsPerTrans)
phase = RWRFPhaseScalar(cavsPerTrans)
nt = t.attach(self,voltage,phase)
attachedTrans.append(nt)
self.add_rf_transnmitter(nt)
cavs.extend(cavsPerTrans)
attachedTrans.append(nt)

frequency = RWRFFrequencyScalar(cavs,harmonics,e)
voltage = RWTotalVoltage(attachedTrans)
ne = e.attach(self,frequency,voltage)
self.add_rf_plant(ne)
frequency = RWRFFrequencyScalar(cavs,harmonics)
voltage = RWTotalVoltage(attachedTrans)
ne = e.attach(self,frequency,voltage)
self.add_rf_plant(ne)
else:
# No transmitter defined switch to AT methods
frequency = RWRFATFrequencyScalar(self.ring)
voltage = RWRFATotalVoltageScalar(self.ring)
ne = e.attach(self,frequency,voltage)
self.add_rf_plant(ne)

elif isinstance(e, BetatronTuneMonitor):
betatron_tune = RBetatronTuneArray(self.ring)
Expand Down
21 changes: 21 additions & 0 deletions tests/config/EBS_rf_notrans.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
type: pyaml.pyaml
instruments:
- type: pyaml.instrument
name: sr
energy: 6e9
simulators:
- type: pyaml.lattice.simulator
lattice: sr/lattices/ebs.mat
name: design
controls:
- type: tango.pyaml.controlsystem
tango_host: ebs-simu-3:10000
name: live
data_folder: /data/store
devices:
- type: pyaml.rf.rf_plant
name: RF
masterclock:
type: tango.pyaml.attribute
attribute: sy/ms/1/Frequency
unit: Hz
34 changes: 34 additions & 0 deletions tests/test_rf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pyaml.pyaml import pyaml,PyAML
from pyaml.instrument import Instrument
from pyaml.configuration.factory import Factory
from pyaml.common.exception import PyAMLException
import numpy as np
import pytest

Expand Down Expand Up @@ -91,3 +92,36 @@ def test_rf_multi(install_test_package):
assert(np.isclose(RFTRA_HARMONIC._cfg.voltage.get(),3e5))

Factory.clear()


@pytest.mark.parametrize("install_test_package", [{
"name": "tango-pyaml",
"path": "tests/dummy_cs/tango-pyaml"
}], indirect=True)
def test_rf_multi_notrans(install_test_package):

ml:PyAML = pyaml("tests/config/EBS_rf_notrans.yaml")
sr:Instrument = ml.get('sr')

# Simulator
RF = sr.design.get_rf_plant("RF")
RF.frequency.set(3.523e8)
RF.voltage.set(10e6)
# Check that frequency and voltage has been applied on all cavities
ring = sr.design.get_lattice()
for e in ring:
if e.FamName.startswith("CAV"):
assert(np.isclose(e.Frequency,3.523e8))
assert(np.isclose(e.Voltage,10e6/13.))

# Control system
RF = sr.live.get_rf_plant("RF")
RF.frequency.set(3.523e8)
with pytest.raises(PyAMLException) as exc:
RF.voltage.set(10e6)
assert("has no trasmitter device defined" in str(exc))

# Check that frequency and voltage has been applied on the masterclock device
assert(np.isclose(RF.frequency.get(), 3.523e8))

Factory.clear()
Loading