/
fingerprint-store
executable file
·215 lines (197 loc) · 8.04 KB
/
fingerprint-store
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Johannes Weißl
#
# License GPLv2+:
# GNU GPL version 2 or later <http://gnu.org/licenses/gpl.html>.
# This is free software: you are free to change and redistribute it.
# There is NO WARRANTY, to the extent permitted by law
import sys
import os
import mutagen
import mutagen.easyid3
import mutagen.easymp4
import mutagen.asf
try:
import musicdns
musicdns.initialize()
except ImportError, e:
musicdns = None
musicdns_import_error = e
import acoustid
import threading
import time
from multiprocessing import Pool, cpu_count
from optparse import OptionParser
musicdns_api_key = None
acoustid_api_key = 'EO7pQCg7'
MUSICDNS_REQUEST_INTERVAL = 1.5
mutagen.easyid3.EasyID3.RegisterTXXXKey('acoustid_id', 'Acoustid Id')
mutagen.easymp4.EasyMP4Tags.RegisterFreeformKey('acoustid_id', 'Acoustid Id')
tag_map = {
mutagen.asf.ASF: {
'musicbrainz_trackid': 'MusicBrainz/Track Id',
'acoustid_id': 'Acoustid/Id',
'musicip_puid': 'MusicIP/PUID',
},
}
def map_tag(m, tag):
return tag_map[type(m)].get(tag, tag) if type(m) in tag_map else tag
def get_tag(m, tag):
for val in m.get(map_tag(m, tag), []):
return unicode(val)
return None
def set_tag(m, tag, val):
m[map_tag(m, tag)] = val
def remove_tag(m, tag):
tag = map_tag(m, tag)
if tag in m:
del m[tag]
def get_acoustid_from_server(path):
def match_and_parse(path):
data = acoustid.match(acoustid_api_key, path, meta='0', parse=False)
if data['status'] != 'ok':
raise acoustid.WebServiceError('status: %s' % data['status'])
if 'results' not in data or len(data['results']) == 0:
raise acoustid.WebServiceError('results not included')
return data['results'][0]['id']
try:
track_id = match_and_parse(path)
except acoustid.AcoustidError, e:
print >> sys.stderr, 'skipping file %s: %s' % (path, e)
track_id = None
return track_id
# from pyacoustid-0.3
class _rate_limit(object):
def __init__(self, fun):
self.fun = fun
self.last_call = 0.0
self.lock = threading.Lock()
def __call__(self, *args, **kwargs):
with self.lock:
since_last_call = time.time() - self.last_call
if since_last_call < MUSICDNS_REQUEST_INTERVAL:
time.sleep(MUSICDNS_REQUEST_INTERVAL - since_last_call)
self.last_call = time.time()
return self.fun(*args, **kwargs)
@_rate_limit
def lookup_musicdns_fingerprint(fingerprint, duration):
return musicdns.lookup_fingerprint(fingerprint, duration, musicdns_api_key)
def get_musicip_from_server(path):
track_id = None
fingerprint, duration = musicdns.create_fingerprint(path)
if fingerprint and duration:
track_id = lookup_musicdns_fingerprint(fingerprint, duration)
if not track_id:
print >> sys.stderr, 'skipping file %s: %s' % (path, 'error fetching PUID from MusicDNS server')
else:
print >> sys.stderr, 'skipping file %s: %s' % (path, 'error calculating MusicDNS fingerprint')
return track_id
def handle_file2(path, m, tag, get_track_id):
modified = False
track_id = get_tag(m, tag)
if options.list:
print '%-36s %-12s %s' % (str(track_id), tag, path)
elif options.remove:
if track_id:
print '%s: remove %s %s' % (path, tag, str(track_id))
if not options.simulate:
remove_tag(m, tag)
modified = True
elif not track_id or options.force:
real_track_id = get_track_id(path)
if real_track_id and track_id != real_track_id:
if track_id:
print '%s: overwrite %s %s with %s' % (path, tag, str(track_id), real_track_id)
else:
print '%s: set %s %s' % (path, tag, str(real_track_id))
if not options.simulate and (options.overwrite or not track_id):
set_tag(m, tag, real_track_id)
modified = True
return modified
def handle_file(path):
m = mutagen.File(path, easy=True)
if not m:
print >> sys.stderr, 'could not read tags of file: %s' % path
return
mbid = get_tag(m, 'musicbrainz_trackid')
if not mbid and not options.no_mbid:
print >> sys.stderr, 'skipping file, has no MBID stored: %s' % path
return
modified = False
if 'a' in options.fprtype:
modified |= handle_file2(path, m, 'acoustid_id', get_acoustid_from_server)
if 'm' in options.fprtype:
modified |= handle_file2(path, m, 'musicip_puid', get_musicip_from_server)
if modified:
m.save()
def get_files(paths, recursive=False):
for path in paths:
if os.path.isdir(path):
if not recursive:
print >> sys.stderr, 'skipping directory %s, use -R to scan recursively' % path
else:
for root, dirs, files in os.walk(path):
for f in files:
yield os.path.join(root, f)
else:
yield path
def main(argv=None):
global options, musicdns_api_key
if not argv:
argv = sys.argv
usage = 'usage: %prog [options] [-l|-w|-r] files...'
version = '%prog 1.0'
description = 'List, remove or store fingerprinting track ids in your music files.'
parser = OptionParser(usage=usage, version=version, description=description)
parser.add_option('-l', '--list', action='store_true', default=False,
help='only output stored fingerprints (default)')
parser.add_option('-w', '--write', action='store_true', default=False,
help='calculate, lookup and write fingerprints to files')
parser.add_option('-r', '--remove', action='store_true', default=False,
help='remove fingerprints from files')
parser.add_option('-t', action='store', dest='fprtype', default='am',
help='type of fingerprinting system: [a]coustid and/or [m]usicdns (default: both)')
parser.add_option('-R', '--recursive', action='store_true',
help='recursively scan directories for files')
parser.add_option('-n', '--simulate', action='store_true',
help='only simulate, no file is modified')
parser.add_option('-f', '--force', action='store_true',
help='calculate and lookup fingerprint even if already stored in file')
parser.add_option('--no-mbid', action='store_true',
help='also process files without MusicBrainz Track Id')
parser.add_option('-F', '--overwrite', action='store_true',
help='overwrite fingerprint if different from stored one')
parser.add_option('-j', '--jobs', type='int', default=cpu_count(), metavar='NUM',
help='number of scanners to run simultaneously (default: %default)')
(options, paths) = parser.parse_args(argv[1:])
s = options.list + options.write + options.remove
if s == 0:
options.list = True
elif s > 1:
parser.error('you can specify only one of -l, -w or -r')
musicdns_api_key_file = '~/.musicdns_key'
try:
with open(os.path.expanduser(musicdns_api_key_file)) as f:
musicdns_api_key = f.read().strip()
except IOError, e:
musicdns_api_key_error = str(e)
if options.write and 'm' in options.fprtype and (not musicdns or not musicdns_api_key):
if not musicdns:
print >> sys.stderr, 'error importing musicdns module, will only use acoustid: %s' % musicdns_import_error
elif not musicdns_api_key:
print >> sys.stderr, 'error using MusicDNS, no API key specified in file %s: %s' % (musicdns_api_key_file, musicdns_api_key_error)
options.fprtype = 'a'
paths = list(get_files(paths, options.recursive))
if not paths:
parser.error('no files specified')
if options.jobs > 1 and options.write and len(paths) > 1:
p = Pool(processes=options.jobs)
p.map(handle_file, paths, min(len(paths)/options.jobs, 25))
else:
for path in paths:
handle_file(path)
if musicdns:
musicdns.finalize()
if __name__ == '__main__':
sys.exit(main())