In [1]:
# -*- coding: UTF-8 -*-
import sys
from time import time
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.feature import StandardScaler


In [2]:
def SetPath(sc):
    global Path
    if sc.master[0:5]=="local" :
        Path="file:/home/hduser/pythonwork/PythonProject/"
    else:   
        Path="hdfs://master:9000/user/hduser/"


In [3]:
def extract_label(record):
    label=(record[-1])
    return float(label)


In [4]:
def extract_features(field,categoriesMap,featureEnd):
    categoryIdx = categoriesMap[field[3]]
    categoryFeatures = np.zeros(len(categoriesMap))
    categoryFeatures[categoryIdx] = 1
    numericalFeatures=[convert_float(field)  for  field in field[4: featureEnd]]    
    return  np.concatenate(( categoryFeatures, numericalFeatures))


In [5]:
def convert_float(x):
    return (0 if x=="?" else float(x))


In [6]:
def PrepareData(sc): 
    #----------------------1.导入并转换数据-------------
    print("开始导入数据...")
    rawDataWithHeader = sc.textFile(Path+"data/train.tsv")
    header = rawDataWithHeader.first() 
    rawData = rawDataWithHeader.filter(lambda x:x !=header)    
    rData=rawData.map(lambda x: x.replace("\"", ""))    
    lines = rData.map(lambda x: x.split("\t"))
    print("共计：" + str(lines.count()) + "项")
    #----------------------2.建立训练评估所需数据 RDD[LabeledPoint]-------------
    print "标准化之前：",
    categoriesMap = lines.map(lambda fields: fields[3]).distinct().zipWithIndex().collectAsMap()
    labelRDD = lines.map(lambda r:  extract_label(r))
    featureRDD = lines.map(lambda r:  extract_features(r,categoriesMap,len(r) - 1))
    for i in featureRDD.first():
        print (str(i)+","),
    print ""       
    print "标准化之后：",       
    stdScaler = StandardScaler(withMean=True, withStd=True).fit(featureRDD)
    ScalerFeatureRDD=stdScaler.transform(featureRDD)
    for i in ScalerFeatureRDD.first():
        print (str(i)+","),
    labelpoint=labelRDD.zip(ScalerFeatureRDD)
    labelpointRDD=labelpoint.map(lambda r: LabeledPoint(r[0], r[1]))
    #----------------------3.以随机方式将数据分为3个部分并且返回-------------
    (trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1])
    print("将数据分trainData:" + str(trainData.count()) + 
              "   validationData:" + str(validationData.count()) +
              "   testData:" + str(testData.count()))
    return (trainData, validationData, testData, categoriesMap) #返回数据


In [7]:
def PredictData(sc,model,categoriesMap): 
    print("开始导入数据...")
    rawDataWithHeader = sc.textFile(Path+"data/test.tsv")
    header = rawDataWithHeader.first() 
    rawData = rawDataWithHeader.filter(lambda x:x !=header)    
    rData=rawData.map(lambda x: x.replace("\"", ""))    
    lines = rData.map(lambda x: x.split("\t"))
    print("共计：" + str(lines.count()) + "项")
    dataRDD = lines.map(lambda r:  ( r[0]  ,
                            extract_features(r,categoriesMap,len(r) )))
    DescDict = {
           0: "暂时性网页(ephemeral)",
           1: "长青网页(evergreen)"
     }
    for data in dataRDD.take(10):
        predictResult = model.predict(data[1])
        print " 网址：  " +str(data[0])+"\n" +\
                  "             ==>预测:"+ str(predictResult)+ \
                  " 说明:"+DescDict[predictResult] +"\n"


In [8]:
def evaluateModel(model, validationData):
    score = model.predict(validationData.map(lambda p: p.features))
    scoreAndLabels=score.zip(validationData \
                                   .map(lambda p: p.label))  \
                                   .map(lambda (x,y): (float(x),float(y)) )
    metrics = BinaryClassificationMetrics(scoreAndLabels)
    AUC=metrics.areaUnderROC
    return( AUC)


In [9]:
def trainEvaluateModel(trainData,validationData,
                                        numIterations, stepSize, miniBatchFraction):
    startTime = time()
    model = LogisticRegressionWithSGD.train(trainData, 
                                        numIterations, stepSize, miniBatchFraction)
    AUC = evaluateModel(model, validationData)
    duration = time() - startTime
    print    "训练评估：使用参数" + \
                " numIterations="+str(numIterations) +\
                " stepSize="+str(stepSize) + \
                " miniBatchFraction="+str(miniBatchFraction) +\
                 " 所需时间="+str(duration) + \
                 " 结果AUC = " + str(AUC) 
    return (AUC,duration, numIterations, stepSize, miniBatchFraction,model)


In [10]:
def evalParameter(trainData, validationData, evalparm,
                  numIterationsList, stepSizeList, miniBatchFractionList):
    
    metrics = [trainEvaluateModel(trainData, validationData,  
                                numIterations,stepSize,  miniBatchFraction  ) 
                       for numIterations in numIterationsList
                       for stepSize in stepSizeList  
                       for miniBatchFraction in miniBatchFractionList ]
    
    if evalparm=="numIterations":
        IndexList=numIterationsList[:]
    elif evalparm=="stepSize":
        IndexList=stepSizeList[:]
    elif evalparm=="miniBatchFraction":
        IndexList=miniBatchFractionList[:]
    
    df = pd.DataFrame(metrics,index=IndexList,
            columns=['AUC', 'duration','numIterations', 'stepSize', 'miniBatchFraction','model'])
    
    showchart(df,evalparm,'AUC','duration',0.5,0.7 )
 


In [11]:
def showchart(df,evalparm ,barData,lineData,yMin,yMax):
    ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12)
    ax.set_xlabel(evalparm,fontsize=12)
    ax.set_ylim([yMin,yMax])
    ax.set_ylabel(barData,fontsize=12)
    ax2 = ax.twinx()
    ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r')
    plt.show()


In [12]:
def evalAllParameter(trainData, validationData, 
                     numIterationsList, stepSizeList, miniBatchFractionList):    
    metrics = [trainEvaluateModel(trainData, validationData,  
                            numIterations,stepSize,  miniBatchFraction  ) 
                      for numIterations in numIterationsList 
                      for stepSize in stepSizeList  
                      for  miniBatchFraction in miniBatchFractionList ]
    
    Smetrics = sorted(metrics, key=lambda k: k[0], reverse=True)
    bestParameter=Smetrics[0]
    
    print("调校后最佳参数：numIterations:" + str(bestParameter[2]) + 
                                      "  ,stepSize:" + str(bestParameter[3]) + 
                                     "  ,miniBatchFraction:" + str(bestParameter[4])   + 
                                      "  ,结果AUC = " + str(bestParameter[0]))
    
    return bestParameter[5]


In [13]:
def  parametersEval(trainData, validationData):
    print("----- 评估numIterations参数使用 ---------")
    evalParameter(trainData, validationData,"numIterations", 
                              numIterationsList=[5, 15, 20, 60, 100],   
                              stepSizeList=[10],  
                              miniBatchFractionList=[1 ])  
    print("----- 评估stepSize参数使用 ---------")
    evalParameter(trainData, validationData,"stepSize", 
                              numIterationsList=[100],                    
                              stepSizeList=[10, 50, 100, 200],    
                              miniBatchFractionList=[1])   
    print("----- 评估miniBatchFraction参数使用 ---------")
    evalParameter(trainData, validationData,"miniBatchFraction", 
                              numIterationsList=[100],      
                              stepSizeList =[100],        
                              miniBatchFractionList=[0.5, 0.8, 1 ])


In [14]:
def CreateSparkContext():
    sparkConf = SparkConf().setAppName("LogisticRegressionWithSGD").set("spark.ui.showConsoleProgress", "false") 
    sc = SparkContext(conf = sparkConf)
    print ("master="+sc.master)    
    SetPath(sc)
    return (sc)


In [15]:
if __name__ == "__main__":
    print("RunLogisticRegressionWithSGDBinary")
    sc.stop()
    sc=CreateSparkContext()
    print("==========数据准备阶段===============")
    (trainData, validationData, testData, categoriesMap) =PrepareData(sc)
    trainData.persist(); validationData.persist(); testData.persist()
    print("==========训练评估阶段===============")
    (AUC,duration, numIterationsParm, stepSizeParm, miniBatchFractionParm,model)= \
          trainEvaluateModel(trainData, validationData, 15, 10, 0.5)
    
    flag_mark = 2
    
    if flag_mark == 1:
        parametersEval(trainData, validationData)
    elif flag_mark !=1:
        print("-----所有参数训练评估找出最好的参数组合---------")  
        model=evalAllParameter(trainData, validationData,
                         [3, 5, 10,15], 
                         [10, 50, 100],
                          [0.5, 0.8, 1 ])
    print("==========测试阶段===============")
    auc = evaluateModel(model, testData)
    print("使用test Data测试最佳模型,结果 AUC:" + str(auc))
    print("==========预测数据===============")
    PredictData(sc, model, categoriesMap)


RunLogisticRegressionWithSGDBinary
master=local[*]
开始导入数据...
共计：7395项
标准化之前： 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.789131, 2.055555556, 0.676470588, 0.205882353, 0.047058824, 0.023529412, 0.443783175, 0.0, 0.0, 0.09077381, 0.0, 0.245831182, 0.003883495, 1.0, 1.0, 24.0, 0.0, 5424.0, 170.0, 8.0, 0.152941176, 0.079129575, 
标准化之后： -0.4464212047941535, 2.7207366564548514, -0.20418221057887365, -0.22052688457880879, -0.06487757239262681, -0.2709990696925828, -0.6807527904251456, -0.10189469097220732, -0.028494000387023734, -0.2016540523193296, -0.23272797709480803, -0.09914991930875496, -0.02326210589837061, -0.38181322324318134, 1.137647336497678, -0.08193557169294771, 1.0251398128933331, -0.05586356442541689, -0.4688932531289357, -0.3543053263079386, -0.3175352172363148, 0.3384507982396541, 0.0, 0.828822173315322, -0.14726894334628504, 0.22963982357813484, -0.14162596909880876, 0.7902380499177364, 0.7171947294529865, -0.29799681649642257, -0.20346257792994

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


训练评估：使用参数 numIterations=15 stepSize=10 miniBatchFraction=0.5 所需时间=9.27591490746 结果AUC = 0.652138607308
-----所有参数训练评估找出最好的参数组合---------
训练评估：使用参数 numIterations=3 stepSize=10 miniBatchFraction=0.5 所需时间=0.796790122986 结果AUC = 0.582900377922
训练评估：使用参数 numIterations=3 stepSize=10 miniBatchFraction=0.8 所需时间=0.592526912689 结果AUC = 0.582900377922
训练评估：使用参数 numIterations=3 stepSize=10 miniBatchFraction=1 所需时间=0.851670026779 结果AUC = 0.584520593284
训练评估：使用参数 numIterations=3 stepSize=50 miniBatchFraction=0.5 所需时间=0.885801076889 结果AUC = 0.569161259535
训练评估：使用参数 numIterations=3 stepSize=50 miniBatchFraction=0.8 所需时间=0.63846206665 结果AUC = 0.57471848277
训练评估：使用参数 numIterations=3 stepSize=50 miniBatchFraction=1 所需时间=0.638918161392 结果AUC = 0.575722939325
训练评估：使用参数 numIterations=3 stepSize=100 miniBatchFraction=0.5 所需时间=0.60569190979 结果AUC = 0.571785931451
训练评估：使用参数 numIterations=3 stepSize=100 miniBatchFraction=0.8 所需时间=0.571465969086 结果AUC = 0.574564543068
训练评估：使用参数 numIterations=3 stepSize=100 miniBat