In [None]:
# Data loading, displaying, and preprocessing 
import os
import random
import numpy as np
import torch
from skimage import io, transform
from torch.utils.data import Dataset, DataLoader
import soundfile as sf
import fnmatch
from IPython.display import Audio, display
from matplotlib import pyplot as plt
import string

In [None]:
# IMAGE data - load and display one image
image_example = io.imread('data/image_corpus/0001.jpg')
plt.imshow(image_example)
plt.show()

In [None]:
# We start by creating a class object for representing our image data
# This is a subclass of torch.utils.data.dataset.Dataset that will serve as input to the DataLoader

class ImageDataset(Dataset):

    def __init__(self, image_dir):
        """Here we initialize the attributes of the object of the class."""
        self.image_dir = image_dir
        self.image_files = sorted(self._find_files(image_dir))

    def __len__(self):
        """Here we return the size of the dataset."""
        return len(self.image_files)

    def __getitem__(self, index):
        """Here we return a data sample for a given index."""
        # TO DO: write this function: it takes as input 'index' (which is an integer number),
        # and returns the corresponding item as a pytorch tensor
        # hint: use 'self.image_files' , which is the list of images paths defined in the '__init__' method
        return x

    def _find_files(self, directory, pattern='*.jpg'):
        """Recursively finds all files matching the pattern."""
        files = []
        for root, dirnames, filenames in os.walk(directory):
            for filename in fnmatch.filter(filenames, pattern):
                files.append(os.path.join(root, filename))
        return files


In [None]:
# We can initialize the dataset by providing the directory to the image corpus data
image_dir = 'data/image_corpus/'
dataset = ImageDataset(image_dir=image_dir)

# You can use the '_find_files' method to get the list of images paths
images_list = dataset._find_files(image_dir)
for l in images_list:
    print(l)
print(len(images_list))

# To get the length of the dataset, you can use the '__len__' method or 'len()' directly
print(dataset.__len__())
print(len(dataset))

In [None]:
# TO DO: load the image with index '0' from the dataset, print its shape and display it
# hint: you can use either '__getitem__(index)', or more simply '[index]'


In [None]:
# Now we can create a dataloader, which samples over the dataset and generates small batches of images
batch_size = 2
image_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Iterate over the dataset to access each batch, and display the images
for x in image_dataloader:
    print(x.shape)
    x = x.int().numpy()
    
    plt.subplot(1, 2, 1)
    plt.imshow(x[0])
    plt.subplot(1, 2, 2)
    plt.imshow(x[1])
    
    plt.show()
    plt.pause(0.001)

In [None]:
# TO DO: create a dataloader with a batch size of 5 and iterate over the dataloader to display the images


In [None]:
# Audio data: a dataset for storing the speech data.
# TO DO: implement the __init__, __len__ and __getitem__ methods for the SpeechDataset class

class SpeechDataset(Dataset):

    def __init__(self, audio_dir, sample_rate, max_sig_length):
        # create variables for storing the attributes of the class

    def __len__(self):
        # return the length of the list of audio files
        return 

    def __getitem__(self, index):
        # create a <filename> variable and store the index-th audio file in it
        
        # get the file name, and read it (use 'sf.read(filename)'') and store the signal in a tensor
        
        # crop x so it has a length equal to 'max_sig_length' (hint: use slicing)

        # check if the sample rates match
        assert original_sample_rate == self.sample_rate
        
        # reshape so that each audio has shape (num_samples, 1)

        return x

    def _find_files(self, directory, pattern='*.wav'):
        # Recursively finds all files matching the pattern.
        files = []
        for root, dirnames, filenames in os.walk(directory):
            for filename in fnmatch.filter(filenames, pattern):
                files.append(os.path.join(root, filename))
        return files

In [None]:
# We can initialize the dataset by providing the directory to the audio corpus data, the target sample rate
# and the maximum signal length (to avoid signals of different lengths)
audio_dir = 'data/audio_corpus/'
sample_rate = 16000
max_sig_length = 16000
dataset = SpeechDataset(audio_dir=audio_dir, sample_rate=sample_rate, max_sig_length=16000)

In [None]:
# Load an audio excerpt from the dataset
x = dataset[0]
print(x.shape)

# To display audio files, we can use 'display(Audio(data, sample_rate))' to have a graphical audio reader
plt.plot(x.numpy())
plt.show()
display(Audio(data=x.numpy(), rate=sample_rate))

# Get the list of files and it's length
audio_list = dataset._find_files(audio_dir)
for l in audio_list:
    print(l)

# Get the length of the dataset
print('Dataset length :', len(dataset))

In [None]:
# Now we can create a dataloader, which samples over the dataset and generates small batches of audio
batch_size = 3
audio_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# TO DO: as for images, iterate over the dataloader to plot the audio


In [None]:
# TO DO: concatenate all the audio files into one single tensor with a dynamic compression

# define a dataloader with batch_size=1

# define an <energy_target> variable (=10) which controls the energy of each signal

# initialize an empty tensor to concatenate everything

# iterate over the dataloader
for x in audio_dataloader:
    # get the signal (only 1 per batch)
    
    # normalize it such that sqrt(sum x^2) = energy_target
    
    # concatenate it to the others in x_tot

# remove the first element of x_tot (which is empty)

# save it as a .pt file and as .wav audio (use sf.write)


In [None]:
# Text data - An example for reading loading and displaying text data
filepath = 'data/text_corpus/file_1.txt'
for line in open(filepath, 'r'):
    text, label = line.strip().split('\t')
print("text: {sentence}\t Label: {label}".format(sentence=text, label=label))

In [None]:
# Dataset for storing the text data.
# TO DO: implement the __init__, __len__ and __getitem__ methods for the TextDataset class

class TextDataset(Dataset):

    def __init__(self, text_dir, max_seq_length=128):
        # create variables for storing the attributes of the class (text_dir, max_seq_length, and the list of text files)

        # extract the labels from the given texts using _get_labels()

        # create a vocabulary of unique words from the given text files using _create_vobabulary()

        # create a word-to-index mapping, i.e., assign a unique (positive) integer to each word in the vocabulary
        # hint: create a dictionary <word_to_index> which takes words as keys and unique integers (0, 1, 2, ...) as values

        # special token [PAD] used for padding text to a fixed length (check _preprocess_text() for details)

    def __len__(self):
        # return the length of the list of text files

    def __getitem__(self, index):
        # get the index-th text file from the list of text files defined in __init__

        # return a list of all tokens in the text and the respective label (use the _tokenize_text method)

        # use the word_to_index mapping to transform the tokens into indices and save them into an IntTensor
        # hint: store all indices in a list and then cast the list into a IntTensor

        # get the index-th label and store it into a FloatTensor
        y = torch.FloatTensor([self.labels[index]])

        # stores the text indices and the label into a dictionary
        
        return features

    def _find_files(self, directory, pattern='*.txt'):
        """Recursively finds all files matching the pattern."""
        files = []
        for root, dirnames, filenames in os.walk(directory):
            for filename in fnmatch.filter(filenames, pattern):
                files.append(os.path.join(root, filename))
        return files

    def _get_labels(self):
        """Extracts the labels from the given text files."""
        labels = []
        for filepath in self.text_files:
            text, label = list(open(filepath, 'r'))[0].split('\t')
            labels.append(int(label))
        return labels

    def _create_vocabulary(self):
        """Creates a vocabulary of unique words from the given text files."""
        all_texts = [list(open(filepath, 'r'))[0].strip().lower() for filepath in self.text_files]
        letters = string.ascii_lowercase
        word_string = ' '.join(all_texts)
        not_letters = set([char for char in word_string if char not in letters and char != ' '])
        for char in not_letters:
            word_string = word_string.replace(char, " ")
        vocab = set(word_string.split())
        return list(vocab)

    def _tokenize_text(self, text_file):
        """
        Removes non-characters from the text and pads the text to max_seq_len.
        *!* Padding is necessary for ensuring that all text_files have the same size
        *!* This is required since DataLoader cannot handle tensors of variable length

        Returns a list of all tokens in the text
        """
        text = list(open(text_file, 'r'))[0].strip().lower()
        letters = string.ascii_lowercase
        not_letters = set([char_ for char_ in text if char_ not in letters and char_ != ' '])
        for char in not_letters:
            text = text.replace(char, " ")
        tokens = text.split()
        for i in range(self.max_seq_len - len(tokens)):
            tokens.append('[PAD]')
        return tokens


In [None]:
text_dir = 'data/text_corpus/'
dataset = TextDataset(text_dir)

# TO DO: create a dataloader for the text data and print the contents of each batch

# define a dataloader with batch_size=2 and shuffle=True

# iterate over the dataloader and store/print each feature (token_ids, label) separately
# hint: check the output of __getitem__
