In [None]:
import pickle
import os
import math
import pywt
import numpy as np
import pandas as pd
import datetime as dt
import mysql.connector as c
from scipy.interpolate import interp1d
from scipy.stats import variation, linregress

feed_dict = pickle.load(open('feed.pickle','rb'))
time_format = '%Y-%m-%dT%H:%M:%SZ'
epoch_time = dt.datetime.strptime('2017-04-01T00:00:00Z', time_format).timestamp()

def magic(x, threshold=200):
    return x/abs(x)*(threshold + math.log(abs(x - threshold + 1))) if abs(x) > threshold else x



In [None]:
def feature_single(series):
    
    len_of_2days = 1440 * 2
    f_dict = dict()
    feature = []
    samples = []
    
    # 对数据进行插值
    tlist = [(series[i][0].timestamp() - epoch_time)/60 for i in range(len(series))]
    vlist = [series[i][1] for i in range(len(series))]
    tlist = [0.0] + tlist + [len_of_2days]
    vlist = [vlist[0]] + vlist + [vlist[-1]]
    try:
        interp_f = interp1d(tlist, vlist)
        for cur_time in range(0, len_of_2days, 10):
            v = interp_f(cur_time)  # 每十分钟进行一次采样
            samples.append(magic(v))
    except ValueError:
        print('Detect error!')
        return None

    # 常规参数
    sample_np = np.array(samples)
    s_ave = np.average(sample_np)
    feature.append(s_ave)
    feature.append(variation(sample_np))
    feature.append(np.min(sample_np))
    feature.append(np.max(sample_np))

    # 小波系数
    w_coeff = pywt.wavedec(samples, 'haar', level=5)
    feature.append(np.linalg.norm(w_coeff[0]))
    feature.append(np.linalg.norm(w_coeff[1]))
    feature.append(np.linalg.norm(w_coeff[2]))
    feature.append(np.linalg.norm(w_coeff[3]))
    feature.append(np.linalg.norm(w_coeff[4]))

    # 对于均值的zero cross
    zc = [i for i in range(1, sample_np.size-1) if (sample_np[i] - s_ave)*(sample_np[i-1] - s_ave) > 0]
    feature.append(len(zc))

    # 一阶回归之后的整体趋势
    slope, intercept, r_value, p_value, std_err = linregress(sample_np.tolist(), list(range(len(sample_np))))
    feature.append(slope)
    feature.append(intercept)
    feature.append(r_value)
    feature.append(p_value)
    feature.append(std_err)

    # 以最大值作为分裂两部分数据的依据
    # 分别计算两部分数据的最小值，作为单日数据截断的依据
    # maxv_idx = np.argmax(sample_np)
    # minv_idx = np.argmax(-sample_np)
    # feature.append()

    # 检查数据有效性
    for f in feature:
        if math.isnan(f) or math.isinf(f):
            return None # invalid
    return feature

In [None]:
import datetime as dt
import re
import pickle

# 抽取每个设备的序列数据
feature_dict = {}

for counter, (feedid, value) in enumerate(feed_dict.items()):
    content, field_name_mapping = value
    series_dict = {}
    
    for entry in content:
        time_stamp = dt.datetime.strptime(entry['created_at'], time_format)
        for k,v in entry.items():
            re_result = re.search(r"[-+]?\d*\.\d+|\d+", str(v)) if not v is None else None
            numerical_v = float(re_result.group()) if not re_result is None else 0.0
            if k.startswith('field'):
                if (feedid,k) not in series_dict:
                    series_dict[(feedid, k)] = []
                series_dict[(feedid, k)].append((time_stamp, numerical_v))
    
    for (feedid, fieldid), vlist in series_dict.items():
        feature_result = feature_single(vlist)
        if feature_result is None:
            continue
        feature_dict[(feedid, fieldid)] = feature_result
    
    if counter % 100 == 0:
        print(counter)

pickle.dump(feature_dict, open('feature.pickle','wb'))

In [None]:
# 方法调用，从数据库中进行读取
# 
np.random.seed(0)

TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
START_TIMESTAMP = dt.datetime.strptime("2016-12-04 00:00:00", TIME_FORMAT).timestamp()
PICKLE_LOADED_FROM_DB = 'p_raw_series_dict.pickle'
PICKLE_FEATURES = 'p_label_feature.pickle'

db_conn = c.connect(user='root', password='ictwsn', host='10.22.0.77', database='curiosity_thingspeak')
print('Fetch labeled streams ...')
label_df = pd.read_sql("select * from manual_label_t where label!=''", db_conn)
label_dict = {(val['feed_id'], val['stream_id']): val['label'] for _, val in label_df.iterrows()}

if os.path.isfile(PICKLE_LOADED_FROM_DB):
    raw_series_dict = pickle.load(open(PICKLE_LOADED_FROM_DB, 'rb'))
else:
    # 数据集有变动时要删除同目录下的两个pickle文件
    raw_series_dict = fetch_raw_datapoints(db_conn, label_dict)
    pickle.dump(raw_series_dict, open(PICKLE_LOADED_FROM_DB, 'wb'))

print("Compute features for each datastream...")
feature_dict = compute_feature(raw_series_dict)

l_swap_dict = dict()
for fs_tuple, f_list in feature_dict.items():
    l_swap_dict[fs_tuple] = label_dict[fs_tuple]

result = {'label_dict': l_swap_dict, 'feature_dict': feature_dict}
pickle.dump(result, open(PICKLE_FEATURES, 'wb'))
print('Length of label_dict = ' + str(len(l_swap_dict)))
print('Length of feature_dict = ' + str(len(feature_dict)))
