In [1]:
# import session
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
sc =SparkContext()

%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

# create spark app
spark = SparkSession.builder.appName('test').getOrCreate()

In [2]:
# read the .gz file into an RDD, show 5 rows of the RDD to confirm
rdd = sc.textFile("./Data/GZs/auth.txt.gz")
rdd.take(5)

['1,ANONYMOUS LOGON@C586,ANONYMOUS LOGON@C586,C1250,C586,NTLM,Network,LogOn,Success',
 '1,ANONYMOUS LOGON@C586,ANONYMOUS LOGON@C586,C586,C586,?,Network,LogOff,Success',
 '1,C101$@DOM1,C101$@DOM1,C988,C988,?,Network,LogOff,Success',
 '1,C1020$@DOM1,SYSTEM@C1020,C1020,C1020,Negotiate,Service,LogOn,Success',
 '1,C1021$@DOM1,C1021$@DOM1,C1021,C625,Kerberos,Network,LogOn,Success']

In [3]:
# create a header variable containing the various column names
header = ["Time", "SourceUserDomain", "DestUserDomain", "SourceComputer", "DestComputer", "AuthType", "LogonType", "AuthOrientation", "SuccessFailure"]

# define a conversion function for the RDD values into a dictionary
def list_to_row(keys, values):
    row_dict = dict(zip(keys, values))
    return Row(**row_dict)

# convert the raw rdd into a new one splitting on the ","
rdd_new = rdd.map(lambda x: x.split(','))
#rdd_new.take(2)

# run the new RDD through the list_to_row function to convert to a dictionary
rdd_rows = rdd_new.map(lambda x: list_to_row(header, x))
#rdd_rows.take(3)

# create a DF using the rdd_rows created above, show the first 5 values to check for accuracy
auth_df = spark.createDataFrame(rdd_rows)
auth_df.show(5)

+---------------+---------+------------+--------------------+---------+--------------+--------------------+--------------+----+
|AuthOrientation| AuthType|DestComputer|      DestUserDomain|LogonType|SourceComputer|    SourceUserDomain|SuccessFailure|Time|
+---------------+---------+------------+--------------------+---------+--------------+--------------------+--------------+----+
|          LogOn|     NTLM|        C586|ANONYMOUS LOGON@C586|  Network|         C1250|ANONYMOUS LOGON@C586|       Success|   1|
|         LogOff|        ?|        C586|ANONYMOUS LOGON@C586|  Network|          C586|ANONYMOUS LOGON@C586|       Success|   1|
|         LogOff|        ?|        C988|          C101$@DOM1|  Network|          C988|          C101$@DOM1|       Success|   1|
|          LogOn|Negotiate|       C1020|        SYSTEM@C1020|  Service|         C1020|         C1020$@DOM1|       Success|   1|
|          LogOn| Kerberos|        C625|         C1021$@DOM1|  Network|         C1021|         C1021$@DO

In [4]:
# trim down columns for the model to use to items that seem most relevant
auth_df_trim = auth_df.drop("DestUserDomain", "SourceComputer", "DestComputer", "DestUserDomain", "Time", "AuthType")
auth_df_trim.show(5)
cols = auth_df_trim.columns

+---------------+---------+--------------------+--------------+
|AuthOrientation|LogonType|    SourceUserDomain|SuccessFailure|
+---------------+---------+--------------------+--------------+
|          LogOn|  Network|ANONYMOUS LOGON@C586|       Success|
|         LogOff|  Network|ANONYMOUS LOGON@C586|       Success|
|         LogOff|  Network|          C101$@DOM1|       Success|
|          LogOn|  Service|         C1020$@DOM1|       Success|
|          LogOn|  Network|         C1021$@DOM1|       Success|
+---------------+---------+--------------------+--------------+
only showing top 5 rows



In [None]:
# take a random sample of the data to work with due to system constraints
# take small fraction of data
auth_df_trim_sample = auth_df_trim.sample(False, 0.001, seed=42)
auth_df_trim.count()

In [None]:
###One-Hot Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["AuthOrientation", "LogonType", "SourceUserDomain"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]
    

# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "SuccessFailure", outputCol = "SuccessFail")
stages += [label_stringIdx]

# Transform all features into a vector using VectorAssembler
assemblerInputs = list(map(lambda c: c + "classVec", categoricalColumns))
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(auth_df_trim_sample)
dataset = pipelineModel.transform(auth_df_trim_sample)

# Keep relevant columns
selectedcols = ["SuccessFail", "features"] + cols
auth_dataset = auth_df_trim_sample.select(selectedcols)
display(auth_dataset)



In [10]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = auth_dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

['AuthOrientation', 'LogonType', 'SourceUserDomain', 'SuccessFailure']

In [None]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="SuccessFail", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [None]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [None]:
predictions.printSchema()

In [1]:
# stop spark
spark.stop()

NameError: name 'spark' is not defined