In [None]:
import os
import shutil
from git import Repo
from PIL import Image, UnidentifiedImageError
from pydub import AudioSegment
import mimetypes
from json import JSONDecodeError

### Post-Processing of Annoymised Data
#### Copy Annoymised data back into working directory

In [None]:
DATA_PATH = "../../assignment_dataset_anonymisation"
TARGET_PATH = "../data/annoymised_unprocessed"


In [None]:
repo = Repo(DATA_PATH)
git = repo.git

git.checkout('main')

if 'behind' in git.status():
    print("You must manually pull recent changes from main!")
else:
    for directory in os.listdir(DATA_PATH):
        if directory.startswith(".") or directory == "README.md":
            continue

        for subdir in os.listdir(os.path.join(DATA_PATH, directory)):
            if subdir.startswith("."):
                continue

            shutil.copytree(
                os.path.join(DATA_PATH, directory, subdir),
                os.path.join(TARGET_PATH, directory, subdir),
                dirs_exist_ok=True)

    print("Copy complete!")

### Get file types

In [None]:
mimetypes.init()

In [None]:
# Adapted from: https://stackoverflow.com/questions/4292029/how-to-get-a-list-of-file-extensions-for-a-general-file-type

def get_extensions_for_type(general_type):
    for ext in mimetypes.types_map:
        if mimetypes.types_map[ext].split('/')[0] == general_type:
            yield ext

VIDEO = tuple(get_extensions_for_type('video'))
AUDIO = tuple(get_extensions_for_type('audio'))
IMAGE = tuple(get_extensions_for_type('image'))

### Post-process
#### Set default image

In [None]:
def process_images():
    for root, _, files in os.walk(TARGET_PATH):
        for file in files:
            if file.lower().endswith(IMAGE):
                img_path = os.path.join(root, file)

                try:
                    image = Image.open(img_path)
                except UnidentifiedImageError:
                    print("DELETING NON IMAGE FILE:", img_path)
                    os.remove(img_path)
                    continue


                image_data = image.getdata()

                censored_image_data = [0] * len(image_data)
                censored_image = Image.new("L", image.size)
                censored_image.putdata(censored_image_data)

                censored_image.save(img_path)
                
    print("Processing complete!")

process_images()

#### Set default audio

In [None]:
def process_audio():
    for root, _, files in os.walk(TARGET_PATH):
        for file in files:
            if file.lower().endswith(AUDIO):
                audio_path = os.path.join(root, file)
                _, file_ext = os.path.splitext(audio_path)

                match file_ext:
                    case ".wav":
                        audio = AudioSegment.from_wav(audio_path)
                        silenced_audio = audio - 1000
                        silenced_audio.export(audio_path, format='wav')
                    case ".mp3":
                        try:
                            audio = AudioSegment.from_file(audio_path)
                        except JSONDecodeError:
                            print("UNABLE TO PROCESS: ", audio_path)
                            continue

                        silenced_audio = audio - 1000
                        silenced_audio.export(audio_path, format='mp3')
                    case _:
                        print("UNPROCESSED AUDIO FILE TYPE", file_ext, file)

    print("Processing complete!")

process_audio()

### Check for Video

In [None]:
count_of_video_files = 0

for root, _, files in os.walk(TARGET_PATH):
    for file in files:
        if file.lower().endswith(VIDEO):
            count_of_video_files += 1

print("Number of video files: ", count_of_video_files)

#### Renumber assignments

In [None]:
count = 0

FINAL_PATH = "../data/anonymised_assignments"

for path in sorted(os.listdir(TARGET_PATH)):
    rel_path = os.path.join(TARGET_PATH, path)
    if os.path.isdir(rel_path):
        for submission in os.listdir(rel_path):
            old_path = os.path.join(rel_path, submission)
            if os.path.isdir(old_path):
                new_path = "_".join(old_path.split('_')[:2]) + "_" + str(count)
                new_path = new_path.replace("annoymised_unprocessed", "anonymised_assignments")
                print(old_path, count, new_path)
                shutil.copytree(old_path, new_path, dirs_exist_ok=True)
                count += 1