In [None]:
import os
import shutil
import numpy as np
import math
import matplotlib
import matplotlib.pyplot as plt
import librosa
import soundfile
import json
import textgrid
import pyroomacoustics as pra
from pyroomacoustics.directivities \
    import (CardioidFamily, DirectionVector, DirectivityPattern)

In [None]:
"""Script parameters"""
target_amount_samples = 250
skipSamples = 0
target_dir = '/workspace/training/KEC'


"Persons"
speakers_in_room = 1

"ROOM"
# size of the Listeners head
head_size = 0.2

randomize_room = False
# room generation will be laplace distributed in these intervalls [width, length, height]
normal_room_dim = [15, 20, 4]
room_dim_ranges = [[3, 30], [3, 30], [2.5, 5]]

# The amout Walls absorb Sound
normal_absorption = 1.2
absorption_range = [1, 2]

# the time it take until the signal drops by 60 dB
normal_rt60 = 0.25
rt60_range = [0.05, 0.75]

"AUdio"
sampleRate = 16000
#total_length = 30
max_audio_length = 30
#min_audio_length = 5
min_characters_per_sentence = 20


# 'nice to have' for data generation
source_dataset = 'KEC - https://clarin.phonetik.uni-muenchen.de/BASRepository/index.php?target=Public/Corpora/KEC/KEC.1.php'


In [None]:
"""Visualization"""
def scatterplot3d(points, dims):
    fig = plt.figure()
    ax = fig.add_subplot(projection='3d')
    ax.set_xlim(0,dims[0])
    ax.set_ylim(0,dims[1])
    ax.set_zlim(0,dims[2])

    for p in points:
        ax.scatter(p[0],p[1],p[2])
    plt.show()
    plt.cla()


In [None]:
"""Util"""
def rec_filter(x):
    return True if 'rec' in x else False


def wav_filter(x):
    return True if '.wav' in x else False


def txt_filter(x):
    return True if '.TextGrid' in x else False


def as_filter(x):
    return True if 'AS' in x else False


def idx_filter_txt(x, y):
    return True if x[len(x) - 10] == y else False


def rec_filter(x):
    return True if 'rec' in x else False


def norm2(x):
    n = np.sum(list(map(lambda y: y**2, x)))
    print(n)
    return n


def distance(x, y):
    v = [ x[i]-y[i] for i in range(len(x))]
    return norm2(v)


def avg(x):
    return sum(x)/len(x)


In [None]:
"Dataset specific Wav Generation"
def FileGeneratorsKEC():
    src = '/workspace/data/KEC'
    recFolders = list(filter(lambda f: rec_filter(f), os.listdir(src)))
    allWavfiles = []
    alltxtFiles = []
    for rec in recFolders:
        currentDir = src+"/"+rec

        wavFilesOfFolder = list(
            filter(lambda f: wav_filter(f), os.listdir(currentDir)))
        wavFilesOfFolder = list(
            map(lambda f: currentDir+"/"+f, wavFilesOfFolder))
        allWavfiles.extend(wavFilesOfFolder)

        txtFilesOfFolder = list(
            filter(lambda f: txt_filter(f), os.listdir(currentDir)))
        txtFilesOfFolder = list(
            map(lambda f: currentDir+"/"+f, txtFilesOfFolder))
        alltxtFiles.extend(txtFilesOfFolder)

    def wavGen():
        for file in allWavfiles:
            yield file

    def textGen():
        for file in alltxtFiles:
            yield file

    return wavGen, textGen

# splits text file in parts


def split_sentence(textGrid):
    currentTime = 0
    sentence_start = 0
    sentences, timestamp = [], []
    sentence = ''

    for t in textGrid[0]:
        if currentTime > t.maxTime:
            break

        currentTime = t.maxTime
        if sentence == '':
            sentence_start = t.maxTime

        sentence += ' ' + t.mark
        duration = np.round(t.maxTime - sentence_start, 5)

        if '<P>' in t.mark:
            if duration < max_audio_length and duration >= 5 and len(sentence) > min_characters_per_sentence:
                sentences.append(sentence[:-3].strip())
                timestamp.append([sentence_start, t.maxTime, duration])
            sentence = ''

    return sentences, timestamp


def VoiceLineGeneratorKEC(count, voicesPerSample):
    wavGen, textGen = FileGeneratorsKEC()
    allWavPaths = [p for p in wavGen()]
    allTxtPaths = [p for p in textGen()]

    for i in range(count):
        w = []
        t = []
        for j in range(voicesPerSample):

            random_file = np.random.randint(0, len(allWavPaths))
            sentences, timestamps = split_sentence(
                textgrid.TextGrid.fromFile(allTxtPaths[random_file]))
            random_phrase = np.random.randint(0, len(timestamps))
            ts = timestamps[random_phrase]
            t.append(ts)
            w.append(loadWavFile(allWavPaths[random_file], ts[0], ts[2]))

        yield w, t
    return


In [None]:
"""Room Functions"""
def generate_room_characteristics():
    dims = rt60 = absorption = 0
    if randomize_room:
        dims: list(int) = [np.random.randint(room_dim_ranges[i, 0], room_dim_ranges[i, 1])
                           for i in range(len(room_dim_ranges[:]))]

        rt60: float = rt60_range[0] + \
            (rt60_range[1]-rt60_range[0])*np.random.random()

        absorption: float = absorption_range[0] + \
            (absorption_range[1]-absorption_range[0]) * np.random.random()

    else:
        dims = normal_room_dim
        rt60 = normal_rt60
        absorption = normal_absorption

    return dims, rt60, absorption

def random_position_in_room(roomDims):
    x = np.random.random() * (roomDims[0]-1) + 0.5
    y = np.random.random() * (roomDims[1]-1) + 0.5
    z = 1.73
    return [x, y, z]


def positions_too_close(positions):
    for a in positions[1:]:
        for b in positions[1:]:
            if a == b:
                continue
            if distance(a, b) < 1:
                return True
    return False


# returns angle between two points in degrees
def get_angle(a, b):
    # turns clockwise angle into counter-clockwise
    def angle_trunc(a):
        while a < 0.0:
            a += math.pi * 2
        return a

    deltaY = b[1] - a[1]
    deltaX = b[0] - a[0]
    return math.degrees(angle_trunc(math.atan2(deltaY, deltaX)))


def transform_to_directivities(positions):
    dirs = []
    print('pos to trans')
    print(positions)
    middle = [avg(np.transpose(positions)[i]) for i in range(len(positions[0]))]

    posi = positions[0]
    # Winkel aus Vogelperspeltive, in die Listnener guckt
    baseAngle = get_angle(posi[:2], middle[:2])

    # linkes und rechtes ohr des LIsteners
    dirs.append([(baseAngle + 90) % 360, 90])  # bug somewhere here
    dirs.append([(baseAngle + 270) % 360, 90])

    for pos in positions[2:]:
        dirs.append([get_angle(pos, posi), 90])

    return dirs


def random_persons_in_room(roomDims, count):
    def pos_in_room(count):
        pos = []
        for i in range(count):
            pos.append(random_position_in_room(roomDims))
        return pos

    # do while: erzeuge solange bis gültiges ergebnis
    positions = pos_in_room(count)
    while(positions_too_close(positions)):
        positions = pos_in_room(count)
    positions[0] = positions[1]
    scatterplot3d(positions,roomDims)
    return positions, transform_to_directivities(positions)

# calculates mic positions depentend of middle point


def get_pos_mics(position, dir):

    x = point_pos(position[0], head_size/2, dir[0][0])
    y = point_pos(position[0], head_size/2, dir[1][0])

    return [x, y]

# calculates mic position depentend of middle point


def point_pos(x, d, theta):
    theta_rad = math.pi/2 - math.radians(theta)
    return [x[0] + d * math.cos(theta_rad), x[1] + d*math.sin(theta_rad), x[2]]


In [None]:
"""WAV and audio Mixing"""
def loadWavFile(path, offset=0, duration=None):
    wav, sr = librosa.load(path, sr=sampleRate, offset=offset,
                           duration=duration, mono=True)

    wav = librosa.util.normalize(wav)
    return wav


def makeCardioid(direction):
    return CardioidFamily(
        orientation=DirectionVector(
            azimuth=direction[0], colatitude=direction[1], degrees=True),
        pattern_enum=DirectivityPattern.CARDIOID,
    )


def createRoom(room_dim, rt60,absorption):
    e_absortion, max_order = pra.inverse_sabine(rt60, room_dim)
    room: pra.ShoeBox = pra.ShoeBox(room_dim, fs=sampleRate, materials=pra.Material(
        e_absortion), absorption=absorption, max_order=max_order)
    return room

def mixRoom(room,listenerPositions, listenerDirs, speakerPositions, speakerDirs, wavs):
    listenerDirs = list(map(lambda d: makeCardioid(d), listenerDirs))
    speakerDirs = list(map(lambda d: makeCardioid(d), speakerDirs))

    mic_array = pra.MicrophoneArray(
        np.c_[listenerPositions[0], listenerPositions[1]], directivity=listenerDirs, fs=sampleRate)

    for i in range(len(speakerPositions)):
        room.add_source(
            position=speakerPositions[i], directivity=speakerDirs[i], signal=wavs[i])
    room.add_microphone_array(mic_array)

    return room

def exportRoom(room:pra.ShoeBox, filepath):
    room.mic_array.to_wav(filepath, norm=True, bitdepth=np.float32)
    

In [None]:
"""Exporting training data"""

# sampleNr: int, speakerIdsList:int[n], positionList:float[n+2], directions:float[n+2], timeframes:floats[n][2]
def createJsonData(sampleNr, speakerIdsList, positionList, directions, timeframes):
    speakers = []
    for i in range(len(speakerIdsList)):
        speaker = {
            'id': speakerIdsList[i],
            'position': positionList[i+2],
            'direction': directions[i+2],
            'startTime': timeframes[i][ 0],
            'endTime': timeframes[i][ 1],
            'duration': timeframes[i][ 2]
        }
        speakers.append(speaker)

    return {
        'comment': 'direction: [azimuth, colatitude]',
        'sample': {
            'id': sampleNr,
            'speakers': speakers,
            'listener': {
                'position': positionList[0],
                'directionLeft': directions[0],
                'directionRight': directions[1]
            },
            'source': source_dataset
        }
    }

def createFolder(targetFolder):
    try:
        os.mkdir(targetFolder)
    except Exception as e:
        print(e)
        shutil.rmtree(targetFolder)
        os.mkdir(targetFolder)

def exportSample(sampleNr,room,wavs,json_data):
    folder = target_dir+'/'+str(sampleNr)
    createFolder(folder)
    exportRoom(room, folder+'/room.wav')
    for i in range(len(wavs)):
        soundfile.write(folder+f'/speaker{i}.wav', wavs[i], sampleRate)
    with open(folder+'/description.json', 'w') as file:
        json.dump(json_data, file, indent=4)

In [None]:
"""MAIN"""

def generate():
    sampleNr = skipSamples
    gen = VoiceLineGeneratorKEC(1, speakers_in_room)
    for (wavs, timestamps) in gen:
        #creating parameters
        sampleNr += 1
        dims, rt60, absorption = generate_room_characteristics()
        room = createRoom(dims, rt60, absorption)
        pos, dirs = random_persons_in_room(dims, speakers_in_room+2)
        listener_pos = get_pos_mics(pos[:2], dirs[:2])
        print(listener_pos)
        #creating data
        room = mixRoom(room, listener_pos, dirs[:2], pos[2:], dirs[2:], wavs)
        room.simulate()
        json_data = createJsonData(sampleNr, range(
            len(wavs)), pos, dirs, timestamps)

        exportSample(sampleNr,room,wavs,json_data)

        msg = f'Generated Room Nr.{sampleNr}.'
        print(msg)


if __name__ == '__main__':
    generate()
