-
Notifications
You must be signed in to change notification settings - Fork 0
/
adsr_envelope.py
209 lines (176 loc) · 7.84 KB
/
adsr_envelope.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import logging
from copy import deepcopy
from enum import Enum
import time
import numpy as np
from .component import Component
from .signal_type import SignalType
class AdsrEnvelope(Component):
class State(Enum):
IDLE = 0
ADS = 1
RELEASE = 2
def __init__(self, sample_rate, frames_per_chunk, source: Component):
super().__init__(sample_rate, frames_per_chunk, signal_type=SignalType.WAVE, subcomponents=[], name="ADSR")
self.log = logging.getLogger(__name__)
self.add_subcomponent(source)
self._current_amp = 0.0
self._attack = np.float32(0.0)
self._decay = np.float32(0.0)
self._sustain = np.float32(1.0)
self._release = np.float32(0.0)
self._iteration_number = 0
self._sustain_frames_num = self.frames_per_chunk * 2
self._target_amp = 1.0
self.calculate_ads_ramps()
self.calculate_r_ramp()
self.state = AdsrEnvelope.State.IDLE
# np.set_printoptions(threshold=self.sample_rate * 10)
def __iter__(self):
self.source_iter = iter(self.subcomponents[0])
self._stage_trig_time = time.time()
self._props["amp"] = self._current_amp
return self
def __next__(self):
(source_chunk, props) = next(self.source_iter)
if self._target_amp != props["amp"]:
self._target_amp = props["amp"]
self.calculate_ads_ramps()
match self.state:
case AdsrEnvelope.State.ADS:
ramp_index = self._iteration_number * self.frames_per_chunk
if ramp_index + self.frames_per_chunk > len(self._ads_ramp):
ramp_index = len(self._ads_ramp) - self.frames_per_chunk
self._iteration_number += 1
end_index = ramp_index + self.frames_per_chunk # ramp index is guaranteed to be within range
ramp_chunk = self._ads_ramp[ramp_index:end_index]
source_chunk = source_chunk * ramp_chunk
# self.log.debug(f"Ramp chunk:\n{ramp_chunk}")
self._current_amp = ramp_chunk[-1]
now = time.time()
elapsed = now - self._stage_trig_time
if not self.active:
self.state = AdsrEnvelope.State.RELEASE
self._stage_trig_time = time.time()
self._iteration_number = 0
self.calculate_r_ramp()
# self.log.debug(f"{self.name}: Triggered release state after {elapsed}s")
self._props["amp"] = self._current_amp
return (source_chunk, self._props)
case AdsrEnvelope.State.RELEASE:
now = time.time()
elapsed = now - self._stage_trig_time
ramp_index = self._iteration_number * self.frames_per_chunk
self._iteration_number += 1
if self.active:
self.trigger_attack()
if ramp_index + self.frames_per_chunk > len(self._r_ramp):
partial_chunk = self._r_ramp[ramp_index:]
zeros = np.zeros((self.frames_per_chunk - len(partial_chunk)), dtype=np.float32)
ramp_chunk = np.concatenate([partial_chunk, zeros], axis=0)
# self.log.debug(f"{self.name}: End of Release Ramp after {elapsed}s")
if not self.active:
self.state = AdsrEnvelope.State.IDLE
else:
end_index = ramp_index + self.frames_per_chunk
ramp_chunk = self._r_ramp[ramp_index:end_index]
self._current_amp = ramp_chunk[-1]
source_chunk = source_chunk * ramp_chunk
self._props["amp"] = self._current_amp
return (source_chunk, self._props)
case AdsrEnvelope.State.IDLE:
if self.subcomponents[0].active:
self.subcomponents[0].active = False
if self.active:
self.trigger_attack()
self._props["amp"] = 0.0
return (np.zeros(self.frames_per_chunk, dtype=np.float32), self._props)
def __deepcopy__(self, memo):
return AdsrEnvelope(self.sample_rate, self.frames_per_chunk, deepcopy(self.subcomponents[0], memo))
@property
def attack(self):
return self._attack
@attack.setter
def attack(self, value):
try:
float_val = np.float32(value)
self._attack = float_val
self.calculate_ads_ramps()
except ValueError:
self.log.error(f"Couldn't set with value {value}")
@property
def decay(self):
return self._decay
@decay.setter
def decay(self, value):
try:
float_val = np.float32(value)
self._decay = float_val
self.calculate_ads_ramps()
except ValueError:
self.log.error(f"Couldn't set with value {value}")
@property
def sustain(self):
return self._sustain
@sustain.setter
def sustain(self, value):
try:
float_val = np.float32(value)
self._sustain = float_val
self.calculate_ads_ramps()
except ValueError:
self.log.error(f"Couldn't set with value {value}")
@property
def release(self):
return self._release
@release.setter
def release(self, value):
try:
float_val = np.float32(value)
self._release = float_val
except ValueError:
self.log.error(f"Couldn't set with value {value}")
@property
def active(self):
"""
The active status. If a component is active it should do its job, otherwise act as a bypass.
If the component is a generator it should generate zeros when inactive.
"""
return self._active
@active.setter
def active(self, value):
"""
This setter overrides the Component setter and specifically does NOT deactivate its subcomponents automatically
"""
try:
bool_val = bool(value)
self._active = bool_val
except ValueError:
self.log.error(f"Unable to set with value {value}")
def calculate_ads_ramps(self):
attack_frames = int(self.sample_rate * self.attack) # attack is in s
attack_ramp = np.linspace(0, self._target_amp, attack_frames, dtype=np.float32, endpoint=False)
decay_frames = int(self.sample_rate * self.decay)
self._decay_index = attack_frames
decay_ramp = np.linspace(self._target_amp, self.sustain, decay_frames, dtype=np.float32, endpoint=False)
# we need to have at least one full chunk in the sustain ramp.
# Usually we'll partially cross into the sustain region from the decay in one chunk,
# then the next chunks can read from the sustain region over and over until the note is released
sustain_frames = self._sustain_frames_num
self._sustain_index = self._decay_index + decay_frames
sustain_ramp = np.full(sustain_frames, self.sustain, dtype=np.float32)
self._ads_ramp = np.concatenate([attack_ramp, decay_ramp, sustain_ramp], axis=0)
# self.log.debug(f"ADS Ramp:\n{self._ads_ramp}")
def calculate_r_ramp(self):
release_frames = int(self.sample_rate * self.release)
self._release_index = self._sustain_index + (self._sustain_frames_num)
self._r_ramp = np.linspace(self._current_amp, 0, release_frames, endpoint=True)
def trigger_attack(self):
self.state = AdsrEnvelope.State.ADS
self._stage_trig_time = time.time()
self._iteration_number = 0
for sub in self.subcomponents:
sub.active = True
# self.log.info(f"{self.name}: Triggered attack stage")
def is_silent(self):
return self.state == self.State.IDLE and not self.active