Skip to content
Newer
Older
100644 149 lines (125 sloc) 5.12 KB
fdff9d2 @ianloic initial import
ianloic authored Apr 5, 2011
1 '''A Python class for creating and updating playlists based on track and artist names'''
2
3
4 import shelve, re, logging, json, time
5 from rdioapi import Rdio
6
7
8 def uniq(seq):
9 '''return non-duplicate items from a sequence, in order'''
10 u = []
11 for i in seq:
12 if i not in u: u.append(i)
13 return u
14
15
16 class Fuzzy(unicode):
17 '''a string where equality is defined as: edit distance as a percentage of the sum of the lengths of the inputs <= 25%'''
18 def __eq__(self, other):
19 from levenshtein_distance import levenshtein_distance as distance
20 d = distance(self.lower(), other.lower())
21 return int(100 * float(d) / len(self+other)) <= 25
22
23
24 class Term(unicode):
25 '''a string that knows about fuzzy matching and simple transforms'''
26 PAREN_RE = re.compile(r'\([^)]*\)') # remove text in parens
27 FEATURE_RE = re.compile(r' (&|Feat\.|feat\.) .*') # remove & / Feat. / feat.
28 @property
29 def forms(self):
30 return (self,
31 Term.PAREN_RE.sub('', self), Term.FEATURE_RE.sub('', self),
32 self.replace('!', ' '), # for Wakey Wakey!
33 )
34 def __eq__(self, other):
35 fuzz = Fuzzy(other)
36 return any((fuzz == f for f in self.forms))
37
38
39 class PlaylistCreator(object):
40 def __init__(self):
41 self.oauth_state = shelve.open('oauth_state')
42 self.found_tracks = shelve.open('found_tracks')
43
44 def __del__(self):
45 self.oauth_state.close()
46 self.found_tracks.close()
47
48 __cached_rdio = None
49 @property
50 def rdio(self):
51 if self.__cached_rdio is None:
52 self.__cached_rdio = Rdio('7v2443fffahpt4fazmmh3hx7', '2nzyX96YAu', self.oauth_state)
53 return self.__cached_rdio
54
55 @property
56 def authenticated(self):
57 if not self.rdio.authenticated:
58 return False
59 try:
60 return self.rdio.currentUser() is not None
61 except BaseException, e:
62 self.rdio.logout()
63 return False
64
65 def authenticate(self):
66 # let's clear our old auth state
67 for k in self.oauth_state.keys():
68 del self.oauth_state[k]
69 self.__cached_rdio = None
70
71 # do a PIN based auth
72 import webbrowser
73 webbrowser.open(self.rdio.begin_authentication('oob'))
74 verifier = raw_input('Enter the PIN from the Rdio site: ').strip()
75 self.rdio.complete_authentication(verifier)
76
77 def find_track(self, artist, title):
78 '''try to find a track but apply various transfomations'''
79 artist = Term(artist)
80 title = Term(title)
81
82 # for each of the forms, search...
83 for a, t in uniq(zip(artist.forms, title.forms)):
84 # query the API
85 q = ('%s %s' % (a, t)).encode('utf-8')
86 result = self.rdio.search(query=q, types='Track', never_or=True)
87
88 # if there were no results then the search failed
89 if not result['track_count']:
90 logging.warning(' rdio.search failed for: '+q)
91 continue
92
93 # look through the results for a good match
94 for track in result['results']:
95 if artist == track['artist'] and \
96 title == track['name']:
97 return track
98 # none found
99 logging.warning('rdio.search succeeded but match failed: '+q)
100 return None
101
102 def make_playlist(self, name, desc, tracks):
103 '''make or update a playlist named @name, with a description @desc, with the tracks specified in @tracks, a list of (artistname, trackname) pairs'''
104 tracks_meta = []
105 for artistname, trackname in tracks:
106 key = json.dumps((artistname, trackname)).encode('utf-8')
107 logging.info('Looking for: %s' % key)
108 if key in self.found_tracks:
109 logging.info(' found it in the cache: %s' % self.found_tracks[key]['key'])
110 tracks_meta.append(self.found_tracks[key])
111 else:
112 track_meta = self.find_track(artistname, trackname)
113 if track_meta is not None:
114 logging.info(' found it in on the site: %s' % track_meta['key'])
115 tracks_meta.append(track_meta)
116 self.found_tracks[key] = track_meta
117 else:
118 logging.info(' not found')
119 pass
120
121 logging.info('Found %d / %d tracks' % (len(tracks_meta), len(tracks)))
122
123 track_keys = [track['key'] for track in tracks_meta]
124
125 # ask the server for playlists
126 playlists = self.rdio.getPlaylists()
127 for playlist in playlists['owned']:
128 # look for a playlist with the right name
129 if playlist['name'] == name:
130 logging.info('Found the playlist')
131 # when we find it, remove all of those tracks...
132 playlist = self.rdio.get(keys=playlist['key'], extras='tracks')[playlist['key']]
133 keys = [t['key'] for t in playlist['tracks']]
134 self.rdio.removeFromPlaylist(playlist=playlist['key'],
135 index=0, count=playlist['length'],
136 tracks=','.join(keys))
137 # now add all of th tracks we just got
138 self.rdio.addToPlaylist(playlist=playlist['key'],
139 tracks=','.join(track_keys))
140 logging.info('Updated the playlist')
141 break
142 else:
143 # didn't find the playlist
144 # create it!
145 playlist = self.rdio.createPlaylist(name=name,
146 description=desc,
147 tracks=','.join(track_keys))
148 logging.info('Created the playlist')
Something went wrong with that request. Please try again.