This repository has been archived by the owner on Sep 6, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
base.py
353 lines (292 loc) · 12.1 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
from __future__ import unicode_literals
import hashlib
import io
import json
import logging
import os
import posixpath
import subprocess
from django.conf import settings
from django.contrib.staticfiles.storage import staticfiles_storage
from django.utils.encoding import force_text
import semantic_version
logger = logging.getLogger(__name__)
JSPM_LOG_VERSION = semantic_version.Version('0.16.3')
SOURCEMAPPING_URL_COMMENT = b'//# sourceMappingURL='
NODE_ENV_VAR = 'NODE_PATH'
class BundleError(OSError):
pass
class System(object):
def __init__(self, **opts):
self.opts = opts
self.stdout = self.stdin = self.stderr = subprocess.PIPE
self.cwd = None
self.version = None # JSPM version
def _has_jspm_log(self):
return self.jspm_version and self.jspm_version >= JSPM_LOG_VERSION
def get_jspm_version(self, opts):
jspm_bin = opts['jspm']
cmd = '{} --version'.format(jspm_bin)
proc = subprocess.Popen(
cmd, shell=True, cwd=self.cwd, stdout=self.stdout,
stdin=self.stdin, stderr=self.stderr)
result, err = proc.communicate() # block until it's done
if err:
raise BundleError("Could not determine JSPM version, error: %s", err)
version_string = result.decode().split()[0]
return semantic_version.Version(version_string, partial=True)
@property
def jspm_version(self):
if not self.version:
options = self.opts.copy()
options.setdefault('jspm', settings.SYSTEMJS_JSPM_EXECUTABLE)
self.version = self.get_jspm_version(options)
return self.version
def bundle(self, app):
bundle = SystemBundle(self, app, **self.opts)
return bundle.bundle()
@staticmethod
def get_bundle_path(app):
"""
Returns the path relative to STATIC_URL for the bundle for app.
"""
bundle = SystemBundle(None, app)
return bundle.get_paths()[1]
class SystemBundle(object):
"""
Represents a single app to be bundled.
"""
def __init__(self, system, app, **options):
"""
Initialize a SystemBundle object.
:param system: a System instance that holds the non-bundle specific
meta information (such as jspm version, configuration)
:param app: string, the name of the JS package to bundle. This may be
missing the '.js' extension.
:param options: dict containing the bundle-specific options. Possible
options:
`jspm`: `jspm` executable (if it's not on $PATH, for example)
`log`: logging mode for jspm, can be ok|warn|err. Only available
for jspm >= 0.16.3
`minify`: boolean, whether go generate minified bundles or not
`sfx`: boolean, generate a self-executing bundle or not
"""
self.system = system
self.app = app
# set the bundle options
options.setdefault('jspm', settings.SYSTEMJS_JSPM_EXECUTABLE)
self.opts = options
bundle_cmd = 'bundle-sfx' if self.opts.get('sfx') else 'bundle'
self.command = '{jspm} ' + bundle_cmd + ' {app} {outfile}'
self.stdout = self.stdin = self.stderr = subprocess.PIPE
def get_outfile(self):
js_file = '{app}{ext}'.format(app=self.app, ext='.js' if self.needs_ext() else '')
outfile = os.path.join(settings.STATIC_ROOT, settings.SYSTEMJS_OUTPUT_DIR, js_file)
return outfile
def get_paths(self):
"""
Return a tuple with the absolute path and relative path (relative to STATIC_URL)
"""
outfile = self.get_outfile()
rel_path = os.path.relpath(outfile, settings.STATIC_ROOT)
return outfile, rel_path
def needs_ext(self):
"""
Check whether `self.app` is missing the '.js' extension and if it needs it.
"""
if settings.SYSTEMJS_DEFAULT_JS_EXTENSIONS:
name, ext = posixpath.splitext(self.app)
if not ext:
return True
return False
def bundle(self):
"""
Bundle the app and return the static url to the bundle.
"""
outfile, rel_path = self.get_paths()
options = self.opts
if self.system._has_jspm_log():
self.command += ' --log {log}'
options.setdefault('log', 'err')
if options.get('minify'):
self.command += ' --minify'
try:
cmd = self.command.format(app=self.app, outfile=outfile, **options)
proc = subprocess.Popen(
cmd, shell=True, cwd=self.system.cwd, stdout=self.stdout,
stdin=self.stdin, stderr=self.stderr)
result, err = proc.communicate() # block until it's done
if err and self.system._has_jspm_log():
fmt = 'Could not bundle \'%s\': \n%s'
logger.warn(fmt, self.app, err)
raise BundleError(fmt % (self.app, err))
if result.strip():
logger.info(result)
except (IOError, OSError) as e:
if isinstance(e, BundleError):
raise
raise BundleError('Unable to apply %s (%r): %s' % (
self.__class__.__name__, cmd, e))
else:
if not options.get('sfx'):
# add the import statement, which is missing for non-sfx bundles
sourcemap = find_sourcemap_comment(outfile)
with open(outfile, 'a') as of:
of.write("\nSystem.import('{app}{ext}');\n{sourcemap}".format(
app=self.app,
ext='.js' if self.needs_ext() else '',
sourcemap=sourcemap if sourcemap else '',
))
return rel_path
class SystemTracer(object):
def __init__(self, node_path=None):
node_env = os.environ.copy()
if node_path and NODE_ENV_VAR not in node_env:
node_env[NODE_ENV_VAR] = node_path
self.env = node_env
self.name = 'deps.json'
self.storage = staticfiles_storage
self._trace_cache = {}
@property
def cache_file_path(self):
if not os.path.exists(settings.SYSTEMJS_CACHE_DIR):
os.makedirs(settings.SYSTEMJS_CACHE_DIR)
return os.path.join(settings.SYSTEMJS_CACHE_DIR, self.name)
def trace(self, app):
"""
Trace the dependencies for app.
A tracer-instance is shortlived, and re-tracing the same app should
yield the same results. Since tracing is an expensive process, cache
the result on the tracer instance.
"""
if app not in self._trace_cache:
process = subprocess.Popen(
"trace-deps.js {}".format(app), shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=self.env, universal_newlines=True
)
out, err = process.communicate()
self._trace_cache[app] = json.loads(out)
return self._trace_cache[app]
def get_hash(self, path):
md5 = hashlib.md5()
with self.storage.open(path) as infile:
for chunk in infile.chunks():
md5.update(chunk)
return md5.hexdigest()
def write_depcache(self, app_deps, bundle_options): # TODO: use storage
all_deps = {
'version': 1,
'packages': app_deps,
'hashes': {},
'options': bundle_options,
}
for pkg_deptree in app_deps.values():
for module, info in pkg_deptree.items():
path = info['path']
if path not in all_deps['hashes']:
all_deps['hashes'][path] = self.get_hash(path)
with open(self.cache_file_path, 'w') as outfile:
json.dump(all_deps, outfile)
@property
def cached_deps(self):
if not hasattr(self, '_depcache'):
with open(self.cache_file_path, 'r') as infile:
self._depcache = json.load(infile)
return self._depcache
def get_depcache(self, app):
# cache in memory for faster lookup
if self.cached_deps.get('version') == 1:
return self.cached_deps['packages'].get(app)
else:
raise NotImplementedError # noqa
def get_hashes(self):
if self.cached_deps.get('version') == 1:
return self.cached_deps['hashes']
else:
raise NotImplementedError # noqa
def get_bundle_options(self):
if self.cached_deps.get('version') == 1:
return self.cached_deps.get('options')
else:
raise NotImplementedError # noqa
def hashes_match(self, dep_tree):
"""
Compares the app deptree file hashes with the hashes stored in the
cache.
"""
hashes = self.get_hashes()
for module, info in dep_tree.items():
md5 = self.get_hash(info['path'])
if md5 != hashes[info['path']]:
return False
return True
def check_needs_update(self, app):
cached_deps = self.get_depcache(app)
deps = self.trace(app)
# no re-bundle needed if the trees, mtimes and file hashes match
if deps == cached_deps and self.hashes_match(deps):
return False
return True
def find_sourcemap_comment(filepath, block_size=100):
"""
Seeks and removes the sourcemap comment. If found, the sourcemap line is
returned.
Bundled output files can have massive amounts of lines, and the sourceMap
comment is always at the end. So, to extract it efficiently, we read out the
lines of the file starting from the end. We look back at most 2 lines.
:param:filepath: path to output bundle file containing the sourcemap comment
:param:blocksize: integer saying how many bytes to read at once
:return:string with the sourcemap comment or None
"""
MAX_TRACKBACK = 2 # look back at most 2 lines, catching potential blank line at the end
block_number = -1
# blocks of size block_size, in reverse order starting from the end of the file
blocks = []
sourcemap = None
try:
# open file in binary read+write mode, so we can seek with negative offsets
of = io.open(filepath, 'br+')
# figure out what's the end byte
of.seek(0, os.SEEK_END)
block_end_byte = of.tell()
# track back for maximum MAX_TRACKBACK lines and while we can track back
while block_end_byte > 0 and MAX_TRACKBACK > 0:
if (block_end_byte - block_size > 0):
# read the last block we haven't yet read
of.seek(block_number*block_size, os.SEEK_END)
blocks.append(of.read(block_size))
else:
# file too small, start from begining
of.seek(0, os.SEEK_SET)
# only read what was not read
blocks = [of.read(block_end_byte)]
# update variables that control while loop
content = b''.join(reversed(blocks))
lines_found = content.count(b'\n')
MAX_TRACKBACK -= lines_found
block_end_byte -= block_size
block_number -= 1
# early check and bail out if we found the sourcemap comment
if SOURCEMAPPING_URL_COMMENT in content:
offset = 0
# splitlines eats the last \n if its followed by a blank line
lines = content.split(b'\n')
for i, line in enumerate(lines):
if line.startswith(SOURCEMAPPING_URL_COMMENT):
offset = len(line)
sourcemap = line
break
while i+1 < len(lines):
offset += 1 # for the newline char
offset += len(lines[i+1])
i += 1
# track back until the start of the comment, and truncate the comment
if sourcemap:
offset += 1 # for the newline before the sourcemap comment
of.seek(-offset, os.SEEK_END)
of.truncate()
return force_text(sourcemap)
finally:
of.close()
return sourcemap