In [121]:
import torch
import torchsig
import torchaudio

from matplotlib import pyplot as plt
from torchaudio.io import StreamReader
import itertools

  from tqdm.autonotebook import tqdm


In [None]:
stream = StreamReader( src=":2",format="avfoundation")


In [None]:
print(stream.get_src_stream_info(0))

In [None]:
stream.add_basic_audio_stream(
    frames_per_chunk=1024,
)

In [None]:
for i in range(stream.num_out_streams):
    print(stream.get_out_stream_info(i))

In [None]:
chunks = next(stream.stream())

In [None]:
chunk = chunks[0]
chunk.shape

In [None]:
X = torch.fft.fft(chunk[:,0])
print(X.shape)

mag_X = torch.abs(torch.fft.fftshift(X))
print(mag_X.shape)
plt.plot(20*torch.log10(mag_X[:,0]))
plt.hold(True)
plt.plot(20*torch.log10(mag_X[:,1]))
plt.show()


In [None]:
plt.figure()
plt.plot(chunk[:,0])
plt.figure()
plt.plot(chunk[:,1])
plt.figure()
plt.plot(chunk[:,0] - chunk[:,1])




In [None]:
# Parameters
samples = 512
frames = 256

class Spectrogram():
    def __init__(self,samples,frames):
        self.spect = torch.zeros((frames,samples))
    def update(self,x):
        # Compute fft for samples
        X = torch.fft.fftshift(torch.fft.fft(x))

        # roll the buffer
        self.spect = torch.roll(self.spect,shifts = (-1,0), dims=(0,1))
        
        # Set the first row to our new data
        self.spect[0,:] = X

# Form a circular buffer
spect = Spectrogram(samples,frames)

# Setup the streams 
streamer = StreamReader( src=":2",format="avfoundation")
streamer.add_basic_audio_stream(
    frames_per_chunk=samples,
)

N = 4096
# Set up loop to read in first N chunks 
for chunk in itertools.islice(stream, N):
    spect.update(chunks[0][:,0])

# x = torch.tensor([1, 2, 3, 4, 5, 6, 7, 8]).view(4, 2)
# print(x)
# print(x.shape)
# torch.roll(x, shifts=(1, 0), dims=(0, 1))

In [None]:
torchaudio.utils.ffmpeg_utils.get_input_devices() 

In [None]:

import torch
import torchsig
import torchaudio

from matplotlib import pyplot as plt
from torchaudio.io import StreamReader
import itertools


In [None]:
samples = 2048
# Setup the streams 
streamer = StreamReader( src=":2",format="avfoundation")
streamer.add_basic_audio_stream(
    frames_per_chunk=samples,
)


In [None]:

stream_info = streamer.get_src_stream_info(0)
sample_rate = stream_info.sample_rate
sample_rate

In [73]:
class CustomWindows:
    @staticmethod
    def rectangular_window(window_size):
        return [1,2,3,4]

getattr(CustomWindows,'rectangular_window')

<function __main__.CustomWindows.rectangular_window(window_size)>

In [120]:
import torch
class CustomWindows(object):
    def __init__(self):
        self.__window_functions = {
            "bartlett_window" : torch,
            "blackman_window" : torch,
            "hamming_window" : torch,
            "hann_window" : torch,
            "kaiser_window" : torch,
        }
    def __str__(self):
        return str(list(self.__window_functions.keys()))
    def __getattr__(self,item):
        if item in self.__window_functions:
            return getattr(self.__window_functions[item],item)
        else:
            raise AttributeError(item)
    @staticmethod
    def rectangular_window(window_size):
        return torch.ones(window_size)
    
m = CustomWindows()
print('Hann')
print(m.hann_window(10))
print('Kaiser')
print(m.kaiser_window(10,True,7))
print("Rectangular")
print(m.rectangular_window(10))
print(m.foo())
m.__dict__

Hann
tensor([0.0000, 0.0955, 0.3455, 0.6545, 0.9045, 1.0000, 0.9045, 0.6545, 0.3455,
        0.0955])
Kaiser
tensor([0.0059, 0.0797, 0.2772, 0.5834, 0.8774, 1.0000, 0.8774, 0.5834, 0.2772,
        0.0797])
Rectangular
tensor([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])


AttributeError: foo

In [None]:
class ContextCacher:
    """Cache the end of input data and prepend the next input data with it.

    Args:
        segment_length (int): The size of main segment.
            If the incoming segment is shorter, then the segment is padded.
        context_length (int): The size of the context, cached and appended.
    """

    def __init__(self, segment_length: int, context_length: int):
        self.segment_length = segment_length
        self.context_length = context_length
        self.context = torch.zeros([context_length])

    def __call__(self, chunk: torch.Tensor):
        if chunk.size(0) < self.segment_length:
            chunk = torch.nn.functional.pad(chunk, (0, self.segment_length - chunk.size(0)))
        chunk_with_context = torch.cat((self.context, chunk))
        self.context = chunk[-self.context_length :]
        return chunk_with_context
    

In [None]:
class Window:
    @staticmethod
    def 

class Spectrogram:
    def __init__(self,samples,frames,n_zp=2,window='rectangular'):
        self.spect = torch.zeros((frames,samples))
        self.n_zp = n_zp
        self.fft_size = n_zp * samples
        self.set_window(window)

    def __call__(self,x):
        self._update(x)

    def set_window(self,window):
        # Options are 
        # - bartlett_window
        # - blackman_window
        # - hamming_window
        # - hann_window
        # - kaiser_window
        # - rectangular_window

    def _update(self,x):
        # Compute fft for samples
        X = torch.fft.fftshift(torch.fft.fft(x))

        # roll the buffer
        self.spect = torch.roll(self.spect,shifts = (-1,0), dims=(0,1))
        
        # Set the first row to our new data
        self.spect[0,:] = X

In [None]:
# define a circular matrix to store the spectrogram
# To minimize latency, we apply the FFT without overlap for now
# <TODO> add STFT overlap with OLS 

chunk_length = 1024
frames_to_show = 4096
spect = Spectrogram(chunk_length,frames_to_show)

def _plot(seg,num_iter,unit=25):
    # pop the data matrix, adding out latest segment

    fig,axes = plt.subplots(1,1)
    axes[0].imshow(spec,aspect="auto",origin="lower")
    # axes[0].tick_params(which='both',left=False,labelleft=False)


In [122]:
spect

NameError: name 'spect' is not defined

In [None]:
stream_iterator = streamer.stream()
segment_length = samples
context_length = segment_length/2

cacher = ContextCacher(segment_length, context_length)

state, hypothesis = None, None

def _plot(feats, num_iter, unit=25):
    unit_dur = segment_length / sample_rate * unit
    num_plots = num_iter // unit + (1 if num_iter % unit else 0)
    fig, axes = plt.subplots(num_plots, 1)
    t0 = 0
    for i, ax in enumerate(axes):
        feats_ = feats[i*unit:(i+1)*unit]
        t1 = t0 + segment_length / sample_rate * len(feats_)
        feats_ = torch.cat([f[2:-2] for f in feats_])  # remove boundary effect and overlap
        ax.imshow(feats_.T, extent=[t0, t1, 0, 1], aspect="auto", origin="lower")
        ax.tick_params(which='both', left=False, labelleft=False)
        ax.set_xlim(t0, t0 + unit_dur)
        t0 = t1
    fig.suptitle("MelSpectrogram Feature")
    plt.tight_layout()


@torch.inference_mode()
def run_inference(num_iter=100):
    global state, hypothesis
    chunks = []
    feats = []
    for i, (chunk,) in enumerate(stream_iterator, start=1):
        segment = cacher(chunk[:, 0])
        features, length = feature_extractor(segment)
        hypos, state = decoder.infer(features, length, 10, state=state, hypothesis=hypothesis)
        hypothesis = hypos[0]
        transcript = token_processor(hypothesis[0], lstrip=False)
        print(transcript, end="", flush=True)

        chunks.append(chunk)
        feats.append(features)
        if i == num_iter:
            break

    # Plot the features
    _plot(feats, num_iter)
    return IPython.display.Audio(torch.cat(chunks).T.numpy(), rate=sample_rate)

In [None]:
# Dynamically update a plot in Jupyter 
%matplotlib inline
import time
from matplotlib import pyplot as plt
from IPython import display
import numpy as np
for i in range(10):
    plt.plot(np.random.randn(100,1))
    display.clear_output(wait=True)
    display.display(plt.gcf())
    time.sleep(1.0)

In [None]:
import numpy as np # to generate random data
import matplotlib.pyplot as plt # to make figure

# optional (just for figure appearence)
plt.style.use('seaborn-colorblind')
plt.style.use('seaborn-whitegrid')

print('library imported')

In [None]:
%matplotlib qt
MEASUREMENT_TIME = 50
INTERVAL_SEC = 0.1

for i in range(MEASUREMENT_TIME):
    # replace with your data
    data = np.random.rand(100)

    plt.plot(data)

    # figure appearence adjustments
    plt.ylim(-0.2, 1.2)
    plt.title(f'FRAME {i+1}')

    # to avoid clearing last plot
    if (i != MEASUREMENT_TIME-1):
        plt.draw()
        plt.pause(INTERVAL_SEC)
        plt.cla()
    else:
        plt.show()

In [None]:
np.random.randn(100,1).shape