In [69]:
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType
import pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
 
from pyspark import SQLContext
 
conf = pyspark.SparkConf() 
 
sc = pyspark.SparkContext.getOrCreate(conf=conf)
sqlcontext = SQLContext(sc)

schema = StructType([
    StructField("ts", StringType(),True),    
    StructField("uid", StringType(),True),     
    StructField("id.orig_h", StringType(),True),         
    StructField("id.orig_p", StringType(),True),     
    StructField("id.resp_h", StringType(),True),      
    StructField("id.resp_p", StringType(),True),   
    StructField("proto", StringType(),True),     
    StructField("service" , StringType(),True),        
    StructField("duration", FloatType(),True),     
    StructField("orig_bytes", StringType(),True),     
    StructField("resp_bytes", StringType(),True),       
    StructField("conn_state", StringType(),True),      
    StructField("local_orig", BooleanType(),True),   
    StructField("missed_bytes", StringType(),True),      
    StructField("history", StringType(),True),     
    StructField("orig_pkts", IntegerType(),True),     
    StructField("orig_ip_bytes", IntegerType(),True),       
    StructField("resp_pkts", IntegerType(),True),      
    StructField("resp_ip_bytes", IntegerType(),True),     
    StructField("tunnel_parents", StringType(),True)    
              ])
        

df = sqlcontext.read.csv(path="/home/ubuntu/Documents/forensics/tail.log", sep="\t", schema=schema) 



columns = ['ts',
 'uid',
 'id.orig_h',
 'id.orig_p',
 'id.resp_h',
 'id.resp_p',
 'proto',
 'service',
 'duration',
 'orig_bytes',
 'resp_bytes',
 'conn_state',
 'local_orig',
 'missed_bytes',
 'history',
 'orig_pkts',
 'orig_ip_bytes',
 'resp_pkts',
 'resp_ip_bytes',
 'tunnel_parents']

sqlcontext.registerDataFrameAsTable(df, "dftab")


In [41]:
colsInt = udf(lambda z: toInt(z), IntegerType())
spark.udf.register("colsInt", colsInt)

def toInt(s):
    if isinstance(s, str) == True:
        st = [str(ord(i)) for i in s]
        return(int(''.join(st)))
    else:
         return type(s)


 

In [65]:

spark.sql("select dftab.conn_state, colsInt(dftab.conn_state) from dftab").show()
 
    
    

+----------+-------------------+
|conn_state|colsInt(conn_state)|
+----------+-------------------+
|        S0|               8348|
|        S0|               8348|
|        S0|               8348|
|        S0|               8348|
|        S0|               8348|
|        S0|               8348|
|       OTH|             798472|
|       OTH|             798472|
|       OTH|             798472|
|       OTH|             798472|
+----------+-------------------+



In [31]:
 def toInt(s):
    st = [str(ord(i)) for i in s]
    return(int(''.join(st)))

spark.udf.register("toIntdebug", toInt, IntegerType())

<function __main__.toInt(s)>

In [70]:
df.withColumn('i_uid', colsInt('uid')).show()

+-----------------+------------------+--------------------+---------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+-----+
|               ts|               uid|           id.orig_h|id.orig_p|      id.resp_h|id.resp_p|proto|service|duration|orig_bytes|resp_bytes|conn_state|local_orig|missed_bytes|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|tunnel_parents|i_uid|
+-----------------+------------------+--------------------+---------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+-----+
|1332017951.670000|CZf32d2DTHeWpAwB53|fe80::4c9b:aad8:8...|    60563|        ff02::c|     1900|  udp|      -|    6.01|       708|         0|        S0|      null|           0|      D|        6|          996|        0|            0|       (em

In [52]:
df.columns

['ts',
 'uid',
 'id.orig_h',
 'id.orig_p',
 'id.resp_h',
 'id.resp_p',
 'proto',
 'service',
 'duration',
 'orig_bytes',
 'resp_bytes',
 'conn_state',
 'local_orig',
 'missed_bytes',
 'history',
 'orig_pkts',
 'orig_ip_bytes',
 'resp_pkts',
 'resp_ip_bytes',
 'tunnel_parents']

In [None]:

 
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")
vecAssembler.transform(router).head().features


 
 

In [48]:
y.show(2)

+------------------+------------+
|               uid|colsInt(uid)|
+------------------+------------+
|CZf32d2DTHeWpAwB53|        null|
| Cv1oPeSUMWsg8Q2Tj|        null|
+------------------+------------+
only showing top 2 rows



In [49]:
df.show(2)

+-----------------+------------------+--------------------+---------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+
|               ts|               uid|           id.orig_h|id.orig_p|      id.resp_h|id.resp_p|proto|service|duration|orig_bytes|resp_bytes|conn_state|local_orig|missed_bytes|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|tunnel_parents|
+-----------------+------------------+--------------------+---------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+
|1332017951.670000|CZf32d2DTHeWpAwB53|fe80::4c9b:aad8:8...|    60563|        ff02::c|     1900|  udp|      -|    6.01|       708|         0|        S0|      null|           0|      D|        6|          996|        0|            0|       (empty)|
|1332017952.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

 

# Trains a k-means model.
kmeans = KMeans().setK(7).setSeed(1)
model = kmeans.fit(router)

# Make predictions
predictions = model.transform(router)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)


In [None]:
dataset = spark.createDataFrame(
        [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
        ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
        inputCols=["hour", "mobile", "userFeatures"],
        outputCol="features")

output = assembler.transform(dataset)

In [None]:
for name, dtype in df.dtypes:
     print(name, dtype)

In [None]:
router.columns

In [None]:
router.rdd.take(2)

In [None]:
dataset.rdd.take(2)

In [44]:
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df).head().features

 

DenseVector([1.0, 0.0, 3.0])