diff --git a/bouldercaves/synthplayer/playback.py b/bouldercaves/synthplayer/playback.py index 5aea3ed..d212c95 100644 --- a/bouldercaves/synthplayer/playback.py +++ b/bouldercaves/synthplayer/playback.py @@ -19,7 +19,7 @@ import os import warnings from collections import defaultdict -from typing import Generator, Union, Dict, Tuple, Any, Type, List, Callable, Iterable, Optional, Container +from typing import Generator, Union, Dict, Tuple, Any, Type, List, Callable, Iterable, Optional from types import TracebackType from .import params from .sample import Sample diff --git a/bouldercaves/synthplayer/sample.py b/bouldercaves/synthplayer/sample.py index 7898596..ac9e454 100644 --- a/bouldercaves/synthplayer/sample.py +++ b/bouldercaves/synthplayer/sample.py @@ -116,29 +116,31 @@ def from_array(cls, array_or_list: Sequence[Union[int, float]], samplerate: int, frames = audioop.byteswap(frames, samplewidth) return Sample.from_raw_frames(frames, samplewidth, samplerate, numchannels, name=name) + @classmethod + def from_osc_block(cls, block: Iterable[float], samplerate: int, amplitude_scale: Optional[float] = None, + samplewidth: int = params.norm_samplewidth) -> 'Sample': + amplitude_scale = amplitude_scale or 2 ** (8 * samplewidth - 1) + if amplitude_scale and amplitude_scale != 1.0: + block = [amplitude_scale * v for v in block] + intblk = list(map(int, block)) + return cls.from_array(intblk, samplerate, 1) + @classmethod def from_oscillator(cls, osc: Oscillator, duration: float, amplitude_scale: Optional[float] = None, - sample_width: int = params.norm_samplewidth) -> 'Sample': - amplitude_scale = amplitude_scale or 2 ** (8 * sample_width - 1) + samplewidth: int = params.norm_samplewidth) -> 'Sample': + amplitude_scale = amplitude_scale or 2 ** (8 * samplewidth - 1) required_samples = int(duration * osc.samplerate) num_blocks, last_block = divmod(required_samples, params.norm_osc_blocksize) if last_block > 0: num_blocks += 1 block_gen = osc.blocks() - sample = cls(None, osc.__class__.__name__, samplerate=osc.samplerate, nchannels=1, samplewidth=sample_width) - - def sample_from_block(b: Iterable[float]) -> 'Sample': - if amplitude_scale and amplitude_scale != 1.0: - b = [amplitude_scale * v for v in b] - intblk = list(map(int, b)) - return cls.from_array(intblk, osc.samplerate, 1) - + sample = cls(None, osc.__class__.__name__, samplerate=osc.samplerate, nchannels=1, samplewidth=samplewidth) if num_blocks > 0: for block in block_gen: num_blocks -= 1 if num_blocks == 0: block = block[:last_block] - sample.join(sample_from_block(block)) + sample.join(Sample.from_osc_block(block, osc.samplerate, amplitude_scale, samplewidth)) if num_blocks == 0: break return sample diff --git a/bouldercaves/synthsamples.py b/bouldercaves/synthsamples.py index 407f780..82350ba 100644 --- a/bouldercaves/synthsamples.py +++ b/bouldercaves/synthsamples.py @@ -101,33 +101,26 @@ def chunked_frame_data(self, chunksize: int, repeat: bool=False, stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]: raise NotImplementedError("subclass should implement this") - def render_samples(self, osc: Oscillator, samplebuffer: bytes, - sample_chunksize: int, stopcondition: Callable[[], bool]=lambda: False, - return_remaining_buffer: bool=False) -> Generator[memoryview, None, bytes]: + def render_samples(self, osc: Oscillator, sample_residue: Sample, + sample_chunksize: int, stopcondition: Callable[[], bool] = lambda: False, + return_residue: bool = False) -> Generator[memoryview, None, Sample]: num_frames = sample_chunksize // synth_params.norm_samplewidth // synth_params.norm_nchannels + blocks = osc.blocks() while not stopcondition(): - # render this sample in chunks of the asked size try: - while len(samplebuffer) < sample_chunksize: - # fill up the sample buffer so we have at least one full chunk - print("NUM_FRAMES", num_frames) # XXX - sample = sample_from_osc(osc) - print(sample, len(sample)) - samplebuffer += sample.view_frame_data() # type: ignore - except NoteFinished: - # go to next sample + block = next(blocks) + except StopIteration: break - if samplebuffer: - chunk = samplebuffer[:sample_chunksize] - samplebuffer = samplebuffer[sample_chunksize:] - assert len(chunk) == sample_chunksize - yield memoryview(chunk) - else: - break - if return_remaining_buffer: - return samplebuffer - yield memoryview(samplebuffer) - return samplebuffer + sample = Sample.from_osc_block(block, osc.samplerate, 2 ** (8 * synth_params.norm_samplewidth - 1)).stereo() + sample_residue.join(sample) + while len(sample_residue) >= num_frames: + yield sample_residue.view_frame_data()[:sample_chunksize] + sample_residue = Sample.from_raw_frames(sample_residue.view_frame_data()[sample_chunksize:], + sample_residue.samplewidth, sample_residue.samplerate, sample_residue.nchannels) + if return_residue: + return sample_residue + yield sample_residue.view_frame_data() + return sample_residue class Amoeba(RealtimeSynthesizedSample): @@ -137,12 +130,12 @@ def __init__(self) -> None: def chunked_frame_data(self, chunksize: int, repeat: bool=False, stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]: assert repeat, "amoeba is a repeating sound" - samplebuffer = b"" + sample_residue = Sample(nchannels=2) while not stopcondition(): freq = random.randint(0x0800, 0x1200) osc = FastTriangle(freq * _sidfreq, amplitude=0.75) filtered = EnvelopeFilter(osc, 0.024, 0.006, 0.0, 0.5, 0.003, stop_at_end=True) - samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True) + sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True) class MagicWall(RealtimeSynthesizedSample): @@ -152,14 +145,14 @@ def __init__(self) -> None: def chunked_frame_data(self, chunksize: int, repeat: bool=False, stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]: assert repeat, "magic_wall is a repeating sound" - samplebuffer = b"" + sample_residue = Sample(nchannels=2) while not stopcondition(): freq = random.randint(0x8600, 0x9f00) freq &= 0b0001100100000000 freq |= 0b1000011000000000 osc = FastTriangle(freq * _sidfreq, amplitude=0.4) filtered = EnvelopeFilter(osc, 0.002, 0.008, 0.0, 0.6, 0.03, stop_at_end=True) - samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True) + sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True) class Cover(RealtimeSynthesizedSample): @@ -169,12 +162,12 @@ def __init__(self) -> None: def chunked_frame_data(self, chunksize: int, repeat: bool=False, stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]: assert repeat, "cover is a repeating sound" - samplebuffer = b"" + sample_residue = Sample(nchannels=2) while not stopcondition(): freq = random.randint(0x6000, 0xd800) osc = FastTriangle(freq * _sidfreq, amplitude=0.7) filtered = EnvelopeFilter(osc, 0.002, 0.02, 0.0, 0.5, 0.02, stop_at_end=True) - samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True) + sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True) class Finished(RealtimeSynthesizedSample): @@ -184,16 +177,16 @@ def __init__(self) -> None: def chunked_frame_data(self, chunksize: int, repeat: bool=False, stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]: assert not repeat - samplebuffer = b"" + sample_residue = Sample(nchannels=2) for n in range(0, 180): if stopcondition(): break freq = 0x8000 - n * 180 osc = FastTriangle(freq * _sidfreq, amplitude=0.8) filtered = EnvelopeFilter(osc, 0.002, 0.004, 0.0, 0.6, 0.02, stop_at_end=True) - samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True) - if samplebuffer: - yield memoryview(samplebuffer) + sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True) + if len(sample_residue) > 0: + yield sample_residue.view_frame_data() class ExtraLife(Sample): @@ -218,7 +211,8 @@ def chunked_frame_data(self, chunksize: int, repeat: bool=False, filtered = EnvelopeFilter(osc, 0.1, 0.3, 1.5, 1.0, 0.07, stop_at_end=True) ampmod = SquareH(10, 9, amplitude=0.5, bias=0.5) modulated = AmpModulationFilter(filtered, ampmod) - yield from self.render_samples(modulated, b"", chunksize, stopcondition=stopcondition) + sample_residue = Sample(nchannels=2) + yield from self.render_samples(modulated, sample_residue, chunksize, stopcondition=stopcondition) class WalkDirt(Sample): @@ -301,7 +295,8 @@ def chunked_frame_data(self, chunksize: int, repeat: bool=False, freq |= 0b1000011000000000 osc = FastTriangle(freq * _sidfreq, amplitude=0.7) filtered = EnvelopeFilter(osc, 0.002, 0.006, 0.0, 0.7, 0.6, stop_at_end=True) - yield from self.render_samples(filtered, b"", chunksize, stopcondition=stopcondition) + sample_residue = Sample(nchannels=2) + yield from self.render_samples(filtered, sample_residue, chunksize, stopcondition=stopcondition) class Timeout(Sample):