In [1]:
from __future__ import print_function

# $example on$
from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext
from pyspark.sql import SQLContext


sql_sc = SQLContext(sc)

In [2]:
# First start the analysis for a sample 1 hour data, we can easily expand this to multiple hours. 
fname = '/tigress/arpitg/dns_data/tshark_20170207_20170212_consolidated/split_hour_00118_20170211215900.csv'
#fname = 'data/tshark_20170207_20170212_consolidated/split_hour_00118_20170211215900.csv'

dns_data = []
with open(fname, 'r') as f:
    for line in f:
        tmp = line.split('\n')[0].split(',')
        dns_data.append(tuple(tmp))

In [3]:
print (dns_data[:2],len(dns_data))

[('1486868340.890438000', '0.001918000', '10.9.245.26', '10.8.0.5', 'api.smoot.apple.com', '0x00000001', '', 'vapornet100', 'host-dynamic', 'nat-oitwireless-inside-vapornet100-d-30235', '', '', ''), ('1486868340.891983000', '0.003463000', '10.8.241.29', '10.8.0.5', 'r20---sn-hgn7zn7e.googlevideo.com', '0x00000001', 'a4:02:b9:7e:a0:48', 'vapornet100', 'host-dynamic', 'nat-oitwireless-inside-vapornet100-b-28947', '', '', 'IntelCor')] 1707513


In [None]:
dns_data_df = sql_sc.createDataFrame(dns_data,["ts", "xx", "srcIP", "dstIP", "qname", "qtype", "macAddr", "subnet", 
                                               "host_type", "host_name", "system_type", "OS", "nic_manuf"])
dns_data_df.count()

In [None]:
# Apply basic filters
filtered_dns_data_df = (dns_data_df
                       .filter(dns_data_df["qname"]!='<Unknown extended label>')
                       .filter(dns_data_df["system_type"] != 'virtual-machine')
                       .filter(dns_data_df["system_type"] != 'dec-pc')
                       .filter(dns_data_df["system_type"] != 'other-pc')   
                       .filter(dns_data_df["system_type"] != 'dell-pc')
                       #.filter(dns_data_df["nic_manuf"] != 'Apple') 
              )
filtered_dns_data_df.show()
filtered_dns_data_df.count()

In [124]:
# Create a feature dataframe with one row for each unique IP addresses
ip_feature_df = (filtered_dns_data_df 
                 .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.nic_manuf)
                 .distinct()
                )

In [131]:
# TODO make these counts bounded by some window interval

# Unique domain counts
ip_2_unique_qname = (filtered_dns_data_df 
                     .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.qname)
                     .distinct()
                     .groupBy('srcIP')
                     .count()
                     .withColumnRenamed('count', 'unique_qname_count')
                    )

ip_2_unique_qname_1l = (filtered_dns_data_df 
                        .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.qname)
                        .map(lambda p: (p.srcIP, ".".join([str(x) for x in p.qname.encode('utf-8').strip().split('.')[-1:]])))
                        .distinct()
                        .toDF(["srcIP", "qname"])
                        .groupBy("srcIP")
                        .count()
                        .withColumnRenamed('count', 'unique_qname_l1_count')
                    )

ip_2_unique_qname_12 = (filtered_dns_data_df 
                        .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.qname)
                        .map(lambda p: (p.srcIP, ".".join([str(x) for x in p.qname.encode('utf-8').strip().split('.')[-2:]])))
                        .distinct()
                        .toDF(["srcIP", "qname"])
                        .groupBy("srcIP")
                        .count()
                        .withColumnRenamed('count', 'unique_qname_l2_count')
                    )

ip_2_unique_qname_13 = (filtered_dns_data_df 
                        .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.qname)
                        .map(lambda p: (p.srcIP, ".".join([str(x) for x in p.qname.encode('utf-8').strip().split('.')[-3:]])))
                        .distinct()
                        .toDF(["srcIP", "qname"])
                        .groupBy("srcIP")
                        .count()
                        .withColumnRenamed('count', 'unique_qname_l3_count')
                    )

# unique dstIPs count
ip_2_unique_dstIPs = (filtered_dns_data_df 
                     .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.dstIP)
                     .distinct()
                     .groupBy('srcIP')
                     .count()
                     .withColumnRenamed('count', 'unique_dstIPs_count')
                    )

# least and most popular qnames
# unique dstIPs count
from collections import defaultdict, Counter
ip_2_most_pop_qname = (filtered_dns_data_df 
                       .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.qname)
                       .map(lambda p: ((p.srcIP, p.qname), 1))
                       .reduceByKey(lambda x,y: x+y)
                       .map(lambda p: (p[0][0], ((p[0][1]),p[1])))
                       .groupByKey()
                       #.map(lambda p: (p[0], list(p[1])))
                       .map(lambda p: (p[0], Counter(dict((x,int(y)) for x,y in p[1])).most_common(1)))
                       .map(lambda p: (p[0], p[1][0][0].encode('utf-8').strip()))
                       .toDF(["srcIP", "qname_most_popular"])
                      )

ip_2_least_pop_qname = (filtered_dns_data_df 
                       .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.qname)
                       .map(lambda p: ((p.srcIP, p.qname), 1))
                       .reduceByKey(lambda x,y: x+y)
                       .map(lambda p: (p[0][0], ((p[0][1]),p[1])))
                       .groupByKey()
                       #.map(lambda p: (p[0], list(p[1])))
                       .map(lambda p: (p[0], Counter(dict((x,int(y)) for x,y in p[1])).most_common()[-1]))
                       .map(lambda p: (p[0], p[1][0].encode('utf-8').strip()))
                       .toDF(["srcIP", "qname_least_popular"])
                      )

ip_2_most_pop_dstIP = (filtered_dns_data_df 
                       .select(filtered_dns_data_df.srcIP, filtered_dns_data_df.dstIP)
                       .map(lambda p: ((p.srcIP, p.dstIP), 1))
                       .reduceByKey(lambda x,y: x+y)
                       .map(lambda p: (p[0][0], ((p[0][1]),p[1])))
                       .groupByKey()
                       #.map(lambda p: (p[0], list(p[1])))
                       .map(lambda p: (p[0], Counter(dict((x,int(y)) for x,y in p[1])).most_common(1)))
                       .map(lambda p: (p[0], p[1][0][0].encode('utf-8').strip()))
                       .toDF(["srcIP", "dstIP_most_popular"])
                      )



ip_feature_df_updated = ip_feature_df.join(ip_2_unique_qname, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_unique_qname_1l, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_unique_qname_12, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_unique_qname_13, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_unique_dstIPs, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_most_pop_qname, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_least_pop_qname, 'srcIP')
ip_feature_df_updated = ip_feature_df_updated.join(ip_2_most_pop_dstIP, 'srcIP')


ip_feature_df_updated.show()     

+------------+---------+------------------+---------------------+---------------------+---------------------+-------------------+--------------------+--------------------+------------------+
|       srcIP|nic_manuf|unique_qname_count|unique_qname_l1_count|unique_qname_l2_count|unique_qname_l3_count|unique_dstIPs_count|  qname_most_popular| qname_least_popular|dstIP_most_popular|
+------------+---------+------------------+---------------------+---------------------+---------------------+-------------------+--------------------+--------------------+------------------+
|10.8.110.155|    Apple|                11|                    4|                    5|                    7|                  1|_aaplcache1._tcp....|p02-ckdatabase.fe...|          10.8.0.5|
| 10.8.118.41|         |                14|                    2|                    8|                   14|                  1|     plus.google.com|time-ios.g.aaplim...|          10.8.0.5|
|10.8.127.157|         |               103|  

In [None]:
# Stringindexer transformations
from pyspark.ml.feature import StringIndexer

indexer1 = StringIndexer(inputCol="qname_most_popular", outputCol="qname_most_popular_idx")
indexer2 = StringIndexer(inputCol="qname_least_popular", outputCol="qname_least_popular_idx")
indexer3 = StringIndexer(inputCol="dstIP_most_popular", outputCol="dstIP_most_popular_idx")
indexer4 = StringIndexer(inputCol="nic_manuf", outputCol="nic_manuf_idx")

ip_feature_df_indexed = ip_feature_df_updated
ip_feature_df_indexed = indexer1.fit(ip_feature_df_indexed).transform(ip_feature_df_indexed)
#ip_feature_df_indexed = indexer2.fit(ip_feature_df_indexed).transform(ip_feature_df_indexed)
#ip_feature_df_indexed = indexer3.fit(ip_feature_df_indexed).transform(ip_feature_df_indexed)
#ip_feature_df_indexed = indexer4.fit(ip_feature_df_indexed).transform(ip_feature_df_indexed)
ip_feature_df_indexed.show()

root
 |-- srcIP: string (nullable = true)
 |-- nic_manuf: string (nullable = true)
 |-- unique_qname_count: long (nullable = false)
 |-- unique_qname_l1_count: long (nullable = false)
 |-- unique_qname_l2_count: long (nullable = false)
 |-- unique_qname_l3_count: long (nullable = false)
 |-- unique_dstIPs_count: long (nullable = false)
 |-- qname_most_popular: string (nullable = true)
 |-- qname_least_popular: string (nullable = true)
 |-- dstIP_most_popular: string (nullable = true)



KeyboardInterrupt: 

In [None]:
domain_level = 2
#test_df = dns_data_df.select(".".join([str(x) for x in dns_data_df['qname'].split('.')[-domain_level:]]),1)
test_df = dns_data_df.select(dns_data_df.qname)


In [23]:
from pyspark.sql.functions import lit
#test_df = dns_data_df.withColumn("qname_count", lit(1))
# ".".join([str(x) for x in dns_data_df.qname.split('.')[-level:]])
test_df = dns_data_df

for level in range(1,3):
    col_name = 'qname_'+str(level)+'_ld'
    test_df = test_df.withColumn(col_name, test_df['qname'])
    
test_df.printSchema()
test_df = test_df.map(lambda p: (p.qname, ".".join([str(x) for x in str(p.qname_1_ld).split('.')[-1:]]),
                                ".".join([str(x) for x in str(p.qname_2_ld).split('.')[-2:]]))).toDF()

root
 |-- ts: string (nullable = true)
 |-- xx: string (nullable = true)
 |-- srcIP: string (nullable = true)
 |-- dstIP: string (nullable = true)
 |-- qname: string (nullable = true)
 |-- qtype: string (nullable = true)
 |-- macAddr: string (nullable = true)
 |-- subnet: string (nullable = true)
 |-- host_type: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- system_type: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- nic_manuf: string (nullable = true)
 |-- qname_1_ld: string (nullable = true)
 |-- qname_2_ld: string (nullable = true)



In [24]:
test_df.show()

+--------------------+---+---------------+
|                  _1| _2|             _3|
+--------------------+---+---------------+
| api.smoot.apple.com|com|      apple.com|
|r20---sn-hgn7zn7e...|com|googlevideo.com|
|     rtax.criteo.com|com|     criteo.com|
|user.auth.xboxliv...|com|   xboxlive.com|
|user.auth.xboxliv...|com|   xboxlive.com|
|             poal.me| me|        poal.me|
|deazs14tb5j7o.clo...|net| cloudfront.net|
|widgets.outbrain.com|com|   outbrain.com|
|     mail.google.com|com|     google.com|
|r20---sn-hgn7zn7e...|com|googlevideo.com|
|as-sec.casalemedi...|com|casalemedia.com|
|stats.l.doublecli...|net|doubleclick.net|
|streamingaudio.g....|com|    aaplimg.com|
|streamingaudio.g....|com|    aaplimg.com|
|        sr.symcd.com|com|      symcd.com|
|PDOM07.pu.win.pri...|edu|  princeton.edu|
|ocos-office365-s2...|net|     msedge.net|
|config.edge.skype...|com|      skype.com|
|init-p01md.apple.com|com|      apple.com|
|p20-keyvalueservi...|com|     icloud.com|
+----------