-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
container.py
1256 lines (1114 loc) · 52.9 KB
/
container.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, logging, sys, hashlib, uuid, re, shutil, unicodedata
from collections import defaultdict
from io import BytesIO
from urlparse import urlparse
from future_builtins import zip
from lxml import etree
from cssutils import replaceUrls, getUrls
from calibre import CurrentDir
from calibre.customize.ui import (plugin_for_input_format, plugin_for_output_format)
from calibre.ebooks.chardet import xml_to_unicode
from calibre.ebooks.conversion.plugins.epub_input import (
ADOBE_OBFUSCATION, IDPF_OBFUSCATION, decrypt_font_data)
from calibre.ebooks.conversion.preprocess import HTMLPreProcessor, CSSPreProcessor as cssp
from calibre.ebooks.mobi import MobiError
from calibre.ebooks.mobi.reader.headers import MetadataHeader
from calibre.ebooks.mobi.tweak import set_cover
from calibre.ebooks.oeb.base import (
serialize, OEB_DOCS, OEB_STYLES, OPF2_NS, DC11_NS, OPF, Manifest,
rewrite_links, iterlinks, itercsslinks, urlquote, urlunquote)
from calibre.ebooks.oeb.polish.errors import InvalidBook, DRMError
from calibre.ebooks.oeb.polish.parsing import parse as parse_html_tweak
from calibre.ebooks.oeb.polish.utils import PositionFinder, CommentFinder, guess_type, parse_css
from calibre.ebooks.oeb.parse_utils import NotHTML, parse_html, RECOVER_PARSER
from calibre.ptempfile import PersistentTemporaryDirectory, PersistentTemporaryFile
from calibre.utils.filenames import nlinks_file, hardlink_file
from calibre.utils.ipc.simple_worker import fork_job, WorkerError
from calibre.utils.logging import default_log
from calibre.utils.zipfile import ZipFile
exists, join, relpath = os.path.exists, os.path.join, os.path.relpath
OEB_FONTS = {guess_type('a.ttf'), guess_type('b.otf'), guess_type('a.woff'), 'application/x-font-ttf', 'application/x-font-otf'}
OPF_NAMESPACES = {'opf':OPF2_NS, 'dc':DC11_NS}
class CSSPreProcessor(cssp):
def __call__(self, data):
return self.MS_PAT.sub(self.ms_sub, data)
def clone_dir(src, dest):
' Clone a directory using hard links for the files, dest must already exist '
for x in os.listdir(src):
dpath = os.path.join(dest, x)
spath = os.path.join(src, x)
if os.path.isdir(spath):
os.mkdir(dpath)
clone_dir(spath, dpath)
else:
try:
hardlink_file(spath, dpath)
except:
shutil.copy2(spath, dpath)
def clone_container(container, dest_dir):
' Efficiently clone a container using hard links '
dest_dir = os.path.abspath(os.path.realpath(dest_dir))
clone_data = container.clone_data(dest_dir)
cls = type(container)
if cls is Container:
return cls(None, None, container.log, clone_data=clone_data)
return cls(None, container.log, clone_data=clone_data)
class Container(object): # {{{
'''
A container represents an Open EBook as a directory full of files and an
opf file. There are two important concepts:
* The root directory. This is the base of the ebook. All the ebooks
files are inside this directory or in its sub-directories.
* Names: These are paths to the books' files relative to the root
directory. They always contain POSIX separators and are unquoted. They
can be thought of as canonical identifiers for files in the book.
Most methods on the container object work with names. Names are always
in the NFC unicode normal form.
* Clones: the container object supports efficient on-disk cloning, which is used to
implement checkpoints in the ebook editor. In order to make this work, you should
never access files on the filesystem directly. Instead, use :meth:`raw_data` or
:meth:`open` to read/write to component files in the book.
When converting between hrefs and names use the methods provided by this
class, they assume all hrefs are quoted.
'''
#: The type of book (epub for EPUB files and azw3 for AZW3 files)
book_type = 'oeb'
SUPPORTS_TITLEPAGES = True
SUPPORTS_FILENAMES = True
def __init__(self, rootpath, opfpath, log, clone_data=None):
self.root = clone_data['root'] if clone_data is not None else os.path.abspath(rootpath)
self.log = log
self.html_preprocessor = HTMLPreProcessor()
self.css_preprocessor = CSSPreProcessor()
self.tweak_mode = False
self.parsed_cache = {}
self.mime_map = {}
self.name_path_map = {}
self.dirtied = set()
self.encoding_map = {}
self.pretty_print = set()
self.cloned = False
self.cache_names = ('parsed_cache', 'mime_map', 'name_path_map', 'encoding_map', 'dirtied', 'pretty_print')
if clone_data is not None:
self.cloned = True
for x in ('name_path_map', 'opf_name', 'mime_map', 'pretty_print', 'encoding_map', 'tweak_mode'):
setattr(self, x, clone_data[x])
self.opf_dir = os.path.dirname(self.name_path_map[self.opf_name])
return
# Map of relative paths with '/' separators from root of unzipped ePub
# to absolute paths on filesystem with os-specific separators
opfpath = os.path.abspath(os.path.realpath(opfpath))
for dirpath, _dirnames, filenames in os.walk(self.root):
for f in filenames:
path = join(dirpath, f)
name = self.abspath_to_name(path)
# OS X silently changes all file names to NFD form. The EPUB
# spec requires all text including filenames to be in NFC form.
# The proper fix is to implement a VFS that maps between
# canonical names and their file system representation, however,
# I dont have the time for that now. Note that the container
# ensures that all text files are normalized to NFC when
# decoding them anyway, so there should be no mismatch between
# names in the text and NFC canonical file names.
name = unicodedata.normalize('NFC', name)
self.name_path_map[name] = path
self.mime_map[name] = guess_type(path)
# Special case if we have stumbled onto the opf
if path == opfpath:
self.opf_name = name
self.opf_dir = os.path.dirname(path)
self.mime_map[name] = guess_type('a.opf')
if not hasattr(self, 'opf_name'):
raise InvalidBook('Could not locate opf file: %r'%opfpath)
# Update mime map with data from the OPF
self.refresh_mime_map()
def refresh_mime_map(self):
for item in self.opf_xpath('//opf:manifest/opf:item[@href and @media-type]'):
href = item.get('href')
name = self.href_to_name(href, self.opf_name)
if name in self.mime_map and name != self.opf_name:
# some epubs include the opf in the manifest with an incorrect mime type
self.mime_map[name] = item.get('media-type')
def clone_data(self, dest_dir):
Container.commit(self, keep_parsed=True)
self.cloned = True
clone_dir(self.root, dest_dir)
return {
'root': dest_dir,
'opf_name': self.opf_name,
'mime_map': self.mime_map.copy(),
'pretty_print': set(self.pretty_print),
'encoding_map': self.encoding_map.copy(),
'tweak_mode': self.tweak_mode,
'name_path_map': {
name:os.path.join(dest_dir, os.path.relpath(path, self.root))
for name, path in self.name_path_map.iteritems()}
}
def guess_type(self, name):
' Return the expected mimetype for the specified file name based on its extension. '
# epubcheck complains if the mimetype for text documents is set to
# text/html in EPUB 2 books. Sigh.
ans = guess_type(name)
if ans == 'text/html':
ans = 'application/xhtml+xml'
return ans
def add_name_to_manifest(self, name):
' Add an entry to the manifest for a file with the specified name. Returns the manifest id. '
all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')}
c = 0
item_id = 'id'
while item_id in all_ids:
c += 1
item_id = 'id' + '%d'%c
manifest = self.opf_xpath('//opf:manifest')[0]
href = self.name_to_href(name, self.opf_name)
item = manifest.makeelement(OPF('item'),
id=item_id, href=href)
item.set('media-type', self.mime_map[name])
self.insert_into_xml(manifest, item)
self.dirty(self.opf_name)
return item_id
def add_file(self, name, data, media_type=None, spine_index=None):
''' Add a file to this container. Entries for the file are
automatically created in the OPF manifest and spine
(if the file is a text document) '''
if self.has_name(name):
raise ValueError('A file with the name %s already exists' % name)
if '..' in name:
raise ValueError('Names are not allowed to have .. in them')
href = self.name_to_href(name, self.opf_name)
all_hrefs = {x.get('href') for x in self.opf_xpath('//opf:manifest/opf:item[@href]')}
if href in all_hrefs:
raise ValueError('An item with the href %s already exists in the manifest' % href)
path = self.name_to_abspath(name)
base = os.path.dirname(path)
if not os.path.exists(base):
os.makedirs(base)
with open(path, 'wb') as f:
f.write(data)
mt = media_type or self.guess_type(name)
self.name_path_map[name] = path
self.mime_map[name] = mt
if self.ok_to_be_unmanifested(name):
return
item_id = self.add_name_to_manifest(name)
if mt in OEB_DOCS:
manifest = self.opf_xpath('//opf:manifest')[0]
spine = self.opf_xpath('//opf:spine')[0]
si = manifest.makeelement(OPF('itemref'), idref=item_id)
self.insert_into_xml(spine, si, index=spine_index)
def rename(self, current_name, new_name):
''' Renames a file from current_name to new_name. It automatically
rebases all links inside the file if the directory the file is in
changes. Note however, that links are not updated in the other files
that could reference this file. This is for performance, such updates
should be done once, in bulk. '''
if current_name in self.names_that_must_not_be_changed:
raise ValueError('Renaming of %s is not allowed' % current_name)
if self.exists(new_name) and (new_name == current_name or new_name.lower() != current_name.lower()):
# The destination exists and does not differ from the current name only by case
raise ValueError('Cannot rename %s to %s as %s already exists' % (current_name, new_name, new_name))
new_path = self.name_to_abspath(new_name)
base = os.path.dirname(new_path)
if os.path.isfile(base):
raise ValueError('Cannot rename %s to %s as %s is a file' % (current_name, new_name, base))
if not os.path.exists(base):
os.makedirs(base)
old_path = parent_dir = self.name_to_abspath(current_name)
self.commit_item(current_name)
os.rename(old_path, new_path)
# Remove empty directories
while parent_dir:
parent_dir = os.path.dirname(parent_dir)
try:
os.rmdir(parent_dir)
except EnvironmentError:
break
for x in ('mime_map', 'encoding_map'):
x = getattr(self, x)
if current_name in x:
x[new_name] = x[current_name]
self.name_path_map[new_name] = new_path
for x in self.cache_names:
x = getattr(self, x)
try:
x.pop(current_name, None)
except TypeError:
x.discard(current_name)
if current_name == self.opf_name:
self.opf_name = new_name
if os.path.dirname(old_path) != os.path.dirname(new_path):
from calibre.ebooks.oeb.polish.replace import LinkRebaser
repl = LinkRebaser(self, current_name, new_name)
self.replace_links(new_name, repl)
self.dirty(new_name)
def replace_links(self, name, replace_func):
''' Replace all links in name using replace_func, which must be a
callable that accepts a URL and returns the replaced URL. It must also
have a 'replaced' attribute that is set to True if any actual
replacement is done. Convenient ways of creating such callables are
using the :class:`LinkReplacer` and :class:`LinkRebaser` classes. '''
media_type = self.mime_map.get(name, guess_type(name))
if name == self.opf_name:
for elem in self.opf_xpath('//*[@href]'):
elem.set('href', replace_func(elem.get('href')))
elif media_type.lower() in OEB_DOCS:
rewrite_links(self.parsed(name), replace_func)
elif media_type.lower() in OEB_STYLES:
replaceUrls(self.parsed(name), replace_func)
elif media_type.lower() == guess_type('toc.ncx'):
for elem in self.parsed(name).xpath('//*[@src]'):
elem.set('src', replace_func(elem.get('src')))
if replace_func.replaced:
self.dirty(name)
return replace_func.replaced
def iterlinks(self, name, get_line_numbers=True):
''' Iterate over all links in name. If get_line_numbers is True the
yields results of the form (link, line_number, offset). Where
line_number is the line_number at which the link occurs and offset is
the number of characters from the start of the line. Note that offset
could actually encompass several lines if not zero. '''
media_type = self.mime_map.get(name, guess_type(name))
if name == self.opf_name:
for elem in self.opf_xpath('//*[@href]'):
yield (elem.get('href'), elem.sourceline, 0) if get_line_numbers else elem.get('href')
elif media_type.lower() in OEB_DOCS:
for el, attr, link, pos in iterlinks(self.parsed(name)):
yield (link, el.sourceline, pos) if get_line_numbers else link
elif media_type.lower() in OEB_STYLES:
if get_line_numbers:
with self.open(name, 'rb') as f:
raw = self.decode(f.read()).replace('\r\n', '\n').replace('\r', '\n')
position = PositionFinder(raw)
is_in_comment = CommentFinder(raw)
for link, offset in itercsslinks(raw):
if not is_in_comment(offset):
lnum, col = position(offset)
yield link, lnum, col
else:
for link in getUrls(self.parsed(name)):
yield link
elif media_type.lower() == guess_type('toc.ncx'):
for elem in self.parsed(name).xpath('//*[@src]'):
yield (elem.get('src'), elem.sourceline, 0) if get_line_numbers else elem.get('src')
def abspath_to_name(self, fullpath, root=None):
'''
Convert an absolute path to a canonical name relative to :attr:`root`
:param root: The base directory. By default the root for this container object is used.
'''
return self.relpath(os.path.abspath(fullpath), base=root).replace(os.sep, '/')
def name_to_abspath(self, name):
' Convert a canonical name to an absolute OS dependant path '
return os.path.abspath(join(self.root, *name.split('/')))
def exists(self, name):
''' True iff a file corresponding to the canonical name exists. Note
that this function suffers from the limitations of the underlying OS
filesystem, in particular case (in)sensitivity. So on a case
insensitive filesystem this will return True even if the case of name
is different from the case of the underlying filesystem file. See also :meth:`has_name`'''
return os.path.exists(self.name_to_abspath(name))
def href_to_name(self, href, base=None):
'''
Convert an href (relative to base) to a name. base must be a name or
None, in which case self.root is used.
'''
if base is None:
base = self.root
else:
base = os.path.dirname(self.name_to_abspath(base))
purl = urlparse(href)
if purl.scheme or not purl.path or purl.path.startswith('/'):
return None
href = urlunquote(purl.path)
fullpath = os.path.join(base, *href.split('/'))
return self.abspath_to_name(fullpath)
def name_to_href(self, name, base=None):
'''Convert a name to a href relative to base, which must be a name or
None in which case self.root is used as the base'''
fullpath = self.name_to_abspath(name)
basepath = self.root if base is None else os.path.dirname(self.name_to_abspath(base))
path = relpath(fullpath, basepath).replace(os.sep, '/')
return urlquote(path)
def opf_xpath(self, expr):
' Convenience method to evaluate an XPath expression on the OPF file, has the opf: and dc: namespace prefixes pre-defined. '
return self.opf.xpath(expr, namespaces=OPF_NAMESPACES)
def has_name(self, name):
''' Return True iff a file with the same canonical name as that specified exists. Unlike :meth:`exists` this method is always case-sensitive. '''
return name and name in self.name_path_map
def relpath(self, path, base=None):
'''Convert an absolute path (with os separators) to a path relative to
base (defaults to self.root). The relative path is *not* a name. Use
:meth:`abspath_to_name` for that.'''
return relpath(path, base or self.root)
def decode(self, data, normalize_to_nfc=True):
"""
Automatically decode ``data`` into a ``unicode`` object.
:param normalize_to_nfc: Normalize returned unicode to the NFC normal form as is required by both the EPUB and AZW3 formats.
"""
def fix_data(d):
return d.replace('\r\n', '\n').replace('\r', '\n')
if isinstance(data, unicode):
return fix_data(data)
bom_enc = None
if data[:4] in {b'\0\0\xfe\xff', b'\xff\xfe\0\0'}:
bom_enc = {b'\0\0\xfe\xff':'utf-32-be',
b'\xff\xfe\0\0':'utf-32-le'}[data[:4]]
data = data[4:]
elif data[:2] in {b'\xff\xfe', b'\xfe\xff'}:
bom_enc = {b'\xff\xfe':'utf-16-le', b'\xfe\xff':'utf-16-be'}[data[:2]]
data = data[2:]
elif data[:3] == b'\xef\xbb\xbf':
bom_enc = 'utf-8'
data = data[3:]
if bom_enc is not None:
try:
self.used_encoding = bom_enc
return fix_data(data.decode(bom_enc))
except UnicodeDecodeError:
pass
try:
self.used_encoding = 'utf-8'
return fix_data(data.decode('utf-8'))
except UnicodeDecodeError:
pass
data, self.used_encoding = xml_to_unicode(data)
if normalize_to_nfc:
data = unicodedata.normalize('NFC', data)
return fix_data(data)
def ok_to_be_unmanifested(self, name):
return name in self.names_that_need_not_be_manifested
@property
def names_that_need_not_be_manifested(self):
' Set of names that are allowed to be missing from the manifest. Depends on the ebook file format. '
return {self.opf_name}
@property
def names_that_must_not_be_removed(self):
' Set of names that must never be deleted from the container. Depends on the ebook file format. '
return {self.opf_name}
@property
def names_that_must_not_be_changed(self):
' Set of names that must never be renamed. Depends on the ebook file format. '
return set()
def parse_xml(self, data):
data, self.used_encoding = xml_to_unicode(
data, strip_encoding_pats=True, assume_utf8=True, resolve_entities=True)
data = unicodedata.normalize('NFC', data)
return etree.fromstring(data, parser=RECOVER_PARSER)
def parse_xhtml(self, data, fname='<string>', force_html5_parse=False):
if self.tweak_mode:
return parse_html_tweak(data, log=self.log, decoder=self.decode, force_html5_parse=force_html5_parse)
else:
try:
return parse_html(
data, log=self.log, decoder=self.decode,
preprocessor=self.html_preprocessor, filename=fname,
non_html_file_tags={'ncx'})
except NotHTML:
return self.parse_xml(data)
def parse(self, path, mime):
with open(path, 'rb') as src:
data = src.read()
if mime in OEB_DOCS:
data = self.parse_xhtml(data, self.relpath(path))
elif mime[-4:] in {'+xml', '/xml'}:
data = self.parse_xml(data)
elif mime in OEB_STYLES:
data = self.parse_css(data, self.relpath(path))
return data
def raw_data(self, name, decode=True, normalize_to_nfc=True):
'''
Return the raw data corresponding to the file specified by name
:param decode: If True and the file has a text based mimetype, decode it and return a unicode object instead of raw bytes.
:param normalize_to_nfc: If True the returned unicode object is normalized to the NFC normal form as is required for the EPUB and AZW3 file formats.
'''
ans = self.open(name).read()
mime = self.mime_map.get(name, guess_type(name))
if decode and (mime in OEB_STYLES or mime in OEB_DOCS or mime == 'text/plain' or mime[-4:] in {'+xml', '/xml'}):
ans = self.decode(ans, normalize_to_nfc=normalize_to_nfc)
return ans
def parse_css(self, data, fname='<string>', is_declaration=False):
return parse_css(data, fname=fname, is_declaration=is_declaration, decode=self.decode, log_level=logging.WARNING,
css_preprocessor=(None if self.tweak_mode else self.css_preprocessor))
def parsed(self, name):
''' Return a parsed representation of the file specified by name. For
HTML and XML files an lxml tree is returned. For CSS files a cssutils
stylesheet is returned. Note that parsed objects are cached for
performance. If you make any changes to the parsed object, you must
call :meth:`dirty` so that the container knows to update the cache. See also :meth:`replace`.'''
ans = self.parsed_cache.get(name, None)
if ans is None:
self.used_encoding = None
mime = self.mime_map.get(name, guess_type(name))
ans = self.parse(self.name_path_map[name], mime)
self.parsed_cache[name] = ans
self.encoding_map[name] = self.used_encoding
return ans
def replace(self, name, obj):
'''
Replace the parsed object corresponding to name with obj, which must be
a similar object, i.e. an lxml tree for HTML/XML or a cssutils
stylesheet for a CSS file.
'''
self.parsed_cache[name] = obj
self.dirty(name)
@property
def opf(self):
' The parsed OPF file '
return self.parsed(self.opf_name)
@property
def mi(self):
''' The metadata of this book as a Metadata object. Note that this
object is constructed on the fly every time this property is requested,
so use it sparingly. '''
from calibre.ebooks.metadata.opf2 import OPF as O
mi = self.serialize_item(self.opf_name)
return O(BytesIO(mi), basedir=self.opf_dir, unquote_urls=False,
populate_spine=False).to_book_metadata()
@property
def opf_version(self):
' The version set on the OPF\'s <package> element '
try:
return self.opf_xpath('//opf:package/@version')[0]
except IndexError:
return ''
@property
def manifest_id_map(self):
' Mapping of manifest id to canonical names '
return {item.get('id'):self.href_to_name(item.get('href'), self.opf_name)
for item in self.opf_xpath('//opf:manifest/opf:item[@href and @id]')}
@property
def manifest_type_map(self):
' Mapping of manifest media-type to list of canonical names of that media-type '
ans = defaultdict(list)
for item in self.opf_xpath('//opf:manifest/opf:item[@href and @media-type]'):
ans[item.get('media-type').lower()].append(self.href_to_name(
item.get('href'), self.opf_name))
return {mt:tuple(v) for mt, v in ans.iteritems()}
@property
def guide_type_map(self):
' Mapping of guide type to canonical name '
return {item.get('type', ''):self.href_to_name(item.get('href'), self.opf_name)
for item in self.opf_xpath('//opf:guide/opf:reference[@href and @type]')}
@property
def spine_iter(self):
''' An iterator that yields item, name is_linear for every item in the
books' spine. item is the lxml element, name is the canonical file name
and is_linear is True if the item is linear. See also: :attr:`spine_names` and :attr:`spine_items`. '''
manifest_id_map = self.manifest_id_map
non_linear = []
for item in self.opf_xpath('//opf:spine/opf:itemref[@idref]'):
idref = item.get('idref')
name = manifest_id_map.get(idref, None)
path = self.name_path_map.get(name, None)
if path:
if item.get('linear', 'yes') == 'yes':
yield item, name, True
else:
non_linear.append((item, name))
for item, name in non_linear:
yield item, name, False
@property
def spine_names(self):
''' An iterator yielding name and is_linear for every item in the
books' spine. See also: :attr:`spine_iter` and :attr:`spine_items`. '''
for item, name, linear in self.spine_iter:
yield name, linear
@property
def spine_items(self):
''' An iterator yielding canonical name for every item in the
books' spine. See also: :attr:`spine_iter` and :attr:`spine_items`. '''
for name, linear in self.spine_names:
yield self.name_path_map[name]
def remove_from_spine(self, spine_items, remove_if_no_longer_in_spine=True):
'''
Remove the specified items (by canonical name) from the spine. If ``remove_if_no_longer_in_spine``
is True, the items are also deleted from the book, not just from the spine.
'''
nixed = set()
for (name, remove), (item, xname, linear) in zip(spine_items, self.spine_iter):
if remove and name == xname:
self.remove_from_xml(item)
nixed.add(name)
if remove_if_no_longer_in_spine:
# Remove from the book if no longer in spine
nixed -= {name for name, linear in self.spine_names}
for name in nixed:
self.remove_item(name)
def set_spine(self, spine_items):
''' Set the spine to be spine_items where spine_items is an iterable of
the form (name, linear). Will raise an error if one of the names is not
present in the manifest. '''
imap = self.manifest_id_map
imap = {name:item_id for item_id, name in imap.iteritems()}
items = [item for item, name, linear in self.spine_iter]
tail, last_tail = (items[0].tail, items[-1].tail) if items else ('\n ', '\n ')
map(self.remove_from_xml, items)
spine = self.opf_xpath('//opf:spine')[0]
spine.text = tail
for name, linear in spine_items:
i = spine.makeelement('{%s}itemref' % OPF_NAMESPACES['opf'], nsmap={'opf':OPF_NAMESPACES['opf']})
i.tail = tail
i.set('idref', imap[name])
spine.append(i)
if not linear:
i.set('linear', 'no')
if len(spine) > 0:
spine[-1].tail = last_tail
self.dirty(self.opf_name)
def remove_item(self, name, remove_from_guide=True):
'''
Remove the item identified by name from this container. This removes all
references to the item in the OPF manifest, guide and spine as well as from
any internal caches.
'''
removed = set()
for elem in self.opf_xpath('//opf:manifest/opf:item[@href]'):
if self.href_to_name(elem.get('href'), self.opf_name) == name:
id_ = elem.get('id', None)
if id_ is not None:
removed.add(id_)
self.remove_from_xml(elem)
self.dirty(self.opf_name)
if removed:
for spine in self.opf_xpath('//opf:spine'):
tocref = spine.attrib.get('toc', None)
if tocref and tocref in removed:
spine.attrib.pop('toc', None)
self.dirty(self.opf_name)
for item in self.opf_xpath('//opf:spine/opf:itemref[@idref]'):
idref = item.get('idref')
if idref in removed:
self.remove_from_xml(item)
self.dirty(self.opf_name)
for meta in self.opf_xpath('//opf:meta[@name="cover" and @content]'):
if meta.get('content') in removed:
self.remove_from_xml(meta)
self.dirty(self.opf_name)
if remove_from_guide:
for item in self.opf_xpath('//opf:guide/opf:reference[@href]'):
if self.href_to_name(item.get('href'), self.opf_name) == name:
self.remove_from_xml(item)
self.dirty(self.opf_name)
path = self.name_path_map.pop(name, None)
if path and os.path.exists(path):
os.remove(path)
self.mime_map.pop(name, None)
self.parsed_cache.pop(name, None)
self.dirtied.discard(name)
def dirty(self, name):
''' Mark the parsed object corresponding to name as dirty. See also: :meth:`parsed`. '''
self.dirtied.add(name)
def remove_from_xml(self, item):
'Removes item from parent, fixing indentation (works only with self closing items)'
parent = item.getparent()
idx = parent.index(item)
if idx == 0:
# We are removing the first item - only care about adjusting
# the tail if this was the only child
if len(parent) == 1:
parent.text = item.tail
else:
# Make sure the preceding item has this tail
parent[idx-1].tail = item.tail
parent.remove(item)
return item
def insert_into_xml(self, parent, item, index=None):
'''Insert item into parent (or append if index is None), fixing
indentation. Only works with self closing items.'''
if index is None:
parent.append(item)
else:
parent.insert(index, item)
idx = parent.index(item)
if idx == 0:
item.tail = parent.text
# If this is the only child of this parent element, we need a
# little extra work as we have gone from a self-closing <foo />
# element to <foo><item /></foo>
if len(parent) == 1:
sibling = parent.getprevious()
if sibling is None:
# Give up!
return
parent.text = sibling.text
item.tail = sibling.tail
else:
item.tail = parent[idx-1].tail
if idx == len(parent)-1:
parent[idx-1].tail = parent.text
def opf_get_or_create(self, name):
''' Convenience method to either return the first XML element with the
specified name or create it under the opf:package element and then
return it, if it does not already exist. '''
ans = self.opf_xpath('//opf:'+name)
if ans:
return ans[0]
self.dirty(self.opf_name)
package = self.opf_xpath('//opf:package')[0]
item = package.makeelement(OPF(name))
item.tail = '\n'
package.append(item)
return item
def generate_item(self, name, id_prefix=None, media_type=None, unique_href=True):
'''Add an item to the manifest with href derived from the given
name. Ensures uniqueness of href and id automatically. Returns
generated item.'''
id_prefix = id_prefix or 'id'
media_type = media_type or guess_type(name)
href = self.name_to_href(name, self.opf_name)
base, ext = href.rpartition('.')[0::2]
all_ids = {x.get('id') for x in self.opf_xpath('//*[@id]')}
c = 0
item_id = id_prefix
while item_id in all_ids:
c += 1
item_id = id_prefix + '%d'%c
all_names = {x.get('href') for x in self.opf_xpath(
'//opf:manifest/opf:item[@href]')}
def exists(h):
return self.exists(self.href_to_name(h, self.opf_name))
if unique_href:
c = 0
while href in all_names or exists(href):
c += 1
href = '%s_%d.%s'%(base, c, ext)
manifest = self.opf_xpath('//opf:manifest')[0]
item = manifest.makeelement(OPF('item'),
id=item_id, href=href)
item.set('media-type', media_type)
self.insert_into_xml(manifest, item)
self.dirty(self.opf_name)
name = self.href_to_name(href, self.opf_name)
self.name_path_map[name] = path = self.name_to_abspath(name)
self.mime_map[name] = media_type
# Ensure that the file corresponding to the newly created item exists
# otherwise cloned containers will fail when they try to get the number
# of links to the file
base = os.path.dirname(path)
if not os.path.exists(base):
os.makedirs(base)
open(path, 'wb').close()
return item
def format_opf(self):
try:
mdata = self.opf_xpath('//opf:metadata')[0]
except IndexError:
pass
else:
mdata.text = '\n '
remove = set()
for child in mdata:
child.tail = '\n '
try:
if (child.get('name', '').startswith('calibre:') and
child.get('content', '').strip() in {'{}', ''}):
remove.add(child)
except AttributeError:
continue # Happens for XML comments
for child in remove:
mdata.remove(child)
if len(mdata) > 0:
mdata[-1].tail = '\n '
# Ensure name comes before content, needed for Nooks
for meta in self.opf_xpath('//opf:meta[@name="cover"]'):
if 'content' in meta.attrib:
meta.set('content', meta.attrib.pop('content'))
def serialize_item(self, name):
''' Convert a parsed object (identified by canonical name) into a bytestring. See :meth:`parsed`. '''
data = self.parsed(name)
if name == self.opf_name:
self.format_opf()
data = serialize(data, self.mime_map[name], pretty_print=name in
self.pretty_print)
if name == self.opf_name:
# Needed as I can't get lxml to output opf:role and
# not output <opf:metadata> as well
data = re.sub(br'(<[/]{0,1})opf:', r'\1', data)
return data
def commit_item(self, name, keep_parsed=False):
''' Commit a parsed object to disk (it is serialized and written to the
underlying file). If ``keep_parsed`` is True the parsed representation
is retained in the cache. See also: :meth:`parsed` '''
if name not in self.parsed_cache:
return
data = self.serialize_item(name)
self.dirtied.discard(name)
if not keep_parsed:
self.parsed_cache.pop(name)
dest = self.name_path_map[name]
if self.cloned and nlinks_file(dest) > 1:
# Decouple this file from its links
os.unlink(dest)
with open(dest, 'wb') as f:
f.write(data)
def filesize(self, name):
''' Return the size in bytes of the file represented by the specified
canonical name. Automatically handles dirtied parsed objects. See also:
:meth:`parsed` '''
if name in self.dirtied:
self.commit_item(name, keep_parsed=True)
path = self.name_to_abspath(name)
return os.path.getsize(path)
def open(self, name, mode='rb'):
''' Open the file pointed to by name for direct read/write. Note that
this will commit the file if it is dirtied and remove it from the parse
cache. You must finish with this file before accessing the parsed
version of it again, or bad things will happen. '''
if name in self.dirtied:
self.commit_item(name)
self.parsed_cache.pop(name, False)
path = self.name_to_abspath(name)
base = os.path.dirname(path)
if not os.path.exists(base):
os.makedirs(base)
else:
if self.cloned and mode not in {'r', 'rb'} and os.path.exists(path) and nlinks_file(path) > 1:
# Decouple this file from its links
temp = path + 'xxx'
shutil.copyfile(path, temp)
os.unlink(path)
os.rename(temp, path)
return open(path, mode)
def commit(self, outpath=None, keep_parsed=False):
'''
Commit all dirtied parsed objects to the filesystem and write out the ebook file at outpath.
:param output: The path to write the saved ebook file to. If None, the path of the original book file is used.
:param keep_parsed: If True the parsed representations of committed items are kept in the cache.
'''
for name in tuple(self.dirtied):
self.commit_item(name, keep_parsed=keep_parsed)
def compare_to(self, other):
if set(self.name_path_map) != set(other.name_path_map):
return 'Set of files is not the same'
mismatches = []
for name, path in self.name_path_map.iteritems():
opath = other.name_path_map[name]
with open(path, 'rb') as f1, open(opath, 'rb') as f2:
if f1.read() != f2.read():
mismatches.append('The file %s is not the same'%name)
return '\n'.join(mismatches)
# }}}
# EPUB {{{
class InvalidEpub(InvalidBook):
pass
class ObfuscationKeyMissing(InvalidEpub):
pass
OCF_NS = 'urn:oasis:names:tc:opendocument:xmlns:container'
class EpubContainer(Container):
book_type = 'epub'
META_INF = {
'container.xml': True,
'manifest.xml': False,
'encryption.xml': False,
'metadata.xml': False,
'signatures.xml': False,
'rights.xml': False,
}
def __init__(self, pathtoepub, log, clone_data=None, tdir=None):
if clone_data is not None:
super(EpubContainer, self).__init__(None, None, log, clone_data=clone_data)
for x in ('pathtoepub', 'obfuscated_fonts'):
setattr(self, x, clone_data[x])
return
self.pathtoepub = pathtoepub
if tdir is None:
tdir = PersistentTemporaryDirectory('_epub_container')
tdir = os.path.abspath(os.path.realpath(tdir))
self.root = tdir
with open(self.pathtoepub, 'rb') as stream:
try:
zf = ZipFile(stream)
zf.extractall(tdir)
except:
log.exception('EPUB appears to be invalid ZIP file, trying a'
' more forgiving ZIP parser')
from calibre.utils.localunzip import extractall
stream.seek(0)
extractall(stream, path=tdir)
try:
os.remove(join(tdir, 'mimetype'))
except EnvironmentError:
pass
container_path = join(self.root, 'META-INF', 'container.xml')
if not exists(container_path):
raise InvalidEpub('No META-INF/container.xml in epub')
container = etree.fromstring(open(container_path, 'rb').read())
opf_files = container.xpath((
r'child::ocf:rootfiles/ocf:rootfile'
'[@media-type="%s" and @full-path]'%guess_type('a.opf')
), namespaces={'ocf':OCF_NS}
)
if not opf_files:
raise InvalidEpub('META-INF/container.xml contains no link to OPF file')
opf_path = os.path.join(self.root, *(urlunquote(opf_files[0].get('full-path')).split('/')))
if not exists(opf_path):
raise InvalidEpub('OPF file does not exist at location pointed to'
' by META-INF/container.xml')
super(EpubContainer, self).__init__(tdir, opf_path, log)
self.obfuscated_fonts = {}
if 'META-INF/encryption.xml' in self.name_path_map:
self.process_encryption()
self.parsed_cache['META-INF/container.xml'] = container
def clone_data(self, dest_dir):
ans = super(EpubContainer, self).clone_data(dest_dir)
ans['pathtoepub'] = self.pathtoepub
ans['obfuscated_fonts'] = self.obfuscated_fonts.copy()
return ans
def rename(self, old_name, new_name):
is_opf = old_name == self.opf_name
super(EpubContainer, self).rename(old_name, new_name)
if is_opf:
for elem in self.parsed('META-INF/container.xml').xpath((
r'child::ocf:rootfiles/ocf:rootfile'
'[@media-type="%s" and @full-path]'%guess_type('a.opf')
), namespaces={'ocf':OCF_NS}
):
# The asinine epubcheck cannot handle quoted filenames in
# container.xml
elem.set('full-path', self.opf_name)
self.dirty('META-INF/container.xml')
if old_name in self.obfuscated_fonts:
self.obfuscated_fonts[new_name] = self.obfuscated_fonts.pop(old_name)
enc = self.parsed('META-INF/encryption.xml')
for cr in enc.xpath('//*[local-name()="CipherReference" and @URI]'):
if self.href_to_name(cr.get('URI')) == old_name:
cr.set('URI', self.name_to_href(new_name))
self.dirty('META-INF/encryption.xml')
@property
def names_that_need_not_be_manifested(self):
return super(EpubContainer, self).names_that_need_not_be_manifested | {'META-INF/' + x for x in self.META_INF}
def ok_to_be_unmanifested(self, name):
return name in self.names_that_need_not_be_manifested or name.startswith('META-INF/')
@property
def names_that_must_not_be_removed(self):
return super(EpubContainer, self).names_that_must_not_be_removed | {'META-INF/container.xml'}
@property
def names_that_must_not_be_changed(self):
return super(EpubContainer, self).names_that_must_not_be_changed | {'META-INF/' + x for x in self.META_INF}