In [None]:
import numpy as np
import os
import signalflow as sf
from PIL import Image
from IPython.display import Audio, display
from pixasonics.core import App, Mapper
from pixasonics.features import *
from pixasonics.synths import Theremin, Oscillator, FilteredNoise, SimpleFM

In [None]:
# Create app
app = App(image_size=(800, 800))

In [None]:
# read all red-ch images into arrays and concatenate them in the channel dimension
img_folder = "images/cellular_dataset/single_channel_16bit/"
img_files = os.listdir(img_folder)
img_files = [f for f in img_files if f.endswith("w2.TIF")] # only red channel images
imgs = []
for img_file in img_files:
    img_path = os.path.join(img_folder, img_file)
    img = Image.open(img_path)
    img = np.array(img)
    imgs.append(img)
img = np.stack(imgs, axis=-1) # now the last dimension is the channel dimension
print(img.shape)
app.load_image_data(img) # load as numpy array

In [None]:
# combine red and green channels and all layers
img_folder = "images/cellular_dataset/single_channel_16bit/"
img_files = os.listdir(img_folder)
imgs_red = [f for f in img_files if f.endswith("w2.TIF")] # only red channel images
imgs_green = [f for f in img_files if f.endswith("w1.TIF")] # only green channel images
imgs = []
for img_red, img_green in zip(imgs_red, imgs_green):
    img_path_red = os.path.join(img_folder, img_red)
    img_path_green = os.path.join(img_folder, img_green)
    img_red = Image.open(img_path_red)
    img_green = Image.open(img_path_green)
    img_red = np.array(img_red)
    img_green = np.array(img_green)
    img = np.stack([img_red, img_green], axis=-1) # now the last dimension is the channel dimension
    imgs.append(img)
img = np.stack(imgs, axis=-1) # now the last dimension is the layer dimension
print(img.shape)
app.load_image_data(img) # load as numpy array

In [None]:
# now use all images in the folder
img_folder = "images/cellular_dataset/single_channel_16bit/"
img_files = os.listdir(img_folder)
imgs_red = [f for f in img_files if f.endswith("w2.TIF")] # only red channel images
imgs_green = [f for f in img_files if f.endswith("w1.TIF")] # only green channel images
imgs_blue = [f for f in img_files if f.endswith("w3.TIF")] # only blue channel images
imgs = []
for img_red, img_green, img_blue in zip(imgs_red, imgs_green, imgs_blue):
    img_path_red = os.path.join(img_folder, img_red)
    img_path_green = os.path.join(img_folder, img_green)
    img_path_blue = os.path.join(img_folder, img_blue)
    img_red = Image.open(img_path_red)
    img_green = Image.open(img_path_green)
    img_blue = Image.open(img_path_blue)
    img_red = np.array(img_red)
    img_green = np.array(img_green)
    img_blue = np.array(img_blue)
    img = np.stack([img_red, img_green, img_blue], axis=-1) # now the last dimension is the channel dimension
    imgs.append(img)
img = np.stack(imgs, axis=-1) # now the last dimension is the layer dimension
print(img.shape)
app.load_image_data(img) # load as numpy array

In [None]:
from pixasonics.core import AppRegistry

app_registry = AppRegistry()

app_registry._apps

In [None]:
app2 = App(image_size=(500, 500))
img_path = "images/test.jpg"
app2.load_image_file(img_path)
mean_red2 = MeanChannelValue(filter_channels=0, name="MeanRed")
# attach the feature to the app
app2.attach_feature(mean_red2)

# create a Theremin, a simple sine wave synth that we will use to sonify the mean pixel value
theremin2 = Theremin()
# attach the Theremin to the app
app2.attach_synth(theremin2)

# create a Mapper that will map the mean red pixel value (within the Probe) to the frequency of the Theremin
red2freq2 = Mapper(
    mean_red2, 
    theremin2["frequency"], 
    exponent=2, name="Red2Freq") # cubic mapping curve for a more "linear" feel of frequency changes
# attach the Mapper to the app
app2.attach_mapper(red2freq2)

In [None]:
app.graph.status

In [None]:
import signalflow as sf
graph = sf.AudioGraph.get_shared_graph()
print(graph is None)

In [None]:
import signalflow as sf

graph = None
buf = sf.Buffer(1, 48000)

class TestPatch(sf.Patch):
    def __init__(self):
        super().__init__()
        param = self.add_input("param")

        out = param * sf.SineOscillator(440)
        self.set_output(out)

def create_audio_graph(nrt=False):
    graph = sf.AudioGraph.get_shared_graph()
    output_device = sf.AudioOut_Dummy(2) if nrt else None
    if graph is not None:
        graph.destroy()
    graph = sf.AudioGraph(
        start=True,
        output_device=output_device)
    my_patch = TestPatch()
    my_patch.set_input("param", 0.5)
    graph.play(my_patch)
    if nrt:
        graph.render_to_buffer(buf)
    graph.stop(my_patch)
    return graph

print(graph) # should be None

graph = create_audio_graph(nrt=False)

print("RT", graph.status)

graph = create_audio_graph(nrt=True)

print("NRT", graph.status)

graph = create_audio_graph(nrt=False)

print("RT2", graph.status)

In [None]:
app = App(image_size=(800, 800))
app.load_image_file("images/cellular_dataset/merged_8bit/Timepoint_001_220518-ST_C03_s1.jpg")

mean_red = MeanChannelValue(filter_channels=0, name="MeanRed")
app.attach(mean_red)

num_instances = 5

for i in range(num_instances):
    theremin = Theremin()
    app.attach(theremin)

    red2freq = Mapper(mean_red, theremin["frequency"], exponent=2, name=f"Red2Freq{i}")
    app.attach(red2freq)

    red2amp = Mapper(mean_red, theremin["amplitude"], exponent=1, name=f"Red2Amp{i}")
    app.attach(red2amp)

    red2pan = Mapper(mean_red, theremin["panning"], exponent=1, name=f"Red2Pan{i}")
    app.attach(red2pan)

# app.interaction_mode = "toggle"
# app.audio = True

In [None]:
app.output_buffer_size

In [None]:
app.output_buffer_size = 1024

In [None]:
osc = Oscillator()
app.attach(osc)

fnoise = FilteredNoise()
app.attach(fnoise)

fm = SimpleFM()
app.attach(fm)

In [None]:
len(app.mappers)

# Test audio settings

In [None]:
app.audio = True

In [None]:
app.master_volume = -6

In [None]:
app.recording = True

In [None]:
app.recording_path = "hey"

In [None]:
app.master_envelope.attack = 0.1

# Test display settings

In [None]:
app.normalize_display = True

In [None]:
app.normalize_display_global = False

In [None]:
app.display_channel_offset = 0

In [None]:
app.display_layer_offset = 2

# Test probe settings

In [None]:
app.probe_width = 10

In [None]:
app.probe_height = 10

In [None]:
app.probe_x = 200

In [None]:
app.probe_y = 25

In [None]:
app.interaction_mode = "hold"

In [None]:
app.probe_follows_idle_mouse = True

In [None]:
import signalflow as sf
graph = sf.AudioGraph.get_shared_graph()
if graph is not None:
    graph.destroy()
graph = sf.AudioGraph(output_device=sf.AudioOut_Dummy(2))

In [None]:
import signalflow as sf
graph = sf.AudioGraph()

class Synth(sf.Patch):
    def __init__(self):
        super().__init__()

class TestPatch(Synth):
    def __init__(self):
        super().__init__()
        param = self.add_input("param")

        out = param * sf.SineOscillator(440)
        self.set_output(out)

patch = TestPatch()
patch.set_input("param", 0.5)

In [None]:
patch.set_input("param", 1)

In [None]:
patch.play()

In [None]:
patch.stop()

In [None]:
print(graph.structure)

In [None]:
graph.status

In [None]:
# print the type of the patch object
print(type(patch))

In [None]:
isinstance(patch, Synth)

In [None]:
isinstance(patch, sf.Patch)

In [None]:
import signalflow as sf
config = sf.AudioGraphConfig()
config.output_buffer_size = 480
graph = sf.AudioGraph(config)

class TestPatch(sf.Patch):
    def __init__(self):
        super().__init__()
        freq = self.add_input("freq", 440)
        out = sf.SineOscillator(freq)
        self.set_output(out)

patch = TestPatch()

graph.play(patch)

In [None]:
graph.destroy()
config = sf.AudioGraphConfig()
config.output_buffer_size = 1024
graph = sf.AudioGraph(config)
print("About to play") # still prints
#graph.play(patch) # will crash Kernel here...

In [None]:
patch.set_input("freq", 880)

In [None]:
patch.stop()

In [None]:
spec = patch.to_spec()

In [None]:
print(spec.to_json())

In [None]:
patch2 = sf.Patch(spec)

In [None]:
graph.play(patch)

In [None]:
graph.output_buffer_size

In [None]:
graph.stop(patch)
graph.clear()
graph.destroy()
config = sf.AudioGraphConfig()
config.output_buffer_size = 1024
graph = sf.AudioGraph(config)
print("About to play") # still prints
graph.play(patch2) # will crash Kernel here...

In [None]:
spec = patch.to_spec()
graph.destroy()
config = sf.AudioGraphConfig()
config.output_buffer_size = 1024
graph = sf.AudioGraph(config)
patch2 = sf.Patch(spec)
graph.play(patch2) # this will work

In [None]:
import signalflow as sf
from pixasonics.synths import Theremin

In [None]:
config = sf.AudioGraphConfig()
config.output_buffer_size = 480
graph = sf.AudioGraph(config)

theremin = Theremin()

graph.play(theremin)

In [None]:
theremin.set_input_buf("frequency", 880)

In [None]:
theremin_spec = theremin.to_spec()
print(theremin_spec.to_json())

In [None]:
theremin.stop()

In [None]:
graph.output_buffer_size

In [None]:
graph.destroy()
config = sf.AudioGraphConfig()
config.output_buffer_size = 1024
graph = sf.AudioGraph(config)
theremin2 = sf.Patch(theremin_spec)
graph.play(theremin2) # this will work

In [None]:
graph.destroy()

# Exponent Canvas proto

In [None]:
import ipywidgets as widgets
from IPython.display import display
from ipycanvas import Canvas, hold_canvas
from pixasonics.utils import scale_array_exp
import numpy as np
from math import log10

In [None]:
class ExponentCanvas():
    def __init__(self, width=200, height=200, exponent=1):
        self.width = width
        self.height = height
        self._exponent = exponent
        self.canvas = Canvas(width=width, height=height)
        self.draw()

    def __call__(self):
        return self.canvas
    
    @property
    def exponent(self):
        return self._exponent
    
    @exponent.setter
    def exponent(self, value):
        self._exponent = value
        self.draw()

    def draw(self):
        with hold_canvas(self.canvas):
            self.canvas.clear()
            x = np.linspace(0, 1, self.width)
            y = scale_array_exp(x, 0, 1, 0, 1, self._exponent)
            y = 1 - y
            y = y * self.height
            self.canvas.fill_style = "black"
            self.canvas.fill_rects(x * self.width, y, 1, self.height)

In [None]:
c = ExponentCanvas(600)
display(c())

In [None]:
c.exponent = 2

In [None]:
exp_slider = widgets.FloatLogSlider(
    value=1,
    base=10,
    min=log10(0.01), # max exponent
    max=log10(100), # min exponent
    step=0.0001,
    description='Exponent:',
    continuous_update=True,
    readout_format='.4f',
)
exp_slider.observe(lambda change: setattr(c, "exponent", change.new), names="value")
display(exp_slider)

In [None]:
class ExponentPlot():
    def __init__(self, width=1000, height=200, exponent=1):
        self.width = width
        self.height = height
        self._exponent = exponent
        
        self.create_ui()

    def __call__(self):
        return self.card
    
    @property
    def exponent(self):
        return self._exponent
    
    def create_ui(self):
        canvas = ExponentCanvas(self.width, self.height, self.exponent)
        exp_slider = widgets.FloatLogSlider(
            value=self.exponent,
            base=10,
            min=log10(0.01),
            max=log10(100),
            step=0.0001,
            description='Exponent:',
            continuous_update=True,
            readout_format='.4f',
        )
        exp_slider.observe(lambda change: setattr(canvas, "exponent", change.new), names="value")
        self.card = widgets.VBox([canvas(), exp_slider])

        

In [None]:
plot = ExponentPlot(1000, 200, 1)
plot()

# Test headless mode

In [None]:
import numpy as np
import os
import signalflow as sf
from PIL import Image
from IPython.display import Audio, display
from pixasonics.core import App, Mapper
from pixasonics.features import *
from pixasonics.synths import Theremin, Oscillator, FilteredNoise, SimpleFM

In [None]:
app = App(headless=True)

In [None]:
app.load_image_file("images/test.jpg")

In [None]:
mean_red = MeanChannelValue(filter_channels=0, name="MeanRed")
app.attach(mean_red)
app.features

In [None]:
theremin = Theremin(name="MySine")
app.attach(theremin)
app.synths

In [None]:
red2freq = Mapper(mean_red, theremin["frequency"], exponent=2, name="Red2Freq")
app.attach(red2freq)
app.mappers

In [None]:
app.probe_width, app.probe_height, app.probe_x, app.probe_y

In [None]:
app.probe_x, app.probe_y = 200, 200

In [None]:
app.audio = True

In [None]:
app.graph.status

In [None]:
app.unmuted = True

In [None]:
app.probe_x, app.probe_y = 50, 400

In [None]:
app.unmuted = False

In [None]:
# a little loop to unmute the probe, then move it around then turn it off
import time
app.unmuted = True
app.probe_x, app.probe_y = 250, 0
while app.probe_y < 350:
    app.probe_y += 1
    time.sleep(0.01)
app.unmuted = False

In [None]:
app.load_image_file("images/cellular_dataset/merged_8bit/Timepoint_001_220518-ST_C03_s1.jpg")

In [None]:
# a for loop where for each image in the folder we load a headless app and render a timeline in nrt mode
img_folder = "images/cellular_dataset/merged_8bit/"
img_files = os.listdir(img_folder)

# example: horizontal scan
duration = 5
my_timeline = [
    (0, {
        "probe_width": 1,
        "probe_height": 500,
        "probe_x": 0,
        "probe_y": 0
    }),
    (duration, {
        "probe_x": 499
    })
]
app = App(headless=True, nrt=True) # create a global graph object, necessary for the Theremin
app.cleanup() # clean up the app to avoid hanging
# only need to create processor objects once and attach them to the apps
mean_red = MeanChannelValue(filter_channels=0, name="MeanRed")
theremin = Theremin(name="MySine")
red2freq = Mapper(mean_red, theremin["frequency"], exponent=2, name="Red2Freq")
# loop over all images in the folder, create a headless app, load the image, attach the processors and render the timeline
for img_file in img_files:
    print(f"Processing {img_file}")
    img_path = os.path.join(img_folder, img_file)
    with App(headless=True, nrt=True) as app:
        app.load_image_file(img_path)
        app.attach(mean_red)
        app.attach(theremin)
        app.attach(red2freq)
        target_filename = img_file.replace(".jpg", ".wav")
        app.render_timeline_to_file(my_timeline, target_filename)
        print(f"Saved {target_filename}")
    display(Audio(target_filename))

In [None]:
# same thing but render np.arrays instead
img_folder = "images/cellular_dataset/merged_8bit/"
img_files = os.listdir(img_folder)

# example: horizontal scan
duration = 5
my_timeline = [
    (0, {
        "probe_width": 1,
        "probe_height": 500,
        "probe_x": 0,
        "probe_y": 0
    }),
    (duration, {
        "probe_x": 499
    })
]
app = App(headless=True, nrt=True) # create a global graph object, necessary for the Theremin
app.cleanup()
# only need to create processor objects once and attach them to the apps
mean_red = MeanChannelValue(filter_channels=0, name="MeanRed")
theremin = Theremin(name="MySine")
red2freq = Mapper(mean_red, theremin["frequency"], exponent=2, name="Red2Freq")
# loop over all images in the folder, create a headless app, load the image, attach the processors and render the timeline
for img_file in img_files:
    print(f"Processing {img_file}")
    img_path = os.path.join(img_folder, img_file)
    buf = None
    with App(headless=True, nrt=True) as app:
        app.load_image_file(img_path)
        app.attach(mean_red)
        app.attach(theremin)
        app.attach(red2freq)
        buf = app.render_timeline_to_array(my_timeline)
        print(f"Saved {target_filename}")
    display(Audio(buf, rate=app.sample_rate, normalize=False))

In [None]:
import signalflow as sf
nrt = True
_output_buffer_size = 480
_sample_rate = 48000
# Get or create the shared audio graph
graph = sf.AudioGraph.get_shared_graph()
if graph is not None and nrt:
    graph.destroy()
    graph = None
if graph is None:
    output_device = sf.AudioOut_Dummy(2) if nrt else None
    config = sf.AudioGraphConfig()
    print(f"Setting AudioGraphConfig output_buffer_size to {_output_buffer_size}")
    config.output_buffer_size = _output_buffer_size
    print(f"Setting AudioGraphConfig sample_rate to {_sample_rate}")
    config.sample_rate = _sample_rate
    graph = sf.AudioGraph(config=config, start=True, output_device=output_device)
print(f"Graph sample_rate: {graph.sample_rate}")
print(f"Graph output_buffer_size: {graph.output_buffer_size}")

In [None]:
import signalflow as sf
nrt = True
_output_buffer_size = 480
_sample_rate = 48000
# Get or create the shared audio graph
graph = sf.AudioGraph.get_shared_graph()
if graph is not None and nrt:
    graph.destroy()
    graph = None
if graph is None:
    print(f"Setting AudioOut_Dummy buffer_size to {_output_buffer_size}")
    output_device = sf.AudioOut_Dummy(2, buffer_size=_output_buffer_size) if nrt else None
    config = sf.AudioGraphConfig()
    # print(f"Setting output buffer size to {_output_buffer_size}")
    # config.output_buffer_size = _output_buffer_size
    print(f"Setting AudioGraphConfig sample_rate to {_sample_rate}")
    config.sample_rate = _sample_rate
    graph = sf.AudioGraph(config=config, start=True, output_device=output_device)
print(f"Graph sample_rate: {graph.sample_rate}")
print(f"Graph output_buffer_size: {graph.output_buffer_size}")

# Test custom Feature

In [None]:
import numpy as np
import os
import signalflow as sf
from PIL import Image
from IPython.display import Audio, display
from pixasonics.core import App, Mapper
from pixasonics.features import Feature
from pixasonics.synths import Theremin, Oscillator, FilteredNoise, SimpleFM

In [None]:
app = App()

In [None]:
app.load_image_file("images/cellular_dataset/merged_8bit/Timepoint_001_220518-ST_C03_s1.jpg")

In [None]:
# combine red and green channels and all layers
img_folder = "images/cellular_dataset/single_channel_16bit/"
img_files = os.listdir(img_folder)
imgs_red = [f for f in img_files if f.endswith("w2.TIF")] # only red channel images
imgs_green = [f for f in img_files if f.endswith("w1.TIF")] # only green channel images
imgs = []
for img_red, img_green in zip(imgs_red, imgs_green):
    img_path_red = os.path.join(img_folder, img_red)
    img_path_green = os.path.join(img_folder, img_green)
    img_red = Image.open(img_path_red)
    img_green = Image.open(img_path_green)
    img_red = np.array(img_red)
    img_green = np.array(img_green)
    img = np.stack([img_red, img_green], axis=-1) # now the last dimension is the channel dimension
    imgs.append(img)
img = np.stack(imgs, axis=-1) # now the last dimension is the layer dimension
print(img.shape)
app.load_image_data(img) # load as numpy array

In [None]:
class MyFeature(Feature):
    def __init__(self, name="MyFeature"):
        super().__init__(name=name)

    def process_image(self, mat):
        return np.random.rand(*mat.shape)
    
    def compute(self, mat):
        num_features = mat.shape[self.target_dim]
        return np.random.rand(num_features)
    
my_feature = MyFeature()
app.attach(my_feature)

In [None]:
app.detach(my_feature)
my_feature = MyFeature()
my_feature.target_dim = 0
app.attach(my_feature)

In [None]:
from sklearn.cluster import KMeans

class KMeansFeature(Feature):
    def __init__(self, n_clusters=3, name="KMeansFeature"):
        super().__init__(name=name)
        self.n_clusters = n_clusters
        self.kmeans = None
        self._original_shape = None

    def _reshape_for_kmeans(self, mat):
        """Helper to reshape 4D matrix to 2D for KMeans"""
        mat_reshaped = np.moveaxis(mat, self.target_dim, 0)
        return mat_reshaped.reshape(mat_reshaped.shape[0], -1)

    def process_image(self, mat):
        self._original_shape = mat.shape
        features = self._reshape_for_kmeans(mat)
        self.kmeans = KMeans(n_clusters=self.n_clusters).fit(features.T)

        # Get cluster assignments and reshape back to original dimensions
        labels = self.kmeans.predict(features.T)
        other_dims = [s for i, s in enumerate(mat.shape) if i != self.target_dim]
        self.transformed_image = np.expand_dims(
            labels.reshape(*other_dims), 
            axis=self.target_dim
        )
        return self.transformed_image
    
    def compute(self, mat):
        if self.kmeans is None:
            raise ValueError("KMeans model has not been fitted. Call process_image first.")
        features = self._reshape_for_kmeans(mat)
        labels = self.kmeans.predict(features.T)
        # Compute histogram of cluster assignments
        hist, _ = np.histogram(labels, bins=range(self.n_clusters + 1))
        return hist.astype(float) / hist.sum() # normalize to sum to 1

# Example usage
kmeans_feature = KMeansFeature(n_clusters=10)
app.attach(kmeans_feature)

In [None]:
# create a multichannel Theremin that has its frequencies in a harmonic series
fundamental_freq = 110
num_harmonics = kmeans_feature.n_clusters
freqs = fundamental_freq * np.arange(1, num_harmonics + 1)
print("Frequencies:",freqs)
osc = Theremin(frequency=freqs, name="KMeansOsc")
app.attach(osc)

# create a Mapper that will map the KMeans cluster histogram to the amplitude of the Theremin 
k2amp = Mapper(kmeans_feature, osc["amplitude"], exponent=1, name="K2Amp")
app.attach(k2amp)

In [None]:
app.attach(osc)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Create a colormap with distinct colors for each cluster
n_clusters = kmeans_feature.n_clusters
colors = sns.color_palette("husl", n_colors=n_clusters)
colormap = {i: colors[i] for i in range(n_clusters)}

# Get the cluster assignments from transformed_image
cluster_image = kmeans_feature.transformed_image[:, :, 0, 0]
print(cluster_image.shape)

# Create RGB image where each cluster gets a unique color
rgb_image = np.zeros((*cluster_image.shape, 3))
for cluster_id, color in colormap.items():
    mask = cluster_image == cluster_id
    rgb_image[mask] = color

# Plot the results
plt.figure(figsize=(10, 5))

plt.subplot(121)
plt.title('Original Image')
plt.imshow(app.image_displayed)  # Show first layer of original image
plt.axis('off')

plt.subplot(122)
plt.title('K-means Clusters')
plt.imshow(rgb_image)
plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
import json
synthmaps_pca_mel_json = "/Volumes/T7RITMO/synthmaps_code/data/pca_mels_mean.json"
synthmaps_pca_mel_json = "/Volumes/T7RITMO/synthmaps_code/data/pca_perceptual.json"
synthmaps_pca_mel_json = "/Volumes/T7RITMO/synthmaps_code/data/pca_encodec.json"
synthmaps_pca_mel_json = "/Volumes/T7RITMO/synthmaps_code/data/pca_clap.json"
with open(synthmaps_pca_mel_json, "r") as f:
    pca_mel_data = json.load(f)
print(pca_mel_data.keys())

In [None]:
def fluid_dataset2array(
        dataset: dict,
) -> np.ndarray:
    """
    Convert a json dataset to a numpy array.

    Args:
        dataset (dict): The json dataset to convert.

    Returns:
        np.ndarray: The numpy array.
    """
    num_cols = dataset["cols"]
    num_rows = len(dataset["data"])
    out_array = np.zeros((num_rows, num_cols))
    for i in range(num_rows):
        out_array[i] = np.array(dataset["data"][str(i)])
    return out_array

In [None]:
pca_mel_data_array = fluid_dataset2array(pca_mel_data)
print(pca_mel_data_array.shape)

In [None]:
from sklearn.preprocessing import MinMaxScaler

synthmaps_scaler = MinMaxScaler()
pca_mel_data_scaled = synthmaps_scaler.fit_transform(pca_mel_data_array)
print(pca_mel_data_scaled.shape)
print(pca_mel_data_scaled.min(), pca_mel_data_scaled.max())

In [None]:
fm_params_json = "/Volumes/T7RITMO/synthmaps_code/data/fm_params.json"
with open(fm_params_json, "r") as f:
    fm_params_data = json.load(f)
print(fm_params_data.keys())
fm_params_data_array = fluid_dataset2array(fm_params_data)
print(fm_params_data_array.shape)

In [None]:
from sklearn.neighbors import KDTree
from sklearn.decomposition import IncrementalPCA

class PCA2FMTimbreSpace(Feature):
    def __init__(self, name="PCA2FMTimbreSpace"):
        super().__init__(name=name)
        self.pca = None
        self.pca_scaler = None
        self.kdtree = KDTree(pca_mel_data_scaled)
        self._original_shape = None
        self._transformed_points = None

    def _reshape_for_pca(self, mat):
        """Helper to reshape 4D matrix (H, W, Ch, L) to 2D by concatenating the Channel and Layer dimensions"""
        mat_reshaped = mat.reshape(mat.shape[0], mat.shape[1], -1)
        return mat_reshaped.reshape(-1, mat_reshaped.shape[-1])

    def process_image(self, mat):
        self._original_shape = mat.shape
        features = self._reshape_for_pca(mat)
        print(features.shape)
        self.pca = IncrementalPCA(n_components=2)
        self.pca.fit(features)
        self.pca_scaler = MinMaxScaler(feature_range=(0.1, 0.9))
        self._transformed_points = self.pca.transform(features)
        self.pca_scaler.fit(self._transformed_points)
        return mat
    
    def compute(self, mat):
        if self.pca is None:
            raise ValueError("PCA model has not been fitted. Call process_image first.")
        features = self._reshape_for_pca(mat)
        projected = self.pca.transform(features)
        projected_scaled = self.pca_scaler.transform(projected)
        projected_scaled_mean = projected_scaled.mean(axis=0, keepdims=True)
        nearest_idx = self.kdtree.query(projected_scaled_mean, return_distance=False)[0][0]
        fm_params = fm_params_data_array[nearest_idx]
        return fm_params


# Example usage
pca2FM_feature = PCA2FMTimbreSpace()
app.attach(pca2FM_feature)

In [None]:
import matplotlib.pyplot as plt
#plot the pca space
plt.figure(figsize=(10, 5))

plt.subplot(121)

points = pca2FM_feature._transformed_points
plt.scatter(points[:, 0], points[:, 1], alpha=0.5)
plt.title('PCA Space')
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')


In [None]:
from sklearn.neighbors import KDTree
from sklearn.manifold import TSNE

class TSNE2FMTimbreSpace(Feature):
    def __init__(self, name="TSNE2FMTimbreSpace"):
        super().__init__(name=name)
        self.tsne = None
        self.tsne_scaler = None
        self.kdtree = KDTree(pca_mel_data_scaled)
        self._original_shape = None
        self._transformed_points = None

    def _reshape_for_tsne(self, mat):
        """Helper to reshape 4D matrix (H, W, Ch, L) to 2D by concatenating the Channel and Layer dimensions"""
        mat_reshaped = mat.reshape(mat.shape[0], mat.shape[1], -1)
        return mat_reshaped.reshape(-1, mat_reshaped.shape[-1])

    def process_image(self, mat):
        self._original_shape = mat.shape
        features = self._reshape_for_tsne(mat)
        print(features.shape)
        self.tsne = TSNE(n_components=2)
        self._transformed_points = self.tsne.fit_transform(features)
        self.tsne_scaler = MinMaxScaler(feature_range=(0.1, 0.9))
        self.tsne_scaler.fit(self._transformed_points)
        return mat
    
    def compute(self, mat):
        if self.tsne is None:
            raise ValueError("TSNE model has not been fitted. Call process_image first.")
        features = self._reshape_for_tsne(mat)
        projected = self.tsne.transform(features)
        projected_scaled = self.tsne_scaler.transform(projected)
        projected_scaled_mean = projected_scaled.mean(axis=0, keepdims=True)
        nearest_idx = self.kdtree.query(projected_scaled_mean, return_distance=False)[0][0]
        fm_params = fm_params_data_array[nearest_idx]
        return fm_params


# Example usage
tsne2FM_feature = TSNE2FMTimbreSpace()
app.attach(tsne2FM_feature)

In [None]:
import matplotlib.pyplot as plt
#plot the tsne space
plt.figure(figsize=(10, 5))

plt.subplot(121)

points = tsne2FM_feature._transformed_points
plt.scatter(points[:, 0], points[:, 1], alpha=0.5)
plt.title('TSNE Space')
plt.xlabel('TSNE 1')
plt.ylabel('TSNE 2')

In [None]:
fm = SimpleFM()
app.attach(fm)

In [None]:
class FMParamSetter(Mapper):
    def __init__(self, feature, synth, name="FMParamSetter"):
        super().__init__(feature, synth, name=name)

    def map(self, frame=None):
        fm_params = self.buf_in.data
        if fm_params.shape[0] == 3:
            self.obj_out_owner.set_input_buf("carrier_freq", fm_params[0], from_slider=False)
            self.obj_out_owner.set_input_buf("harm_ratio", fm_params[1], from_slider=False)
            self.obj_out_owner.set_input_buf("mod_index", fm_params[2], from_slider=False)

fm_param_setter = FMParamSetter(pca2FM_feature, fm["carrier_freq"])
app.attach(fm_param_setter)

In [None]:
fm2 = SimpleFM(name="FM2")
app.attach(fm2)
fm2_param_setter = FMParamSetter(tsne2FM_feature, fm2["carrier_freq"])
app.attach(fm2_param_setter)

In [None]:
app.probe_height, app.probe_width = 20, 20

In [None]:
app.detach(pca2FM_feature)
app.detach(fm_param_setter)

In [None]:
app.detach(pca2FM_feature)
app.detach(fm_param_setter)
pca2FM_feature = PCA2FMTimbreSpace()
pca2FM_feature.filter_channels = None # try also 0
app.attach(pca2FM_feature)
fm_param_setter = FMParamSetter(pca2FM_feature, fm["carrier_freq"])
app.attach(fm_param_setter)