forked from pytube/pytube
/
__init__.py
361 lines (308 loc) · 12.3 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
from os.path import normpath
from urllib import urlencode
from urllib2 import urlopen
from urlparse import urlparse, parse_qs
import re
YT_BASE_URL = 'http://www.youtube.com/get_video_info'
#YouTube quality and codecs id map.
#source: http://en.wikipedia.org/wiki/YouTube#Quality_and_codecs
YT_ENCODING = {
#Flash Video
5: ["flv", "240p", "Sorenson H.263", "N/A", "0.25", "MP3", "64"],
6: ["flv", "270p", "Sorenson H.263", "N/A", "0.8", "MP3", "64"],
34: ["flv", "360p", "H.264", "Main", "0.5", "AAC", "128"],
35: ["flv", "480p", "H.264", "Main", "0.8-1", "AAC", "128"],
#3GP
36: ["3gp", "240p", "MPEG-4 Visual", "Simple", "0.17", "AAC", "38"],
13: ["3gp", "N/A", "MPEG-4 Visual", "N/A", "0.5", "AAC", "N/A"],
17: ["3gp", "144p", "MPEG-4 Visual", "Simple", "0.05", "AAC", "24"],
#MPEG-4
18: ["mp4", "360p", "H.264", "Baseline", "0.5", "AAC", "96"],
22: ["mp4", "720p", "H.264", "High", "2-2.9", "AAC", "192"],
37: ["mp4", "1080p", "H.264", "High", "3-4.3", "AAC", "192"],
38: ["mp4", "3072p", "H.264", "High", "3.5-5", "AAC", "192"],
82: ["mp4", "360p", "H.264", "3D", "0.5", "AAC", "96"],
83: ["mp4", "240p", "H.264", "3D", "0.5", "AAC", "96"],
84: ["mp4", "720p", "H.264", "3D", "2-2.9", "AAC", "152"],
85: ["mp4", "520p", "H.264", "3D", "2-2.9", "AAC", "152"],
#WebM
43: ["webm", "360p", "VP8", "N/A", "0.5", "Vorbis", "128"],
44: ["webm", "480p", "VP8", "N/A", "1", "Vorbis", "128"],
45: ["webm", "720p", "VP8", "N/A", "2", "Vorbis", "192"],
46: ["webm", "1080p", "VP8", "N/A", "N/A", "Vorbis", "192"],
100: ["webm", "360p", "VP8", "3D", "N/A", "Vorbis", "128"],
101: ["webm", "360p", "VP8", "3D", "N/A", "Vorbis", "192"],
102: ["webm", "720p", "VP8", "3D", "N/A", "Vorbis", "192"]
}
# The keys corresponding to the quality/codec map above.
YT_ENCODING_KEYS = (
'extension', 'resolution', 'video_codec', 'profile', 'video_bitrate',
'audio_codec', 'audio_bitrate'
)
class MultipleObjectsReturned(Exception):
"""
The query returned multiple objects when only one was expected.
"""
pass
class YouTubeError(Exception):
"""
The REST interface returned an error.
"""
pass
class Video(object):
"""
Class representation of a single instance of a YouTube video.
"""
def __init__(self, url, filename, **attributes):
"""
Define the variables required to declare a new video.
Keyword arguments:
extention -- The file extention the video should be saved as.
resolution -- The broadcasting standard of the video.
url -- The url of the video. (e.g.: youtube.com/watch?v=..)
filename -- The filename (minus the extention) to save the video.
"""
self.url = url
self.filename = filename
self.__dict__.update(**attributes)
def download(self, path=None):
"""
Downloads the file of the URL defined within the class
instance.
Keyword arguments:
path -- Destination directory
"""
path = (normpath(path) + '/' if path else '')
response = urlopen(self.url)
with open(path + self.filename, 'wb') as dst_file:
meta_data = response.info()
file_size = int(meta_data.getheaders("Content-Length")[0])
print "Downloading: %s Bytes: %s" % (self.filename, file_size)
bytes_received = 0
chunk_size = 8192
while True:
buffer = response.read(chunk_size)
if not buffer:
break
bytes_received += len(buffer)
dst_file.write(buffer)
percent = bytes_received * 100. / file_size
status = r"%10d [%3.2f%%]" % (bytes_received, percent)
status = status + chr(8) * (len(status) + 1)
print status,
def __repr__(self):
"""A cleaner representation of the class instance."""
return "<Video: %s (.%s) - %s>" % (self.video_codec, self.extension,
self.resolution)
def __cmp__(self, other):
if type(other) == Video:
v1 = "%s %s" % (self.extension, self.resolution)
v2 = "%s %s" % (other.extension, other.resolution)
return cmp(v1, v2)
class YouTube(object):
_filename = None
_fmt_values = []
_video_url = None
title = None
videos = []
# fmt was an undocumented URL parameter that allowed selecting
# YouTube quality mode without using player user interface.
@property
def url(self):
"""Exposes the video url."""
return self._video_url
@url.setter
def url(self, url):
""" Defines the URL of the YouTube video."""
self._video_url = url
#Reset the filename.
self._filename = None
#Get the video details.
self._get_video_info()
@property
def filename(self):
"""
Exposes the title of the video. If this is not set, one is
generated based on the name of the video.
"""
if not self._filename:
self._filename = safe_filename(self.title)
return self._filename
@filename.setter
def filename(self, filename):
""" Defines the filename."""
self._filename = filename
@property
def video_id(self):
"""Gets the video ID extracted from the URL."""
parts = urlparse(self._video_url)
qs = getattr(parts, 'query', None)
if qs:
video_id = parse_qs(qs).get('v', None)
if video_id:
return video_id.pop()
def get(self, extension=None, res=None):
"""
Return a single video given an extention and resolution.
Keyword arguments:
extention -- The desired file extention (e.g.: mp4).
res -- The desired broadcasting standard of the video (e.g.: 1080p).
"""
result = []
for v in self.videos:
if extension and v.extension != extension:
continue
elif res and v.resolution != res:
continue
else:
result.append(v)
if not len(result):
return
elif len(result) is 1:
return result[0]
else:
d = len(result)
raise MultipleObjectsReturned("get() returned more than one "
"object -- it returned %d!" % d)
def get_highest_res(self, preferred='mp4'):
"""
Return the highest quality resolution available that matches the
preferred filetype, if available. Will favour the highest quality
over the preferred filetype. If not available, the highest
quality that matches any filetype is returned.
Keyword arguments:
preferred -- the preferred extension (e.g.: mp4)
"""
highest, result = 0, None
for v in self.videos:
current = int(v.resolution[:-1])
if ((current > highest) or
(current == highest and v.extension == preferred)):
highest = current
result = v
return result
def filter(self, extension=None, res=None):
"""
Return a filtered list of videos given an extention and
resolution criteria.
Keyword arguments:
extention -- The desired file extention (e.g.: mp4).
res -- The desired broadcasting standard of the video (e.g.: 1080p).
"""
results = []
for v in self.videos:
if extension and v.extension != extension:
continue
elif res and v.resolution != res:
continue
else:
results.append(v)
return results
def _fetch(self, path, data):
"""
Given a path, traverse the response for the desired data. (A
modified ver. of my dictionary traverse method:
https://gist.github.com/2009119)
Keyword arguments:
path -- A tuple representing a path to a node within a tree.
data -- The data containing the tree.
"""
elem = path[0]
#Get first element in tuple, and check if it contains a list.
if type(data) is list:
# Pop it, and let's continue..
return self._fetch(path, data.pop())
#Parse the url encoded data
data = parse_qs(data)
#Get the element in our path
data = data.get(elem, None)
#Offset the tuple by 1.
path = path[1::1]
#Check if the path has reached the end OR the element return
#nothing.
if len(path) is 0 or data is None:
if type(data) is list and len(data) is 1:
data = data.pop()
return data
else:
# Nope, let's keep diggin'
return self._fetch(path, data)
def _get_video_info(self):
"""
This is responsable for executing the request, extracting the
necessary details, and populating the different video
resolutions and formats into a list.
"""
querystring = urlencode({'asv': 3, 'el': 'detailpage', 'hl': 'en_US',
'video_id': self.video_id})
self.title = None
self.videos = []
response = urlopen(YT_BASE_URL + '?' + querystring)
if response:
content = response.read()
data = parse_qs(content)
if 'errorcode' in data:
error = data.get('reason', 'An unknown error has occurred')
if isinstance(error, list):
error = error.pop()
raise YouTubeError(error)
#Use my cool traversing method to extract the specific
#attribute from the response body.
path = ('url_encoded_fmt_stream_map', 'url')
video_urls = self._fetch(path, content)
#Get the video signatures, YouTube require them as an url component
path = ('url_encoded_fmt_stream_map', 'sig')
video_signatures = self._fetch(path, content)
self.title = self._fetch(('title',), content)
for idx in range(len(video_urls)):
url = video_urls[idx]
signature = video_signatures[idx]
try:
fmt, data = self._extract_fmt(url)
filename = "%s.%s" % (self.filename, data['extension'])
except (TypeError, KeyError):
pass
else:
#Add video signature to url
url = "%s&signature=%s" % (url, signature)
v = Video(url, filename, **data)
self.videos.append(v)
self._fmt_values.append(fmt)
self.videos.sort()
def _extract_fmt(self, text):
"""
YouTube does not pass you a completely valid URLencoded form,
I suspect this is suppose to act as a deterrent.. Nothing some
regulular expressions couldn't handle.
Keyword arguments:
text -- The malformed data contained within each url node.
"""
itag = re.findall('itag=(\d+)', text)
if itag and len(itag) is 1:
itag = int(itag[0])
attr = YT_ENCODING.get(itag, None)
if not attr:
return itag, None
data = {}
map(lambda k, v: data.update({k: v}), YT_ENCODING_KEYS, attr)
return itag, data
def safe_filename(text, max_length=200):
"""
Sanitizes filenames for many operating systems.
Keyword arguments:
text -- The unsanitized pending filename.
"""
#Quickly truncates long filenames.
truncate = lambda text: text[:max_length].rsplit(' ', 0)[0]
#Tidy up ugly formatted filenames.
text = text.replace('_', ' ')
text = text.replace(':', ' -')
#NTFS forbids filenames containing characters in range 0-31 (0x00-0x1F)
ntfs = [chr(i) for i in range(0, 31)]
#Removing these SHOULD make most filename safe for a wide range
#of operating systems.
paranoid = ['\"', '\#', '\$', '\%', '\'', '\*', '\,', '\.', '\/', '\:',
'\;', '\<', '\>', '\?', '\\', '\^', '\|', '\~', '\\\\']
blacklist = re.compile('|'.join(ntfs + paranoid), re.UNICODE)
filename = blacklist.sub('', text)
return truncate(filename)