-
-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathdigester.py
187 lines (136 loc) · 5.52 KB
/
digester.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import functools
import glob
import gzip
import hashlib
import json
import os.path
import re
import shutil
DIGESTED_FILE_REGEX = r"-[a-f\d]{32}"
CHUNK_SIZE = 1024 * 1024
def compile(
input_path, output_path, digest_blacklist_filter, gzip_files, brotli_files
):
"""
Generate md5 tagged static files compressed with gzip and brotli.
:param input_path: The source path of your static files
:type input_path: str
:param output_path: The destination path of your static files
:type output_path: str
:param digest_blacklist_filter: Ignore compiling these file types
:type digest_blacklist_filter: list
:param gzip_files: Whether or not gzipped files will be generated
:type gzip_files: bool
:param brotli_files: Whether or not brotli files will be generated
:type brotli_files: bool
:return: None
"""
if not os.path.exists(input_path):
print(f"The input path '{input_path}' does not exist")
return None
if not os.path.exists(output_path):
print(f"The output path '{output_path}' does not exist")
return None
files = _filter_files(input_path, digest_blacklist_filter)
manifest = _generate_manifest(files, gzip_files, brotli_files, output_path)
_save_manifest(manifest, output_path)
print(f"Check your digested files at '{output_path}'")
return None
def clean(output_path, digest_blacklist_filter, gzip_files, brotli_files):
"""
Delete the generated md5 tagged and gzipped static files.
:param input_path: The source path of your static files
:type input_path: str
:param output_path: The destination path of your static files
:type output_path: str
:param digest_blacklist_filter: Ignore cleaning these file types
:type digest_blacklist_filter: list
:param gzip_files: Whether or not gzipped files will be cleaned
:type gzip_files: bool
:param brotli_files: Whether or not brotli files will be cleaned
:type brotli_files: bool
:return: None
"""
for item in glob.iglob(output_path + "**/**", recursive=True):
if os.path.isfile(item):
_, file_extension = os.path.splitext(item)
basename = os.path.basename(item)
if (
re.search(DIGESTED_FILE_REGEX, basename)
and file_extension not in digest_blacklist_filter
):
if os.path.exists(item):
os.remove(item)
if gzip_files and file_extension == ".gz":
if os.path.exists(item):
os.remove(item)
if brotli_files and file_extension == ".br":
if os.path.exists(item):
os.remove(item)
manifest_path = os.path.join(output_path, "cache_manifest.json")
if os.path.exists(manifest_path):
os.remove(manifest_path)
print(f"Check your cleaned files at '{output_path}'")
return None
def _filter_files(input_path, digest_blacklist_filter):
filtered_files = []
for item in glob.iglob(input_path + "**/**", recursive=True):
if os.path.isfile(item):
if not _is_compiled_file(item, digest_blacklist_filter):
filtered_files.append(item)
return filtered_files
def _is_compiled_file(file_path, digest_blacklist_filter):
file_name, file_extension = os.path.splitext(file_path)
basename = os.path.basename(file_path)
return (
re.search(DIGESTED_FILE_REGEX, basename)
or file_extension in digest_blacklist_filter
or file_extension == ".gz"
or file_extension == ".br"
or basename == "cache_manifest.json"
)
def _generate_manifest(files, gzip_files, brotli_files, output_path):
manifest = {}
for file in files:
rel_file_path = os.path.relpath(file, output_path).replace("\\", "/")
file_name, file_extension = os.path.splitext(rel_file_path)
digest = _generate_digest(file)
digested_file_path = f"{file_name}-{digest}{file_extension}"
manifest[rel_file_path] = digested_file_path
_write_to_disk(
file, digested_file_path, gzip_files, brotli_files, output_path
)
return manifest
def _generate_digest(file):
digest = None
with open(file, "rb") as f:
digest = hashlib.md5(f.read()).hexdigest()
return digest
def _save_manifest(manifest, output_path):
manifest_content = json.dumps(manifest)
manifest_path = os.path.join(output_path, "cache_manifest.json")
with open(manifest_path, "w") as f:
f.write(manifest_content)
return None
def _write_to_disk(
file, digested_file_path, gzip_files, brotli_files, input_path
):
full_digested_file_path = os.path.join(input_path, digested_file_path)
# Copy file while preserving permissions and meta data if supported.
shutil.copy2(file, full_digested_file_path)
if gzip_files:
with open(file, "rb") as f_in:
with gzip.open(f"{file}.gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
shutil.copy2(f"{file}.gz", f"{full_digested_file_path}.gz")
if brotli_files:
import brotli
compressor = brotli.Compressor(quality=11)
with open(file, "rb") as f_in:
with open(f"{file}.br", "wb") as f_out:
read_chunk = functools.partial(f_in.read, CHUNK_SIZE)
for data in iter(read_chunk, b""):
f_out.write(compressor.process(data))
f_out.write(compressor.finish())
shutil.copy2(f"{file}.br", f"{full_digested_file_path}.br")
return None