In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
#创建一个配置对象
conf = SparkConf().setMaster("spark://192.168.206.101:7077").setAppName("KMeans")
conf.setAll([('spark.driver.memory','700m'), ('spark.executor.memory', '3000m'),('spark.executor.cores', 2)])
#创建一个SparkContext对象
sc = SparkContext(conf=conf)
#创建sparksession
spark = SparkSession.builder.master('spark://192.168.206.101:7077').getOrCreate()


In [3]:
#读取数据，创建DataFrame
dataWithoutHeader = spark.read.option("inferSchema",True).option("header", False).csv("hdfs:///user/ds/kddcup.data_10_percent_corrected")
data = dataWithoutHeader.toDF("duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label")


In [4]:
#查看数据，根据类别标号，分类统计样本个数，然后降序
data.select("label").groupBy("label").count().orderBy("count", ascending=False).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [None]:
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml.feature import VectorAssembler
numericOnly = data.drop("protocol_type", "service", "flag").cache()
assembler = VectorAssembler().setInputCols([column for column in numericOnly.columns if column!='label']).setOutputCol("featureVector")
kmeans = KMeans().\
setPredictionCol("cluster").\
setFeaturesCol("featureVector")

from pyspark.ml import Pipeline
pipeline = Pipeline().setStages([assembler, kmeans])
pipelineModel = pipeline.fit(numericOnly)
kmeansModel = pipelineModel.stages[-1]
kmeansModel.clusterCenters
for center in kmeansModel.clusterCenters():
	print(center)
    
withCluster = pipelineModel.transform(numericOnly)
withCluster.select("cluster","label").groupBy('cluster', 'label').count().orderBy(['cluster', 'label'], ascending=[1,0]).show(25)


In [None]:
import random

def clusteringScore0(data, k):
    #创建一特征向量
    assembler = VectorAssembler().setInputCols([e for e in data.columns if e!='label']).setOutputCol('featureVector')
    #创建kmeans
    kmeans = KMeans().setSeed(random.randint(1, 2**63-1)).setK(k).setPredictionCol("cluster").setFeaturesCol("featureVector")
    #创建一个管道
    pipeline = Pipeline().setStages([assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(assembler.transform(data))/data.count()

for k in range(20, 101,20):
    print(k,clusteringScore0(numericOnly, k))

In [None]:
def clusteringScore1(data, k):
    #创建一特征向量
    assembler = VectorAssembler().setInputCols([e for e in data.columns if e!='label']).setOutputCol('featureVector')
    #创建kmeans
    #设置最大迭代次数。默认20， 设置阈值，该阈值控制聚类过程中簇质点进行有效移动的最小值。
    kmeans = KMeans().\
    setSeed(random.randint(1, 2**63-1)).\
    setK(k).\
    setPredictionCol("cluster").\
    setFeaturesCol("featureVector").\
    setMaxIter(70).\
    setTol(1.0e-7)
    #创建一个管道
    pipeline = Pipeline().setStages([assembler, kmeans])
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(assembler.transform(data))/data.count()

for k in range(20, 201,20):
    print(k,clusteringScore1(numericOnly, k))

In [None]:
#特征的规范化
from pyspark.ml.feature import StandardScaler

def clusteringScore2(data, k):
    assembler = VectorAssembler().setInputCols([e for e in data.columns if e!='label']).setOutputCol('featureVector')
    #创建一个StandardScaler用于特征规范化
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("scaledFeatureVector").setWithStd(True).setWithMean(False)
    kmeans = KMeans().\
    setSeed(random.randint(1, 2**63-1)).\
    setK(k).\
    setPredictionCol("cluster").\
    setFeaturesCol("scaledFeatureVector").\
    setMaxIter(40).\
    setTol(1.0e-5)
    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    pipelineModel = pipeline.fit(data)
    kmeansModel = pipeline.fit(data).stages[-1]
    return kmeansModel.computeCost(pipelineModel.transform(data))/data.count()

for k in range(60, 400, 30):
    print((k,clusteringScore2(numericOnly, k)))

In [None]:
#类别型变量
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def oneHotPipeline(inputCol):
    """类别型变量"""
    #使用StringIndexer将字符串转化为整数索引
    indexer = StringIndexer().setInputCol(inputCol).setOutputCol(inputCol+"_indexed")
    #编码成一个向量
    encoder = OneHotEncoder().setInputCol(inputCol+"_indexed").setOutputCol(inputCol+"_vec")
    pipeline = Pipeline().setStages((indexer, encoder))
    return pipeline, inputCol+"_vec"

def clusteringScore3(data, k):
    protoTypeEncoder,protoTypeVecCol = oneHotPipeline("protocol_type")
    serviceEncoder,serviceVecCol = oneHotPipeline("service")
    flagEncoder,flagVecCol = oneHotPipeline("flag" )
    assemblerCols = set(data.columns) - {"label", "protocol_type", "service", "flag"}
    assemblerCols.update([protoTypeVecCol, serviceVecCol, flagVecCol])
    assembler = VectorAssembler().setInputCols(list(assemblerCols)).setOutputCol("featureVector")
    scaler = StandardScaler().\
    setInputCol("featureVector").\
    setOutputCol("scaledFeatureVector").\
    setWithStd(True).\
    setWithMean(False)
    
    kmeans = KMeans().\
    setSeed(random.randint(1, 2**63-1)).\
    setK(k).\
    setPredictionCol("cluster").\
    setFeaturesCol("scaledFeatureVector").\
    setMaxIter(40).\
    setTol(1.0e-5)
    
    pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
    pipelineModel = pipeline.fit(data)
    kmeansModel = pipeline.fit(data).stages[-1]
    #返回每个聚簇
    return kmeansModel.computeCost(pipelineModel.transform(data))/data.count()

for k in range(60, 400, 30):
    print((k, clusteringScore3(data, k))) 


In [None]:
#利用标号的熵信息
import math

def fitPipeline4(data, k):
    protoTypeEncoder,protoTypeVecCol = oneHotPipeline("protocol_type")
    serviceEncoder,serviceVecCol = oneHotPipeline("service")
    flagEncoder,flagVecCol = oneHotPipeline("flag" )
    assemblerCols = set(data.columns) - {"label", "protocol_type", "service", "flag"}
    assemblerCols.update([protoTypeVecCol, serviceVecCol, flagVecCol])
    print(assemblerCols)
    assembler = VectorAssembler().setInputCols(list(assemblerCols)).setOutputCol("featureVector")
    scaler = StandardScaler().\
    setInputCol("featureVector").\
    setOutputCol("scaledFeatureVector").\
    setWithStd(True).\
    setWithMean(False)
    
    kmeans = KMeans().\
    setSeed(random.randint(1, 2**63-1)).\
    setK(k).\
    setPredictionCol("cluster").\
    setFeaturesCol("scaledFeatureVector").\
    setMaxIter(40).\
    setTol(1.0e-5)
    
    pipeline = Pipeline().setStages([protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans])
    return pipeline.fit(data)


In [None]:
def entropy(counts):
    """熵函数"""
    def map_f(v):
        p= v/n
        return -p*math.log(p)

    values = counts.filter(lambda a: a > 0)
    n = values.map(lambda e: float(e)).sum()
    return values.map(mapf).sum()
    
def clusteringScore4(data, k):
    pipelineModel = fitPipeline4(data, k)
    
    #预测每一个聚类
    clusterLabel = pipelineModel.transform(data).select('cluster','label')
    clusterLabel.show()
    weightedClusterEntropy = clusterLabel.groupBy('cluster')
    
clusteringScore4(data, 180)

In [None]:
from pyspark.ml.linalg import Vectors, Vector


def buildAnomalyDetector(data):
   
    #训练模型
    pipelineModel = fitPipeline4(data, 240)
    #查看数据， 统计每个簇中个标签的数量
    countByClusterLabel = pipelineModel.transform(data).select("cluster","label").groupby("cluster","label").count().orderBy("cluster","label")
    countByClusterLabel.show()
    #获取质点， 聚类中心
    kMeansModel = pipelineModel.stages[-1]
    
    centroids = kMeansModel.clusterCenters()
    #转化数据
    clustered = pipelineModel.transform(data)
    #获取阈值
    threshold = clustered.select("cluster", "scaledFeatureVector").\
    rdd.\
    map(lambda e: Vectors.squared_distance(centroids[e[0]],e[1])).\
    sortBy(lambda e:e,ascending=False).\
    take(100)[-1]
    originalCols = data.columns
    #获取已知的异常列表
    known = data.select("label").distinct().collect()
    known = [row[0] for row in known]
    print(known)
    #读取新数据
    dataWithoutHeader = spark.read.option("inferSchema",True).option("header", False).csv("hdfs:///user/ds/corrected")
    new_data = dataWithoutHeader.toDF("duration", "protocol_type", "service", "flag",
    "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
    "hot", "num_failed_logins", "logged_in", "num_compromised",
    "root_shell", "su_attempted", "num_root", "num_file_creations",
    "num_shells", "num_access_files", "num_outbound_cmds",
    "is_host_login", "is_guest_login", "count", "srv_count",
    "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
    "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
    "dst_host_count", "dst_host_srv_count",
    "dst_host_same_srv_rate", "dst_host_diff_srv_rate",
    "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
    "dst_host_serror_rate", "dst_host_srv_serror_rate",
    "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
    "label")
    
    #转化新的数据
    print("@@@@@@@@@@@@@@@@@@@")
    #result = pipelineModel.transform(new_data)
    #result.printSchema()
    #result.show()
    #correct_result = result.\
    #("cluster", "scalselectedFeatureVector","label").\
    #rdd.\
    #filter(lambda row:(Vectors.squared_distance(centroids[row[0]],row[1]) >= threshold and row[2] not in known) or (Vectors.squared_distance(centroids[row[0]],row[1]) < threshold and row[2] in known) )
    print("@@@@@@@@@@@@@@@@@@@")
    pipeline = Pipeline().setStages( pipelineModel.stages[:-1])
    pipelineModel = pipeline.fit(new_data)
    new_processed_data=pipelineModel.transform(new_data)
    
    def isCorrect(row):
        min_dis = None
        #计算最到各个质点小距离
        for center in centroids:
            dis =  Vectors.squared_distance(center,row[0])
            if min_dis is None:
                min_dis = dis
            min_dis = min(min_dis,dis)
            
        return (min_dis >= threshold and row[0] not in known) or (min_dis < threshold and row[0] in known)
    #筛选出预测正确的结果
    correct_result = new_processed_data.select("scaledFeatureVector","label").rdd.filter(isCorrect)
    #输出数据
    correct_result.foreach(print)
    return correct_result.count()/ new_processed_data.count()

accuracy = buildAnomalyDetector(data)  
print(accuracy)

In [None]:
from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(

[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],

("key", "value1", "value2")

)

df3.show()

from pyspark.sql.types import *


schema = StructType([

    StructField("key", StringType()),

    StructField("avg_value1", DoubleType()),

    StructField("avg_value2", DoubleType()),

    StructField("sum_avg", DoubleType()),

    StructField("sub_avg", DoubleType())

])


@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()

In [None]:
#测试部分
pipelineModel = fitPipeline4(data, 240)
countByClusterLabel = pipelineModel.transform(data).select("cluster","label").groupby("cluster","label").count().orderBy("cluster","label")
countByClusterLabel.show()
kMeansModel = pipelineModel.stages[-1]
    #质点， 聚类中心
centroids = kMeansModel.clusterCenters()
    #转化数据
clustered = pipelineModel.transform(data)
    #获取阈值
threshold = clustered.select("cluster", "scaledFeatureVector").\
    rdd.\
    map(lambda e: Vectors.squared_distance(centroids[e[0]],e[1])).\
    sortBy(lambda e:e,ascending=False).\
    take(100)[-1]
    
    
L100=clustered.select("cluster", "scaledFeatureVector").\
    rdd.\
    map(lambda e: Vectors.squared_distance(centroids[e[0]],e[1])).\
    take(100)
    
print(L100)
