In [1]:
# coding=utf8
# 手写逻辑回归，并实现垃圾短信分类
#  基于mini-batch GD，不考虑正则
#  Copyright 木豆
#  2017-11-6
import os
import sys
import jieba
from sklearn import metrics
import numpy as np
from random import randrange
import time

class logistic_regression():
    """
       手写逻辑回归，用于二分类。
       注意事项：
           1）sklearn自带的lr可以做多分类，这里我们实现的版本只能做二分类
           2）这里我们支持的label是1,0。如果您的label是1,-1，请先通过预处理进行转换
    """
    def fit(self, train_X_in, train_Y, learning_rate=0.5, batch_size=20, eps=1e-3):   
 
        case_cnt, feature_cnt = np.array(train_X_in).shape
        self.theta = np.zeros([feature_cnt + 1,1])
        train_X = np.c_[train_X_in, np.ones(case_cnt,)]
        
        step = 0
        max_iteration_times = sys.maxint
        past_best_likelihood = -sys.maxint - 1
        past_step = 0
        stay_times = 0
        X = train_X.T
        while step < max_iteration_times:
            for b in range(0, case_cnt, batch_size):  
                pred = 1.0/(1 + np.exp(-self.theta.T.dot(X[:,b : b + batch_size])))   
                self.theta = self.theta + learning_rate * 1.0/case_cnt*\
                         (train_Y[b : b + batch_size] - pred).dot(X[:, b : b + batch_size].T).T         
                      
            pred = 1.0/(1 + np.exp(-self.theta.T.dot(X)))       
            likelihood = 1.0/case_cnt*sum((train_Y * np.log(pred) + \
                       (1 - train_Y)* np.log(1 - pred)).flatten()) 
            if   likelihood > past_best_likelihood + eps:
                past_best_likelihood = likelihood
                past_step = step   
            elif  step - past_step >= 20:
                sys.stderr.write("training finished. total step %s: %.6f\n" % (step, likelihood))
                break   
            if step % 1000 == 0:
                sys.stderr.write("step %s: %.6f\n" % (step, likelihood))             
            step += 1    
        return  1
     
    def predict_proba(self, X):
        case_cnt = X.shape[0]
        X = np.c_[X, np.ones(case_cnt,)]
        return 1. / (1 + np.exp(-self.theta.T.dot(X.T)))
    
    def predict(self, X):
        case_cnt = X.shape[0]
        X = np.c_[X, np.ones(case_cnt,)]
        prob = 1. / (1 + np.exp(-self.theta.T.dot(X.T)))
        return (prob >= 0.5).astype(np.int32).flatten()
    
    def score(self, X, label):
       """ Returns the mean accuracy on the given test data and labels """
       pred = self.predict(X)
       return sum((pred == label).astype(np.int32).flatten()) * 1.0 / pred.shape[0]
    
def create_vocab_dict(dataSet):
    vocab_dict = {}    
    for document in dataSet:
        for term in document:
           if term in vocab_dict:
              vocab_dict[term] += 1
           else:
              vocab_dict[term] = 1
    return vocab_dict

def BOW_feature(vocabList, inputSet):
    returnVec = [0]*len(vocabList)
    for word in inputSet:
        if word in vocabList:
            returnVec[vocabList.index(word)] += 1
    return returnVec

def get_dataset(data_path):
    text_list = []
    labels = []
    for line in open(data_path, "r"):     
        arr = line.rstrip().split('\t')
        if len(arr) < 3:
            continue
        
        # 把标签从(1,-1)改为(1,0)    
        if int(arr[0]) == 1:
            label = arr[0]
        elif int(arr[0]) == -1:
            label = 0
        else:   # illegal label
            continue        
        text = arr[2]
        text_list.append(list(text.split()))
        labels.append(float(label))
    return text_list, labels    

if __name__ == "__main__":
    
    # 读取数据
    train_file_path = 'mudou_spam.train'
    test_file_path = 'mudou_spam.test'
    train_data, train_label = get_dataset(train_file_path)
    test_data, test_label = get_dataset(test_file_path)
    
    # 构造词典
    min_freq = 5
    vocab_dict = create_vocab_dict(train_data)
    sorted_vocab_list= sorted(vocab_dict.iteritems(), key=lambda d:d[1], reverse = True)    
    vocab_list = [ v[0]  for v  in sorted_vocab_list if int(v[1]) > min_freq ]
 
    # 生成文本的词袋（BOW）特征
    train_X = []
    for one_msg  in train_data:
        train_X.append(BOW_feature(vocab_list, one_msg))
        
    test_X = []
    for one_msg  in test_data:
        test_X.append(BOW_feature(vocab_list, one_msg))
        
    test_label = np.array(test_label)
    train_label = np.array(train_label)
    train_X = np.array(train_X)    
    test_X = np.array(test_X)    
    
    # 训练模型
    model = logistic_regression() 
    model.fit(train_X, train_label, 0.6, 100, 1e-4)
     
    # 模型评估
    accuracy_train = model.score(train_X, train_label)
    print '训练集accuracy:',accuracy_train
    accuracy_test = model.score(test_X, test_label)
    print '测试集accuracy: ',accuracy_test
    
    # pred = model.predict(test_X)     
    predict_prob_y = model.predict_proba(test_X)
    test_auc = metrics.roc_auc_score(test_label.flatten(), predict_prob_y.flatten())
    print '测试集AUC:',test_auc

SyntaxError: Missing parentheses in call to 'print'. Did you mean print('训练集accuracy:',accuracy_train)? (<ipython-input-1-97979daff33f>, line 140)

In [2]:
import numpy as np
np.zeros([10, 1])

array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.]])