-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjack_of_hearts
executable file
·135 lines (105 loc) · 4.31 KB
/
jack_of_hearts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python3
# Example JACK midi event generator
#
# Play a drum pattern over JACK
import argparse
import asyncio
import json
import statistics
import sys
from collections import deque
from typing import NamedTuple, Type
import pyeep.app
from pyeep.generative import (DRUM_CHANNEL, DRUM_CLOSED_HIHAT, DRUM_CRASH1,
DRUM_LOW_TOM, DRUM_SIDE_STICK, GenerativeScore)
from pyeep.jackmidi import MidiPlayer
# See:
# https://soundprogramming.net/file-formats/general-midi-instrument-list/
# https://www.pgmusic.com/tutorial_gm.htm
class HeartSample(NamedTuple):
# UNIX timestamp in nanoseconds
time: int
rate: float
rr: tuple[float] = ()
class DrumLoop(GenerativeScore):
def __init__(self, *, player: MidiPlayer, heart: "App", bpm: int = 60, **kw):
super().__init__(player=player, bpm=bpm)
self.heart = heart
self.channel = DRUM_CHANNEL
def beat(self):
super().beat()
self.bpm = self.heart.last_sample.rate
if self.heart.improvised_delta > 2:
self.drum(DRUM_LOW_TOM, 0, 1/4)
self.drum(DRUM_CRASH1, 1/2, 1/4)
elif self.heart.improvised_delta > 0.5:
self.drum(DRUM_LOW_TOM, 0, 1/4)
self.drum(DRUM_CLOSED_HIHAT, 1/2, 1/4)
elif self.heart.improvised_delta < -0.5:
self.drum(DRUM_SIDE_STICK, 0, 1/4, velocity=64)
else:
self.drum(DRUM_LOW_TOM, 0, 1/4)
class App(pyeep.app.JackApp):
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.player = self.add_jack_component(MidiPlayer)
self.score: GenerativeScore | None = None
self.last_sample: HeartSample | None = None
self.last_window: deque[float] = deque(maxlen=10)
self.improvised_delta: float = 0
async def read_socket(self):
# Read heart beats from https://www.enricozini.org/blog/2023/debian/monitoring-a-heart-rate-monitor/
reader, writer = await asyncio.open_unix_connection(self.args.socket)
# Skip the initial line with recent heartbeat history
initial = json.loads(await reader.readline())
for sample in (HeartSample(*s) for s in initial["last"]):
self.last_window.append(sample.rate)
while not self.shutting_down and (line := await reader.readline()):
self.last_sample = HeartSample(*json.loads(line))
self.on_sample()
def on_sample(self):
if len(self.last_window) > 5:
mean = statistics.mean(self.last_window)
variance = statistics.variance(self.last_window)
if self.last_sample.rate > mean + variance:
self.improvised_delta = self.last_sample.rate - mean - variance
elif self.last_sample.rate < mean - variance:
self.improvised_delta = -(mean - variance - self.last_sample.rate)
else:
self.improvised_delta = 0.0
print("Improvised delta:", self.improvised_delta)
self.last_window.append(self.last_sample.rate)
def set_score(self, score_cls: Type[GenerativeScore]):
if self.score is not None:
self.score.stop()
self.score = score_cls(player=self.player, heart=self)
async def composer(self):
while not self.shutting_down:
if self.score is None or self.last_sample is None:
await asyncio.sleep(0.2)
continue
self.score.beat()
await asyncio.sleep(60 / self.score.bpm)
if self.score.stop():
await asyncio.sleep(60 / self.score.bpm)
async def aio_main(self):
await asyncio.gather(self.read_socket(), self.composer())
def ui_main(self):
try:
while not self.shutting_down:
cmd = input("> ")
if cmd.startswith("q"):
break
except EOFError:
pass
def main(self, score_cls: Type[GenerativeScore]):
self.set_score(score_cls)
super().main()
def main():
parser = App.argparser("JACK of Hearts", "Generate a drum pattern based on heart beat")
parser.add_argument("socket", action="store", help="path to the socket to use to read heart beats")
args = parser.parse_args()
with App(args) as app:
app.main(DrumLoop)
if __name__ == "__main__":
sys.exit(main())