In [0]:
import org.apache.spark.sql.types._

val schema = new StructType().
add("id",StringType,true).
add("dur",StringType,true).
add("proto",StringType,true).
add("service",StringType,true).
add("state",StringType,true).
add("spkts",StringType,true).
add("dpkts",StringType,true).
add("sbytes",StringType,true).
add("dbytes",StringType,true).
add("rate",StringType,true).
add("sttl",StringType,true).
add("dttl",StringType,true).
add("sload",StringType,true).
add("dload",StringType,true).
add("sloss",StringType,true).
add("dloss",StringType,true).
add("sinpkt",StringType,true).
add("dinpkt",StringType,true).
add("sjit",StringType,true).
add("djit",StringType,true).
add("swin",StringType,true).
add("stcpb",StringType,true).
add("dtcpb",StringType,true).
add("dwin",StringType,true).
add("tcprtt",StringType,true).
add("synack",StringType,true).
add("ackdat",StringType,true).
add("smean",StringType,true).
add("dmean",StringType,true).
add("trans_depth",StringType,true).
add("response_body_len",StringType,true).
add("ct_srv_src",StringType,true).
add("ct_state_ttl",StringType,true).
add("ct_dst_ltm",StringType,true).
add("ct_src_dport_ltm",StringType,true).
add("ct_dst_sport_ltm",StringType,true).
add("ct_dst_src_ltm",StringType,true).
add("is_ftp_login",StringType,true).
add("ct_ftp_cmd",StringType,true).
add("ct_flw_http_mthd",StringType,true).
add("ct_src_ltm",StringType,true).
add("ct_srv_dst",StringType,true).
add("is_sm_ips_ports",StringType,true).
add("attack_cat",StringType,true).
add("label",StringType,true)

var df = spark.read.options(Map("sep"->",", "header"-> "true")).
                schema(schema).
                csv("/unsw_tra.csv")


In [1]:
var df_num = df.select("state","Dttl","synack","swin","dwin","ct_state_ttl","ct_src_ltm","ct_srv_dst","Sttl","ct_dst_sport_ltm","Djit")

In [2]:
df_num.show(10)

In [3]:
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

val strIndexer = new StringIndexer()
  .setInputCol("state")
  .setOutputCol("indexedState")
  .fit(df_num)

df_num = strIndexer.transform(df_num)
df_num = df_num.drop("state")

In [4]:
val num_categories = df_num.dtypes.filter (_._2 == "StringType") map (_._1)


num_categories.foreach{r=>
    df_num = df_num.withColumn(r,df_num(r).cast("double"))
}

In [5]:
var df_label = df.select("label")

In [6]:
df_label = df_label.withColumn("label",df_label("label").cast("integer"))

In [7]:
df_label.printSchema

In [8]:
val df1 = df_num.withColumn("row_id", monotonically_increasing_id())

val df2 = df_label.withColumn("row_id", monotonically_increasing_id())

var df_final = df1.join(df2, ("row_id")).drop("row_id")
df_num.drop("row_id")
df_label.drop("row_id")

In [9]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val assembler = new VectorAssembler()
  .setInputCols(df_num.columns)
  .setOutputCol("features")
  
df_final = assembler.transform(df_final)

In [10]:
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)
  .fit(df_final)
  
df_final = scaler.transform(df_final)

In [11]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

val Array(training, test) = df_final.randomSplit(Array(0.7, 0.3), seed = 12345)

In [12]:
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxIter(10)
  .setRegParam(0.4)
  .setFamily("multinomial")
 
val model = lr.fit(training)
val predict_train=model.transform(training)
val predict_test=model.transform(test)

In [13]:
val check = predict_test.withColumn("correct",when(col("label").equalTo(col("prediction")),1).otherwise(0))

In [14]:
check.groupBy("correct").count.show

In [15]:
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier

val dt = new DecisionTreeClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

// Train model. This also runs the indexers.
val model_dt = dt.fit(training)

// Make predictions.
val predictions = model_dt.transform(test)

In [16]:
val check2 = predictions.withColumn("correct",when(col("label").equalTo(col("prediction")),1).otherwise(0))
check2.groupBy("correct").count.show

In [17]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(10)
  
val model_rf = rf.fit(training)

In [18]:
val predictions_rf = model_rf.transform(test)

In [19]:
val check3 = predictions_rf.withColumn("correct",when(col("label").equalTo(col("prediction")),1).otherwise(0))
check3.groupBy("correct").count.show

In [20]:
check3.select("label","prediction").write.format("csv").save("rf.csv")

In [21]:
check2.select("label","prediction").write.format("csv").save("dt.csv")

In [22]:
check.select("label","prediction").write.format("csv").save("lr.csv")