Skip to content
This repository has been archived by the owner on Dec 21, 2021. It is now read-only.

Commit

Permalink
updating the synthplayer library
Browse files Browse the repository at this point in the history
  • Loading branch information
irmen committed Jun 4, 2019
1 parent cff08a4 commit a3041f5
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 47 deletions.
2 changes: 1 addition & 1 deletion bouldercaves/synthplayer/playback.py
Expand Up @@ -19,7 +19,7 @@
import os
import warnings
from collections import defaultdict
from typing import Generator, Union, Dict, Tuple, Any, Type, List, Callable, Iterable, Optional, Container
from typing import Generator, Union, Dict, Tuple, Any, Type, List, Callable, Iterable, Optional
from types import TracebackType
from .import params
from .sample import Sample
Expand Down
24 changes: 13 additions & 11 deletions bouldercaves/synthplayer/sample.py
Expand Up @@ -116,29 +116,31 @@ def from_array(cls, array_or_list: Sequence[Union[int, float]], samplerate: int,
frames = audioop.byteswap(frames, samplewidth)
return Sample.from_raw_frames(frames, samplewidth, samplerate, numchannels, name=name)

@classmethod
def from_osc_block(cls, block: Iterable[float], samplerate: int, amplitude_scale: Optional[float] = None,
samplewidth: int = params.norm_samplewidth) -> 'Sample':
amplitude_scale = amplitude_scale or 2 ** (8 * samplewidth - 1)
if amplitude_scale and amplitude_scale != 1.0:
block = [amplitude_scale * v for v in block]
intblk = list(map(int, block))
return cls.from_array(intblk, samplerate, 1)

@classmethod
def from_oscillator(cls, osc: Oscillator, duration: float, amplitude_scale: Optional[float] = None,
sample_width: int = params.norm_samplewidth) -> 'Sample':
amplitude_scale = amplitude_scale or 2 ** (8 * sample_width - 1)
samplewidth: int = params.norm_samplewidth) -> 'Sample':
amplitude_scale = amplitude_scale or 2 ** (8 * samplewidth - 1)
required_samples = int(duration * osc.samplerate)
num_blocks, last_block = divmod(required_samples, params.norm_osc_blocksize)
if last_block > 0:
num_blocks += 1
block_gen = osc.blocks()
sample = cls(None, osc.__class__.__name__, samplerate=osc.samplerate, nchannels=1, samplewidth=sample_width)

def sample_from_block(b: Iterable[float]) -> 'Sample':
if amplitude_scale and amplitude_scale != 1.0:
b = [amplitude_scale * v for v in b]
intblk = list(map(int, b))
return cls.from_array(intblk, osc.samplerate, 1)

sample = cls(None, osc.__class__.__name__, samplerate=osc.samplerate, nchannels=1, samplewidth=samplewidth)
if num_blocks > 0:
for block in block_gen:
num_blocks -= 1
if num_blocks == 0:
block = block[:last_block]
sample.join(sample_from_block(block))
sample.join(Sample.from_osc_block(block, osc.samplerate, amplitude_scale, samplewidth))
if num_blocks == 0:
break
return sample
Expand Down
65 changes: 30 additions & 35 deletions bouldercaves/synthsamples.py
Expand Up @@ -101,33 +101,26 @@ def chunked_frame_data(self, chunksize: int, repeat: bool=False,
stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]:
raise NotImplementedError("subclass should implement this")

def render_samples(self, osc: Oscillator, samplebuffer: bytes,
sample_chunksize: int, stopcondition: Callable[[], bool]=lambda: False,
return_remaining_buffer: bool=False) -> Generator[memoryview, None, bytes]:
def render_samples(self, osc: Oscillator, sample_residue: Sample,
sample_chunksize: int, stopcondition: Callable[[], bool] = lambda: False,
return_residue: bool = False) -> Generator[memoryview, None, Sample]:
num_frames = sample_chunksize // synth_params.norm_samplewidth // synth_params.norm_nchannels
blocks = osc.blocks()
while not stopcondition():
# render this sample in chunks of the asked size
try:
while len(samplebuffer) < sample_chunksize:
# fill up the sample buffer so we have at least one full chunk
print("NUM_FRAMES", num_frames) # XXX
sample = sample_from_osc(osc)
print(sample, len(sample))
samplebuffer += sample.view_frame_data() # type: ignore
except NoteFinished:
# go to next sample
block = next(blocks)
except StopIteration:
break
if samplebuffer:
chunk = samplebuffer[:sample_chunksize]
samplebuffer = samplebuffer[sample_chunksize:]
assert len(chunk) == sample_chunksize
yield memoryview(chunk)
else:
break
if return_remaining_buffer:
return samplebuffer
yield memoryview(samplebuffer)
return samplebuffer
sample = Sample.from_osc_block(block, osc.samplerate, 2 ** (8 * synth_params.norm_samplewidth - 1)).stereo()
sample_residue.join(sample)
while len(sample_residue) >= num_frames:
yield sample_residue.view_frame_data()[:sample_chunksize]
sample_residue = Sample.from_raw_frames(sample_residue.view_frame_data()[sample_chunksize:],
sample_residue.samplewidth, sample_residue.samplerate, sample_residue.nchannels)
if return_residue:
return sample_residue
yield sample_residue.view_frame_data()
return sample_residue


class Amoeba(RealtimeSynthesizedSample):
Expand All @@ -137,12 +130,12 @@ def __init__(self) -> None:
def chunked_frame_data(self, chunksize: int, repeat: bool=False,
stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]:
assert repeat, "amoeba is a repeating sound"
samplebuffer = b""
sample_residue = Sample(nchannels=2)
while not stopcondition():
freq = random.randint(0x0800, 0x1200)
osc = FastTriangle(freq * _sidfreq, amplitude=0.75)
filtered = EnvelopeFilter(osc, 0.024, 0.006, 0.0, 0.5, 0.003, stop_at_end=True)
samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True)
sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True)


class MagicWall(RealtimeSynthesizedSample):
Expand All @@ -152,14 +145,14 @@ def __init__(self) -> None:
def chunked_frame_data(self, chunksize: int, repeat: bool=False,
stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]:
assert repeat, "magic_wall is a repeating sound"
samplebuffer = b""
sample_residue = Sample(nchannels=2)
while not stopcondition():
freq = random.randint(0x8600, 0x9f00)
freq &= 0b0001100100000000
freq |= 0b1000011000000000
osc = FastTriangle(freq * _sidfreq, amplitude=0.4)
filtered = EnvelopeFilter(osc, 0.002, 0.008, 0.0, 0.6, 0.03, stop_at_end=True)
samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True)
sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True)


class Cover(RealtimeSynthesizedSample):
Expand All @@ -169,12 +162,12 @@ def __init__(self) -> None:
def chunked_frame_data(self, chunksize: int, repeat: bool=False,
stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]:
assert repeat, "cover is a repeating sound"
samplebuffer = b""
sample_residue = Sample(nchannels=2)
while not stopcondition():
freq = random.randint(0x6000, 0xd800)
osc = FastTriangle(freq * _sidfreq, amplitude=0.7)
filtered = EnvelopeFilter(osc, 0.002, 0.02, 0.0, 0.5, 0.02, stop_at_end=True)
samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True)
sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True)


class Finished(RealtimeSynthesizedSample):
Expand All @@ -184,16 +177,16 @@ def __init__(self) -> None:
def chunked_frame_data(self, chunksize: int, repeat: bool=False,
stopcondition: Callable[[], bool]=lambda: False) -> Generator[memoryview, None, None]:
assert not repeat
samplebuffer = b""
sample_residue = Sample(nchannels=2)
for n in range(0, 180):
if stopcondition():
break
freq = 0x8000 - n * 180
osc = FastTriangle(freq * _sidfreq, amplitude=0.8)
filtered = EnvelopeFilter(osc, 0.002, 0.004, 0.0, 0.6, 0.02, stop_at_end=True)
samplebuffer = yield from self.render_samples(filtered, samplebuffer, chunksize, return_remaining_buffer=True)
if samplebuffer:
yield memoryview(samplebuffer)
sample_residue = yield from self.render_samples(filtered, sample_residue, chunksize, return_residue=True)
if len(sample_residue) > 0:
yield sample_residue.view_frame_data()


class ExtraLife(Sample):
Expand All @@ -218,7 +211,8 @@ def chunked_frame_data(self, chunksize: int, repeat: bool=False,
filtered = EnvelopeFilter(osc, 0.1, 0.3, 1.5, 1.0, 0.07, stop_at_end=True)
ampmod = SquareH(10, 9, amplitude=0.5, bias=0.5)
modulated = AmpModulationFilter(filtered, ampmod)
yield from self.render_samples(modulated, b"", chunksize, stopcondition=stopcondition)
sample_residue = Sample(nchannels=2)
yield from self.render_samples(modulated, sample_residue, chunksize, stopcondition=stopcondition)


class WalkDirt(Sample):
Expand Down Expand Up @@ -301,7 +295,8 @@ def chunked_frame_data(self, chunksize: int, repeat: bool=False,
freq |= 0b1000011000000000
osc = FastTriangle(freq * _sidfreq, amplitude=0.7)
filtered = EnvelopeFilter(osc, 0.002, 0.006, 0.0, 0.7, 0.6, stop_at_end=True)
yield from self.render_samples(filtered, b"", chunksize, stopcondition=stopcondition)
sample_residue = Sample(nchannels=2)
yield from self.render_samples(filtered, sample_residue, chunksize, stopcondition=stopcondition)


class Timeout(Sample):
Expand Down

0 comments on commit a3041f5

Please sign in to comment.