# Initial experiments with embeddings model

This workbook was used for initial setup of the models. Workbook downloads model, converts it, evaluates it, check if it is mattiching current implementation, and finaly saves it as GZipped JSON file that will be pushed to the repository.

In [None]:
import sys
import subprocess
import re
from pathlib import Path

def ensure_expected_venv():
    venv_path = Path("../.venv").resolve()
    current_prefix = Path(sys.prefix).resolve()
    req_file = Path("../requirements.txt").resolve()
    if current_prefix == venv_path:
        print("✅ Running inside ../.venv")
    else:
        print("❌ Not running inside ../.venv")
        if not venv_path.exists():
            print(f"Creating virtual environment at {venv_path}...")
            subprocess.check_call([sys.executable, "-m", "venv", str(venv_path)])
        raise EnvironmentError("Please activate the virtual environment located at ../.venv")
    result = subprocess.run([sys.executable, "-m", "pip", "install", "--dry-run", "-r", str(req_file)], capture_output=True, text=True, check=True)
    if re.search(r'Collecting|Using cached|Would install', result.stdout):
        print("❌ Some required packages are missing. Installing...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", str(req_file)])
        raise EnvironmentError("Modules installed. Re-run the script.")
    else:
        print("✅ Modules available")
    
ensure_expected_venv()

sys.path.insert(0, '../train')

### Download from hub (or load cached file)

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
from pathlib import Path

SAVED_MODEL_DIR = Path('../data/models/saved_speech_embedding_model')

if SAVED_MODEL_DIR.exists():
    loaded_model = tf.saved_model.load(SAVED_MODEL_DIR)
else:
    loaded_model = hub.load('https://tfhub.dev/google/speech_embedding/1')
    tf.saved_model.save(loaded_model, SAVED_MODEL_DIR, signatures=loaded_model.signatures)

loaded_model_default = loaded_model.signatures["default"]

print('Signatures:', list(loaded_model.signatures.keys()))

### Convert to tflite

In [None]:
TFLITE_MODEL_FILE = Path('../data/models/embedding_model.tflite')

if not TFLITE_MODEL_FILE.exists():
    converter = tf.lite.TFLiteConverter.from_concrete_functions([loaded_model_default])
    converter.target_spec.supported_ops = [
        tf.lite.OpsSet.TFLITE_BUILTINS,
        tf.lite.OpsSet.SELECT_TF_OPS
    ]
    TFLITE_MODEL_FILE.write_bytes(converter.convert())

print(f'\nYou can inspect {TFLITE_MODEL_FILE.resolve()} with https://netron.app/')

### Inspect in tensorboard

In [None]:
writer = tf.summary.create_file_writer("../logdir")
with writer.as_default():
    tf.summary.graph(loaded_model_default.graph)
writer.close()
print ('RUN IN TERMINAL:\n', 'tensorboard --logdir=./logdir')

### Detect windowing and sliding sizes

In [None]:
def binary_search(start, end, callback):
    low = start
    high = end
    while low < high:
        mid = (low + high) // 2
        try:
            ok = callback(mid)
        except:
            ok = False
        if ok:
            high = mid
        else:
            low = mid + 1
    return low if low < end else None

min_samples = binary_search(12400, 200000, lambda x: int(tf.size(loaded_model_default(tf.random.uniform(shape=(1, x), dtype=tf.float32))['default'])) > 0)
after_step1_samples = binary_search(min_samples, 200000, lambda x: int(tf.size(loaded_model_default(tf.random.uniform(shape=(1, x), dtype=tf.float32))['default'])) > 96)
after_step2_samples = binary_search(min_samples, 200000, lambda x: int(tf.size(loaded_model_default(tf.random.uniform(shape=(1, x), dtype=tf.float32))['default'])) > 2 * 96)
after_step3_samples = binary_search(min_samples, 200000, lambda x: int(tf.size(loaded_model_default(tf.random.uniform(shape=(1, x), dtype=tf.float32))['default'])) > 3 * 96)
step_samples = after_step1_samples - min_samples
print(f'Minimum samples: {min_samples} samples = {min_samples / 16} ms')
print(f'Sliding step: {step_samples} samples = {step_samples / 16} ms')
print(f'Sliding step: {after_step2_samples - after_step1_samples} samples = {(after_step2_samples - after_step1_samples) / 16} ms')
print(f'Sliding step: {after_step3_samples - after_step2_samples} samples = {(after_step3_samples - after_step2_samples) / 16} ms')

### Compare my mel spectrogram with model

In [None]:
import numpy as np
import tensorflow as tf
from mel import calc_mel

min_samples = 12400
TFLITE_MODEL_FILE = Path('../data/models/embedding_model.tflite')
data = np.random.uniform(size=(min_samples), low=-1.0, high=1.0).astype(np.float32)
my_mel = calc_mel(data)

model_tflite = tf.lite.Interpreter(model_path=str(TFLITE_MODEL_FILE), experimental_preserve_all_tensors=True)
model_tflite.resize_tensor_input(
    model_tflite.get_input_details()[0]['index'],
    [1, min_samples],
    strict=True)
model_tflite.allocate_tensors()
model_tflite_in = model_tflite.get_input_details()[0]['index']
model_tflite_out = model_tflite.get_output_details()[0]['index']
model_tflite.set_tensor(model_tflite_in, data.reshape(1, -1))
model_tflite.invoke()

tensor_details = model_tflite.get_tensor_details()

for d in tensor_details:
    if d['name'] == 'Squeeze':
        model_mel = model_tflite.get_tensor(d['index'])
        break
else:
    raise ValueError('"Squeeze" tensor not found in TFLite model')

print(f'Mel shape (my): {my_mel.shape}')
print(f'Mel shape (TFLite): {model_mel.shape}')

diff = my_mel - model_mel.reshape(my_mel.shape)
print(f'Max diff: {np.max(np.abs(diff))}')


### Write JSON model and check it

In [None]:
import scripts.model_desc as desc
import gzip

for i in range(10):
    desc.self_test()

JSON_MODEL_FILE = Path('../models/embedding_model.json.gz')
JSON_MODEL_FILE.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(JSON_MODEL_FILE, "wt", encoding="utf-8", compresslevel=9) as f:
    f.write(desc.as_json())
