-
Notifications
You must be signed in to change notification settings - Fork 30
/
exFAT.py
1266 lines (907 loc) · 41 KB
/
exFAT.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# dfir_ntfs: an NTFS/FAT parser for digital forensics & incident response
# (c) Maxim Suhanov
#
# This additional module implements an interface to work with exFAT volumes.
# [EXFAT 1.00] is:
# exFAT file system specification
#
# URL: https://docs.microsoft.com/en-us/windows/win32/fileio/exfat-specification
# Microsoft Corporation
#
# [US10614032B2] is:
# Quick filename lookup using name hash
#
# URL: https://patents.google.com/patent/US10614032B2/
#
# [US10726147B2] is:
# File encryption support for FAT file systems
#
# URL: https://patents.google.com/patent/US10726147B2/
import struct
import uuid
import ctypes
from datetime import datetime, timedelta
from collections import namedtuple
PATH_SEPARATOR = '/'
EXFAT_EOC = 0xFFFFFFFF # End of chain.
EXFAT_BAD = 0xFFFFFFF7 # Bad cluster.
# File attributes:
ATTR_READ_ONLY = 0x01
ATTR_HIDDEN = 0x02
ATTR_SYSTEM = 0x04
# 0x08 was ATTR_VOLUME_ID in FAT12/16/32. According to [EXFAT 1.00], this value is reserved.
#
# The macOS exFAT driver (Big Sur and Monterey, but not Catalina) sets the 0x08 flag for directory entries. The meaning of this flag is unclear.
# The flag is set for every newly created directory, but it is not checked when read (not in the exFAT driver, not in the userspace tools like fsck).
#
# See also:
# * https://www.magiclantern.fm/forum/index.php?topic=25656.0
# * https://www.ghisler.ch/board/viewtopic.php?t=73553
#
#
# According to the Azure RTOS FileX documentation, this bit means "[e]ntry is reserved" (there, "entry" means "file").
#
# Source:
# * https://docs.microsoft.com/en-us/azure/rtos/filex/chapter3#exfat-file-directory-entry
#
# This is likely a typo.
ATTR_UNKNOWN8 = 0x08
ATTR_DIRECTORY = 0x10
ATTR_ARCHIVE = 0x20
FILE_ATTR_LIST = {
ATTR_READ_ONLY: 'READ_ONLY',
ATTR_HIDDEN: 'HIDDEN',
ATTR_SYSTEM: 'SYSTEM',
ATTR_UNKNOWN8: 'UNKNOWN8_MACOS',
ATTR_DIRECTORY: 'DIRECTORY',
ATTR_ARCHIVE: 'ARCHIVE'
}
# A maximum number of name entries (for a single file).
MAX_NAME_ENTRIES = 17 # 255/15... Old versions of fuse-exfat allowed 256 characters, this was a bug (now it is fixed). We do not support such invalid names.
# A list of characters forbidden in file names.
FORBIDDEN_CHARACTERS = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x22, 0x2A, 0x2F, 0x3A, 0x3C, 0x3E, 0x3F, 0x5C, 0x7C]
# Known OEM parameters GUIDs:
OEM_NULL_GUID = uuid.UUID('{00000000-0000-0000-0000-000000000000}') # No OEM parameters.
OEM_FLASH_PARAMETERS_GUID = uuid.UUID('{0A0C7E46-3399-4021-90C8-FA6D389C4BA2}')
# Known directory entries (the in-use bit is set to 0):
DE_ALLOCATION_BITMAP = 0x01 # A bitmap used to track cluster allocations.
DE_UPCASE_TABLE = 0x02 # An uppercase table.
DE_VOLUME_LABEL = 0x03 # A volume label.
DE_FILE = 0x05 # A file itself.
DE_VOLUME_GUID = 0x20 # A volume GUID.
DE_TEXFAT_PADDING = 0x21 # A placeholder to fill the first cluster of a TexFAT-enabled directory.
DE_STREAM_EXTENSION = 0x40 # File data.
DE_FILE_NAME = 0x41 # A file name.
DE_VENDOR_EXTENSION = 0x60 # Vendor-specific data stored in the directory entry.
DE_VENDOR_ALLOCATION = 0x61 # Vendor-specific data stored in linked clusters.
# According to [US10614032B2], the following directory entry also exists:
DE_WINDOWS_CE_ACT = 0x22 # An access control table. Not publicly defined.
# The document also lists "Windows CE Access Control" and "Allocation Bitmap Directory Entry", but no description is given (perhaps, this was a draft).
# Some additional entries:
DE_END_OF_DIRECTORY = 0x00 # No further entries are expected in a directory.
DE_INVALID = 0x80 # This entry is not allowed.
# Some values for the entry type fields:
TYPE_IMPORTANCE_CRITICAL = 0
TYPE_IMPORTANCE_BENIGN = 1
TYPE_CATEGORY_PRIMARY = 0
TYPE_CATEGORY_SECONDARY = 1
# Flags for the stream data.
STREAM_DATA_ENCRYPTED = 0x8000 # Files: data is encrypted, directories: files are encrypted by default.
def ResolveFileAttributes(FileAttributes):
"""Convert file attributes to a string."""
str_list = []
for file_attr in sorted(FILE_ATTR_LIST.keys()):
if FileAttributes & file_attr > 0:
str_list.append(FILE_ATTR_LIST[file_attr])
return ' | '.join(str_list)
def BuildName(NameEntities, NameLength):
"""Parse a given name, return a string (or None, if the name is invalid).
Decoding errors are not raised.
"""
if len(NameEntities) == 0:
return
buf = b''.join(NameEntities)
# Remove everything after the first null character (including it).
i = 0
while i < len(buf):
if buf[i : i + 2] == b'\x00\x00':
buf = buf[ : i]
break
i += 2
name = buf.decode('utf-16le', errors = 'replace')
if len(name) == 0: # This is an invalid name.
return
if NameLength is not None and len(name) > NameLength: # This name is too long, truncate it.
return name[ : NameLength]
for character_code in FORBIDDEN_CHARACTERS:
character = chr(character_code)
if character in name: # This is an invalid name.
return
if name in [ '.', '..' ]: # These are reserved names.
return
return name
def BuildLabel(LabelRaw, LabelLength):
"""Parse a given volume label, return a string (or None, if the label is invalid).
Decoding errors are not raised.
"""
buf = LabelRaw
# Remove everything after the first null character (including it).
i = 0
while i < len(buf):
if buf[i : i + 2] == b'\x00\x00':
buf = buf[ : i]
break
i += 2
name = buf.decode('utf-16le', errors = 'replace')
if len(name) == 0: # This is an invalid label.
return
if len(name) > LabelLength: # This label is too long, truncate it.
return name[ : LabelLength]
for character_code in FORBIDDEN_CHARACTERS:
character = chr(character_code)
if character in name: # This is an invalid label.
return
return name
def DecodeFATTimestamp(Value, Value10ms = 0):
"""Decode and return the datetime object (or None, if the timestamp is invalid)."""
ValueTime = Value & 0xFFFF
ValueDate = (Value >> 16) & 0xFFFFF
if Value10ms > 199: # Something is wrong, ignore this value.
Value10ms = 0
second = ValueTime & 0x1F
if second > 29:
return
second *= 2
minute = (ValueTime >> 5) & 0x3F
if minute > 59:
return
hour = (ValueTime >> 11) & 0x1F
if hour > 23:
return
day = ValueDate & 0x1F
if day == 0: # 0x1F = 31.
return
month = (ValueDate >> 5) & 0x0F
if month == 0 or month > 12:
return
year = (ValueDate >> 9) & 0x7F
if year > 127:
return
year += 1980
try:
return datetime(year, month, day, hour, minute, second) + timedelta(milliseconds = Value10ms * 10)
except ValueError:
return
def DecodeFATTimezone(Value):
"""Decode and return the FAT timezone as an integer (or None, if the timezone is not given).
This integer is signed and it counts 15-minute intervals from UTC.
"""
is_valid = Value & 0x80 > 0
if not is_valid:
return
offset = Value & 0x7F
if offset & 0x40 == 0: # Unsigned.
return offset
else: # Signed.
return ctypes.c_int8(offset | 0x80).value
def EntryTypeWithoutInUseBit(EntryType):
"""Return the entry type with the in-use bit unset."""
if EntryType & 0x80 > 0:
return EntryType - 0x80
return EntryType
def DecodeEntryType(EntryType):
"""Decode the entry type, return a tuple: (type_code, type_importance, type_category, is_in_use)."""
type_code = EntryType & 0x1F
type_importance = (EntryType >> 5) & 1
type_category = (EntryType >> 6) & 1
is_in_use = EntryType & 0x80 > 0
return (type_code, type_importance, type_category, is_in_use)
def DecodeFlags(Flags):
"""Decode the flags found in an entry, return a tuple: (allocation_possible, no_fat_chain)."""
allocation_possible = Flags & 1 > 0
no_fat_chain = (Flags >> 1) & 1 > 0
return (allocation_possible, no_fat_chain)
def EntrySetChecksum(EntrySetRaw):
"""Calculate and return the entry set checksum (as an integer)."""
if len(EntrySetRaw) < 32 or len(EntrySetRaw) % 32 != 0:
raise ValueError('Invalid entry set length')
checksum = 0
for idx in range(len(EntrySetRaw)):
if idx in [2, 3]:
continue
if checksum & 1 > 0:
checksum = 0x8000 | (checksum >> 1)
else:
checksum = checksum >> 1
checksum = (checksum + EntrySetRaw[idx]) & 0xFFFF
return checksum
class FileSystemException(Exception):
"""This is a top-level exception for this module."""
def __init__(self, value):
self._value = value
def __str__(self):
return repr(self._value)
class BootRegionException(FileSystemException):
"""This exception is raised when something is wrong with the boot region."""
pass
class BootRegionChecksumException(BootRegionException):
"""This exception is raised when something is wrong with the boot region checksum."""
pass
class FileAllocationTableException(FileSystemException):
"""This exception is raised when something is wrong with the file allocation table (FAT)."""
pass
class DirectoryEntriesException(FileSystemException):
"""This exception is raised when something is wrong with directory entries."""
pass
class BR(object):
"""This class is used to work with a boot region (BR) containing boot sectors and other data (12 sectors)."""
br_buf = None
def __init__(self, br_buf):
self.br_buf = br_buf
if len(self.br_buf) < 12 * 512 or len(self.br_buf) % 512 != 0:
raise BootRegionException('Invalid boot region size')
self.validate()
def get_bs_jmpboot(self):
"""Get and return the first 3 bytes."""
return self.br_buf[ : 3]
def get_bs_fsname(self):
"""Get and return the FS name (as raw bytes)."""
return self.br_buf[3 : 11]
def get_mustbezero(self):
"""Get and return the null area, a former BPB (as raw bytes)."""
return self.br_buf[11 : 64]
def get_partitionoffset(self):
"""Get and return the partition offset."""
return struct.unpack('<Q', self.br_buf[64 : 72])[0]
def get_volumelength(self):
"""Get and return the volume length (in sectors)."""
return struct.unpack('<Q', self.br_buf[72 : 80])[0]
def get_fatoffset(self):
"""Get and return the FAT offset (in sectors)."""
return struct.unpack('<L', self.br_buf[80 : 84])[0]
def get_fatlength(self):
"""Get and return the FAT length (in sectors)."""
return struct.unpack('<L', self.br_buf[84 : 88])[0]
def get_clusterheapoffset(self):
"""Get and return the cluster heap offset (in sectors)."""
return struct.unpack('<L', self.br_buf[88 : 92])[0]
def get_clustercount(self):
"""Get and return the cluster count."""
return struct.unpack('<L', self.br_buf[92 : 96])[0]
def get_firstclusterofrootdirectory(self):
"""Get and return the first cluster of the root directory."""
return struct.unpack('<L', self.br_buf[96 : 100])[0]
def get_volumeserialnumber(self):
"""Get and return the volume serial number (as an integer)."""
return struct.unpack('<L', self.br_buf[100 : 104])[0]
def get_filesystemrevision(self):
"""Get and return the file system revision, as a tuple (major_version, minor_version)."""
major_version = self.br_buf[105]
minor_version = self.br_buf[104]
return (major_version, minor_version)
def get_volumeflags(self):
"""Get and return the volume flags, as a tuple: (active_fat, volume_dirty, media_failure, clear_to_zero)."""
flags = struct.unpack('<H', self.br_buf[106 : 108])[0]
active_fat = flags & 1 > 0
volume_dirty = (flags >> 1) & 1 > 0
media_failure = (flags >> 2) & 1 > 0
clear_to_zero = (flags >> 3) & 1 > 0
return (active_fat, volume_dirty, media_failure, clear_to_zero)
def get_bytespersector(self):
"""Calculate and return the bytes per sector value."""
shift = self.br_buf[108]
if shift < 9 or shift > 12:
raise BootRegionException('Invalid bytes per sector shift')
return 1 << shift
def get_sectorspercluster(self):
"""Calculate and return the sectors per cluster value."""
shift_sec = self.br_buf[108]
shift_clus = self.br_buf[109]
if shift_clus > 25 - shift_sec:
raise BootRegionException('Invalid sectors per cluster shift (too large)')
return 1 << shift_clus
def get_numberoffats(self):
"""Calculate and return the number of FATs."""
return self.br_buf[110]
def get_driveselect(self):
"""Calculate and return the BIOS drive number."""
return self.br_buf[111]
def get_percentinuse(self):
"""Calculate and return the percent in use (or None, if not tracked or invalid)."""
percent = self.br_buf[112]
if percent > 100 or percent == 0xFF:
return
return percent
def get_reserved(self):
"""Get and return the reserved area (as raw bytes)."""
return self.br_buf[113 : 120]
def get_bs_bootcode(self):
"""Get and return the boot code from the boot sector (as raw bytes)."""
return self.br_buf[120 : 510]
def get_bs_signature(self):
"""Get and return the boot signature of the boot sector (as two raw bytes)."""
return self.br_buf[510 : 512]
def get_ebs_signature(self, sector_number):
"""Get and return the signature of a given extended boot sector (as four raw bytes)."""
if sector_number < 1 or sector_number > 8:
raise ValueError('Invalid extended boot sector number ({})'.format(sector_number))
offset = self.get_bytespersector() * sector_number
return self.br_buf[offset + 508 : offset + 512]
def get_oem_guid(self):
"""Get, decode and return the GUID defined in the OEM parameters sector (as an UUID object)."""
offset = self.get_bytespersector() * 10
guid_raw = self.br_buf[offset : offset + 16]
return uuid.UUID(bytes_le = guid_raw)
def get_oem_flash_parameters(self):
"""Get and return the OEM flash parameters, if present.
A tuple is returned: (erase_block_size, page_size, spare_sectors, random_access_time, programming_time, read_cycle, write_cycle).
Its members are None if the OEM flash parameters are absent.
"""
if self.get_oem_guid() != OEM_FLASH_PARAMETERS_GUID:
return (None, None, None, None, None, None, None)
parameters_buf = self.br_buf[self.get_bytespersector() * 10 + 16 : self.get_bytespersector() * 10 + 44]
erase_block_size, page_size, spare_sectors, random_access_time, programming_time, read_cycle, write_cycle = struct.unpack('<LLLLLLL', parameters_buf)
return (erase_block_size, page_size, spare_sectors, random_access_time, programming_time, read_cycle, write_cycle)
def get_checksum(self):
"""Get and return the boot region checksum (as an integer)."""
offset = self.get_bytespersector() * 11
# We validate only the first three checksums.
checksum_1 = self.br_buf[offset : offset + 4]
checksum_2 = self.br_buf[offset + 4 : offset + 8]
checksum_3 = self.br_buf[offset + 8 : offset + 12]
if checksum_1 != checksum_2 or checksum_1 != checksum_3:
raise BootRegionException('Invalid checksums in the boot region')
return struct.unpack('<L', checksum_1)[0]
def calculate_checksum(self):
"""Calculate and return the boot region checksum (as an integer)."""
checksum = 0
for idx in range(self.get_bytespersector() * 11):
if idx in [106, 107, 112]:
continue
if checksum & 1 > 0:
checksum = 0x80000000 | (checksum >> 1)
else:
checksum = checksum >> 1
checksum = (checksum + self.br_buf[idx]) & 0xFFFFFFFF
return checksum
def validate(self, relaxed_checks = True):
"""Validate the boot region."""
if self.get_bs_fsname() != b'EXFAT ':
raise BootRegionException('Invalid file system name')
if self.get_bs_signature() != b'\x55\xAA':
raise BootRegionException('Invalid boot signature')
if not relaxed_checks:
if self.get_bs_jmpboot() != b'\xEB\x76\x90':
raise BootRegionException('Invalid jump code')
if self.get_mustbezero() != b'\x00' * 53:
raise BootRegionException('Invalid (non-zero) null area')
if self.get_checksum() != self.calculate_checksum():
raise BootRegionChecksumException('Invalid boot region checksum')
bytes_per_sector = self.get_bytespersector()
volume_length = self.get_volumelength()
if volume_length * bytes_per_sector < 1024 * 1024:
raise BootRegionException('Invalid volume length (too small)')
number_of_fats = self.get_numberoffats()
cluster_heap_offset = self.get_clusterheapoffset()
fat_offset = self.get_fatoffset()
if fat_offset < 24:
raise BootRegionException('Invalid FAT offset (too small)')
if fat_offset > cluster_heap_offset - self.get_fatlength() * number_of_fats:
raise BootRegionException('Invalid FAT offset (too large)')
cluster_count = self.get_clustercount()
sectors_per_cluster = self.get_sectorspercluster()
fat_length = self.get_fatlength()
if fat_length * bytes_per_sector < (cluster_count + 2) * 4:
raise BootRegionException('Invalid FAT length (too small)')
if fat_length * number_of_fats > cluster_heap_offset - fat_offset:
raise BootRegionException('Invalid FAT length (too large)')
if cluster_heap_offset < fat_offset + fat_length * number_of_fats:
raise BootRegionException('Invalid cluster heap offset (too small)')
if cluster_heap_offset > cluster_count * sectors_per_cluster:
raise BootRegionException('Invalid cluster heap offset (too large)')
if cluster_count > 0xFFFFFFF5:
raise BootRegionException('Invalid cluster count (too large)')
if cluster_count * sectors_per_cluster > volume_length - cluster_heap_offset:
raise BootRegionException('Invalid cluster count (too large)')
first_cluster_of_root_directory = self.get_firstclusterofrootdirectory()
if first_cluster_of_root_directory < 2:
raise BootRegionException('Invalid first root directory cluster (too small)')
if first_cluster_of_root_directory > cluster_count + 1:
raise BootRegionException('Invalid first root directory cluster (too large)')
major_version, minor_version = self.get_filesystemrevision()
if major_version == 0 or major_version > 99 or minor_version > 99:
raise BootRegionException('Invalid file system version')
if major_version > 1:
raise NotImplementedError('File system major version is not supported: {}'.format(major_version))
if number_of_fats != 1 and number_of_fats != 2:
raise BootRegionException('Invalid number of FATs')
active_fat, __, __, __ = self.get_volumeflags()
if active_fat == 1 and number_of_fats != 2:
raise BootRegionException('Invalid number of FATs')
return True
def __str__(self):
return 'BR (boot region)'
class FAT(object):
"""This class is used to work with a file allocation table."""
fat_object = None
fat_offset = None
fat_size = None
last_valid_cluster = None
def __init__(self, fat_object, fat_offset, fat_size, last_valid_cluster):
self.fat_object = fat_object
self.fat_offset = fat_offset
self.fat_size = fat_size
self.last_valid_cluster = last_valid_cluster
if self.fat_offset > 0 and self.fat_offset % 512 != 0:
raise FileAllocationTableException('Invalid FAT offset: {}'.format(self.fat_offset))
if self.fat_size < 512 or self.fat_size % 512 != 0:
raise FileAllocationTableException('Invalid FAT size: {}'.format(self.fat_size))
def get_element(self, number):
"""Get and return the FAT entry by its number."""
fat_item_offset = number * 4
if fat_item_offset + 4 > self.fat_size or number > self.last_valid_cluster:
raise FileAllocationTableException('Out of bounds, FAT element: {}'.format(number))
self.fat_object.seek(self.fat_offset + fat_item_offset)
next_element_raw = self.fat_object.read(4)
if len(next_element_raw) != 4:
raise FileAllocationTableException('Truncated FAT entry, FAT element: {}'.format(number))
next_cluster = struct.unpack('<L', next_element_raw)[0]
return next_cluster
def get_media_type(self):
"""Get and return the media type value."""
fat_0 = self.get_element(0)
return fat_0 & 0xFF
def chain(self, first_cluster):
"""Get and return the cluster chain for the given first cluster (as a list of cluster numbers).
For bad clusters, None is given (as an item in the chain).
"""
if first_cluster == 0:
# This file is empty, no chain.
return []
if first_cluster == 1:
# This cluster is reserved, no chain.
return []
if first_cluster == EXFAT_BAD:
# The first cluster is bad.
raise FileAllocationTableException('Bad starting cluster {}'.format(first_cluster))
chain = [ first_cluster ]
curr_cluster = first_cluster
while True:
next_cluster = self.get_element(curr_cluster)
if next_cluster in chain: # This is a loop, the FAT is corrupted, stop (but do not raise an exception).
break
if next_cluster == EXFAT_EOC:
# End of chain, stop.
break
elif next_cluster == EXFAT_BAD:
# Bad cluster, use None and stop.
chain.append(None)
break
elif next_cluster == 0:
# This is a "file is empty" mark (in a wrong location), stop.
break
elif next_cluster == 1:
# This cluster is reserved, stop.
break
chain.append(next_cluster)
curr_cluster = next_cluster
return chain
def __str__(self):
return 'FAT'
# Here, "ctime" means "created time".
# However, in the Linux kernel, updates to this field are not always consistent (for example, "ctime" is updated for a parent directory when renaming a child file).
#
# "atz", "mtz", "ctz" are measured in 15-minute intervals (UTC+5:45 is 23, UTC-10 is -40). If no time zone is specified, the value is None.
# However, macOS tracks timestamps vice versa, using a wrong sign (UTC+3 is -12).
# [EXFAT 1.00] clearly states that UTC+00:15 is 1 and UTC-00:15 is -1.
FileEntry = namedtuple('FileEntry', [ 'is_deleted', 'is_directory', 'name', 'atime', 'atz', 'mtime', 'mtz', 'ctime', 'ctz', 'size', 'valid_data_length', 'attributes', 'first_cluster', 'is_encrypted', 'no_fat_chain' ])
OrphanEntry = namedtuple('OrphanEntry', [ 'name_partial' ])
VolumeLabelEntry = namedtuple('VolumeLabelEntry', [ 'volume_label' ])
AllocationBitmapEntry = namedtuple('AllocationBitmapEntry', [ 'bitmap_id', 'first_cluster', 'size' ])
def ExpandPath(ParentPath, FileEntryOrOrphanEntry):
if len(ParentPath) > 0 and ParentPath[-1] != PATH_SEPARATOR:
ParentPath += PATH_SEPARATOR
elif len(ParentPath) == 0:
ParentPath = PATH_SEPARATOR
if type(FileEntryOrOrphanEntry) is FileEntry:
FileEntryOrig = FileEntryOrOrphanEntry
is_deleted = FileEntryOrig.is_deleted
is_directory = FileEntryOrig.is_directory
name = ParentPath + FileEntryOrig.name
atime = FileEntryOrig.atime
atz = FileEntryOrig.atz
mtime = FileEntryOrig.mtime
mtz = FileEntryOrig.mtz
ctime = FileEntryOrig.ctime
ctz = FileEntryOrig.ctz
size = FileEntryOrig.size
valid_data_length = FileEntryOrig.valid_data_length
attributes = FileEntryOrig.attributes
first_cluster = FileEntryOrig.first_cluster
is_encrypted = FileEntryOrig.is_encrypted
no_fat_chain = FileEntryOrig.no_fat_chain
return FileEntry(is_deleted, is_directory, name, atime, atz, mtime, mtz, ctime, ctz, size, valid_data_length, attributes, first_cluster, is_encrypted, no_fat_chain)
elif type(FileEntryOrOrphanEntry) is OrphanEntry:
name_partial = ParentPath + FileEntryOrOrphanEntry.name_partial
return OrphanEntry(name_partial)
# Something is wrong, return the input entry as is.
return FileEntryOrOrphanEntry
class DirectoryEntries(object):
"""This class is used to work with directory entries."""
clusters_buf = None
is_root = None
def __init__(self, clusters_buf, is_root = False):
self.clusters_buf = clusters_buf
self.is_root = is_root
if len(self.clusters_buf) < 512 or len(self.clusters_buf) % 512 != 0:
raise DirectoryEntriesException('Invalid buffer size: {}'.format(len(self.clusters_buf)))
def entries(self):
"""Get, decode and yield directory entries in the clusters (as named tuples: FileEntry and OrphanEntry).
If the 'is_root' argument to the constructor was True, treat the directory entries as located in the root directory.
(In this case, the following named tuples can be yielded: VolumeLabelEntry and AllocationBitmapEntry.)
"""
label_count = 0
bitmap_count = 0
pos = 0
while pos < len(self.clusters_buf):
entry_type = self.clusters_buf[pos]
if entry_type == DE_END_OF_DIRECTORY: # We will not scan further, stop.
break
if entry_type == DE_INVALID: # Something is wrong with this directory.
if self.is_root:
raise DirectoryEntriesException('Invalid directory entry type found')
else:
break
# Decode the entry type.
type_code, type_importance, type_category, is_in_use = DecodeEntryType(entry_type)
# Remove the in-use bit.
entry_type_pure = EntryTypeWithoutInUseBit(entry_type)
# We are looking for primary entries.
if type_category == TYPE_CATEGORY_PRIMARY:
if (not self.is_root) and is_in_use and entry_type_pure in [DE_ALLOCATION_BITMAP, DE_UPCASE_TABLE, DE_VOLUME_LABEL]: # This directory is invalid.
break
if (not self.is_root) and is_in_use and entry_type_pure == DE_VOLUME_GUID: # This is unexpected, but not critical, so skip the entry.
pos += 32
continue
if entry_type_pure == DE_TEXFAT_PADDING: # This is a padding entry, skip it.
pos += 32
continue
if entry_type_pure == DE_WINDOWS_CE_ACT: # This entry is not supported, skip it.
pos += 32
continue
if entry_type_pure == DE_FILE: # This is what we want (a file or a directory).
secondary_count = self.clusters_buf[pos + 1]
if secondary_count < 2 or secondary_count > 64: # This file entry is invalid, skip it.
pos += 32
continue
# According to [US10726147B2], the "Reserved1" field is used to store the encryption flag.
# This is also found in the current official implementation of the exFAT driver.
set_checksum, attributes, data_flags, ctime_int, mtime_int, atime_int, ctime_10ms, mtime_10ms, ctz_int, mtz_int, atz_int = struct.unpack('<HHHLLLBBBBB', self.clusters_buf[pos + 2 : pos + 25])
ctime = DecodeFATTimestamp(ctime_int, ctime_10ms)
mtime = DecodeFATTimestamp(mtime_int, mtime_10ms)
atime = DecodeFATTimestamp(atime_int)
ctz = DecodeFATTimezone(ctz_int)
mtz = DecodeFATTimezone(mtz_int)
atz = DecodeFATTimezone(atz_int)
is_deleted = not is_in_use
is_directory = attributes & ATTR_DIRECTORY > 0
is_encrypted = data_flags & STREAM_DATA_ENCRYPTED > 0
# Now, validate the entries. No checksum validation is performed against deleted entries.
secondary_buf = self.clusters_buf[pos + 32 : pos + 32 + secondary_count * 32]
if len(secondary_buf) != secondary_count * 32: # This entry set is truncated, skip it.
pos += 32
continue
if (not is_deleted) and set_checksum != EntrySetChecksum(self.clusters_buf[pos : pos + 32] + secondary_buf): # Entry set is allocated, but it is invalid, skip it.
pos += 32
continue
# Validate secondary entries.
is_invalid_set = False
file_name_count = 0
file_name_entities = []
found_vendor_extension = False
i = 0
while i < len(secondary_buf):
secondary_entry_type = secondary_buf[i]
if secondary_entry_type in [DE_END_OF_DIRECTORY, DE_INVALID]: # This is an invalid entry set, skip it.
is_invalid_set = True
break
secondary_type_code, secondary_type_importance, secondary_type_category, secondary_is_in_use = DecodeEntryType(secondary_entry_type)
if secondary_type_category != TYPE_CATEGORY_SECONDARY: # This is not a secondary entry, skip the set.
is_invalid_set = True
break
if (not is_deleted) and (not secondary_is_in_use): # This entry is not allocated, but the set must be allocated, so skip the set.
is_invalid_set = True
break
secondary_entry_type_pure = EntryTypeWithoutInUseBit(secondary_entry_type)
if i == 0 and secondary_entry_type_pure != DE_STREAM_EXTENSION: # The first secondary entry is wrong, skip the set.
is_invalid_set = True
break
if i == 32 and secondary_entry_type_pure != DE_FILE_NAME: # The second secondary entry is wrong, skip the set.
is_invalid_set = True
break
if i >= 64 and secondary_entry_type_pure == DE_STREAM_EXTENSION: # This secondary entry is wrong, skip the set.
is_invalid_set = True
break
if secondary_entry_type_pure == DE_STREAM_EXTENSION:
stream_flags = secondary_buf[i + 1]
stream_allocation_possible, stream_no_fat_chain = DecodeFlags(stream_flags)
if not stream_allocation_possible: # Something is wrong, skip the set.
is_invalid_set = True
break
# According to [US10726147B2], the "Reserved1" field is used to store the padding size (in bytes) for encrypted files.
# And the "Reserved2" field is split and its first byte is used to store the EFS header size (in 4096-byte increments). The second byte is still reserved.
# These are also found in the current official implementation of the exFAT driver.
#
# Also, for encrypted files with no user data: the file size is 4096 bytes (the file contains the EFS header only), but the valid data length is 0 bytes.
efs_padding_size, name_length, name_hash, efs_header_size, __, valid_data_length, __, first_cluster, data_length = struct.unpack('<BBHBBQLLQ', secondary_buf[i + 2 : i + 32])
if efs_header_size > 0:
efs_header_size = efs_header_size * 4096
if name_length == 0: # The name is invalid, skip the set. This is 0 if the name length is 256 (a bug in fuse-exfat).
is_invalid_set = True
break
if name_length <= 15:
max_file_name_count = 1
elif name_length % 15 == 0:
max_file_name_count = name_length // 15
else:
max_file_name_count = (name_length // 15) + 1
if secondary_entry_type_pure == DE_FILE_NAME:
file_name_count += 1
if file_name_count > max_file_name_count or file_name_count > MAX_NAME_ENTRIES:
is_invalid_set = True
break
file_name_flags = secondary_buf[i + 1]
# It is unclear from [EXFAT 1.00] if file name entries must have the AllocationPossible flag set to 0.
# We assume that this flag is not important here. So, do not check it.
file_name_part_raw = secondary_buf[i + 2 : i + 32]
file_name_entities.append(file_name_part_raw)
if found_vendor_extension and secondary_entry_type_pure not in [DE_VENDOR_EXTENSION, DE_VENDOR_ALLOCATION]:
is_invalid_set = True
break
if secondary_entry_type_pure in [DE_VENDOR_EXTENSION, DE_VENDOR_ALLOCATION]: # Found a vendor-specific extension, no other critical entry types can follow this one.
found_vendor_extension = True
# Vendor-specific extensions are not supported, so do not validate them.
i += 32
if is_invalid_set: # Skip this invalid set.
pos += 32
continue
size = data_length
no_fat_chain = stream_no_fat_chain
name = BuildName(file_name_entities, name_length)
if name is None: # Something is wrong with the name, skip the set.
pos += 32
continue
yield FileEntry(is_deleted, is_directory, name, atime, atz, mtime, mtz, ctime, ctz, size, valid_data_length, attributes, first_cluster, is_encrypted, no_fat_chain)
pos += 32 + secondary_count * 32
continue
if self.is_root and entry_type_pure == DE_VOLUME_LABEL and is_in_use: # This is what we want (a volume label found in the root directory).
label_count += 1
if label_count >= 2:
raise DirectoryEntriesException('More than one volume label found')
label_character_count = self.clusters_buf[pos + 1]
# According to [EXFAT 1.00], the volume label limit is 11 characters.
# However, at least one third-party implementation (exfatlabel) and one old official implementation (the Windows 7 exFAT driver) allow 15 characters.
# Such volume labels can be read (but not set) by the current Windows driver. In particular:
# - for a volume label to be read, the limit is 15 characters;
# - for a volume label to be set, the limit is 11 characters.
if label_character_count > 15:
raise DirectoryEntriesException('Invalid volume label length specified')
if label_character_count > 0:
volume_label = self.clusters_buf[pos + 2 : pos + 2 + label_character_count * 2]
volume_label = BuildLabel(volume_label, label_character_count)
if volume_label is None: # The volume label is invalid.
raise DirectoryEntriesException('Invalid volume label characters found')
yield VolumeLabelEntry(volume_label)
if self.is_root and entry_type_pure == DE_ALLOCATION_BITMAP and is_in_use: # This is what we want (an allocation bitmap found in the root directory).
bitmap_count += 1
if bitmap_count >= 3:
raise DirectoryEntriesException('More than two allocation bitmaps found')
bitmap_flags = self.clusters_buf[pos + 1]
bitmap_id = bitmap_flags & 1
bitmap_first_cluster = struct.unpack('<L', self.clusters_buf[pos + 20 : pos + 24])[0]
bitmap_data_length = struct.unpack('<Q', self.clusters_buf[pos + 24 : pos + 32])[0]
yield AllocationBitmapEntry(bitmap_id, bitmap_first_cluster, bitmap_data_length)
else: # A secondary entry found outside of a valid directory set.
if entry_type_pure == DE_FILE_NAME:
file_name_entities = []
i = 0
next_entry_type = None
while next_entry_type == entry_type or next_entry_type is None:
this_file_name_flags = self.clusters_buf[pos + i * 32 + 1]
# It is unclear from [EXFAT 1.00] if file name entries must have the AllocationPossible flag set to 0.
# Previously, we assumed that this flag is not important. Now, when dealing with orphan entries, check it.
if this_file_name_flags != 0: # This is unusual, skip.
break
file_name_part_raw = self.clusters_buf[pos + i * 32 + 2 : pos + i * 32 + 32]
file_name_entities.append(file_name_part_raw)
i += 1
if i >= MAX_NAME_ENTRIES:
break
if file_name_part_raw.endswith(b'\x00\x00'): # This was the last entry in the set.
break