In [5]:
import pickle
import pandas as pd
from pathlib import Path

with open('../data/flo_metadata.dat', 'rb') as f:
    metadata = pickle.load(f)
df = pd.read_csv('../data/flo_test_list.csv', delimiter=',')
test_ids = list(df['track id'])

In [6]:
ballad_kr = [x for x in metadata if 4 in x['genre_id_basket']]
pop_world = [x for x in metadata if '해외 팝' in x['genre_name_basket']]
rnb_kr = [x for x in metadata if '국내 알앤비' in x['genre_name_basket']]
rock_kr = [x for x in metadata if '국내 락/메탈' in x['genre_name_basket']]
rock_world =  [x for x in metadata if '해외 락' in x['genre_name_basket']]
trot =  [x for x in metadata if '트로트' in x['genre_name_basket']]
folk_kr =  [x for x in metadata if '국내 포크/블루스' in x['genre_name_basket']]
pop_kr =  [x for x in metadata if '국내 팝/어쿠스틱' in x['genre_name_basket']]



In [6]:
import collections
total_list = ballad_kr[:200] + pop_world[:200] + rnb_kr[:100] + rock_kr[:100] + rock_world[:100] + trot[:52] + folk_kr[:50] + pop_kr[:151]
unique_id = set([x['track_id'] for x in total_list])
print([x for x in total_list if x['track_id'] in unique_id and x['track_id'] in test_ids ])
total_list = [x for x in metadata if x['track_id'] in unique_id and x['track_id'] not in test_ids]
print(len(total_list))
print(len(set([x['track_name'] for x in total_list])))
duplicated_songtitles =[item for item, count in collections.Counter([x['track_name'] for x in total_list]).items() for i in range(count-1) if count > 1]
dup_idx = []
i = 1
while duplicated_songtitles != []:
    if total_list[-i]['track_name'] in duplicated_songtitles:
        id = duplicated_songtitles.index(total_list[-i]['track_name'])
        duplicated_songtitles.pop(id)
        dup_idx.append(-i)
    i+= 1
    
for idx in reversed(dup_idx):
    total_list.pop(idx)
print(len(set([x['track_name'] for x in total_list])))
len(total_list)

[{'song_id': 434425125, 'artist_id_basket': [80259668], 'artist_name_basket': ['영탁'], 'album_id': 404412585, 'album_name': '내일은 미스터트롯 결승전 베스트', 'track_id': 434425125, 'track_name': '찐이야', 'label_name_basket': ['(주)쇼플레이', 'TV CHOSUN'], 'genre_id_basket': [8], 'genre_name_basket': ['트로트'], 'publish_date': 20200313, 'play_count': 1448527}, {'song_id': 435279872, 'artist_id_basket': [80265501], 'artist_name_basket': ['임영웅'], 'album_id': 404550893, 'album_name': '내일은 미스터트롯 우승자 특전곡', 'track_id': 435279872, 'track_name': '이제 나만 믿어요', 'label_name_basket': ['TV CHOSUN'], 'genre_id_basket': [8], 'genre_name_basket': ['트로트'], 'publish_date': 20200403, 'play_count': 1055132}]
923
900
900


900

In [11]:
class DataMonitor:
    def __init__(self, data_path):
        self.data_path = Path(data_path)
#         self.contour_list = list(self.data_path.rglob('*.txt'))
        self.sr = 44100
        
    def get_contour(self,index):
        song_idx = self.song_list[index]
        pitch_path = self.song_idx_to_path(song_idx).parent / 'pitch_{}.txt'.format(song_idx)
        return load_melody(pitch_path)

    def get_audio(self, song_id, id1, id2):
        song_path = self.song_idx_to_path(song_id)
        audio = self.load_audio(song_path)
        audio = audio[id1:id2]
        return audio

    def load_audio(self, track_path):
        song = AudioSegment.from_file(track_path, 'm4a').set_frame_rate(self.sr).set_channels(1)._data
        decoded = np.frombuffer(song, dtype=np.int16) / 32768
        return decoded

    def song_idx_to_path(self, idx):
        idx = str(idx)
        if 'qbh' in str(self.data_path):
            path = self.data_path / (idx +'.aac')
        else:
            path = self.data_path / idx[:3] / idx[3:6] / (idx +'.aac')
        if not path.exists():
            path = path.with_suffix('.m4a')
        return path
    
    def sample_random_melody(self):
        while True:
            rand_index = random.randint(0, len(self)-1)
            contour = self.get_contour(rand_index)
            q_contour = quantizing_hz(contour)
            c_contour = clearing_note(q_contour)
            melody_indices = self.find_melody_segment(c_contour)
            if len(melody_indices) > 0:
                rand_mel_idx = random.randint(0, len(melody_indices)-1)
                a,b = melody_indices[rand_mel_idx]
                return contour[a:b]
    
    def plot_and_play(self, idx):
        contour = self.get_contour(idx)
        q_contour = quantizing_hz(contour)
        c_contour = clearing_note(q_contour)
        e_contour = elongate_note(c_contour)

        melody_indices = self.find_melody_segment(c_contour)
        if len(melody_indices) > 0:
            a,b = melody_indices[random.randint(0, len(melody_indices)-1)]
            plt.plot(contour[a:b])
#             plt.plot(q_contour[a:b])
            plt.plot(e_contour[a:b])
#             plt.plot(e_contour[a:b])
#             print(q_contour[a:b])
            audio = self.get_audio(self.song_list[idx], a*self.sr//100, b*self.sr//100)
#             audio = self.generate_sine_wav(e_contour[a:b], audio)
            audio = self.generate_sine_wav(contour[a:b], audio)

            return audio 
    
    def find_melody_segment(self, contour, threshold=50):
        return find_melody_seg_fast(contour, zero_threshold=50, max_length=2000, min_length=500)
    
    def get_segmented_contours(self, song_idx):
        pitch_path = self.song_idx_to_path(song_idx).parent / 'pitch_{}.txt'.format(song_idx)
        contour = load_melody(pitch_path)
        return [{'melody':contour[a:b], 'frame_pos':(a,b)} for (a,b) in self.find_melody_segment(contour)]
    
    def generate_sine_wav(self, melody, audio, frame_rate=100):
        melody_resampled = np.repeat(melody, self.sr//frame_rate)
        phi = np.zeros_like(melody_resampled)
        phi[1:] = np.cumsum(2* np.pi * melody_resampled[:-1] / self.sr, axis=0)
        sin_wav = 0.9 * np.sin(phi)
        sin_wav = sin_wav[:audio.shape[0]]
        return sin_wav+(audio*0.3)
    
    def __len__(self):
        return len(self.song_list)
    

def quantizing_hz(contour):
    output = []
    for pitch in contour:
        if pitch > 0:
            q_pitch = 440 * (2 ** ((round(log2(pitch/440) * 12))/12))
        else:
            q_pitch = 0
        output.append(q_pitch)
    return output

def elongate_note(q_contour, patience=10):
    output = []
    prev_pitch = 0
    non_pitch_count = 0
    for pitch in q_contour:
        if pitch > 0:
            output.append(pitch)
            prev_pitch = pitch
            non_pitch_count = 0
        else:
            non_pitch_count += 1
            if non_pitch_count > patience:
                prev_pitch = 0
                non_pitch_count = 0
            output.append(prev_pitch)
    return output

def clearing_note(q_contour, min_pitch_len=5):
    prev_pitch = 0
    prev_pitch_start = 0
    output = [x for x in q_contour]
    for i in range(len(q_contour)):
        pitch = q_contour[i]
        if pitch != prev_pitch:
            prev_pitch_duration = i - prev_pitch_start
            if prev_pitch_duration < min_pitch_len:
                output[prev_pitch_start:i] = [0] * prev_pitch_duration
            prev_pitch = pitch
            prev_pitch_start = i
    return output

def load_melody(path):
    with open(path, "r") as f:
        lines = f.readlines()
    return [float(x.split(' ')[1][:-2]) for x in lines]

def find_melody_seg_fast(contour,zero_threshold, max_length, min_length):
    zeros_slice = get_zero_slice_from_contour(contour, threshold=zero_threshold)
    voice = zero_slice_to_segment(zeros_slice)
    if voice != []:
        expand_voice(voice, max_length=max_length)
    voice = [(int(x[0]), int(x[1])) for x in voice if x[1]-x[0]>min_length]
    return voice

def get_zero_slice_from_contour(contour, threshold=50):
    contour_array = np.asarray(contour)
    is_zero_position = np.where(contour_array == 0)[0]
    diff_by_position = np.diff(is_zero_position)
    slice_pos = np.where(diff_by_position>1)[0]
    voice_frame = np.stack([is_zero_position[slice_pos]+1, is_zero_position[slice_pos] + diff_by_position[slice_pos]], axis=-1)
    if voice_frame.shape[0] == 0:
        zeros_slice = []
    else:
        zeros_slice = [ [0, voice_frame[0,0]] ] + [ [voice_frame[i-1,1], voice_frame[i,0]] for i in range(1, voice_frame.shape[0])]
        zeros_slice = [x for x in zeros_slice if x[1]-x[0] > threshold]
    return zeros_slice

def zero_slice_to_segment(zeros_slice, min_voice_seg=10):
    return [ (zeros_slice[i][1], zeros_slice[i+1][0]) for i in range(len(zeros_slice)-1) if zeros_slice[i+1][0] - zeros_slice[i][1]  >= min_voice_seg]

def expand_voice(voice_slice, max_length=2000):
    def merged_length(alist, idx):
        return alist[idx][0] + alist[idx][1] + alist[idx+1][0]
    len_and_distance = get_length_and_distance_of_melody(voice_slice)
#     valid_distances = [len_and_distance[i][1] for i in range(len(len_and_distance)-1) if len_and_distance[i][0] +len_and_distance[i+1][0]<max_length]
    valid_distances = [ len_and_distance[i][1] for i in range(len(len_and_distance)-1) if merged_length(len_and_distance, i) <max_length]
    while valid_distances:
        min_distance = min(valid_distances)
        min_index = [i for i in range(len(len_and_distance)-1) if len_and_distance[i][1] ==min_distance and  merged_length(len_and_distance, i) <max_length]
        for index in reversed(min_index):
            merge_voice_slice(voice_slice, index)
        if voice_slice == []:
            valid_distances = []
        else:
            len_and_distance = get_length_and_distance_of_melody(voice_slice)
            valid_distances = [ len_and_distance[i][1] for i in range(len(len_and_distance)-1) if merged_length(len_and_distance, i) <max_length]
    return voice_slice

def merge_voice_slice(voice_slice, index):
    first = voice_slice.pop(index)
    second = voice_slice.pop(index)
    new = (first[0], second[1])
    voice_slice.insert(index, new)

def get_length_and_distance_of_melody(voice_slice):
    return [ (voice_slice[i][1]-voice_slice[i][0], voice_slice[i+1][0]-voice_slice[i][1]) for i in range(len(voice_slice)-1)] + [(voice_slice[-1][1]-voice_slice[-1][0], 10000 )]



In [12]:
data_monitor = DataMonitor('/home/svcapp/userdata/flo_data')