-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
255 lines (211 loc) · 7.25 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# -*- coding: utf-8 -*-
# @Time : 2019/7/27 9:51
# @Author : Esbiya
# @Email : 18829040039@163.com
# @File : utils.py
# @Software: PyCharm
import os
import ast
import time
import json
import traceback
import redis
import logging
import functools
def get_logger(name=__file__, level=logging.INFO, logfile=None):
"""
获取logger
:param name: 名称
:param level: log等级
:return:
"""
name = name.split('/')[-1]
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ch = logging.StreamHandler()
ch.setLevel(level)
ch.setFormatter(formatter)
logger.addHandler(ch)
root_path = 'log'
if not os.path.exists(root_path):
os.mkdir(root_path)
if logfile is None:
logfile = '{}/{}.log'.format(root_path, time.strftime('%Y-%m-%d', time.localtime(time.time())))
else:
logfile = '{}/{}'.format(root_path, logfile)
fh = logging.FileHandler(logfile)
fh.setLevel(level)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def loopUnlessSeccessOrMaxTry(max_times, sleep_time=1.5):
"""
重试修饰器,被该修饰器修饰的函数,出错后会循环执行,
直到执行成功或者到达最大执行次数
:param max_times: 最大执行次数
:param sleep_time:出错后的暂停时间
:return:
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
err_cnt = 1
reset_cnt = 1
while True:
try:
result = func(self, *args, **kwargs)
return result
except Exception as e:
if self.reset_flag:
if reset_cnt >= max_times:
self.logger.error('密码错误次数过多, 请确认密码后重新登录! ')
break
self.logger.warning('账号或密码错误, 你还有{}次机会! '.format(max_times - reset_cnt))
self.username = input('账号 >> \n')
self.password = input('密码 >> \n')
reset_cnt += 1
self.reset_flag = False
time.sleep(sleep_time)
continue
else:
self.logger.warning('在执行函数: {} 时出错 -> {}'.format(func.__name__, e))
time.sleep(sleep_time)
if err_cnt >= max_times:
self.logger.error('在执行函数: {} 时出错次数过多, 请重新运行程序! '.format(func.__name__))
break
err_cnt += 1
return wrapper
return decorator
def seleniumLoopUnlessSeccessOrMaxTry(max_times, sleep_time=1):
"""
重试修饰器,被该修饰器修饰的函数,出错后会循环执行,
直到执行成功或者到达最大执行次数
:param max_times: 最大执行次数
:return:
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
cnt = 1
while True:
try:
return func(self, *args, **kwargs)
except Exception as e:
self.logger.error(u'在执行函数{}时出错 {}'.format(func.__name__, e))
# traceback.print_exc()
self.browser.execute_script('window.stop()')
time.sleep(1)
self.browser.refresh()
time.sleep(sleep_time)
if cnt >= max_times:
self.logger.error(u'出错次数过多,该函数{}已跳过执行'.format(func.__name__))
break
cnt += 1
return wrapper
return decorator
async def pyppeteerLoopUnlessSeccessOrMaxTry(max_times, sleep_time=1):
"""
重试修饰器,被该修饰器修饰的函数,出错后会循环执行,
直到执行成功或者到达最大执行次数
:param max_times: 最大执行次数
:return:
"""
async def decorator(func):
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
cnt = 1
while True:
try:
return func(self, *args, **kwargs)
except Exception as e:
self.logger.error(u'在执行函数{}时出错 {}'.format(func.__name__, e))
# traceback.print_exc()
await self.page_close()
time.sleep(sleep_time)
if cnt >= max_times:
self.logger.error(u'出错次数过多,该函数{}已跳过执行'.format(func.__name__))
break
cnt += 1
return wrapper
return decorator
def check_login():
"""
检查是否登录
:return:
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
# 读取并装载cookies
self.load_cookies()
# 检查是否已经登录了
self.check_islogin()
# 状态正常
if self.is_login:
self.logger.info('登录状态正常!')
return func(self, *args, **kwargs)
# 不正常则重新登录
else:
self.login()
return wrapper
return decorator
def check_user():
"""
检查账号密码
:return:
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if not self.username:
self.username = input('账号 >> \n')
if not self.password:
self.password = input('密码 >> \n')
return func(self, *args, **kwargs)
return wrapper
return decorator
def reset_user():
"""
重置账号密码
:return:
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if self.reset_flag:
if self.reset_num < self.max_reset_num:
self.username = input('账号 >> \n')
self.password = input('密码 >> \n')
else:
self.logger.warning('密码输入次数达到最大限制, 请确认密码重新运行程序! ')
return func(self, *args, **kwargs)
return wrapper
return decorator
def load_cookies():
"""
加载 Cookies 到 Session
:return:
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
self.session.cookies.load(ignore_discard=True)
return func(self, *args, **kwargs)
except FileNotFoundError:
return False
return wrapper
return decorator
def get_redis_client():
client = redis.StrictRedis(
host='127.0.0.1',
port=6379,
db=0,
password=None)
return client
def parse_json(s):
begin = s.find('{')
end = s.rfind('}') + 1
return json.loads(s[begin:end])