-
Notifications
You must be signed in to change notification settings - Fork 0
/
smart_compress.py
285 lines (231 loc) · 11.3 KB
/
smart_compress.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
'''
Created on 2016年10月21日
@author: caifh
使用"https://tinypng.com/"网站提供的 smart compression 技术对指定文件后缀名的PNG及JPG文件进行压缩;
支持文件夹指压缩和单文件压缩;
因为每一个key有压缩数据限制(免费版key支持500个),所以如果要大批量进行图片压缩,需要自己先搞定key的问题;
功能验证列表:
1.先验证整体拷贝和非整体拷贝功能是否正常;(E:\temp\cloud_res\mini_src_01)
2.验证网上压缩功能是否正常;(E:\temp\cloud_res\mini_src_01)
3.验证拷贝只打标签功能是否正常;(E:\temp\cloud_res\mini_src_02)
4.验证本地更新功能是否正常;(E:\temp\cloud_res\mini_src_03)
5.验证本地只打标签功能是否正常;(E:\temp\cloud_res\mini_src_04)
'''
import tinify
import argparse
import os
import creditutils.file_util as myfile
import creditutils.trivial_util as utility
import shutil
import piexif
import traceback
import sys
import imghdr
import tempfile
import creditutils.png_util as png_util
class ImageType:
png = 'png'
jpeg = 'jpeg'
class Compression:
def __init__(self, cer_key):
tinify.key = cer_key
# 必须保证是成员方法,保证tinify.key 可被正常初始化
@staticmethod
def compress_file(src, dst):
if not os.path.exists(src):
raise Exception('{} not exists'.format(src))
try:
source = tinify.from_file(src)
dir_name = os.path.dirname(dst)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
source.to_file(dst)
except Exception:
print('error.message: ', traceback.format_exc())
sys.exit(1)
class ProcessManager:
COMPRESSED_FLAG_CHUNK_INDEX = 1
SUPPORT_IMAGE_SUFFIX = ['.png', '.jpg', '.jpeg']
COMPRESSED_KEY = 'UserComment'
def __init__(self, args):
# 先将输入的控制参数全部存储为成员变量
for name, value in vars(args).items():
setattr(self, name, value)
self.src = os.path.abspath(self.src)
self.comp = Compression(self.cer_key)
self.temp_dir = None
def process(self):
try:
if not self.dst:
self.temp_dir = tempfile.mkdtemp()
if os.path.isfile(self.src):
if self.dst:
ori_dst = os.path.abspath(myfile.normalpath(self.dst))
if ori_dst.endswith(os.path.sep):
target_path = os.path.join(ori_dst, os.path.basename(self.src))
else:
target_path = ori_dst
self.process_common_func(self.src, target_path, self.identification, self.old_key, self.to_copy, self.tag_only)
else:
self.process_modify_func(self.src, self.identification, self.old_key, self.tag_only)
elif os.path.isdir(self.src):
if self.dst:
myfile.process_dir_src_to_dst(self.src, self.dst, self.process_common_func, self.identification, self.old_key, self.to_copy, self.tag_only)
else:
myfile.process_dir(self.src, self.process_modify_func, self.identification, self.old_key, self.tag_only)
else:
raise Exception('{} not exists'.format(self.src))
finally:
if not self.dst:
if os.path.isdir(self.temp_dir):
shutil.rmtree(self.temp_dir)
# to_copy 标识是否直接拷贝不相干或者已经压缩过文件, tag_only 标识是否只是纯粹加压缩标签
def process_common_func(self, src_file, dst_file, compressed_identify, old_compressed_identify,
to_copy=False, tag_only=False):
ext_name = os.path.splitext(src_file.lower())[1]
filename = os.path.split(src_file)[1]
# 非指定后缀名文件无须处理
if ext_name not in ProcessManager.SUPPORT_IMAGE_SUFFIX:
if to_copy:
# 直接复制到目标文件夹
shutil.copyfile(src_file, dst_file)
print("copy {} to {} success".format(src_file, dst_file))
return
if filename.lower().endswith('.9.png'):
if to_copy:
# 如果是.9图则直接复制到目标文件夹
shutil.copyfile(src_file, dst_file)
print("copy {} to {} success".format(src_file, dst_file))
return
# 判断是否已经压缩过
is_compressed = ProcessManager.check_if_compressed(src_file, compressed_identify, old_compressed_identify)
if is_compressed:
if to_copy:
# 已经压缩过则复制到目标文件夹
shutil.copyfile(src_file, dst_file)
print("copy {} to {} success".format(src_file, dst_file))
return
else:
# 如果只是标记是否压缩,则无须上传网络压缩(用于标记已经压缩但是没有打标记的文件)
if not tag_only:
print(src_file, dst_file)
self.comp.compress_file(src_file, dst_file)
print('smart compress "{}" to "{}" success.'.format(src_file, dst_file))
else:
shutil.copyfile(src_file, dst_file)
if os.path.isfile(dst_file):
# 为目标文件增加压缩标识
ProcessManager.add_compressed_flag(dst_file, compressed_identify)
print('add compressed flag to "{}" success.'.format(dst_file))
else:
print('process "{}" to "{}" failed!'.format(src_file, dst_file))
# tag_only 标识是否只是纯粹加压缩标签
def process_modify_func(self, src_file, compressed_identify, old_compressed_identify, tag_only=False):
ext_name = os.path.splitext(src_file.lower())[1]
filename = os.path.split(src_file)[1]
# 非指定后缀名的文件无须处理
if ext_name not in ProcessManager.SUPPORT_IMAGE_SUFFIX:
return
# android 使用的是.9图片无须处理
if filename.lower().endswith('.9.png'):
return
# 已经压缩过的无须处理
is_compressed = ProcessManager.check_if_compressed(src_file, compressed_identify, old_compressed_identify)
if is_compressed:
return
else:
temp_name = os.path.basename(tempfile.mktemp()) + os.path.basename(src_file)
temp_file = os.path.join(self.temp_dir, temp_name)
shutil.copyfile(src_file, temp_file)
# 如果只是标记是否压缩,则无须上传网络压缩(用于标记已经压缩但是没有打标记的文件)
if not tag_only:
self.comp.compress_file(temp_file, src_file)
print('smart compress "{}" success.'.format(src_file))
# 为目标文件增加压缩标识
ProcessManager.add_compressed_flag(src_file, compressed_identify)
print('add compressed flag to "{}" success.'.format(src_file))
# 给png 图片文件增加压缩标识
@staticmethod
def add_compressed_flag_to_png_file(file_path, compressed_identify):
png_util.insert_text_chunk(file_path, compressed_identify,
ProcessManager.COMPRESSED_FLAG_CHUNK_INDEX)
# 给jpeg 图片文件增加压缩标识
@staticmethod
def add_compressed_flag_to_jpeg_file(file_path, compressed_identify):
exif_dict = piexif.load(file_path)
comment_tag = piexif.ExifIFD.UserComment
exif_info = exif_dict['Exif']
exif_info[comment_tag] = compressed_identify.encode()
piexif.insert(piexif.dump(exif_dict), file_path)
# 给图片文件增加压缩标识
@staticmethod
def add_compressed_flag(src_path, compressed_identify):
img_type = imghdr.what(src_path)
if ImageType.jpeg == img_type:
ProcessManager.add_compressed_flag_to_jpeg_file(src_path, compressed_identify)
elif ImageType.png == img_type:
ProcessManager.add_compressed_flag_to_png_file(src_path, compressed_identify)
# 检测png 文件是否包含压缩标识
@staticmethod
def check_if_png_compressed(file_path, compressed_identify, old_compressed_identify):
# print(f'file_path: {file_path}.')
data = png_util.get_text_chunk_data(file_path, 1)
if data:
if data == compressed_identify:
return True
# 检查是否有旧的标识,替换成新的标识
if data == old_compressed_identify:
ProcessManager.add_compressed_flag(file_path, compressed_identify)
return True
else:
return False
# 检测jpeg 文件是否包含压缩标识
@staticmethod
def check_if_jpeg_compressed(file_path, compressed_identify, old_compressed_identify):
exif_dict = piexif.load(file_path)
comment_tag = piexif.ExifIFD.UserComment
exif_info = exif_dict['Exif']
if exif_info:
if comment_tag in exif_info:
if exif_info[comment_tag] == compressed_identify.encode():
return True
# 检查是否有旧的标识,替换成新的标识
if exif_info[comment_tag] == old_compressed_identify:
ProcessManager.add_compressed_flag(file_path, compressed_identify)
return True
return False
# 检查文件是否含有压缩标识
@staticmethod
def check_if_compressed(file_path, compressed_identify, old_compressed_identify):
img_type = imghdr.what(file_path)
if ImageType.png == img_type:
return ProcessManager.check_if_png_compressed(file_path, compressed_identify, old_compressed_identify)
elif ImageType.jpeg == img_type:
return ProcessManager.check_if_jpeg_compressed(file_path, compressed_identify, old_compressed_identify)
return False
def main(args):
manager = ProcessManager(args)
manager.process()
# 对输入参数进行解析,设置相应参数
def get_args(src_args=None):
parser = argparse.ArgumentParser(description='smart compress image(PNG or JPEG).')
parser.add_argument('src', metavar='src',
help='source file or directory')
parser.add_argument('-k', dest='cer_key', default=None, required=True,
help='indicate the certificate key.')
parser.add_argument('-o', dest='dst', metavar='dst', default=None,
help='target file or directory')
parser.add_argument('-a', dest='to_copy', action='store_true', default=False,
help='specify if to copy all files!')
parser.add_argument('-t', dest='tag_only', action='store_true', default=False,
help='specify only to tag compressed!')
parser.add_argument('-i', dest='identification', default='ImageCompressed',
help='the label of the compressed picture')
parser.add_argument('--old', dest='old_key', default=None,
help='the old label, and will be replaced with new default label.')
return parser.parse_args(src_args)
if __name__ == '__main__':
test_args = ''.split()
test_args = None
input_args = get_args(test_args)
utility.measure_time(main, input_args)