In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import shutil
import os

# Create working directory
os.makedirs('/content/acoustography_model', exist_ok=True)

# Copy files from Drive
shutil.copy('/content/drive/MyDrive/satere_project/hifigan_model/checkpoint_epoch50.pt',
            '/content/acoustography_model/')
shutil.copy('/content/drive/MyDrive/satere_project/satere_units/satere_kmeans.pkl',
            '/content/acoustography_model/')
shutil.copy('/content/drive/MyDrive/satere_project/hifigan_training/metadata.json',
            '/content/acoustography_model/')

print("Files copied:")
os.listdir('/content/acoustography_model')

Files copied:


['metadata.json', 'satere_kmeans.pkl', 'checkpoint_epoch50.pt']

In [None]:
%%writefile /content/acoustography_model/generator.py

import torch
import torch.nn as nn
import torch.nn.functional as F

class Generator(nn.Module):
    def __init__(self, num_units=100):
        super().__init__()

        self.unit_embed = nn.Embedding(num_units, 256)
        self.pre_conv = nn.Conv1d(256, 512, 7, padding=3)

        self.upsamples = nn.ModuleList([
            nn.Sequential(
                nn.ConvTranspose1d(512, 256, 10, stride=5, padding=2, output_padding=0),
                nn.LeakyReLU(0.1),
                nn.Conv1d(256, 256, 7, padding=3)
            ),
            nn.Sequential(
                nn.ConvTranspose1d(256, 128, 10, stride=5, padding=2, output_padding=0),
                nn.LeakyReLU(0.1),
                nn.Conv1d(128, 128, 7, padding=3)
            ),
            nn.Sequential(
                nn.ConvTranspose1d(128, 64, 8, stride=4, padding=2, output_padding=0),
                nn.LeakyReLU(0.1),
                nn.Conv1d(64, 64, 7, padding=3)
            ),
            nn.Sequential(
                nn.ConvTranspose1d(64, 32, 8, stride=4, padding=2, output_padding=0),
                nn.LeakyReLU(0.1),
                nn.Conv1d(32, 32, 7, padding=3)
            ),
            nn.Sequential(
                nn.ConvTranspose1d(32, 64, 4, stride=2, padding=1, output_padding=0),
                nn.LeakyReLU(0.1),
                nn.Conv1d(64, 64, 7, padding=3)
            ),
        ])

        self.post_conv = nn.Conv1d(64, 1, 7, padding=3)

    def forward(self, x):
        x = self.unit_embed(x).transpose(1, 2)  # [B, 256, T]
        x = self.pre_conv(x)  # [B, 512, T]
        x = F.leaky_relu(x, 0.1)

        for upsample in self.upsamples:
            x = upsample(x)
            x = F.leaky_relu(x, 0.1)

        x = self.post_conv(x)  # [B, 1, T*800]
        x = torch.tanh(x)
        return x.squeeze(1)

Overwriting /content/acoustography_model/generator.py


In [None]:
import sys
sys.path.insert(0, '/content/acoustography_model')

# Force reload the module
import importlib
import generator
importlib.reload(generator)

import torch
import pickle
import numpy as np
from scipy.io.wavfile import write
from generator import Generator
from IPython.display import Audio

# Load generator
print("Loading model...")
gen = Generator(num_units=100)

checkpoint = torch.load('/content/acoustography_model/checkpoint_epoch50.pt', map_location='cpu')
gen.load_state_dict(checkpoint['generator_state_dict'])

gen.eval()
print("Model loaded successfully!")

# Load kmeans
with open('/content/acoustography_model/satere_kmeans.pkl', 'rb') as f:
    kmeans = pickle.load(f)
print(f"Codebook: {kmeans.n_clusters} acoustemes")

Loading model...
Model loaded successfully!


UnpicklingError: invalid load key, '\x01'.

In [None]:
import pickle

# Load with explicit pickle protocol handling
with open('/content/acoustography_model/satere_kmeans.pkl', 'rb') as f:
    kmeans = pickle.load(f)

print("Loaded successfully!")
print("Type:", type(kmeans))

# Check what's inside
if hasattr(kmeans, 'cluster_centers_'):
    print(f"Cluster centers shape: {kmeans.cluster_centers_.shape}")
elif hasattr(kmeans, 'shape'):
    print(f"Array shape: {kmeans.shape}")
else:
    print("Contents:", kmeans)

UnpicklingError: invalid load key, '\x01'.

In [None]:
import torch

checkpoint = torch.load('/content/acoustography_model/checkpoint_epoch50.pt', map_location='cpu')

print("Generator state dict keys:")
for key in checkpoint['generator_state_dict'].keys():
    print(f"  {key}: {checkpoint['generator_state_dict'][key].shape}")

Generator state dict keys:
  unit_embed.weight: torch.Size([100, 256])
  pre_conv.weight: torch.Size([512, 256, 7])
  pre_conv.bias: torch.Size([512])
  upsamples.0.0.weight: torch.Size([512, 256, 10])
  upsamples.0.0.bias: torch.Size([256])
  upsamples.0.2.weight: torch.Size([256, 256, 7])
  upsamples.0.2.bias: torch.Size([256])
  upsamples.1.0.weight: torch.Size([256, 128, 10])
  upsamples.1.0.bias: torch.Size([128])
  upsamples.1.2.weight: torch.Size([128, 128, 7])
  upsamples.1.2.bias: torch.Size([128])
  upsamples.2.0.weight: torch.Size([128, 64, 8])
  upsamples.2.0.bias: torch.Size([64])
  upsamples.2.2.weight: torch.Size([64, 64, 7])
  upsamples.2.2.bias: torch.Size([64])
  upsamples.3.0.weight: torch.Size([64, 32, 8])
  upsamples.3.0.bias: torch.Size([32])
  upsamples.3.2.weight: torch.Size([32, 32, 7])
  upsamples.3.2.bias: torch.Size([32])
  upsamples.4.0.weight: torch.Size([32, 64, 4])
  upsamples.4.0.bias: torch.Size([64])
  upsamples.4.2.weight: torch.Size([64, 64, 7])
  u

In [None]:
# Test with random acousteme sequence
test_sequence = [45, 23, 67, 12, 89, 34, 56, 78, 90, 11, 45, 23, 67, 12, 89, 34]

print(f"Synthesizing {len(test_sequence)} acoustemes...")

units = torch.LongTensor(test_sequence).unsqueeze(0)

with torch.no_grad():
    audio = gen(units)
    audio = audio.squeeze().numpy()

# Normalize
audio = audio / np.max(np.abs(audio)) * 0.95

# Play in Colab
print("Playing audio:")
Audio(audio, rate=16000)

Synthesizing 16 acoustemes...
Playing audio:


In [None]:
# Load a real unit sequence from your corpus
unit_file = '/content/drive/MyDrive/satere_project/satere_units/MAVWYIN1DA_B27_REV_022.units.txt'

with open(unit_file, 'r') as f:
    content = f.read().strip()
    units_list = content.split()
    real_sequence = [int(u) for u in units_list[:500]]  # First 500 units

print(f"Loaded {len(real_sequence)} acoustemes from real data")
print(f"First 20 acoustemes: {real_sequence[:20]}")

units = torch.LongTensor(real_sequence).unsqueeze(0)

with torch.no_grad():
    audio = gen(units)
    audio = audio.squeeze().numpy()

audio = audio / np.max(np.abs(audio)) * 0.95

print(f"Duration: {len(audio) / 16000:.2f} seconds")
Audio(audio, rate=16000)

Loaded 500 acoustemes from real data
First 20 acoustemes: [26, 26, 26, 26, 26, 21, 21, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26]
Duration: 25.01 seconds


In [None]:
# Play the sample audio from training
from IPython.display import Audio
import os

sample_path = '/content/drive/MyDrive/satere_project/hifigan_model/sample_epoch50.wav'

if os.path.exists(sample_path):
    print("Playing sample from epoch 50:")
    display(Audio(sample_path))
else:
    print("Sample file not found")
    # List available samples
    samples = [f for f in os.listdir('/content/drive/MyDrive/satere_project/hifigan_model/') if f.endswith('.wav')]
    print("Available samples:", samples)

Playing sample from epoch 50:


In [None]:
import os

# Check the training data structure
training_path = '/content/drive/MyDrive/satere_project/hifigan_training/'

# Look at the wavs and units folders
wavs_path = os.path.join(training_path, 'wavs')
units_path = os.path.join(training_path, 'units')

if os.path.exists(wavs_path):
    wav_files = sorted(os.listdir(wavs_path))[:5]
    print("Sample WAV files:", wav_files)
else:
    print("No wavs folder found")

if os.path.exists(units_path):
    unit_files = sorted(os.listdir(units_path))[:5]
    print("Sample unit files:", unit_files)
else:
    print("No units folder found")

# Check one pair
if os.path.exists(wavs_path) and os.path.exists(units_path):
    # Load one wav file
    import scipy.io.wavfile as wav
    wav_file = os.path.join(wavs_path, wav_files[0])
    sr, audio = wav.read(wav_file)
    print(f"\nWAV: {wav_files[0]}")
    print(f"  Sample rate: {sr}")
    print(f"  Duration: {len(audio)/sr:.2f} seconds")
    print(f"  Samples: {len(audio)}")

    # Load corresponding units
    unit_file = os.path.join(units_path, wav_files[0].replace('.wav', '.txt'))
    if os.path.exists(unit_file):
        with open(unit_file, 'r') as f:
            units = f.read().strip().split()
        print(f"\nUnits file: {len(units)} units")
        print(f"  Expected duration at 50 fps: {len(units) * 0.02:.2f} seconds")
    else:
        print(f"No matching unit file for {wav_files[0]}")

Sample WAV files: ['seg_00000.wav', 'seg_00001.wav', 'seg_00002.wav', 'seg_00003.wav', 'seg_00004.wav']
Sample unit files: ['seg_00000.txt', 'seg_00001.txt', 'seg_00002.txt', 'seg_00003.txt', 'seg_00004.txt']

WAV: seg_00000.wav
  Sample rate: 16000
  Duration: 5.00 seconds
  Samples: 80000

Units file: 250 units
  Expected duration at 50 fps: 5.00 seconds


In [None]:
import os

# Check the training data structure
training_path = '/content/drive/MyDrive/satere_project/hifigan_training/'

# Look at the wavs and units folders
wavs_path = os.path.join(training_path, 'wavs')
units_path = os.path.join(training_path, 'units')

if os.path.exists(wavs_path):
    wav_files = sorted(os.listdir(wavs_path))[:5]
    print("Sample WAV files:", wav_files)
else:
    print("No wavs folder found")

if os.path.exists(units_path):
    unit_files = sorted(os.listdir(units_path))[:5]
    print("Sample unit files:", unit_files)
else:
    print("No units folder found")

# Check one pair
if os.path.exists(wavs_path) and os.path.exists(units_path):
    # Load one wav file
    import scipy.io.wavfile as wav
    wav_file = os.path.join(wavs_path, wav_files[0])
    sr, audio = wav.read(wav_file)
    print(f"\nWAV: {wav_files[0]}")
    print(f"  Sample rate: {sr}")
    print(f"  Duration: {len(audio)/sr:.2f} seconds")
    print(f"  Samples: {len(audio)}")

    # Load corresponding units
    unit_file = os.path.join(units_path, wav_files[0].replace('.wav', '.txt'))
    if os.path.exists(unit_file):
        with open(unit_file, 'r') as f:
            units = f.read().strip().split()
        print(f"\nUnits file: {len(units)} units")
        print(f"  Expected duration at 50 fps: {len(units) * 0.02:.2f} seconds")
    else:
        print(f"No matching unit file for {wav_files[0]}")

Sample WAV files: ['seg_00000.wav', 'seg_00001.wav', 'seg_00002.wav', 'seg_00003.wav', 'seg_00004.wav']
Sample unit files: ['seg_00000.txt', 'seg_00001.txt', 'seg_00002.txt', 'seg_00003.txt', 'seg_00004.txt']

WAV: seg_00000.wav
  Sample rate: 16000
  Duration: 5.00 seconds
  Samples: 80000

Units file: 250 units
  Expected duration at 50 fps: 5.00 seconds


In [None]:
import scipy.io.wavfile as wav
from IPython.display import Audio, display

training_path = '/content/drive/MyDrive/satere_project/hifigan_training/'
wavs_path = os.path.join(training_path, 'wavs')
units_path = os.path.join(training_path, 'units')

# Load original audio
wav_file = os.path.join(wavs_path, 'seg_00000.wav')
sr, original_audio = wav.read(wav_file)

print("ORIGINAL AUDIO (from training data):")
display(Audio(original_audio, rate=sr))

# Load units for same segment
unit_file = os.path.join(units_path, 'seg_00000.txt')
with open(unit_file, 'r') as f:
    units_list = [int(u) for u in f.read().strip().split()]

print(f"\nUnits: {units_list[:30]}...")

# Synthesize from units
units_tensor = torch.LongTensor(units_list).unsqueeze(0)

with torch.no_grad():
    synth_audio = gen(units_tensor)
    synth_audio = synth_audio.squeeze().numpy()

synth_audio = synth_audio / np.max(np.abs(synth_audio)) * 0.95

print(f"\nSYNTHESIZED AUDIO (from vocoder):")
print(f"Original length: {len(original_audio)} samples")
print(f"Synthesized length: {len(synth_audio)} samples")
display(Audio(synth_audio, rate=16000))

ORIGINAL AUDIO (from training data):



Units: [26, 26, 26, 26, 26, 49, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 26, 49, 30, 98, 98, 88, 88, 88]...

SYNTHESIZED AUDIO (from vocoder):
Original length: 80000 samples
Synthesized length: 200192 samples


In [None]:
# Check the unit values
unit_file = '/content/drive/MyDrive/satere_project/hifigan_training/units/seg_00000.txt'

with open(unit_file, 'r') as f:
    content = f.read().strip()

print("Raw file content (first 200 chars):")
print(repr(content[:200]))

print("\nParsed units:")
units_list = content.split()
print(f"Number of units: {len(units_list)}")
print(f"First 20 units: {units_list[:20]}")
print(f"Unit range: min={min(int(u) for u in units_list)}, max={max(int(u) for u in units_list)}")

# Check if units are space-separated or something else
print(f"\nSeparator check:")
print(f"Newlines in content: {content.count(chr(10))}")
print(f"Spaces in content: {content.count(' ')}")

Raw file content (first 200 chars):
'26 26 26 26 26 49 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 49 30 98 98 88 88 88 91 91 91 69 46 46 46 66 8 72 23 58 49 99 56 93 18 16 16 10 10 10 51 90 6 6 6 6 6 84 27 62 62 62 62 62 62 52 52'

Parsed units:
Number of units: 250
First 20 units: ['26', '26', '26', '26', '26', '49', '26', '26', '26', '26', '26', '26', '26', '26', '26', '26', '26', '26', '26', '26']
Unit range: min=1, max=99

Separator check:
Newlines in content: 0
Spaces in content: 249


In [None]:
# Check if our model matches the checkpoint exactly
print("=== Model parameters ===")
for name, param in gen.named_parameters():
    print(f"{name}: {param.shape}")

print("\n=== Checkpoint parameters ===")
for name, param in checkpoint['generator_state_dict'].items():
    print(f"{name}: {param.shape}")

=== Model parameters ===
unit_embed.weight: torch.Size([100, 256])
pre_conv.weight: torch.Size([512, 256, 7])
pre_conv.bias: torch.Size([512])
upsamples.0.0.weight: torch.Size([512, 256, 10])
upsamples.0.0.bias: torch.Size([256])
upsamples.0.2.weight: torch.Size([256, 256, 7])
upsamples.0.2.bias: torch.Size([256])
upsamples.1.0.weight: torch.Size([256, 128, 10])
upsamples.1.0.bias: torch.Size([128])
upsamples.1.2.weight: torch.Size([128, 128, 7])
upsamples.1.2.bias: torch.Size([128])
upsamples.2.0.weight: torch.Size([128, 64, 8])
upsamples.2.0.bias: torch.Size([64])
upsamples.2.2.weight: torch.Size([64, 64, 7])
upsamples.2.2.bias: torch.Size([64])
upsamples.3.0.weight: torch.Size([64, 32, 8])
upsamples.3.0.bias: torch.Size([32])
upsamples.3.2.weight: torch.Size([32, 32, 7])
upsamples.3.2.bias: torch.Size([32])
upsamples.4.0.weight: torch.Size([32, 64, 4])
upsamples.4.0.bias: torch.Size([64])
upsamples.4.2.weight: torch.Size([64, 64, 7])
upsamples.4.2.bias: torch.Size([64])
post_conv.we