# LPC Video creation

This notebook is used to put all functionality to create a video of the LPC process. It is not necessary to look at this notebook.

## If you are running this Notebook on Google Colab

If you are running this Notebook on Google Colab you should store the files from the ZIP file downloaded from Blackboard to your Googe Drive first and then you have to give this Notebook access to these files.
 
If you are running the code locally (e.g. in Jupyter Notebook) you don't need to adapt (or execute) the following cell.

The following code assumes that you stores the LPC older directly in your main Google Drive folder. Please adapt the pathes accordingly if you chose a different storage location.

In [None]:
# set this to True if you intend to work on Google Drive and with Google Colab
I_AM_USING_GOOGLE_DRIVE = False 

# mount your Google Drive to be accessible from Google Colab
if I_AM_USING_GOOGLE_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive/')
    !cp drive/MyDrive/LPC/nb_importer.py .
    !cp drive/MyDrive/LPC/lpc_synthesis.ipynb .

In [None]:
# if you don't have ffmpeg installed uncomment the next line
#!pip install ffmpeg-python

# ffmpeg for combining video and audio
import ffmpeg
# plotting
import matplotlib.pyplot as plt
# for writing wave files
import soundfile as sf
import numpy as np
import math

from IPython.display import Video, display
from scipy.signal import lfilter

# Import from other notebook
import nb_importer
from lpc_synthesis import rms

In [None]:
def video_plotting_setup(video, sample_rate_hz, signal, signal_reconstructed, half_window_len_samples, p):
    if not video:
        return

    color_original = "C0"
    color_transfer = "C1"
    color_original_secondary = "C2"
    color_synthesized = "C4"
    color_synthesized_secondary = "C6"
    color_highlight = "C3"

    # Prepare the plotting, i.e., set all static elements
    fig = plt.figure(figsize=(12, 7), constrained_layout=True)
    fig.suptitle("Linear Predictive Coding")

    # Create grid for the different plots (2 lines, 9 columns)
    gs = fig.add_gridspec(3, 8)

    # Add top left plot for different spectrum information
    ax_spectrum = fig.add_subplot(gs[0, :4])
    # sampling theorem, maximal frequency is half the sampling frequency
    ax_spectrum.set_xlim((0, sample_rate_hz // 2))
    ax_spectrum.set_ylim((-75, 75))
    (spectrum,) = ax_spectrum.plot(
        [], [], color=color_original, label="Original Signal Spectrum"
    )
    (transfer,) = ax_spectrum.plot(
        [], [], color=color_transfer, linestyle="--", label="Transfer Function"
    )
    (error_spectrum,) = ax_spectrum.plot(
        [], [], color=color_original_secondary, label="Error Spectrum"
    )

    ax_spectrum.legend()
    ax_spectrum.set_title(
        "Original Signal Spectrum vs. Transfer Function vs. Error Spectrum"
    )
    ax_spectrum.set_xlabel("Frequency (Hz)")
    ax_spectrum.set_ylabel("Energy (dB)")

    ax_spectrum2 = fig.add_subplot(gs[0, 4:])
    ax_spectrum2.set_xlim((0, sample_rate_hz // 2))
    ax_spectrum2.set_ylim((-75, 75))
    (error_spectrum2,) = ax_spectrum2.plot(
        [], [], color=color_synthesized_secondary, label="Excitation Signal Spectrum"
    )
    (transfer2,) = ax_spectrum2.plot(
        [], [], color=color_transfer, linestyle="--", label="Transfer Function"
    )
    (spectrum2,) = ax_spectrum2.plot(
        [], [], color=color_synthesized, label="Synthesized Signal Spectrum"
    )

    ax_spectrum2.legend()
    ax_spectrum2.set_title(
        "Excitation Signal Spectrum vs. Transfer Function vs. Synthesized Signal Spectrum"
    )
    ax_spectrum2.set_xlabel("Frequency (Hz)")
    ax_spectrum2.set_ylabel("Energy (dB)")

    ax_bar = fig.add_subplot(gs[1, 0])
    bar = ax_bar.bar([0], [0], color=color_synthesized_secondary)
    ax_bar.set_ylim((80, 250))
    ax_bar.tick_params("x", bottom=False, labelbottom=False)

    ax_bar.set_ylabel("Fundamental Frequency (Hz)")

    ax_synth = fig.add_subplot(gs[1, 1:5])
    (plt_synth,) = ax_synth.plot([], [], color=color_synthesized_secondary)
    ax_synth.set_xlim((0, 2 * half_window_len_samples))
    signal_max = np.max(np.abs(signal))
    ax_synth.set_ylim((-3277, 6554))
    ax_synth.set_xlabel("Time (samples)")
    ax_synth.set_title("Synthesised Excitation Signal Waveform")

    ax_glottis = fig.add_subplot(gs[1, 5:])
    plt_glottis_top = ax_glottis.stairs([], color=color_synthesized, linewidth=2)
    plt_glottis_bot = ax_glottis.stairs([], color=color_synthesized, linewidth=2)
    ax_glottis.set_xlim((-1, p))
    ax_glottis.set_ylim((-1.05, 1.05))
    ax_glottis.set_axis_off()

    ax_glottis.set_title("Vocal Tract and Glottis")

    (plt_glottis_activity,) = ax_glottis.plot([0], [0], "ko", markersize=20)

    ax_waveform = fig.add_subplot(gs[2, :4])
    ax_waveform.set_xlim((0, len(signal)))
    ax_waveform.set_ylim((-signal_max, signal_max))
    (plt_waveform,) = ax_waveform.plot(
        np.arange(len(signal)), signal, color=color_original
    )
    plt_progress = ax_waveform.axvspan(0, 0, color=color_highlight, alpha=0.2)

    ax_waveform.set_title("Original Signal Waveform")
    ax_waveform.set_xlabel("Time (samples)")

    ax_waveform2 = fig.add_subplot(gs[2, 4:])
    ax_waveform2.set_xlim((0, len(signal)))
    ax_waveform2.set_ylim((-signal_max, signal_max))
    (plt_waveform2,) = ax_waveform2.plot(
        np.arange(len(signal)), signal_reconstructed, color=color_synthesized
    )
    plt_progress2 = ax_waveform2.axvspan(0, 0, color=color_highlight, alpha=0.2)

    ax_waveform2.set_title("Synthesized Signal Waveform")
    ax_waveform2.set_xlabel("Time (samples)")

    return (
        fig,
        spectrum,
        transfer,
        error_spectrum,
        spectrum2,
        transfer2,
        error_spectrum2,
        bar[0],
        plt_synth,
        plt_glottis_top,
        plt_glottis_bot,
        plt_glottis_activity,
        plt_waveform,
        plt_progress,
        plt_waveform2,
        plt_progress2,
    )

In [None]:
def video_plotting_frame(
    video,
    plotting,
    block,
    sample_rate_hz,
    residual_signal,
    block_synthesized,
    excitation_signal_synthesized,
    excitaton_frequency_hz,
    noise_excitation_percentage,
    k,
    signal_energy,
    index,
    half_window_len_samples,
    signal_reconstructed,
    p,
    a,
):
    if not video:
        return

    (
        _,
        spectrum,
        transfer,
        error_spectrum,
        spectrum2,
        transfer2,
        error_spectrum2,
        bar,
        plt_synth,
        plt_glottis_top,
        plt_glottis_bot,
        plt_glottis_activity,
        plt_waveform,
        plt_progress,
        plt_waveform2,
        plt_progress2,
    ) = plotting

    # --------
    # PLOTTING
    # --------
    N = len(block)
    # Get smallest power of 2 at least as large as N
    NFFT = 2 ** math.ceil(math.log2(N))
    freq_vec = np.linspace(0, sample_rate_hz // 2, NFFT // 2)
    X = np.fft.rfft(block, NFFT) / N
    X = 2 * np.abs(X[: NFFT // 2])

    # Prevent zeros from messing up logarithm
    X[X < 1e-10] = 1e-10
    spectrum.set_data(freq_vec, 20 * np.log10(X))

    E = np.fft.rfft(residual_signal, NFFT) / N
    E = 2 * np.abs(E[: NFFT // 2])

    error_spectrum.set_data(freq_vec, 20 * np.log10(E))

    # transfer function
    dirac_impulse = np.zeros(N)
    dirac_impulse[0] = np.sqrt(N) * rms(residual_signal)
    h = lfilter(np.array([1], dtype=a.dtype), a, dirac_impulse)
    H = np.fft.rfft(h, NFFT) / N
    H = 2 * np.abs(H[: NFFT // 2])

    transfer.set_data(freq_vec, 20 * np.log10(H))

    # SPECTRUM 2
    X = np.fft.rfft(block_synthesized, NFFT) / N
    X = 2 * np.abs(X[: NFFT // 2])

    # Prevent zeros from messing up logarithm
    X[X < 1e-10] = 1e-10
    spectrum2.set_data(freq_vec, 20 * np.log10(X))

    E = np.fft.rfft(excitation_signal_synthesized, NFFT) / N
    E = 2 * np.abs(E[: NFFT // 2])

    error_spectrum2.set_data(freq_vec, 20 * np.log10(E))

    # transfer function
    transfer2.set_data(freq_vec, 20 * np.log10(H))

    # set excitation frequency
    bar.set_height(excitaton_frequency_hz)
    if not np.isnan(noise_excitation_percentage):
        bar.set_alpha(1 - noise_excitation_percentage)

    plt_synth.set_data(
        np.arange(len(excitation_signal_synthesized)), excitation_signal_synthesized
    )

    # The reflection coefficients k are used to simulate an acoustic tube
    # Compute the impedance ratios ([Mak], formula (45)) between consecutive sections of acoustic tube
    g = (1 + k) / (1 - k)
    # "In the case of an acoustic tube with p sections of equal thickness, the impedance ratios reduce to the inverse ratio of the consecutive cross-sectional areas." [Mak], S. 566
    # => computes ratios between cross-sections
    ratios = 1 / g
    # D will contain the diameters of the P sections
    D = np.ones(p)
    for i in range(1, p):
        # Compute area from ratio and previous area
        D[i] = D[i - 1] * ratios[i - 1]
    # Normalize
    D = D / np.max(D)
    # Obtain diameter from area (due to normalization we don't care about the $\pi$ factor here)
    D = np.sqrt(D)
    plt_glottis_top.set_data(values=D, edges=np.arange(p + 1))
    plt_glottis_bot.set_data(values=-D, edges=np.arange(p + 1))

    glottis_acitvity = max(min(20 * rms(residual_signal) / signal_energy, 1), 0)
    plt_glottis_activity.set_markerfacecolor(
        (1 - glottis_acitvity, 1 - glottis_acitvity, 1 - glottis_acitvity)
    )

    # Indicator in the waveform which block we are processing
    plt_progress.set_xy(
        [
            [index * half_window_len_samples, 0],
            [index * half_window_len_samples, 1],
            [(index + 2) * half_window_len_samples, 1],
            [(index + 2) * half_window_len_samples, 0],
        ]
    )

    # Indicator in the waveform which block we are processing
    plt_waveform2.set_data(np.arange(len(signal_reconstructed)), signal_reconstructed)
    plt_progress2.set_xy(
        [
            [index * half_window_len_samples, 0],
            [index * half_window_len_samples, 1],
            [(index + 2) * half_window_len_samples, 1],
            [(index + 2) * half_window_len_samples, 0],
        ]
    )

    # Return a list of all modified plot parts
    return (
        spectrum,
        transfer,
        bar,
        plt_synth,
        plt_glottis_top,
        plt_glottis_bot,
        plt_glottis_activity,
        plt_progress,
    )

In [None]:
def video_plotting_finish(video, plotting, anim, signal, signal_reconstructed, sample_rate_hz):
    # Write animation to disk
    anim.save("lpc-result-video.mp4")
    
    plt.close(plotting[0])
    
    # If we want a video we also need to write the sound file to disk
    sf.write("lpc-result-audio.wav", np.asarray(signal_reconstructed, dtype=signal.dtype), sample_rate_hz)
    # Now use FFmpeg to combine the video (without sound) and the sound to a video with sound
    video = ffmpeg.input("lpc-result-video.mp4")
    audio = ffmpeg.input("lpc-result-audio.wav")
    ffmpeg.concat(video, audio, v=1, a=1).output("lpc-result.mp4").run(
        overwrite_output=True, quiet=True
    )

    # Display the video inside the notebook
    display(Video("lpc-result.mp4"))