In [1]:
import sys
from time import time
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.feature import StandardScaler

In [2]:


def SetLogger( sc ):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)    

def SetPath(sc):
    global Path
    if sc.master[0:5]=="local" :
        Path="file:/home/hduser/pythonwork/PythonProject/"
    else:
        Path="hdfs://hdnn:8020/user/hduser/"
#如果您要在cluster模式執行(hadoop yarn 或Spark Stand alone)，請依照書上說明，先上傳檔案至HDFS目錄
        
def get_mapping(rdd, idx):
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

def extract_label(record):
    label=(record[-1])
    return float(label)

def extract_features(field,categoriesMap,featureEnd):
    categoryIdx = categoriesMap[field[3]]
    categoryFeatures = np.zeros(len(categoriesMap))
    categoryFeatures[categoryIdx] = 1
    numericalFeatures=[convert_float(field)  for  field in field[4: featureEnd]]    
    return  np.concatenate(( categoryFeatures, numericalFeatures))

def convert_float(x):
    return (0 if x=="?" else float(x))   


In [3]:
def PrepareData(sc): 
    #----------------------1.匯入並轉換資料-------------
    print("開始匯入資料...")
    rawDataWithHeader = sc.textFile(Path+"data/train.tsv")
    header = rawDataWithHeader.first() 
    rawData = rawDataWithHeader.filter(lambda x:x !=header)    
    rData=rawData.map(lambda x: x.replace("\"", ""))    
    lines = rData.map(lambda x: x.split("\t"))
    print("共計：" + str(lines.count()) + "筆")
    #----------------------2.建立訓練評估所需資料 RDD[LabeledPoint]-------------
    print ("標準化之前：")
    categoriesMap = lines.map(lambda fields: fields[3]). \
                                        distinct().zipWithIndex().collectAsMap()
    labelRDD = lines.map(lambda r:  extract_label(r))
    featureRDD = lines.map(lambda r:  extract_features(r,categoriesMap,len(r) - 1))

    for i in featureRDD.first():
        print((str(i)+","))
    print("")
    
    stdScaler = StandardScaler(withMean=True, withStd=True).fit(featureRDD)
    ScalerFeatureRDD=stdScaler.transform(featureRDD)
    print("標準化之後：")
    for i in ScalerFeatureRDD.first():
        print (str(i)+","),        
    labelpoint=labelRDD.zip(ScalerFeatureRDD)
    labelpointRDD=labelpoint.map(lambda r: LabeledPoint(r[0], r[1]))

    
    #----------------------3.以隨機方式將資料分為3部份並且回傳-------------
    (trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1])
    print("將資料分trainData:" + str(trainData.count()) + 
              "   validationData:" + str(validationData.count()) +
              "   testData:" + str(testData.count()))
    return (trainData, validationData, testData, categoriesMap) #回傳資料

    
def PredictData(sc,model,categoriesMap): 
    print("開始匯入資料...")
    rawDataWithHeader = sc.textFile(Path+"data/test.tsv")
    header = rawDataWithHeader.first() 
    rawData = rawDataWithHeader.filter(lambda x:x !=header)    
    rData=rawData.map(lambda x: x.replace("\"", ""))    
    lines = rData.map(lambda x: x.split("\t"))
    print("共計：" + str(lines.count()) + "筆")
    dataRDD = lines.map(lambda r:  ( r[0]  ,
                            extract_features(r,categoriesMap,len(r) )))
    DescDict = {
           0: "暫時性網頁(ephemeral)",
           1: "長青網頁(evergreen)"
     }
    for data in dataRDD.take(10):
        predictResult = model.predict(data[1])
        print(" 網址：  " +str(data[0])+"\n" + "             ==>預測:"+ str(predictResult)+ " 說明:"+DescDict[predictResult] +"\n")

def evaluateModel(model, validationData):
    score = model.predict(validationData.map(lambda p: p.features))
    scoreAndLabels=score.zip(validationData \
                                   .map(lambda p: p.label))  \
                                   .map(lambda t: (float(t[0]),float(t[1])) )
    metrics = BinaryClassificationMetrics(scoreAndLabels)
    AUC=metrics.areaUnderROC
    return( AUC)


def trainEvaluateModel(trainData,validationData,
                                        numIterations, stepSize, regParam):
    startTime = time()
    model = SVMWithSGD.train(trainData, numIterations, stepSize, regParam)
    AUC = evaluateModel(model, validationData)
    duration = time() - startTime
    print("訓練評估：使用參數" + " numIterations="+str(numIterations) + " stepSize="+str(stepSize) + " regParam="+str(regParam) + " 所需時間="+str(duration) + " 結果AUC = " + str(AUC))
    return (AUC,duration, numIterations, stepSize, regParam,model)


def evalParameter(trainData, validationData, evalparm,
                  numIterationsList, stepSizeList, regParamList):
    
    metrics = [trainEvaluateModel(trainData, validationData,  
                                numIterations,stepSize,  regParam  ) 
                       for numIterations in numIterationsList
                       for stepSize in stepSizeList  
                       for regParam in regParamList ]
    
    if evalparm=="numIterations":
        IndexList=numIterationsList[:]
    elif evalparm=="stepSize":
        IndexList=stepSizeList[:]
    elif evalparm=="regParam":
        IndexList=regParamList[:]
    
    df = pd.DataFrame(metrics,index=IndexList,
            columns=['AUC', 'duration','numIterations', 'stepSize', 'regParam','model'])
    showchart(df,evalparm,'AUC','duration',0.5,0.7 )
    
def showchart(df,evalparm ,barData,lineData,yMin,yMax):
    ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12)
    ax.set_xlabel(evalparm,fontsize=12)
    ax.set_ylim([yMin,yMax])
    ax.set_ylabel(barData,fontsize=12)
    ax2 = ax.twinx()
    ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r')
    plt.show()
    
def evalAllParameter(trainData, validationData, 
                     numIterationsList, stepSizeList, regParamList):    
    metrics = [trainEvaluateModel(trainData, validationData,  
                            numIterations,stepSize,  regParam  ) 
                      for numIterations in numIterationsList 
                      for stepSize in stepSizeList  
                      for  regParam in regParamList ]
    
    Smetrics = sorted(metrics, key=lambda k: k[0], reverse=True)
    bestParameter=Smetrics[0]
    
    print("調校後最佳參數：numIterations:" + str(bestParameter[2]) + 
                                      "  ,stepSize:" + str(bestParameter[3]) + 
                                     "  ,regParam:" + str(bestParameter[4])   + 
                                      "  ,結果AUC = " + str(bestParameter[0]))
    
    return bestParameter[5]

def  parametersEval(trainData, validationData):
    print("----- 評估numIterations參數使用 ---------")
    evalParameter(trainData, validationData,"numIterations", 
                              numIterationsList= [1, 3, 5, 15, 25],   
                              stepSizeList=[100],  
                              regParamList=[1 ])  
    print("----- 評估stepSize參數使用 ---------")
    evalParameter(trainData, validationData,"stepSize", 
                              numIterationsList=[25],                    
                              stepSizeList= [10, 50, 100, 200],    
                              regParamList=[1])   
    print("----- 評估regParam參數使用 ---------")
    evalParameter(trainData, validationData,"regParam", 
                              numIterationsList=[25],      
                              stepSizeList =[100],        
                              regParamList=[0.01, 0.1, 1 ])

def CreateSparkContext():
    #sparkConf = SparkConf()                                                       \
    #                     .setAppName("LogisticRegressionWithSGD")                         \
    #                     .set("spark.ui.showConsoleProgress", "false") 
    #sc = SparkContext(conf = sparkConf)
    print ("master="+sc.master)    
    SetLogger(sc)
    SetPath(sc)
    return (sc)

In [4]:
if __name__ == "__main__":
    print("RunSVMWithSGDBinary")
    sc=CreateSparkContext()
    print("==========資料準備階段===============")
    (trainData, validationData, testData, categoriesMap) =PrepareData(sc)
    trainData.persist(); validationData.persist(); testData.persist()
    print("==========訓練評估階段===============")
    (AUC,duration, numIterations, stepSize, regParam,model)= \
          trainEvaluateModel(trainData, validationData, 3, 50, 1)
    if (len(sys.argv) == 2) and (sys.argv[1]=="-e"):
        parametersEval(trainData, validationData)
    elif   (len(sys.argv) == 2) and (sys.argv[1]=="-a"): 
        print("-----所有參數訓練評估找出最好的參數組合---------")  
        model=evalAllParameter(trainData, validationData,
                        [1, 3, 5, 15, 25], 
                        [10, 50, 100, 200],
                        [0.01, 0.1, 1 ])
    print("==========測試階段===============")
    auc = evaluateModel(model, testData)
    print("使用test Data測試最佳模型,結果 AUC:" + str(auc))
    print("==========預測資料===============")
    PredictData(sc, model, categoriesMap)

RunSVMWithSGDBinary
master=spark://spkma:7077
開始匯入資料...
共計：7395筆
標準化之前：
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.789131,
2.055555556,
0.676470588,
0.205882353,
0.047058824,
0.023529412,
0.443783175,
0.0,
0.0,
0.09077381,
0.0,
0.245831182,
0.003883495,
1.0,
1.0,
24.0,
0.0,
5424.0,
170.0,
8.0,
0.152941176,
0.079129575,

標準化之後：
-0.680752790425,
-0.381813223243,
-0.204182210579,
2.72073665645,
-0.220526884579,
-0.0991499193088,
-0.232727977095,
-0.101894690972,
-0.0648775723926,
-0.0232621058984,
-0.028494000387,
-0.446421204794,
-0.270999069693,
-0.201654052319,
1.1376473365,
-0.0819355716929,
1.02513981289,
-0.0558635644254,
-0.468893253129,
-0.354305326308,
-0.317535217236,
0.33845079824,
0.0,
0.828822173315,
-0.147268943346,
0.229639823578,
-0.141625969099,
0.790238049918,
0.717194729453,
-0.297996816496,
-0.20346257793,
-0.0329672096969,
-0.0487811297558,
0.940069975117,
-0.108698488525,
-0.278820782314,
將資料分trainData:5926   validationData:702   testData