In [464]:
import numpy
from overrides import override
import math
import random
from abc import ABC, abstractmethod
from typing import List, Tuple, Set, Any, Union, TypeVar, Dict, Callable
from dataclasses import dataclass


def rotate_left(items: List[Any], n: int) -> List[Any]:
  return items[n % len(items):] + items[:n % len(items)]

_accidentals = {'♮': 0, 'b': -1, '#': 1}

def parse_note(index: Union[int, str]) -> Tuple[int, int]:
    # TODO: this does not currently respect octave
    if isinstance(index, str):
      offset = _accidentals[index[-1]]
      index = int(index[:-1])
    else:
      offset = 0
    return index, offset


class Key:
  def __init__(self, pitches: List[int]):
    self._pitches = pitches
    
  @classmethod
  def major(cls):
    return Key([0, 2, 4, 5, 7, 9, 11])
  
  @classmethod
  def minor(cls):
    return Key([0, 2, 3, 5, 7, 8, 10])
  
  @classmethod
  def diminished(cls):
    return Key([0, 2, 3, 5, 6, 8, 9, 11])
  
  @classmethod
  def augmented(cls):
    return Key([0, 3, 4, 7, 8, 11])
    
  @property
  def pitches(self) -> List[int]:
    return self._pitches.copy()
  
  def __getitem__(self, index: int | str) -> 'Key':
    # TODO: this does not currently respect octave
    index, offset = parse_note(index)
    index -= 1
    return Key([pitch + offset for pitch in rotate_left(self.pitches, index)])
  
  def pitch(self, index: int | str) -> int:
    if isinstance(index, str):
      offset = Key._accidentals[index[-1]]
      index = int(index[:-1])
    else:
      offset = 0
    index -= 1
    relative_index = index % len(self.pitches)
    octave = index // len(self.pitches)
    return self.pitches[relative_index] + octave * 12 + offset
  
  def chord(self, notes: List[int], required: List[int] = None, cardinality: int = None, inversion: int = 1, exclusion_preference: List[int] = None):
    if required is None:
      required = notes
    if cardinality is None:
      cardinality = len(notes)
    if exclusion_preference is None:
      exclusion_preference = []
      
    included_notes = set(required)
    remaining_notes = set(notes) - included_notes
    remaining_preferred_notes = remaining_notes - set(exclusion_preference)
    remaining_un_preferred_notes = remaining_notes - remaining_preferred_notes
    
    while len(included_notes) < cardinality and (remaining_preferred_notes or remaining_un_preferred_notes):
      if remaining_preferred_notes:
        included_notes.add(remaining_preferred_notes.pop())
      else:
        included_notes.add(remaining_un_preferred_notes.pop())
    # TODO: this does not properly do inversions respecting octave
    return rotate_left([self.pitch(note) for note in notes], inversion - 1)
  
  def as_root(self):
    return self[-min(self.pitches)]
  
  def is_major_like(self):
    # check that note 3 is a major third from the base note
    return self.as_root().pitch(3) == 4
  
  def is_minor_like(self):
    # check that note 3 is flatted
    return self.as_root().pitch(3) == 3
  
  def is_diminished_like(self):
    # check that notes 3 and 5 are flatted
    as_root = self.as_root()
    return as_root.pitch(3) == 3 and as_root.pitch(5) == 6
  
  def is_augmented_like(self):
    # check that note 5 is sharped
    return self.as_root().pitch(5) == 8
  
  def pitch_as_scale_note(self, pitch: int) -> Union[int, str]:
    if pitch in self.pitches:
      return self.pitches.index(pitch)
    if pitch - 1 in self.pitches:
      return f'{self.pitches.index(pitch-1)}#'
    if pitch + 1 in self.pitches:
      return f'{self.pitches.index(pitch+1)}b'
    raise ValueError(f'Unable to identify pitch {pitch} in relation to a scale note of {self}.')
  
  def __str__(self) -> str:
    return ' '.join([str(pitch) for pitch in self.pitches])
  

@dataclass
class Note:
  pitches: Set[int]
  duration: float
  
  
@dataclass
class Chord:
  key: Key
  notes: List[int]
  duration: float
  required: List[int] = None
  cardinality: int = None
  inversion: int = 1
  exclusion_preference: List[int] = None
  
  def copy(self) -> 'Chord':
    return Chord(self.key, self.notes, self.duration, self.required, self.cardinality, self.inversion, self.exclusion_preference)
  
  
T = TypeVar('T')
def item_at_time(sequence: List[T], t: float) -> T:
  current_time = 0
  for element in sequence:
    if current_time + element.duration >= t:
      return element
    current_time += element.duration
  

def change_key(notes: List[Note], src_chord_progression: List[Chord], tgt_chord_progression: List[Chord]) -> List[Note]:
  new_notes = []
  current_time = 0
  for note in notes:
    src_chord = item_at_time(src_chord_progression, current_time)
    tgt_chord = item_at_time(tgt_chord_progression, current_time)
    scale_indices = {src_chord.key.pitch_as_scale_note(pitch) for pitch in note.pitches}
    adapted_pitches = {tgt_chord.key.pitch(index) for index in scale_indices}
    new_notes.append(Note(adapted_pitches, note.duration))
  return new_notes


# def adapt_accidentals(notes: List[Note], src_chord_progression: List[Chord], tgt_chord_progression: List[Chord]) -> List[Note]:
#   new_notes = []
#   current_time = 0
#   for note in notes:
#     src_chord = item_at_time(src_chord_progression, current_time)
#     tgt_chord = item_at_time(tgt_chord_progression, current_time)
#     relative_difference = src_chord.key.pitch_as_scale_note(tgt_chord.key.pitch(0))
#     scale_indices = {src_chord.key.pitch_as_scale_note(pitch) for pitch in note.pitches}
#     adapted_pitches = {tgt_chord.key.pitch(index) for index in scale_indices}
#     new_notes.append(Note(adapted_pitches, note.duration))
#   return new_notes


@dataclass
class CompatibleElements:
  blocks: List['CompatibleElements'] = None
  block_type: str = None
  start_pitch: int = None
  end_pitch: int = None
  spec: Dict = None


@dataclass
class TrackCharacteristics:
  # beats in a measure
  measure_length: int
  # approximate number of notes per measure
  melody_velocity_bounds: Tuple[float, float]
  # how many measures it should take to ramp from min to max velocity
  velocity_inertia: float
  # approximate number of syncopations per measure
  melody_syncopation_bounds: Tuple[float, float]
  # average numbers of notes harmonized
  harmonization_bounds: Tuple[float, float]
  beat_harmonization_rate_bounds: Tuple[float, float]
  # how likely a block is to try and borrow from other blocks
  borrow_likelihood_bounds: Tuple[float, float]
  # length of phrases
  phrase_length_bounds: Tuple[int, int]
  max_pitch_interval: int
  home_octave: int
  octave_bounds: Tuple[int, int]
  substructures_per_block_bounds: Tuple[float, float]
  substructure_size_bounds: Tuple[float, float]
  note_duration_bounds: Tuple[float, float]
  
  
@dataclass
class SectionCharacteristics:
  measure_length: int
  velocity: float
  velocity_tolerance: float
  syncopation: float
  melody_syncopation_bounds: Tuple[float, float]
  harmonization_bounds: Tuple[float, float]
  beat_harmonization_rate: float
  borrow_likelihood: float
  max_pitch_interval: int
  pitch_bounds: Tuple[int, int]
  substructures_per_block: float
  substructure_size_bounds: Tuple[float, float]
  note_duration_bounds: Tuple[float, float]
  

# @dataclass
# class Phrase(ABC):
#   type: str
#   start_pitch: int
#   end_pitch: int
#   chord_progression: List[Chord]
#   
# @dataclass
# class SequencePhrase(Phrase):
#   type: 'sequence'

def calculate_interval(note1: Union[int, str], note2: Union[int, str]) -> float:
  index1, offset1 = parse_note(note1)
  index2, offset2 = parse_note(note2)
  return (index2 + offset2 / 2) - (index1 + offset1 / 2)


class NoteBlock(ABC):
  def __init__(self, block_type: str, start_pitch: int, end_pitch: int, chord_progression: List[Chord], obscured: bool = False):
    self.block_type = block_type
    self.start_pitch = start_pitch
    self.end_pitch = end_pitch
    self.chord_progression = chord_progression
    self.obscured = obscured
    self.root_key = Key.major()
    self._notes = None
    self.spec = {}
  
  @property
  def duration(self) -> float:
    return sum(c.duration for c in self.chord_progression)
  
  def find_duration_matches(self, duration: float) -> List['NoteBlock']:
    if sum([chord.duration for chord in self.chord_progression]) == duration:
      return [self]
  
  def find_harmonic_matches(self, block: 'Noteblock') -> List['NoteBlock']:
    """
    Extracts note blocks that fit the harmonic and duration requirements provided.
    These can be found exactly, or be derived from blocks which match in duration and when transposed
    remain compatible with the required chord progression.
    :param block: 
    :return: 
    """
    if self.start_pitch == block.start_pitch and self.end_pitch == block.end_pitch:
      if self.chord_progression == block.chord_progression:
        return [self]
    return []
  
  @abstractmethod
  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    pass
  
  @abstractmethod
  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    pass
  
  def get_notes(self) -> List[Note]:
    if self._notes is None:
      raise RuntimeError('Notes have not been generated yet. Call the generate function.')
    return self._notes
  

class SequentialNoteBlock(NoteBlock, ABC):
  def __init__(self, block_type: str, start_pitch: int, end_pitch: int, chord_progression: List[Chord]):
    super().__init__(block_type, start_pitch, end_pitch, chord_progression)
    self.subtype = None
    self.blocks: Union[List[NoteBlock] | None] = None
    
  def generate_notes(self, motif_bank: NoteBlock, section_specs: SectionCharacteristics):
    self.blocks = self._generate_blocks(motif_bank, section_specs)
    for block in self.blocks:
      block.generate_notes(motif_bank, section_specs)
      motif_bank = ListNoteBlock(motif_bank, block)
      
  @abstractmethod
  def _generate_blocks(self, motif_bank: NoteBlock, section_specs: SectionCharacteristics) -> List[NoteBlock]:
    pass
        
  @override
  def get_notes(self) -> List[Note]:
    return sum([block.get_notes() for block in self.blocks], [])

  def find_duration_matches(self, duration: float) -> List['NoteBlock']:
    matches = []
    if sum([chord.duration for chord in self.chord_progression]) == duration:
      matches.append(self)
    for block in self.blocks:
      matches.extend(block.find_duration_matches(duration))
    return matches

  def find_harmonic_matches(self, start_pitch: int, end_pitch: int, chord_progression: List[Chord]) -> List[
    'NoteBlock']:
    matches = []
    if self.start_pitch == start_pitch and self.end_pitch == end_pitch:
      # TODO: check broader harmonic match, not just exact chord match
      if self.chord_progression == chord_progression:
        matches.append(self)
    for block in self.blocks:
      # TODO: check subsequences, not just each sub-block
      matches.extend(block.find_harmonic_matches(start_pitch, end_pitch, chord_progression))
    return matches


class ListNoteBlock(SequentialNoteBlock):
  def __init__(self, *note_blocks: NoteBlock):
    super().__init__('list', note_blocks[0].start_pitch, note_blocks[-1].end_pitch, sum([n.chord_progression for n in note_blocks], []))
    self.blocks = list(note_blocks)

  def _generate_blocks(self, motif_bank: NoteBlock, section_specs: SectionCharacteristics) -> List[NoteBlock]:
    return self.blocks

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    compatible_elements = []
    if block.block_type == 'list':
      # find subsequence block matches for block
      for i in range(len(self.blocks) - len(block.blocks)):
        subsequence = self.blocks[i:i+len(block.blocks)]
        sub_duration = sum([b.duration for b in subsequence])
        if sub_duration > block.duration:
          break
        # check that each corresponding block has the same relative pitch progression
        matches = []
        p1, p2 = None, None
        for (b1, b2) in zip(subsequence, block.blocks):
          if b1.duration != b2.duration or b1.block_type != b2.block_type:
            break
          # matching block types and sizes
          existing_interval = calculate_interval(
            b1.root_key.pitch_as_scale_note(b1.start_pitch),
            b1.root_key.pitch_as_scale_note(b1.end_pitch))
          proposed_interval = calculate_interval(
            b2.root_key.pitch_as_scale_note(b2.start_pitch),
            b2.root_key.pitch_as_scale_note(b2.end_pitch))
          if existing_interval != proposed_interval:
            break
          if p1 is not None:
            existing_interval = calculate_interval(
              b1.root_key.pitch_as_scale_note(p1.end_pitch),
              b1.root_key.pitch_as_scale_note(b1.start_pitch))
            proposed_interval = calculate_interval(
              b2.root_key.pitch_as_scale_note(p2.end_pitch),
              b2.root_key.pitch_as_scale_note(b2.start_pitch))
            if existing_interval != proposed_interval:
              break
          p1, p2 = b1, b2
          # record the details of the compatible block and include the specification
          # from which aspects can be derived instead of generated anew
          matches.append(CompatibleElements(
            block_type=b1.block_type,
            start_pitch=b1.start_pitch,
            end_pitch=b1.end_pitch,
            spec=b1.spec
          ))
        if len(matches) == len(subsequence):
          subsequence_match = CompatibleElements(
            block_type='list',
            start_pitch=subsequence[0].start_pitch,
            end_pitch=subsequence[-1].end_pitch
          )
          compatible_elements.append(subsequence_match)
    for sub_block in self.blocks:
      if sub_block.duration >= block:
        compatible_elements.extend(sub_block.find_compatible_elements(block))
    return compatible_elements
        

class NBlock(NoteBlock):
  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    # TODO: implement matching
    return []

  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    self._notes = [Note({self.start_pitch}, self.duration)]

  def __init__(self, pitch: int, note: Note, chord: Chord):
    super().__init__('note', pitch, pitch, chord_excerpt([chord], 0, note.duration))
    self.chord = chord
    self.note = note

  def find_duration_matches(self, duration: float) -> List['NoteBlock']:
    if self.duration == duration:
      return [self]


class RhythmBlock(NoteBlock):
  def __init__(self, pitch: int, chord_progression: List[Chord]):
    super().__init__('rhythm', pitch, pitch, chord_progression)

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    return []

  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    min_d, max_d = section_specs.note_duration_bounds
    rhythm = generate_rhythm(
      self.duration,
      min_d,
      max_d,
      section_specs.syncopation,
      section_specs.velocity,
      velocity_tolerance=section_specs.velocity_tolerance * 2,
      measure_duration=self.duration)
    self._notes = [Note({self.start_pitch}, duration) for duration in rhythm]


class TrillBlock(NoteBlock):
  def __init__(self, chord_progression: List[Chord], start_pitch: int, end_pitch: int = None):
    if end_pitch is None:
      end_pitch = start_pitch
      # trill can be with an adjacent note or chord notes
      trill_type = random.choice(['adjacent', 'chord'])
      super().__init__('trill', start_pitch, end_pitch, chord_progression)
      self.spec['trill_type'] = trill_type
    else:
      super().__init__('trill', start_pitch, end_pitch, chord_progression)  
      self.spec['trill_type'] = 'predefined'

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    return []

  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    min_note_duration, max_note_duration = section_specs.note_duration_bounds
    rhythm = generate_rhythm(
      self.duration,
      min_note_duration,
      max_note_duration,
      section_specs.syncopation,
      section_specs.velocity * 2,
      velocity_tolerance=1)
    # TODO: make this selection better and also add support for ternary trills
    cycle_type = random.choice([[(0, 1), (1, 0)], [(0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 1, 1), (1, 0, 1), (1, 1, 0)]])
    cycle = random.choice(cycle_type)
    self.spec['rhythm'] = rhythm
    self.spec['cycle'] = cycle
    
    trill_type, rhythm, cycle = self.spec['trill_type'], self.spec['rhythm'], self.spec['cycle']
    num_unique_nodes = len(set(cycle))
    nodes = [lambda d, __: Note({self.start_pitch}, d)]
    start_pitch = self.start_pitch
    for i in range(1, num_unique_nodes):
      select_node = None
      if trill_type == 'predefined':
        select_node = lambda d, __: Note({self.end_pitch}, d)
      elif trill_type == 'adjacent':
        direction = random.choice([-1, 1])
        select_node = lambda d, chord: Note({next_scale_pitch(start_pitch, chord, direction)}, d)
      elif trill_type == 'chord':
        direction = random.choice([-1, 1])
        # TODO: this should depend on a section-wide verbosity parameter
        cardinality = random.choice([1, 2, 3])
        def get_next_chord_pitches(pitch: int, chord: Chord) -> Set[int]:
          pitches = set([])
          chord_pitch = next_chord_pitch(pitch, chord, direction)
          for _ in range(cardinality):
            pitches.add(chord_pitch)
            chord_pitch = next_chord_pitch(pitch, chord, direction)
          return pitches
        select_node = lambda d, chord: Note(get_next_chord_pitches(start_pitch, chord), d)
      nodes.append(select_node)
    self._notes = []
    cycle_index = 0
    current_time = 0
    for duration in rhythm:
      self._notes.append(nodes[cycle_index](duration, item_at_time(self.chord_progression, current_time)))
      cycle_index = (cycle_index + 1) % len(nodes)
      current_time += duration
      

class StagnateBlock(SequentialNoteBlock):
  """
  Stagnation structures include:
  1. Pitch repetition
  2. Trills
  3. Arpeggios
  4. Holds
  5. Scales
  """

  def __init__(self, pitch: int, chord_progression: List[Chord]):
    super().__init__('stagnate', pitch, pitch, chord_progression)

  def _generate_blocks(self, motif_bank: NoteBlock, section_specs: SectionCharacteristics) -> List[NoteBlock]:
    min_substructure_size, max_substructure_size = section_specs.substructure_size_bounds
    subdivisions = generate_rhythm(
      self.duration,
      min_substructure_size,
      max_substructure_size,
      section_specs.syncopation,
      section_specs.substructures_per_block * self.duration / section_specs.measure_length,
      measure_duration=self.duration,
      possible_durations=list(
        numpy.arange(min_substructure_size, max_substructure_size, section_specs.note_duration_bounds[0])))
    block_types = {
      'rhythm': 0,
      'trill': 1,
      'arpeggio': 1,
      'run': 0.5
    }
    self.blocks: List[NoteBlock] = []
    current_time = 0
    for subdivision in subdivisions:
      legal_block_types = {block_type: min_duration for block_type, min_duration in block_types.items() if subdivision >= min_duration}
      # TODO: more sophisticated probability selection system
      block_type = random.choice(list(legal_block_types.keys()))
      block_chords = chord_excerpt(self.chord_progression, current_time, current_time + subdivision)
      if block_type == 'rhythm':
        block = RhythmBlock(self.start_pitch, block_chords)
      elif block_type == 'trill':
        block = TrillBlock(block_chords, self.start_pitch)
      elif block_type == 'arpeggio':
        block = ArpeggioBlock(self.start_pitch, self.start_pitch, block_chords, random.choice([True, False]))
      elif block_type == 'run':
        block = RunBlock(self.start_pitch, self.start_pitch, block_chords, random.choice([True, False]))
      else:
        raise RuntimeError('Unknown block type:', block_type)
      self.blocks.append(block)
      current_time += subdivision
    # pitch = self.start_pitch
    # chord_progression = self.chord_progression
    # duration = sum([chord.duration for chord in chord_progression])
    # return [NBlock(pitch, Note({pitch}, duration), chord_progression[0])]
    return self.blocks

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    # TODO: check if block matches subsequences
    compatible_elements = []
    if self.duration < block.duration:
      return compatible_elements
    if self.block_type == block.block_type and self.duration == block.duration:
      existing_interval = calculate_interval(
        self.root_key.pitch_as_scale_note(self.start_pitch),
        self.root_key.pitch_as_scale_note(self.end_pitch))
      proposed_interval = calculate_interval(
        self.root_key.pitch_as_scale_note(block.start_pitch),
        self.root_key.pitch_as_scale_note(block.end_pitch))
      if existing_interval == proposed_interval:
        # TODO: check that chords line up or something
        # including start and end pitches indicate the notes themselves may be transferable
        compatible_elements.append(CompatibleElements(
          blocks=[CompatibleElements(
            block_type=b.block_type,
            start_pitch=b.start_pitch,
            end_pitch=b.end_pitch,
            spec=b.spec
          ) for b in self.blocks],
          block_type=self.block_type,
          start_pitch=self.start_pitch,
          end_pitch=self.end_pitch,
          spec=self.spec
        ))
      else:
        # if nothing else, at least the rhythm
        compatible_elements.append(CompatibleElements(
          blocks=[CompatibleElements(
            block_type=b.block_type,
            spec=b.spec
          ) for b in self.blocks],
          block_type=self.block_type,
          spec=self.spec
        ))
    for sub_block in self.blocks:
      compatible_elements.extend(sub_block.get_compatible_elements(block))
    return compatible_elements


class TransitionBlock(SequentialNoteBlock):
  def __init__(self, start_pitch: int, end_pitch: int, chord_progression: List[Chord]):
    super().__init__('transition', start_pitch, end_pitch, chord_progression)
  
  def _generate_blocks(self, motif_bank: NoteBlock, section_specs: SectionCharacteristics) -> List[NoteBlock]:
    min_substructure_size, max_substructure_size = section_specs.substructure_size_bounds
    subdivisions = generate_rhythm(
      self.duration,
      min_substructure_size,
      max_substructure_size,
      section_specs.syncopation,
      section_specs.substructures_per_block * self.duration / section_specs.measure_length,
      measure_duration=self.duration,
      possible_durations=list(
        numpy.arange(min_substructure_size, max_substructure_size, section_specs.note_duration_bounds[0])))
    block_types = {
      'arpeggio': 1,
      'run': 0.5
    }
    self.blocks: List[NoteBlock] = []
    current_time = 0
    for subdivision in subdivisions:
      legal_block_types = {block_type: min_duration for block_type, min_duration in block_types.items() if subdivision >= min_duration}
      # TODO: more sophisticated probability selection system
      block_type = random.choice(list(legal_block_types.keys()))
      block_chords = chord_excerpt(self.chord_progression, current_time, current_time + subdivision)
      if block_type == 'arpeggio':
        block = ArpeggioBlock(self.start_pitch, self.end_pitch, block_chords, random.choice([True, False]))
      elif block_type == 'run':
        block = RunBlock(self.start_pitch, self.end_pitch, block_chords, random.choice([True, False]))
      else:
        raise RuntimeError('Unknown block type:', block_type)
      self.blocks.append(block)
      current_time += subdivision
    # pitch = self.start_pitch
    # chord_progression = self.chord_progression
    # duration = sum([chord.duration for chord in chord_progression])
    # return [NBlock(pitch, Note({pitch}, duration), chord_progression[0])]
    return self.blocks
    # TODO: cleanup
    # duration = self.duration
    # if duration > 0.5:
    #   notes = [Note({self.start_pitch}, 0.5), Note({self.end_pitch}, duration - 0.5)]
    # else:
    #   notes = [Note({self.start_pitch}, duration)]
    # return [NBlock(list(note.pitches)[0], note, self.chord_progression[0]) for note in notes]

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    # TODO: implement matching
    return []


class PickupBlock(TransitionBlock):
  pass

# make an arpeggio block generating function that tries a bunch of options
# then evaluates the arpeggios for:
# 1. consistency of intervals
#

def chord_excerpt(chord_progression: List[Chord], start_time: float, end_time: float) -> List[Chord]:
  chord_progression = [chord.copy() for chord in chord_progression]
  start_index, start_clip = None, None
  end_index, end_clip = None, None
  cumulative_time = 0
  for index in range(len(chord_progression)):
    chord_duration = chord_progression[index].duration
    if start_index is None and cumulative_time + chord_duration > start_time:
      start_index = index
      start_clip = start_time - cumulative_time
    if end_index is None and cumulative_time >= end_time:
      end_index = index
      end_clip = cumulative_time + chord_duration - end_time
      break
    cumulative_time += chord_duration
  if start_index is None:
    return []
  if end_index is None:
    end_index = len(chord_progression) - 1
    end_clip = 0
  chord_subsequence = chord_progression[start_index:end_index + 1]
  chord_subsequence[0].duration -= start_clip
  chord_subsequence[-1].duration -= end_clip
  if chord_subsequence[0].duration <= 0:
    chord_subsequence = chord_subsequence[1:]
  return chord_subsequence
    
# 
def generate_arpeggio_possibilities(start_pitch: int, end_pitch: int, chord_progression: List[Chord], include_end_pitch: bool):
  return generate_oscillating_sequences(start_pitch, end_pitch, chord_progression, include_end_pitch, next_chord_pitch)
    
    
def generate_run_possibilities(start_pitch: int, end_pitch: int, chord_progression: List[Chord], include_end_pitch: bool):
  return generate_oscillating_sequences(start_pitch, end_pitch, chord_progression, include_end_pitch, next_scale_pitch)


def next_chord_pitch(pitch: int, chord: Chord, direction: int) -> int:
  chord_pitches = [chord.key.pitch(index) for index in chord.notes]
  chord_pitches = [chord_pitches[-1] - 12] + chord_pitches + [chord_pitches[0] + 12]
  chord_pitches.sort(reverse=direction < 0)
  
  pitch_octave = pitch // 12
  normalized_pitch = pitch - pitch_octave * 12
  for chord_pitch in chord_pitches:
    if (direction * chord_pitch) > (direction * normalized_pitch):
      return chord_pitch + pitch_octave * 12


def next_scale_pitch(pitch: int, chord: Chord, direction: int) -> int:
  chord_pitches = chord.key.pitches
  chord_pitches = [chord_pitches[-1] - 12] + chord_pitches + [chord_pitches[0] + 12]
  chord_pitches.sort(reverse=direction < 0)
  
  pitch_octave = pitch // 12
  normalized_pitch = pitch - pitch_octave * 12
  for chord_pitch in chord_pitches:
    if (direction * chord_pitch) > (direction * normalized_pitch):
      return chord_pitch + pitch_octave * 12


def generate_oscillating_sequences(
        start_pitch: int,
        end_pitch: int,
        chord_progression: List[Chord],
        include_end_pitch: bool,
        interval_navigator: Callable[[int, Chord, int], int]
) -> List[List[Tuple[List[int], List[float]]]]:
  """
  Generates rhythmic sequences of notes split into monotonically pitch increasing
  or decreasing subsequences that match the given chord progression.
  
  :param start_pitch: The pitch of the note that the sequence will start with.
  :param end_pitch: The pitch of the note that the sequence aims to end with.
  :param chord_progression: The chord progression delineating allowed notes and sequence duration.
  :param include_end_pitch: Whether *end_pitch* should be included at the end.
  :param interval_navigator: (pitch, chord, direction) -> next_pitch; used to find the next allowed
  pitch in a given direction.
  :return: A list of possible sequences split into subsequences represented as (pitches, rhythms).
  """
  # TODO: rename variables to reflect abstraction away from just arpeggios
  duration = sum(c.duration for c in chord_progression)
  # TODO: make min note duration a config
  max_num_notes = duration // 0.5
  min_num_notes = min(3, max_num_notes)
  velocity = (max_num_notes + min_num_notes) / 2
  velocity_tolerance = (max_num_notes - min_num_notes) / 2
  rhythm = generate_rhythm(duration, 0.5, 4, 0.2, velocity, velocity_tolerance, measure_duration=duration)
  if len(rhythm) == 2 and start_pitch == end_pitch:
    return [[([start_pitch, end_pitch], rhythm)]]
  # fit_chord_notes = calculate_num_chord_notes_between(start_pitch, end_pitch, chord_progression, rhythm)
  possible_arpeggiations = []
  num_extra_peaks = 0
  while True:
    if num_extra_peaks >= len(rhythm):
      print(f'rhythm: {rhythm}, start: {start_pitch}, end: {end_pitch}')
      raise RuntimeError('Could not generate arpeggios')
    # TODO: should probably disallow directly consecutive peaks; ones right after another with no notes in-between
    for peaks in combinations(range(1, len(rhythm) - 1), num_extra_peaks):
      # subdivide into different arpeggios and try to fit
      subdivisions = [(rhythm, chord_progression)]
      index_offset = 0
      time_offset = 0
      for peak in peaks:
        section, section_chords = subdivisions[-1]
        front = section[:peak - index_offset]
        front_chords = chord_excerpt(section_chords, 0, sum(front))
        back = section[peak - index_offset:]
        back_chords = chord_excerpt(section_chords, sum(front), sum(c.duration for c in section_chords))
        subdivisions = subdivisions[:-1] + [(front, front_chords), (back, back_chords)]
        index_offset += len(front)
        time_offset += sum(front)
      print('subd', subdivisions)
      # basically do another search to find any viable pitches for intermediate peaks
      heads = [([], -1, start_pitch)]
      while heads:
        arpeggios, last_subdivision, last_pitch = heads.pop()
        if last_subdivision >= len(subdivisions) - 1:
          continue
        section, section_chords = subdivisions[last_subdivision + 1]
        peak_chord = item_at_time(section_chords, sum(section[:-1]))
        include_section_end_pitch = (last_subdivision + 1 == len(subdivisions) - 1) and include_end_pitch
        if last_subdivision == len(subdivisions) - 2:
          possible_arpeggios = generate_monotonically_evolving_pitches(
            last_pitch, end_pitch, section_chords, include_section_end_pitch, section, interval_navigator)
          if len(possible_arpeggios) > 0:
            possible_arpeggiations.append(list(zip(
              arpeggios + [random.choice(possible_arpeggios)], [subdivision[0] for subdivision in subdivisions])))
        
        potential_peaks = []
        for direction in [-1, 1]:
          next_candidate = interval_navigator(last_pitch, peak_chord, direction)
          # 7 is the number of notes in a typical two-octave arpeggio
          # comfortably reached by a piano player with 1 crossover
          for _ in range(7 - 1):
            potential_peaks.append(next_candidate)
            # TODO: disqualify a candidate if it is beyond the specified note range
            next_candidate = interval_navigator(next_candidate, peak_chord, direction)
        for peak_pitch in potential_peaks:
          possible_arpeggios = generate_monotonically_evolving_pitches(
            last_pitch, peak_pitch, section_chords, include_section_end_pitch, section, interval_navigator)
          if len(possible_arpeggios) > 0:
            heads.append((arpeggios + [random.choice(possible_arpeggios)], last_subdivision + 1, peak_pitch))
    # generally, break with the minimum number of extra peaks
    if possible_arpeggiations:
      break
    num_extra_peaks += 1
  # TODO: possibly do a more sophisticated ranking and selection of arpeggios
  return possible_arpeggiations#random.choice(possible_arpeggiations)


def generate_monotonically_evolving_pitches(
        start_pitch: int,
        end_pitch: int,
        chord_progression: List[Chord],
        include_end_pitch: bool,
        rhythm: List[float],
        interval_navigator: Callable[[int, Chord, int], int]
) -> List[List[int]]:
  """
  Generates all possible monotonically evolving pitch sequences given a function
  that informs monotonic steps.
  
  :param start_pitch: The pitch of the note that the sequence will start with.
  :param end_pitch: The pitch of the note that the sequence aims to end with.
  :param chord_progression: The chord progression delineating allowed notes and sequence duration.
  :param include_end_pitch: Whether *end_pitch* should be included at the end.
  :param rhythm: The rhythm that the sequence should follow.
  :param interval_navigator: (pitch, chord, direction) -> next_pitch; used to find the next allowed
  pitch in a given direction.
  :return: A list of lists of monotonically evolving pitches.
  """
  # we will assume pianists cannot do jumps greater than a tenth
  max_interval = 14
  
  if len(rhythm) < 2:
    return [[start_pitch] * len(rhythm)] if abs(end_pitch - start_pitch) <= max_interval else []
  
  arpeggios = []
  direction = math.copysign(1, end_pitch - start_pitch)
  # last_note_chord = item_at_time(chord_progression, sum(rhythm[:-2 if include_end_pitch else -1]))
  # last_chord_pitch = next_chord_pitch(end_pitch, last_note_chord, -direction)
  heads = [([start_pitch], 0)]
  while heads:
    pitches, rhythm_index = heads.pop()
    if include_end_pitch and rhythm_index == len(rhythm) - 1 and pitches[-1] == end_pitch:
      arpeggios.append(pitches)
    if not include_end_pitch and rhythm_index == len(rhythm) and pitches[-1] == end_pitch:
      arpeggios.append(pitches[:-1])
    if rhythm_index >= len(rhythm):
      continue
    
    current_time = sum(rhythm[:rhythm_index + 1])
    chord = item_at_time(chord_progression, current_time)
    next_pitch = interval_navigator(pitches[-1], chord, direction)
    while (direction * next_pitch) < (direction * end_pitch) and abs(next_pitch - pitches[-1]) <= max_interval:
      heads.append((pitches + [next_pitch], rhythm_index + 1))
      next_pitch = interval_navigator(next_pitch, chord, direction)
    if pitches[-1] != end_pitch and abs(pitches[-1] - end_pitch) <= max_interval:
      heads.append((pitches + [end_pitch], rhythm_index + 1))
  return arpeggios


class ArpeggioBlock(NoteBlock):
  def __init__(self, start_pitch: int, end_pitch: int, chord_progression: List[Chord], include_end_pitch: bool):
    super().__init__('arpeggio', start_pitch, end_pitch, chord_progression)
    self.spec['include_end_pitch'] = include_end_pitch

  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    # TODO: look in the motif bank. if an applicable rhythm is found, use that
    #       if an applicable number of peaks is found, use that. intervals, peak values, etc...
    #       just take an entire pre-generated arpeggio if it works
    #       maybe implement some 'nudge-arpeggio' or 'adjust-arpeggio' that redistributes some rhythms
    possible_arpeggiations = generate_arpeggio_possibilities(
      self.start_pitch, self.end_pitch, self.chord_progression, self.spec['include_end_pitch'])
    selected_arpeggios = random.choice(possible_arpeggiations)
    self.spec['arpeggios'] = selected_arpeggios
    self._notes = []
    print(selected_arpeggios)
    for pitches, rhythm in selected_arpeggios:
      print('rhy', rhythm)
      self._notes.extend(Note({pitch}, duration) for pitch, duration in zip(pitches, rhythm))

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    # TODO: implement matching
    return []


class RunBlock(NoteBlock):
  def __init__(self, start_pitch: int, end_pitch: int, chord_progression: List[Chord], include_last_pitch: bool):
    super().__init__('run', start_pitch, end_pitch, chord_progression)
    self.spec['include_last_pitch'] = include_last_pitch

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    # TODO: implement matching
    return []

  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    possible_runs = generate_run_possibilities(self.start_pitch, self.end_pitch, self.chord_progression, self.spec['include_last_pitch'])
    selected_runs = random.choice(possible_runs)
    self.spec['runs'] = selected_runs
    self._notes = []
    print(selected_runs)
    for pitches, rhythm in selected_runs:
      self._notes.extend(Note({pitch}, duration) for pitch, duration in zip(pitches, rhythm))
    for note in self._notes:
      if note.duration > 4:
        print(f'Look what I did: {note}')
    
    
class EmptyNoteBlock(NoteBlock):
  def __init__(self):
    super().__init__('empty', 0, 0, [])

  def find_compatible_elements(self, block: 'NoteBlock') -> List[CompatibleElements]:
    return []

  def generate_notes(self, motif_bank: 'NoteBlock', section_specs: SectionCharacteristics):
    self._notes = []


from itertools import combinations_with_replacement


def find_combinations(nums: List[float], target: float, lo: int, hi: int) -> List[List[float]]:
  result = []
  nums = list(nums)  # Convert to list if it's not already
  for length in range(max(lo, 1), hi + 1):
    for combination in combinations_with_replacement(nums, length):
      if sum(combination) == target:
        result.append(list(combination))
  return result


def count_syncopation(rhythm: List[float]) -> int:
  syncopation = 0
  current_time = 0
  for note in rhythm:
    start = current_time
    current_time += note
    end = current_time
    # could be syncopated if start or end is off-beat
    if math.floor(start) != start or math.floor(end) != end:
      # but is only syncopated if the off-beat start or end are in different beats from one another
      if not(math.ceil(start) == math.ceil(end) or math.floor(start) == math.floor(end)):
        syncopation += 1
  return syncopation
    

# TODO: this does not often generate triplets grouped correctly
def generate_rhythm(
        duration: float,
        min_note_length: float,
        max_note_length: float,
        syncopation_rate: float,
        velocity_per_measure: float,
        velocity_tolerance: float = 1,
        measure_duration: int = 4,
        possible_durations: List[float] = None
) -> List[float]:
  if possible_durations is None:
    # in fractions of a beat
    on_beat_notes = [n for n in [1, 2, 3, 4] if min_note_length <= n <= max_note_length]
    # off beat notes with the number required to make them on-beat
    off_beat_notes = [(d, n) for d, n in [(1/4, 4), (1/3, 3), (1/2, 2), (2/3, 3), (1.5, 2)] if min_note_length <= d <= max_note_length]
    possible_durations = on_beat_notes + [d for d, n in off_beat_notes]
  
  rhythm = []
  duration_left = duration
  while duration_left > 0:
    if duration_left > measure_duration:
      current_measure = measure_duration
    else:
      current_measure = duration_left
    duration_left -= current_measure
    lo = math.ceil(velocity_per_measure - velocity_tolerance)
    hi = math.floor(velocity_per_measure + velocity_tolerance)
    possible_rhythms = []
    while not possible_rhythms:
      possible_rhythms = find_combinations(possible_durations, current_measure, lo, hi)
      lo -= 1
      hi -= 1
    for r in possible_rhythms:
      random.shuffle(r)
    random.shuffle(possible_rhythms)
    syncopation_counts = [count_syncopation(r) for r in possible_rhythms]
    ranked_by_syncopation = sorted(
      range(len(possible_rhythms)),
      key=lambda i: abs(syncopation_counts[i] - syncopation_rate))
    rhythm += possible_rhythms[ranked_by_syncopation[0]]
  return rhythm


def generate_section():#(start_pitch: int, end_pitch: int, velocity_per_measure: float, velocity_tolerance: float, motif_bank: NoteBlock):
  key = Key.major()
  chord_progression = [
    Chord(key, [1, 3, 5], 2),
    Chord(key[6], [1, 3, 5], 2),
    Chord(key[4], [1, 3, 5], 2),
    Chord(key[5], [1, 3, 5], 2),
    Chord(key, [1, 3, 5], 2),
    Chord(key[6], [1, 3, 5], 2),
  ]
  section = ListNoteBlock(
    StagnateBlock(0, chord_excerpt(chord_progression, 0, 2)),
    TransitionBlock(0, -7, chord_excerpt(chord_progression, 2, 4)),
    TransitionBlock(-7, -3, chord_excerpt(chord_progression, 4, 6)),
    TransitionBlock(-3, 2, chord_excerpt(chord_progression, 6, 8)),
    StagnateBlock(4, chord_excerpt(chord_progression, 8, 12)),
  )
  
# class SectionCharacteristics:
#   measure_length: int
#   velocity: float
#   velocity_tolerance: float
#   syncopation: float
#   melody_syncopation_bounds: Tuple[float, float]
#   harmonization_bounds: Tuple[float, float]
#   beat_harmonization_rate: float
#   borrow_likelihood: float
#   max_pitch_interval: int
#   pitch_bounds: Tuple[int, int]
#   substructures_per_block: float
#   substructure_size_bounds: Tuple[float, float]
#   note_duration_bounds: Tuple[float, float]
  section_specs = SectionCharacteristics(
    measure_length=4,
    velocity=1,
    velocity_tolerance=3,
    syncopation=2,
    melody_syncopation_bounds=(0.0, 1.0),
    harmonization_bounds=(0.0, 2.0),
    beat_harmonization_rate=0.25,
    borrow_likelihood=0.3,
    max_pitch_interval=14,
    pitch_bounds=(0, 1000),
    substructures_per_block=1.3,
    substructure_size_bounds=(0.5, 8),
    note_duration_bounds=(0.5, 4)
  )
  section.generate_notes(EmptyNoteBlock(), section_specs)
  print(section.blocks[0].blocks)
  print(section.blocks[-1].blocks)
  return section.get_notes()


def generate_motif(motif_bank: NoteBlock, start_pitch: int, end_pitch: int, chord_progression: List[Chord]) -> NoteBlock:
  duration = sum([chord.duration for chord in chord_progression])
  piece_split = generate_rhythm(duration, 1, 4, syncopation_rate=0.4, velocity_per_measure=1.5)
  

In [278]:
p = set([])
for i in range(100):
  r = generate_rhythm(8, 1/2, 4, velocity_per_measure=2.1, syncopation_rate=0.5)
  p.add(tuple(r))
print(p, len(p))

{(3, 0.5, 0.5, 2, 2), (1, 2, 1, 0.5, 3, 0.5), (3, 1, 0.5, 3, 0.5), (2, 0.5, 1.5, 3, 1), (0.5, 0.5, 3, 2, 0.5, 1.5), (1, 1, 2, 0.5, 0.5, 3), (2, 2, 1, 1, 2), (3, 1, 3, 0.5, 0.5), (3, 1, 1, 1, 2), (3, 1, 1, 3), (1, 1, 2, 1.5, 0.5, 2), (1.5, 0.5, 2, 0.5, 3, 0.5), (1, 2, 1, 0.5, 0.5, 3), (3, 1, 0.5, 0.5, 3), (1, 2, 1, 1, 3), (0.5, 1.5, 2, 2, 2), (1, 2, 1, 2, 2), (3, 0.5, 0.5, 2, 1, 1), (2, 2, 0.5, 1.5, 2), (1, 3, 0.5, 1.5, 2), (0.5, 3, 0.5, 3, 1), (1, 3, 0.5, 3, 0.5), (1, 3, 1, 1, 2), (2, 2, 3, 1), (1, 3, 3, 1), (0.5, 0.5, 3, 3, 1), (3, 0.5, 0.5, 1.5, 0.5, 2), (2, 2, 2, 0.5, 1.5), (2, 2, 2, 1, 1), (2, 2, 2, 1.5, 0.5), (1, 3, 2, 2), (2, 2, 0.5, 3, 0.5), (3, 1, 2, 1.5, 0.5), (1.5, 0.5, 2, 1, 2, 1), (1.5, 0.5, 2, 2, 2), (1.5, 0.5, 2, 3, 1), (1, 3, 0.5, 0.5, 3), (3, 0.5, 0.5, 3, 0.5, 0.5), (0.5, 0.5, 3, 2, 2), (0.5, 3, 0.5, 0.5, 0.5, 3), (0.5, 3, 0.5, 2, 2), (2, 2, 1, 3), (2, 2, 1.5, 0.5, 2), (2, 2, 2, 2), (2, 1.5, 0.5, 0.5, 3, 0.5), (2, 2, 3, 0.5, 0.5), (1.5, 0.5, 2, 2, 1, 1), (2, 1.5, 0.5, 0

In [279]:
from itertools import combinations

def subsequences_with_length(lst, n):
    """
    Generate all subsequences of length n from a list.
    """
    return [list(comb) for comb in combinations(lst, n)]

lst = [1, 2, 3, 4]
n = 2
result = subsequences_with_length(lst, 3)
print(result)

[[1, 2, 3], [1, 2, 4], [1, 3, 4], [2, 3, 4]]


In [373]:
key = Key.major()
chords = [Chord(key, [1, 3, 5], 2)]
generate_run_possibilities(0, 8, chords, True)

[([[0, 2, 5, 8]], [[0.5, 0.5, 0.5, 0.5]])]

In [457]:
generate_section()

subd [([1, 0.5, 0.5], [Chord(key=<__main__.Key object at 0x0000022B4360B450>, notes=[1, 3, 5], duration=2, required=None, cardinality=None, inversion=1, exclusion_preference=None), Chord(key=<__main__.Key object at 0x0000022B43609A90>, notes=[1, 3, 5], duration=0.0, required=None, cardinality=None, inversion=1, exclusion_preference=None)])]
subd [([1], [Chord(key=<__main__.Key object at 0x0000022B4360B450>, notes=[1, 3, 5], duration=2, required=None, cardinality=None, inversion=1, exclusion_preference=None), Chord(key=<__main__.Key object at 0x0000022B43609A90>, notes=[1, 3, 5], duration=-1.0, required=None, cardinality=None, inversion=1, exclusion_preference=None)]), ([0.5, 0.5], [Chord(key=<__main__.Key object at 0x0000022B4360B450>, notes=[1, 3, 5], duration=1, required=None, cardinality=None, inversion=1, exclusion_preference=None), Chord(key=<__main__.Key object at 0x0000022B43609A90>, notes=[1, 3, 5], duration=0.0, required=None, cardinality=None, inversion=1, exclusion_preferenc

[Note(pitches={0}, duration=0.5),
 Note(pitches={2}, duration=0.5),
 Note(pitches={0}, duration=0.5),
 Note(pitches={2}, duration=0.5),
 Note(pitches={0}, duration=1),
 Note(pitches={4}, duration=0.5),
 Note(pitches={-7}, duration=0.5),
 Note(pitches={-7}, duration=0.5),
 Note(pitches={-3}, duration=0.5),
 Note(pitches={9}, duration=0.5),
 Note(pitches={5}, duration=0.5),
 Note(pitches={-3}, duration=0.5),
 Note(pitches={-5}, duration=0.5),
 Note(pitches={-12}, duration=0.5),
 Note(pitches={2}, duration=0.5),
 Note(pitches={4}, duration=0.5),
 Note(pitches={-5}, duration=0.5),
 Note(pitches={0}, duration=0.5),
 Note(pitches={4}, duration=0.5),
 Note(pitches={4}, duration=0.5),
 Note(pitches={5}, duration=0.5),
 Note(pitches={4}, duration=0.5),
 Note(pitches={5}, duration=0.5)]

In [458]:
from midiutil import MIDIFile

def convert_note_group_sequence_to_midi(left_hand: List[Note], right_hand: List[Note], save_path):
  right_track = 0
  left_track = 1
  channel = 0
  tempo = 100
  volume = 100
  midi_file = MIDIFile(2)
  midi_file.addTempo(right_track, 0, tempo)
  
  for hand in [right_hand, left_hand]:
    if hand == right_hand:
      track, octave, volume = right_track, 4, 100
    else:
      track, octave, volume = left_track, 3, 75
    current_time = 0
    for note in hand:
      for pitch in note.pitches:
        midi_file.addNote(track, channel, pitch + 12 * octave, current_time, note.duration, volume)
      current_time += note.duration
  
  with open(save_path, 'wb') as output_file:
    midi_file.writeFile(output_file)

In [469]:
convert_note_group_sequence_to_midi([], generate_section(), 'output.mid')
import time
from pygame import mixer

mixer.init()
mixer.set_num_channels(80)
mixer.music.load('output.mid')
mixer.music.play()
while mixer.music.get_busy():
  time.sleep(1)

subd [([0.5, 0.5, 1], [Chord(key=<__main__.Key object at 0x0000022B41C13AD0>, notes=[1, 3, 5], duration=2, required=None, cardinality=None, inversion=1, exclusion_preference=None), Chord(key=<__main__.Key object at 0x0000022B41C134D0>, notes=[1, 3, 5], duration=0.0, required=None, cardinality=None, inversion=1, exclusion_preference=None)])]
subd [([0.5], [Chord(key=<__main__.Key object at 0x0000022B41C13AD0>, notes=[1, 3, 5], duration=2, required=None, cardinality=None, inversion=1, exclusion_preference=None), Chord(key=<__main__.Key object at 0x0000022B41C134D0>, notes=[1, 3, 5], duration=-1.5, required=None, cardinality=None, inversion=1, exclusion_preference=None)]), ([0.5, 1], [Chord(key=<__main__.Key object at 0x0000022B41C13AD0>, notes=[1, 3, 5], duration=1.5, required=None, cardinality=None, inversion=1, exclusion_preference=None), Chord(key=<__main__.Key object at 0x0000022B41C134D0>, notes=[1, 3, 5], duration=0.0, required=None, cardinality=None, inversion=1, exclusion_prefere