forked from canonical/snapcraft
/
file_utils.py
248 lines (200 loc) · 9.08 KB
/
file_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*-
#
# Copyright (C) 2016 Canonical Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from contextlib import contextmanager, suppress
import hashlib
import logging
import os
import shutil
import subprocess
import sys
from snapcraft.internal.errors import (
RequiredCommandFailure,
RequiredCommandNotFound,
RequiredPathDoesNotExist,
)
if sys.version_info < (3, 6):
import sha3 # noqa
logger = logging.getLogger(__name__)
def replace_in_file(directory, file_pattern, search_pattern, replacement):
"""Searches and replaces patterns that match a file pattern.
:param str directory: The directory to look for files.
:param str file_pattern: The file pattern to match inside directory.
:param search_pattern: A re.compile'd pattern to search for within
matching files.
:param str replacement: The string to replace the matching search_pattern
with.
"""
for root, directories, files in os.walk(directory):
for file_name in files:
if file_pattern.match(file_name):
file_path = os.path.join(root, file_name)
# Don't bother trying to rewrite a symlink. It's either invalid
# or the linked file will be rewritten on its own.
if not os.path.islink(file_path):
search_and_replace_contents(
file_path, search_pattern, replacement)
def search_and_replace_contents(file_path, search_pattern, replacement):
"""Search file and replace any occurrence of pattern with replacement.
:param str file_path: Path of file to be searched.
:param re.RegexObject search_pattern: Pattern for which to search.
:param str replacement: The string to replace pattern.
"""
try:
with open(file_path, 'r+') as f:
try:
original = f.read()
except UnicodeDecodeError:
# This was probably a binary file. Skip it.
return
replaced = search_pattern.sub(replacement, original)
if replaced != original:
f.seek(0)
f.truncate()
f.write(replaced)
except PermissionError as e:
logger.warning('Unable to open {path} for writing: {error}'.format(
path=file_path, error=e))
def link_or_copy(source, destination, follow_symlinks=False):
"""Hard-link source and destination files. Copy if it fails to link.
Hard-linking may fail (e.g. a cross-device link, or permission denied), so
as a backup plan we just copy it.
:param str source: The source to which destination will be linked.
:param str destination: The destination to be linked to source.
:param bool follow_symlinks: Whether or not symlinks should be followed.
"""
try:
# Note that follow_symlinks doesn't seem to work for os.link, so we'll
# implement this logic ourselves using realpath.
source_path = source
if follow_symlinks:
source_path = os.path.realpath(source)
if not os.path.exists(os.path.dirname(destination)):
create_similar_directory(
os.path.dirname(source_path),
os.path.dirname(destination))
# Setting follow_symlinks=False in case this bug is ever fixed
# upstream-- we want this function to continue supporting NOT following
# symlinks.
os.link(source_path, destination, follow_symlinks=False)
except OSError:
# If os.link raised an I/O error, it may have left a file behind.
# Skip on OSError in case it doesn't exist or is a directory.
with suppress(OSError):
os.unlink(destination)
shutil.copy2(source, destination, follow_symlinks=follow_symlinks)
uid = os.stat(source, follow_symlinks=follow_symlinks).st_uid
gid = os.stat(source, follow_symlinks=follow_symlinks).st_gid
try:
os.chown(destination, uid, gid, follow_symlinks=follow_symlinks)
except PermissionError as e:
logger.debug('Unable to chown {destination}: {error}'.format(
destination=destination, error=e))
def link_or_copy_tree(source_tree, destination_tree,
copy_function=link_or_copy):
"""Copy a source tree into a destination, hard-linking if possible.
:param str source_tree: Source directory to be copied.
:param str destination_tree: Destination directory. If this directory
already exists, the files in `source_tree`
will take precedence.
:param callable copy_function: Callable that actually copies.
"""
if not os.path.isdir(source_tree):
raise NotADirectoryError('{!r} is not a directory'.format(source_tree))
if (not os.path.isdir(destination_tree) and
os.path.exists(destination_tree)):
raise NotADirectoryError(
'Cannot overwrite non-directory {!r} with directory '
'{!r}'.format(destination_tree, source_tree))
create_similar_directory(source_tree, destination_tree)
for root, directories, files in os.walk(source_tree):
for directory in directories:
source = os.path.join(root, directory)
# os.walk doesn't by default follow symlinks (which is good), but
# it includes symlinks that are pointing to directories in the
# directories list. We want to treat it as a file, here.
if os.path.islink(source):
files.append(directory)
continue
destination = os.path.join(
destination_tree, os.path.relpath(source, source_tree))
create_similar_directory(source, destination)
for file_name in files:
source = os.path.join(root, file_name)
destination = os.path.join(
destination_tree, os.path.relpath(source, source_tree))
copy_function(source, destination)
def create_similar_directory(source, destination, follow_symlinks=False):
"""Create a directory with the same permission bits and owner information.
:param str source: Directory from which to copy name, permission bits, and
owner information.
:param str destintion: Directory to create and to which the `source`
information will be copied.
:param bool follow_symlinks: Whether or not symlinks should be followed.
"""
stat = os.stat(source, follow_symlinks=follow_symlinks)
uid = stat.st_uid
gid = stat.st_gid
os.makedirs(destination, exist_ok=True)
try:
os.chown(destination, uid, gid, follow_symlinks=follow_symlinks)
except PermissionError as exception:
logger.debug('Unable to chown {}: {}'.format(destination, exception))
shutil.copystat(source, destination, follow_symlinks=follow_symlinks)
def executable_exists(path):
"""Return True if 'path' exists and is readable and executable."""
return os.path.exists(path) and os.access(path, os.R_OK | os.X_OK)
@contextmanager
def requires_command_success(command, not_found_fmt=None, failure_fmt=None):
if isinstance(command, str):
cmd_list = command.split()
else:
raise TypeError('command must be a string.')
kwargs = dict(command=command, cmd_list=cmd_list)
try:
subprocess.check_call(
cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
if not_found_fmt is not None:
kwargs['fmt'] = not_found_fmt
raise RequiredCommandNotFound(**kwargs)
except subprocess.CalledProcessError:
if failure_fmt is not None:
kwargs['fmt'] = failure_fmt
raise RequiredCommandFailure(**kwargs)
yield
@contextmanager
def requires_path_exists(path, error_fmt=None):
if not os.path.exists(path):
kwargs = dict(path=path)
if error_fmt is not None:
kwargs['fmt'] = error_fmt
raise RequiredPathDoesNotExist(**kwargs)
yield
def calculate_sha3_384(path):
"""Calculate sha3 384 hash, reading the file in 1MB chunks."""
return calculate_hash(path, algorithm='sha3_384')
def calculate_hash(path, *, algorithm):
"""Calculate the hash for path with algorithm."""
# This will raise an AttributeError if algorithm is unsupported
hasher = getattr(hashlib, algorithm)()
blocksize = 2**20
with open(path, 'rb') as f:
while True:
buf = f.read(blocksize)
if not buf:
break
hasher.update(buf)
return hasher.hexdigest()