In [None]:
# Functions that could be useful
# How to load audio from path
from scipy.io import wavfile
def load_audio(path):
    samplerate, data = wavfile.read(path)
    return samplerate, data

# Detection of illegal deforestation

## Acquisition

In [10]:
# installs
!pip install librosa soundfile datasets

# signing in hugging face for datasets
from huggingface_hub import login
token = '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'
login(token)

# train dataset
from datasets import load_dataset
dataset = load_dataset("rfcx/frugalai", streaming=True)
print(next(iter(dataset['train'])))

{'audio': {'path': 'pooks_6ebcaf77-aa92-4f10-984e-ecc5a919bcbb_41-44.wav', 'array': array([-0.00915527,  0.01025391, -0.01452637, ..., -0.00628662,
        0.00064087,  0.00137329]), 'sampling_rate': 12000}, 'label': 1}


In [2]:
# dataset size of audio
print('length of audio : ' + str(len(next(iter(dataset['train']))['audio']['array'])))

length of audio : 36000


In [13]:
# imports
import tensorflow
import pandas
import numpy
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch
import torch.optim as optim

In [8]:
# example of record
next(iter(dataset['train']))['audio']['array']

array([-0.00915527,  0.01025391, -0.01452637, ..., -0.00628662,
        0.00064087,  0.00137329])

In [15]:
# dataset format
dataset

IterableDatasetDict({
    train: IterableDataset({
        features: ['audio', 'label'],
        num_shards: 6
    })
    test: IterableDataset({
        features: ['audio', 'label'],
        num_shards: 3
    })
})

## Spectrogram class

In [14]:
# script for transforming audio_iterable to spectrogram
'''import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

def audio_to_spectrogram(audio_iterable, save_dir=None, n_fft=2048, hop_length=512, n_mels=128):
    """
    Converts an audio file to a Mel spectrogram and saves it as an image.

    Args:
        audio_iterable (iterable): Path to the audio file.
        save_dir (str): Directory to save the spectrogram image (optional).
        n_fft (int): Number of FFT components.
        hop_length (int): Hop length for the STFT.
        n_mels (int): Number of Mel bands.
    
    Returns:
        np.ndarray: The generated Mel spectrogram (log-scaled).
    """
    # Load the audio file
    y, sr = audio_iterable['audio']['array'], audio_iterable['audio']['sampling_rate']
    
    # Generate the Mel spectrogram
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    
    # Convert to log scale (dB)
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)

    # Plot and save the spectrogram as an image if save_dir is specified
    if save_dir:
        Path(save_dir).mkdir(parents=True, exist_ok=True)
        save_path = Path(save_dir) / f"{Path('example').stem}_spectrogram.png" # modify the example part 
        
        plt.figure(figsize=(10, 4))
        librosa.display.specshow(log_mel_spectrogram, sr=sr, hop_length=hop_length,
                                 x_axis='time', y_axis='mel', cmap='viridis')
        plt.colorbar(format='%+2.0f dB')
        plt.title('Mel Spectrogram')
        plt.tight_layout()
        plt.savefig(save_path)
        plt.close()
        print(f"Spectrogram saved to {save_path}")
    
    return log_mel_spectrogram

# Example usage
audio_iterable = next(iter(dataset['train'])) # Replace with your audio file path
output_dir = "spectrograms"  # Replace with your desired output directory
spectrogram = audio_to_spectrogram(audio_iterable, save_dir=output_dir)'''

Spectrogram saved to spectrograms/example_spectrogram.png


In [33]:
# Spectrogram with __iter__
class SpectrogramIterableDataset(torch.utils.data.IterableDataset):
    def __init__(self, iterable_dataset, n_fft=2048, hop_length=512, n_mels=128, target_size=(128, 128)):
        """
        Wraps an IterableDataset to preprocess audio into spectrograms.
        
        Args:
            iterable_dataset (IterableDataset): The input dataset.
            n_fft (int): Number of FFT components.
            hop_length (int): Hop length for the STFT.
            n_mels (int): Number of Mel bands.
            target_size (tuple): Desired size for spectrograms (height, width).
        """
        self.dataset = iterable_dataset
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.target_size = target_size

    def process_audio(self, audio_array, sampling_rate):
        # Generate Mel spectrogram
        mel_spectrogram = librosa.feature.melspectrogram(
            y=audio_array, sr=sampling_rate, n_fft=self.n_fft, 
            hop_length=self.hop_length, n_mels=self.n_mels
        )
        # Convert to log scale (dB)
        log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
        
        # Normalize to [0, 1]
        log_mel_spectrogram = (log_mel_spectrogram - np.min(log_mel_spectrogram)) / (
            np.max(log_mel_spectrogram) - np.min(log_mel_spectrogram)
        )
        
        # Resize to target size
        log_mel_spectrogram = librosa.util.fix_length(log_mel_spectrogram, size=self.target_size[1], axis=1)
        log_mel_spectrogram = librosa.util.fix_length(log_mel_spectrogram, size=self.target_size[0], axis=0)
        
        return torch.tensor(log_mel_spectrogram, dtype=torch.float32).unsqueeze(0)  # Add channel dimension

    def __iter__(self):
        for sample in iter(self.dataset):  # Iterate over the base IterableDataset
            audio_array = sample['audio']['array']
            sampling_rate = sample['audio']['sampling_rate']
            label = sample['label']
            
            # Process audio to spectrogram
            spectrogram = self.process_audio(audio_array, sampling_rate)
            
            yield spectrogram, label
    def __len__(self):
        # Count items manually
        return sum(1 for _ in iter(self.dataset))  # Count the number of items


## Load data

In [34]:
from torch.utils.data import DataLoader
batch_size = 32  # Adjust based on your system's memory

# Wrap the train IterableDataset
wrapped_train_dataset = SpectrogramIterableDataset(dataset['train'])

# Create DataLoader
train_loader = DataLoader(
    wrapped_train_dataset,
    batch_size=batch_size, 
    shuffle=False, # Shuffling is not allowed for IterableDataset
    num_workers=0 # This could be 8 as well, performance depends on available RAM. Ensure your system has enough RAM to handle multiple workers without swapping to disk.
)

'''# Iterate through batches
for batch_idx, (spectrograms, labels) in enumerate(train_loader):
    print(f"Batch {batch_idx}")
    print("Spectrograms shape:", spectrograms.shape)  # (batch_size, 1, height, width)
    print("Labels shape:", labels.shape)
    break'''

'# Iterate through batches\nfor batch_idx, (spectrograms, labels) in enumerate(train_loader):\n    print(f"Batch {batch_idx}")\n    print("Spectrograms shape:", spectrograms.shape)  # (batch_size, 1, height, width)\n    print("Labels shape:", labels.shape)\n    break'

In [None]:
'''# Other possibility without creating the spectrogram class, 
from torch.utils.data import DataLoader

# Create DataLoader for the train set
batch_size = 32  # Adjust as needed
train_loader = DataLoader(
    dataset['train'], 
    batch_size=batch_size, 
    shuffle=False,  # Shuffling is not allowed for IterableDataset
    num_workers=4
)

# Iterate through batches
for batch_idx, batch in enumerate(train_loader):
    audio_arrays = batch['audio']['array']  # Access audio data
    labels = batch['label']  # Access labels
    print(f"Batch {batch_idx}")
    print("Audio arrays shape:", audio_arrays.shape)
    print("Labels shape:", labels.shape)
    break
'''

## CNN

In [35]:
import torch.nn as nn
import torch.nn.functional as F

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Fully connected layers
        self.fc1 = nn.Linear(64 * 32 * 32, 128)  # Adjust based on input size
        self.fc2 = nn.Linear(128, 1)  # Output 1 value for binary classification

    def forward(self, x):
        # Convolutional layers with ReLU activation and max pooling
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))

        # Flatten the output from the convolutional layers
        x = x.view(x.size(0), -1)

        # Fully connected layers with ReLU activation
        x = F.relu(self.fc1(x))

        # Final output layer with a single neuron and sigmoid activation
        x = self.fc2(x)

        # Apply sigmoid to output (to get a probability between 0 and 1)
        return torch.sigmoid(x)

# Example of using BCEWithLogitsLoss (handles the sigmoid internally)
import torch.optim as optim

# Initialize model, loss function, and optimizer
model = CNNModel()
criterion = nn.BCEWithLogitsLoss()  # For binary classification
optimizer = optim.Adam(model.parameters(), lr=0.001)


## Training

In [26]:
# See whether cuda is available for GPU
import torch
print(torch.__version__)
print(torch.cuda.is_available()) # False means not available

2.2.2
False


In [29]:
next(iter(train_loader))

[tensor([[[[0.3280, 0.4255, 0.4397,  ..., 0.0000, 0.0000, 0.0000],
           [0.1475, 0.3320, 0.3144,  ..., 0.0000, 0.0000, 0.0000],
           [0.3071, 0.3802, 0.3775,  ..., 0.0000, 0.0000, 0.0000],
           ...,
           [0.2711, 0.2894, 0.2162,  ..., 0.0000, 0.0000, 0.0000],
           [0.1515, 0.1842, 0.1277,  ..., 0.0000, 0.0000, 0.0000],
           [0.1413, 0.1480, 0.0774,  ..., 0.0000, 0.0000, 0.0000]]],
 
 
         [[[0.5005, 0.5371, 0.4653,  ..., 0.0000, 0.0000, 0.0000],
           [0.5819, 0.5546, 0.4063,  ..., 0.0000, 0.0000, 0.0000],
           [0.6304, 0.6595, 0.5979,  ..., 0.0000, 0.0000, 0.0000],
           ...,
           [0.1368, 0.1523, 0.2050,  ..., 0.0000, 0.0000, 0.0000],
           [0.1529, 0.1337, 0.1328,  ..., 0.0000, 0.0000, 0.0000],
           [0.0491, 0.0251, 0.0325,  ..., 0.0000, 0.0000, 0.0000]]],
 
 
         [[[0.3942, 0.4402, 0.4222,  ..., 0.0000, 0.0000, 0.0000],
           [0.3958, 0.3910, 0.3807,  ..., 0.0000, 0.0000, 0.0000],
           [0.3908

In [36]:
import torch
import torch.optim as optim

# Initialize model, loss function, and optimizer
model = CNNModel()
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 5

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for batch_idx, (spectrograms, labels) in enumerate(train_loader):
        '''# Move data to GPU if available
        spectrograms, labels = spectrograms.to('cuda'), labels.to('cuda')
        model = model.to('cuda')'''
        
        labels = labels.unsqueeze(1).float()  # This reshapes labels to (batch_size, 1)

        # Forward pass
        outputs = model(spectrograms)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Log loss
        running_loss += loss.item()
        if (batch_idx + 1) % 10 == 0:  # Log every 10 batches
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {running_loss/len(train_loader):.4f}")


TypeError: object of type 'IterableDataset' has no len()

In [37]:
dataset

IterableDatasetDict({
    train: IterableDataset({
        features: ['audio', 'label'],
        num_shards: 6
    })
    test: IterableDataset({
        features: ['audio', 'label'],
        num_shards: 3
    })
})