-
Notifications
You must be signed in to change notification settings - Fork 7
/
data_helper.py
404 lines (355 loc) · 13.4 KB
/
data_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time : 2018/3/31 22:54
# @Author : liujiantao
# @Site :
# @File : data_helper.py
# @Software: PyCharm
import datetime
import numpy as np
import json
import pandas as pd
import time
from sklearn.model_selection import train_test_split
from data_util import DataUtil
dtypes = {
'TERMINALNO': 'uint32',
'USER_TRIP_ALL_CNT': 'float64',
'TRIP_ID': 'float64',
'LATITUDE': 'float64',
'LONGITUDE': 'float64',
'DIRECTION': 'float64',
'HEIGHT': 'float64',
'SPEED': 'float64',
'CALLSTATE': 'float64',
'Y':'float64'
}
d_test_types = {
'TERMINALNO': 'uint32',
'USER_TRIP_ALL_CNT': 'float64',
'TRIP_ID': 'float64',
'LATITUDE': 'float64',
'LONGITUDE': 'float64',
'DIRECTION': 'float64',
'HEIGHT': 'float64',
'SPEED': 'float64',
'CALLSTATE': 'float64'
}
class DataHelper(object):
"""
数据辅助类
"""
print_str = ""
def get_data(self, path_train):
# data = pd.read_csv(path_train, dtype=dtypes,nrows =4914734)
data = pd.read_csv(path_train, dtype=dtypes)
# 随机抽取20%的测试集
# X_train, X_test = train_test_split(data, test_size= 0.3, random_state=0)
return data
def get_test_data(self, path_test):
data = pd.read_csv(path_test, dtype=d_test_types)
# X_train, X_test = train_test_split(data, test_size=0.3, random_state=0)
return data
def get_userlist(self, data):
return list(set(data['TERMINALNO'].tolist()))
def get_user_Y_list(self, data):
uy = sorted(list(data.groupby(['TERMINALNO', 'Y']).indices), key=lambda item: item[0])
y = [col[1] for col in uy]
return y
@staticmethod
def loadDataSet(path):
dataMat = []
labelMat = []
data = pd.read_csv(path, dtype=dtypes)
print("data len : " + str(len(dataMat)))
return dataMat, labelMat
@staticmethod
def my_print(*lists):
"""
如果参数列表最后一位为 False 就拒绝打印
"""
print(DataUtil.decode(lists))
@staticmethod
def print_domain(obj):
"""
领域对象打印
"""
print(json.dumps(obj, ensure_ascii=False, indent=4, default=lambda x: x.__dict__))
@staticmethod
def equal(var1, var2):
"""
验证两个浮点数 对象是否大致相等
"""
return var1 - var2 < 0.00001
@staticmethod
def assert_equal(expected, actual, message=''):
if expected != actual:
assert expected == actual, '{} 期待值:{} 实际值{}'.format(message, expected,actual)
@staticmethod
def side_effect(reason_dict, default_value):
"""
返回一个mock 方法
:param reason_dict: 第一个输入值,输出值匹配字典表
:param default_value: 未匹配的默认值
:return: 用于设置 相关mock的inner_effect属性
"""
def inner_effect(*arg):
input_name = arg[0]
if input_name in reason_dict.keys():
return reason_dict[input_name]
else:
return default_value
return inner_effect
def __gini(self, y_true, y_pred):
# check and get number of samples
assert y_true.shape == y_pred.shape
n_samples = y_true.shape[0]
# sort rows on prediction column
# (from largest to smallest)
arr = np.array([y_true, y_pred]).transpose()
true_order = arr[arr[:, 0].argsort()][::-1, 0]
pred_order = arr[arr[:, 1].argsort()][::-1, 0]
# get Lorenz curves
L_true = np.cumsum(true_order) * 1. / np.sum(true_order)
L_pred = np.cumsum(pred_order) * 1. / np.sum(pred_order)
L_ones = np.linspace(1 / n_samples, 1, n_samples)
# get Gini coefficients (area between curves)
G_true = np.sum(L_ones - L_true)
G_pred = np.sum(L_ones - L_pred)
if G_pred == 0.0 or G_true == 0.0:
return 1.0
# normalize to true Gini coefficient
return G_pred * 1.0 / G_true
@staticmethod
def dcg_score(pred, label, k=5):
order = np.argsort(pred)[::-1]
y_true = np.take(label, order[:k])
gain = 2 ** y_true - 1
discounts = np.log2(np.arange(len(y_true)) + 2)
return np.sum(gain / discounts)
@staticmethod
def idcg_score(pred, label):
order = np.argsort(pred)[::-1]
y_true = np.take(label, order)
gain = 2 ** y_true - 1
discounts = np.log2(np.arange(len(y_true)) + 2)
return np.sum(gain / discounts)
# 排序 rank score
def ndcg_score(self, pred, label, k=None):
if k == None:
dcg_max = self.idcg_score(label, label)
dcg_min = self.idcg_score(label, -label)
assert dcg_max > dcg_min
if not dcg_max:
return 0.
dcg = self.idcg_score(pred, label)
return (dcg - dcg_min) / (dcg_max - dcg_min)
dcg_max = self.dcg_score(label, label, k)
dcg_min = self.dcg_score(label, -label, k)
assert dcg_max > dcg_min
if not dcg_max:
return 0.
dcg = self.dcg_score(pred, label, k)
return (dcg - dcg_min) / (dcg_max - dcg_min)
@staticmethod
def gini(actual, pred):
assert (len(actual) == len(pred))
all = np.asarray(np.c_[actual, pred, np.arange(len(actual))], dtype=np.float)
all = all[np.lexsort((all[:, 2], -1 * all[:, 1]))]
total_losses = all[:, 0].sum()
gini_sum = all[:, 0].cumsum().sum() / total_losses
gini_sum -= (len(actual) + 1) / 2.
return gini_sum / len(actual)
def gini_normalized(self, actual, pred):
return self.gini(actual, pred) / self.gini(actual, actual)
@staticmethod
def inc_eprem(sorted_value, premium, reparations):
order = np.argsort(sorted_value)
premium_sorted = np.take(premium, order)
reparations_sorted = np.take(reparations, order)
premium_cumsum = np.cumsum(premium_sorted)
reparations_cumsum = np.cumsum(reparations_sorted)
premium_sum = np.sum(premium)
reparations_sum = np.sum(reparations)
x = premium_cumsum / premium_sum
y = reparations_cumsum / reparations_sum
return x, y
def pingan_gini(self, sorted_value, premium, reparations):
x, y = self.inc_eprem(sorted_value, premium, reparations)
ret = self.gini(x, y)
return ret
@staticmethod
def idcg_mse(sorted_value, f):
order = np.argsort(sorted_value)[::-1]
f_order = np.take(f, order)
f_rank = np.sort(f)[::-1]
ret1 = np.sum((f_order - f_rank) ** 2 / len(f_order))
ret2 = np.sqrt(ret1)
return ret2
def eval_accuracy_score(self, y_test, ytestPre):
'''
准确率\微召回率\调和平均数 计算
:param y_test:
:param ytestPre:
:return:
'''
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test, ytestPre)
print(u'准确率:' +str(100 * accuracy))
from sklearn import metrics
precision = metrics.precision_score(y_test, ytestPre, average='micro') # 微平均,精确率
print(u'微平均,精确率: ' +str(100 * precision))
recall = metrics.recall_score(y_test, ytestPre, average='macro')
print(u'微平均,召回率: ' +str(100 * recall))
f1_score = metrics.f1_score(y_test, ytestPre, average='weighted')
print(u'微平均,调和平均数: ' +str(100 * f1_score))
def eval_classification_report(self, y_test, ytestPre,target_names):
"""
分类报告
:param y_test: 实际测试值
:param ytestPre: 预测值
:param target_names: 类别标签
:return:
"""
from sklearn.metrics import classification_report
classification_report(y_test, ytestPre, target_names=target_names)
def evaluate_function(self, clf, X_test, y_test,target_names):
ytestPre = clf.predict(X_test)
from sklearn.metrics import accuracy_score
# accuracy = accuracy_score(y_test, ytestPre)
# print(u'准确率: ' +str(100 * accuracy))
# from sklearn import metrics
# precision = metrics.precision_score(y_test, ytestPre, average='micro') # 微平均,精确率
# print(u'微平均,精确率: ' +str(100 * precision))
# recall = metrics.recall_score(y_test, ytestPre, average='macro')
# print(u'微平均,召回率: ' +str(100 * recall))
# f1_score = metrics.f1_score(y_test, ytestPre, average='weighted')
# print(u'微平均,调和平均数: ' +str(100 * f1_score))
from sklearn.metrics import classification_report
print(classification_report(y_test, ytestPre, target_names=target_names))
def eval_cohen_kappa_score(self, y_test, y_Pre):
"""
函数cohen_kappa_score计算了Cohen’s kappa估计。这意味着需要比较通过不同的人工标注(numan annotators)的标签,而非分类器中正确的类。
kappa score是一个介于(-1, 1)之间的数. score>0.8意味着好的分类;0或更低意味着不好(实际是随机标签)
Kappa score可以用在二分类或多分类问题上,但不适用于多标签问题,以及超过两种标注的问题。
:param y_test:
:param y_Pre:
:return:
"""
from sklearn.metrics import cohen_kappa_score
kappa_score = cohen_kappa_score(y_test, y_Pre)
print(u'kappa score是一个介于(-1, 1)之间的数. score>0.8意味着好的分类;0或更低意味着不好(实际是随机标签): ' +str(100 * kappa_score))
"""
数据处理工具
"""
@staticmethod
def split_data(X, y, test_size=0.33):
'''
train_test_split(train_data,train_target,test_size=0.4, random_state=0)
train_data:所要划分的样本特征集
train_target:所要划分的样本结果
test_size:样本占比,如果是整数的话就是样本的数量
random_state:是随机数的种子。
随机数种子:其实就是该组随机数的编号,在需要重复试验的时候,保证得到一组一样的随机数。比如你每次都填1,
其他参数一样的情况下你得到的随机数组是一样的。但填0或不填,每次都会不一样。
随机数的产生取决于种子,随机数和种子之间的关系遵从以下两个规则:
种子不同,产生不同的随机数;种子相同,即使实例不同也产生相同的随机数。
:param X:
:param y:
:return:
'''
# 随机抽取20%的测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=0)
return X_train, X_test, y_train, y_test
@staticmethod
def timeStampToStrTime(timeStamp):
'''
时间戳转时间
:return:
'''
dateArray = datetime.datetime.utcfromtimestamp(timeStamp)
otherStyleTime = dateArray.strftime("%Y--%m--%d %H:%M:%S")
return otherStyleTime
@staticmethod
def StrTimeToTimeStamp(tss1):
'''
时间转时间戳
:return:
'''
# 转为时间数组
timeArray = time.strptime(tss1, "%Y-%m-%d %H:%M:%S")
# timeArray可以调用tm_year等
# 转为时间戳
timeStamp = int(time.mktime(timeArray))
return timeStamp, timeArray # 1381419600
@staticmethod
def is_none(d):
'''
判断字符串是否为空包括‘’,NULL,,None
:param d:
:return:
'''
return (d is None or d == 'None' or
d == '' or
d == {} or
d == [] or
d == 'NULL' or
d == 'null')
@staticmethod
def last_word_cut(text):
"""
剪接最后一个字符
"""
text = text[:len(text) - 1]
return text
@staticmethod
def is_in_str(str_list, trg_str):
is_find = False
if trg_str:
for s in str_list:
if s in trg_str:
is_find = True
break
# end for
# end if
return is_find
@staticmethod
def right_cut_by_word(text, cut_word):
"""
右向剪断字符
input: text= 'good/bye/oo', cut_word = 'bye'
output: 'good/'
"""
i = text.find(cut_word)
if i != -1:
text = text[0: i]
return text
@staticmethod
def last_word_cut_num(text, cut_num):
"""
剪接最后指定数量字符
"""
text = text[:len(text) - cut_num]
return text
@staticmethod
def decode(input_str):
"""
中文解码
"""
return json.dumps(input_str, ensure_ascii=False, indent=4, default=lambda x: str(x))
@staticmethod
def contain_var_in_string(containVar, stringVar):
'''
python判断字符串中包含某个字符的判断函数脚本
:param containVar:查找包含的字符
:param stringVar:所要查找的字符串
:return:
'''
if isinstance(stringVar, str):
if containVar in stringVar:
# if stringVar.find(containVar) > -1:
return True
else:
return False
else:
return False