<a href="https://colab.research.google.com/github/maecyntha/ai-classical-music-detector/blob/main/01_feature_extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pretty_midi

Collecting pretty_midi
  Downloading pretty_midi-0.2.10.tar.gz (5.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting mido>=1.1.16 (from pretty_midi)
  Downloading mido-1.3.3-py3-none-any.whl.metadata (6.4 kB)
Downloading mido-1.3.3-py3-none-any.whl (54 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.6/54.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pretty_midi
  Building wheel for pretty_midi (setup.py) ... [?25l[?25hdone
  Created wheel for pretty_midi: filename=pretty_midi-0.2.10-py3-none-any.whl size=5592286 sha256=4dd23a77cf948b4acf54caa9e00672a992b10877256faeccab65ca2940ece4d3
  Stored in directory: /root/.cache/pip/wheels/e6/95/ac/15ceaeb2823b04d8e638fd1495357adb8d26c00ccac9d7782e
Successfully built pretty_midi
Installing collected packages: mido, pretty_midi
Successf

In [2]:
import zipfile
import random
import pretty_midi
import numpy as np
import pandas as pd
import io
import requests
import gzip
import json

## Additional Processes

In [3]:
def get_target_instrument(midi_data):
  """Pick the main instrument from the MIDI:
  1. Piano (program 0) if exists
  2. If it doesn't exists, choose instrument with the higheset number of notes
  """
  instruments = [inst for inst in midi_data.instruments if not inst.is_drum]

  piano_instruments = [inst for inst in instruments if inst.program == 0]
  if piano_instruments:
      return piano_instruments[0]

  if instruments:
      return max(instruments, key=lambda inst: len(inst.notes))

  return None

In [7]:
def pad_features(features, max_segments, feature_size_per_segment=9):
  needed_length = feature_size_per_segment * max_segments
  features = np.array(features, dtype=np.float32)

  return np.pad(features, (0, max(0, needed_length - len(features))), mode='constant', constant_values=np.nan)

In [9]:
def repeat_segments_from_start(features, max_segments):
  segment_size = 9
  num_segments = len(features) // segment_size

  if num_segments == 0:
    return features

  # Reshape to (num_segments, segment_size)
  segments = features[:num_segments * segment_size].reshape(num_segments, segment_size)

  # Only take segment with valid data
  valid_segments = segments[~np.isnan(segments).all(axis=1)]

  if len(valid_segments) == 0:
    return features  # If all NaN, then return

  # Repeat the song until the number of segment is sufficient
  repeated_segments = np.tile(valid_segments, (max_segments // len(valid_segments) + 1, 1))[:max_segments]

  return repeated_segments.flatten()

In [8]:
def process_and_combine_datasets(zip_paths=None, jsonl_paths=None, target_segments=None):
  all_features, all_expressiveness, all_labels = [], [], []

  # Extract from zip
  if zip_paths:
      for zip_info in zip_paths:
          zip_path, label = zip_info
          features, labels = extract_features_from_zip(zip_path, label)
          all_features.extend(features)
          all_labels.extend(labels)

  # Extract from jsonl
  if jsonl_paths:
      for jsonl_info in jsonl_paths:
          jsonl_path, label = jsonl_info
          features, labels = extract_features_from_jsonl(jsonl_path, label)
          all_features.extend(features)
          all_labels.extend(labels)

  # Padding dan data combining
  max_segments = target_segments if target_segments is not None else max(len(features) // 9 for features in all_features)
  padded_features = np.array([pad_features(features, max_segments) for features in all_features])

  repeated_features = np.array([repeat_segments_from_start(features, max_segments) for features in padded_features])

  combined_features = np.hstack([repeated_features])
  combined_labels = np.array(all_labels).flatten()

  return combined_features, combined_labels

## Feature Extraction

In [4]:
def extract_features_beat_based(midi_data=None, beats_per_segment=4, isMidi=True, sequence=None):
  max_duration = 30 # in seconds

  if isMidi:
    beat_times = [b for b in midi_data.get_beats() if b <= max_duration]
  else:
    beat_times = []
    for note in sequence.get("notes", []):
      start_time = note.get("startTime", 0)
      end_time = note["endTime"]
      # Add start and end time to beat_times
      if start_time <= max_duration:
          beat_times.append(start_time)
      if end_time <= max_duration:
          beat_times.append(end_time)

    # Sort dan hilangkan duplikat waktu
    beat_times = sorted(set(beat_times))

  if not beat_times:
    return None, 0

  default_padding = np.nan
  num_segments = len(beat_times) // beats_per_segment  # Calculate beat-based segment

  all_pitches, all_velocities, all_durations = [], [], []

  for i in range(num_segments):
    segment_start = beat_times[i * beats_per_segment]
    segment_end = beat_times[(i + 1) * beats_per_segment - 1] if (i + 1) * beats_per_segment - 1 < len(beat_times) else max_duration

    pitches, velocities, durations = [], [], []

    if isMidi:
      instrument = get_target_instrument(midi_data)
      for note in instrument.notes:
        if segment_start <= note.start < segment_end:
          pitches.append(note.pitch if note.pitch is not None else 0)
          velocities.append(note.velocity if note.velocity is not None else 0)
          durations.append((note.end - note.start) if (note.end - note.start) is not None else 0)
    else:
      for note in sequence.get("notes", []):
        note["startTime"] = note.get("startTime", 0)
        if segment_start <= note["startTime"] < segment_end:
          pitches.append(note.get("pitch", 0))
          velocities.append(note.get("velocity", 0))
          durations.append(note.get("endTime", 0) - note.get("startTime", 0))

    # Add padding if data in a segment is insufficient
    while len(pitches) < 10:
      pitches.append(default_padding)
      velocities.append(default_padding)
      durations.append(default_padding)

    def safe_stat(arr):
      if len(arr) == 0 or np.all(np.isnan(arr)):
          return 0, 0, 0
      return np.nanmean(arr), np.nanmedian(arr), np.nanstd(arr)

    pitch_mean, pitch_median, pitch_std = safe_stat(pitches)
    velocity_mean, velocity_median, velocity_std = safe_stat(velocities)
    duration_mean, duration_median, duration_std = safe_stat(durations)

    all_pitches.append([pitch_mean, pitch_median, pitch_std])
    all_velocities.append([velocity_mean, velocity_median, velocity_std])
    all_durations.append([duration_mean, duration_median, duration_std])

  # Combine all features
  features = np.array(all_pitches + all_velocities + all_durations).flatten()

  return features, num_segments

### Zipped MIDI

In [5]:
def extract_features_from_zip(zip_path, label):
  sample_size = 500
  all_features, all_labels, segments_list, expressiveness_list = [], [], [], []

  with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    midi_files = [f for f in zip_ref.namelist() if f.endswith((".midi", ".mid"))]
    selected_files = random.sample(midi_files, min(len(midi_files), sample_size))

    for midi_file in selected_files:
      try:
        with zip_ref.open(midi_file) as file:
          midi_data = pretty_midi.PrettyMIDI(io.BytesIO(file.read()))

          if midi_data.get_end_time() < 10:
            continue  # Skip file with the duration under 10s

          features, num_segments = extract_features_beat_based(midi_data=midi_data, isMidi=True)

          all_features.append(features)
          all_labels.append(label)
          segments_list.append(num_segments)
      except Exception as e:
          print(f"Error processing {midi_file}: {e}")

  max_segments = max(segments_list, default=0)
  feature_size_per_segment = 9

  padded_features_list = []
  for features in all_features:
    features = np.array(features, dtype=np.float32)

    needed_length = feature_size_per_segment * max_segments
    padded_features = np.pad(features, (0, max(0, needed_length - len(features))), mode='constant', constant_values=np.nan)
    padded_features_list.append(padded_features)

  return np.array(padded_features_list), np.array(all_labels)

### JSONL

In [6]:
def extract_features_from_jsonl(url, label):
  sample_size = 1000
  features_list, labels, segments_list, expressiveness_list = [], [], [], []

  # Download file .gz from URL
  response = requests.get(url)
  response.raise_for_status()

  with gzip.GzipFile(fileobj=io.BytesIO(response.content), mode="rb") as gz_file:
    with io.TextIOWrapper(gz_file, encoding="utf-8") as f:  # Decode gzip as texts
        lines = f.readlines()

  random.shuffle(lines)

  for line in lines:
    data = json.loads(line)
    seq = data["input_sequence"] if label == 0 else data["output_sequence"]

    for song in seq:
      if len(labels) >= sample_size:
        break

      if song["totalTime"] < 5:
        continue

      features, num_segments = extract_features_beat_based(sequence=song, isMidi=False)

      if features is not None:
        features_list.append(features)
        labels.append(label)
        segments_list.append(num_segments)

    if len(labels) >= sample_size:
        break

  max_segments = max(segments_list, default=0)
  feature_size_per_segment = 9

  padded_features_list = []
  for features in features_list:
    features = np.array(features, dtype=np.float32)

    needed_length = feature_size_per_segment * max_segments
    padded_features = np.pad(features, (0, max(0, needed_length - len(features))), mode='constant', constant_values=np.nan)
    padded_features_list.append(padded_features)

  return np.array(padded_features_list), np.array(labels)

## Main Code

In [10]:
human_zip = "bachs.zip"
ai_zip = "js_fakes_midi.zip"
jsonl_paths_1 = "https://storage.googleapis.com/magentadata/datasets/bach-doodle/bach-doodle.jsonl-00006-of-00192.gz"
jsonl_paths_2 = "https://storage.googleapis.com/magentadata/datasets/bach-doodle/bach-doodle.jsonl-00054-of-00192.gz"

# Process data with the predefined label
combined_features, combined_labels = process_and_combine_datasets(
  zip_paths=[[human_zip, 0], [ai_zip, 1]],
  jsonl_paths=[[jsonl_paths_1, 0], [jsonl_paths_1, 1], [jsonl_paths_2, 0], [jsonl_paths_2, 1]],
  target_segments=25
)

num_segments = combined_features.shape[1] // 9

columns = []
for i in range(num_segments):
  columns.extend([
    f"segment_{i}_pitch_mean", f"segment_{i}_pitch_median", f"segment_{i}_pitch_std",
    f"segment_{i}_velocity_mean", f"segment_{i}_velocity_median", f"segment_{i}_velocity_std",
    f"segment_{i}_duration_mean", f"segment_{i}_duration_median", f"segment_{i}_duration_std"
  ])

# Save data to CSV
df = pd.DataFrame(combined_features, columns=columns)
df['is_ai'] = combined_labels

df = df.sample(frac=1).reset_index(drop=True)
df.to_csv("repeated.csv", index=False)

