In [1]:
print("Hello World")

Hello World


In [2]:
readFile="Data/parquet/9.Malware"

In [3]:
from pyspark.sql import SparkSession
import seaborn as sns
import matplotlib.pyplot as plt 
from pyspark.sql.functions import col

In [4]:
spark = SparkSession.builder \
    .master("local") \
    .appName("WiFi") \
    .config("spark.driver.memory", "4g")\
    .getOrCreate()

In [5]:
df = spark.read.parquet(readFile)

In [6]:
df.count()

2312760

https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/getting-started/get-started-with-mllib-dbr7.html as a guide

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [8]:
categoricalCols = [
    #"frame_time",
    "radiotap_present_tsft",
    "radiotap_rxflags",
    "wlan_analysis_kck",
    "wlan_analysis_kek",
    "wlan_bssid",
    "wlan_country_info_fnm",
    "wlan_country_info_code",
    "wlan_da",
    "wlan_fc_ds",
    "wlan_ra",
    "wlan_sa",
    "wlan_ssid",
    "wlan_ta",
    "wlan_tag_length",
    "wlan_rsna_eapol_keydes_data",
    "wlan_rsna_eapol_keydes_nonce",
    "llc",
    "arp",
    "arp_proto_type",
    "arp_src_hw_mac",
    "arp_src_proto_ipv4",
    "arp_dst_hw_mac",
    "arp_dst_proto_ipv4",
    "ip_dst",
    "ip_src",
    "data_data",
    "tcp_checksum",
    #"tcp_payload",
    #"udp_payload",
    "nbns",
    "nbss_continuation_data",
    "nbss_type",
    "ldap",
    "smb_server_component",
    "smb2_acct",
    "smb2_auth_frame",
    "smb2_buffer_code",
    "smb2_cmd",
    "smb2_data_offset",
    "smb2_domain",
    "smb2_fid",
    "smb2_filename",
    "smb2_header_len",
    "smb2_host",
    "smb2_msg_id",
    "smb2_pid",
    "smb2_previous_sesid",
    "smb2_protocol_id",
    "smb2_sesid",
    "smb2_session_flags",
    "smb2_tid",
    "dhcp",
    "dhcp_client_id_duid_ll_hw_type",
    "dhcp_cookie",
    "dhcp_hw_mac_addr",
    "dhcp_id",
    "dhcp_ip_client",
    "dhcp_ip_relay",
    "dhcp_ip_server",
    "dhcp_option_broadcast_address",
    "dhcp_option_dhcp_server_id",
    "dhcp_option_router",
    "dhcp_option_vendor_bsdp_message_type",
    "mdns",
    "dns",
    "dns_a",
    "dns_flags_authoritative",
    "dns_flags_checkdisable",
    "dns_flags_opcode",
    "dns_flags_response",
    "dns_id",
    "dns_ptr_domain_name",
    "dns_qry_name",
    "dns_resp_len",
    "dns_resp_name",
    "dns_resp_ttl",
    "dns_resp_len_1",
    "ssdp",
    "http_connection",
    "http_content_type",
    "http_date",
    "http_file_data",
    "http_host",
    "http_last_modified",
    "http_location",
    "http_next_request_in",
    "http_next_response_in",
    "http_request_full_uri",
    "http_request_line",
    "http_request_method",
    "http_request_uri_path",
    "http_request_uri_query",
    "http_request_uri_query_parameter",
    "http_request_version",
    "http_response_code_desc",
    "http_response_line",
    "http_response_phrase",
    "http_response_version",
    "http_response_for_uri",
    "http_response_in",
    "http_referer",
    "http_server",
    "json_value_string",
    "json_key",
    "ssh_cookie",
    "ssh_message_code",
    "ssh_mpint_length",
    #"ssh_packet_length_encrypted",
    "ssh_protocol",
    "tls_app_data_proto",
    "tls_connection_id",
    "tls_handshake_extension_type",
    "tls_handshake_extensions_key_share_group",
    "tls_handshake_version",
    "tls_record_version" 
]


In [9]:
# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "index" for x in categoricalCols]).setHandleInvalid("keep") 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

In [10]:
numericCols=[
    "frame_time_delta",
    "frame_time_delta_displayed",
    "frame_time_epoch",
    "frame_time_relative",
    "radiotap_datarate",
    "radiotap_mactime",
    "wlan_fixed_timestamp",
    "wlan_seq",
    "wlan_radio_data_rate",
    "wlan_radio_end_tsf",
    "wlan_radio_start_tsf",
    "wlan_radio_timestamp",
    "wlan_rsn_capabilities_mfpc",
    "wlan_rsna_eapol_keydes_msgnr",
    "wlan_rsna_eapol_keydes_data_len",
    "wlan_rsna_eapol_keydes_key_info_key_mic",
    "eapol_keydes_key_len",
    "eapol_keydes_replay_counter",
    "eapol_len",
    "eapol_type",
    "arp_hw_type",
    "arp_hw_size",
    "arp_proto_size",
    "arp_opcode",
    "icmpv6_mldr_nb_mcast_records",
    "tcp_time_delta",
    "tcp_time_relative",
    "udp_time_relative",
    "udp_time_delta",
    "nbss_length",
    "smb_access_generic_execute",
    "smb_access_generic_read",
    "smb_access_generic_write",
    "smb_flags_notify",
    "smb_flags_response",
    "smb_flags2_nt_error",
    "smb_flags2_sec_sig",
    "smb_mid",
    "smb_nt_status",
    "smb_pid_high",
    "smb_tid",
    "dhcp_hw_addr_padding",
    "dns_time",
    "http_content_length",
    "http_request_in",
    "http_response_code",
    "http_time",
    "tls_compress_certificate_compressed_certificate_message_length",
    "frame_encap_type",
    #"frame_len",
    #"frame_number",
    "radiotap_channel_flags_cck",
    "radiotap_channel_flags_ofdm",
    "radiotap_channel_freq",
    "radiotap_dbm_antsignal",
    "radiotap_length",
    "radiotap_vendor_oui",
    "wlan_duration",
    "wlan_fc_frag",
    "wlan_fc_order",
    "wlan_fc_moredata",
    "wlan_fc_protected",
    "wlan_fc_pwrmgt",
    "wlan_fc_type",
    "wlan_fc_retry",
    "wlan_fc_subtype",
    "wlan_fcs_bad_checksum",
    "wlan_fixed_beacon",
    "wlan_fixed_capabilities_ess",
    "wlan_fixed_capabilities_ibss",
    "wlan_fixed_reason_code",
    "wlan_radio_duration",
    "wlan_rsn_ie_gtk_key",
    "wlan_rsn_ie_igtk_key",
    "wlan_rsn_ie_pmkid",
    "wlan_tag",
    "wlan_radio_channel",
    "wlan_radio_frequency",
    "wlan_radio_signal_dbm",
    "wlan_radio_phy",
    "ip_proto",
    "ip_ttl",
    "ip_version",
    "data_len",
    "icmpv6_ni_nonce",
    "tcp_ack",
    "tcp_ack_raw",
    "tcp_analysis",
    "tcp_analysis_flags",
    "tcp_analysis_retransmission",
    "tcp_analysis_reused_ports",
    "tcp_analysis_rto_frame",
    "tcp_checksum_status",
    "tcp_flags_syn",
    "tcp_dstport",
    "tcp_flags_ack",
    "tcp_flags_fin",
    "tcp_flags_push",
    "tcp_flags_reset",
    "tcp_option_len",
    "tcp_seq",
    "tcp_seq_raw",
    "tcp_srcport",
    "udp_dstport",
    "udp_srcport",
    "udp_length",
    "smb2_write_length",
    "dns_count_add_rr",
    "dns_count_answers",
    "dns_count_auth_rr",
    "dns_count_labels",
    "dns_count_queries",
    "dns_qry_name_len",
    "dns_retransmit_request",
    "dns_retransmit_response",
    "ssh_compression_algorithms_client_to_server_length",
    "ssh_compression_algorithms_server_to_client_length",
    "ssh_direction",
    "ssh_dh_gex_max",
    "ssh_dh_gex_min",
    "ssh_dh_gex_nbits",
    "ssh_encryption_algorithms_client_to_server_length",
    "ssh_encryption_algorithms_server_to_client_length",
    "ssh_host_key_length",
    "ssh_host_key_type_length",
    "ssh_kex_algorithms_length",
    "ssh_mac_algorithms_client_to_server_length",
    "ssh_mac_algorithms_server_to_client_length",
    "ssh_packet_length",
    "ssh_padding_length",
    "ssh_padding_string",
    "ssh_server_host_key_algorithms_length",
    "tls_alert_message_desc",
    "tls_alert_message_level",
    "tls_handshake_session_ticket_length",
    "tls_record_content_type",
    "radiotap_timestamp_ts"
]

In [11]:
from pyspark.ml.feature import VectorAssembler
 
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [12]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [13]:
subSample = df.sample(fraction=1.0/100, withReplacement=False)
print('We will use',subSample.count(),' events for the grid search.')

We will use 23256  events for the grid search.


In [14]:
subSample.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|21933|
|    1| 1323|
+-----+-----+



In [15]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='label', featuresCol='features', 
                            maxBins=20, subsamplingRate=0.7,
                            minInfoGain= 1e-05,
                            featureSubsetStrategy='all',
                            minInstancesPerNode=2)

evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR')
rf_pipe = Pipeline(stages=[stringIndexer, encoder, vecAssembler,rf])





paramGrid = ParamGridBuilder()\
  .addGrid(rf.maxDepth, [10,20])\
  .addGrid(rf.numTrees, [50])\
  .build()

cv = CrossValidator(estimator=rf_pipe, 
                    evaluator=evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=2, parallelism=2)

In [16]:

bestModel = cv.fit(subSample)
 


In [17]:
print('BestModel:\n\t-maxDepth =',bestModel.bestModel.stages[-1]._java_obj.getMaxDepth())
print('\t-Feature Importance =',bestModel.bestModel.stages[-1]._java_obj.featureImportances())

BestModel:
	-maxDepth = 10
	-Feature Importance = (11582,[23,26,43,49,51,61,64,100,112,113,119,208,1033,1111,1274,1339,1412,1482,1552,1602,1607,1634,1638,1650,1700,1715,1751,1774,1775,1903,1957,2371,2541,2623,3530,3550,3797,3842,4036,4258,4287,4335,4512,4966,5674,5752,5922,5979,6478,6513,6543,6769,6903,6997,7181,7467,7536,7540,8007,8034,8147,8386,8945,8961,9351,9527,9801,11447,11449,11454,11472,11499,11530,11545,11581],[0.16393184298732014,0.10871492960936333,2.7316241387685886E-4,2.882999829125571E-4,9.858893114122325E-5,6.841744734386446E-5,5.897853008102251E-6,0.48417576824363523,0.008766637454545012,8.949721272316395E-5,0.004846827464417092,0.11818792086739371,3.028535215779918E-4,1.3373336418792128E-4,3.108000189701424E-4,4.502952494898455E-5,1.1286899419533014E-4,8.995728764312048E-6,1.9816040799179414E-4,1.6609027602291655E-4,6.355499694915865E-5,5.629175967632235E-6,2.496958109311559E-5,6.770063139228394E-6,1.5768757247151794E-4,7.635370240006153E-5,5.747295220594953E-5,3.27413

In [18]:
transformedBestModel=bestModel.transform(subSample)

In [19]:
import pandas as pd

p = pd.DataFrame(transformedBestModel.schema["features"].metadata["ml_attr"]["attrs"]["binary"]+transformedBestModel.schema["features"].metadata["ml_attr"]["attrs"]["numeric"]).sort_values("idx")

In [20]:
p.name

0             radiotap_present_tsftOHE_1-0-0
1             radiotap_present_tsftOHE_0-0-0
2             radiotap_rxflagsOHE_0x00000000
3                    wlan_analysis_kckOHE_NA
4                    wlan_analysis_kekOHE_NA
                        ...                 
11577                 tls_alert_message_desc
11578                tls_alert_message_level
11579    tls_handshake_session_ticket_length
11580                tls_record_content_type
11581                  radiotap_timestamp_ts
Name: name, Length: 11582, dtype: object

In [21]:

from itertools import chain
attrs = sorted(
    (attr["idx"], attr["name"])
    for attr in (
        chain(*transformedBestModel.schema["features"].metadata["ml_attr"]["attrs"].values())
    )
) 

In [22]:
[
    (name,  bestModel.bestModel.stages[-1].featureImportances[idx])
    for idx, name in attrs
    if bestModel.bestModel.stages[-1].featureImportances[idx]
]

[('wlan_daOHE_24:f5:a2:ea:86:c3', 0.16393184298732014),
 ('wlan_daOHE_00:0c:29:cf:08:aa', 0.10871492960936333),
 ('wlan_daOHE_00:0c:29:cf:08:aa-00:0c:29:cf:08:aa', 0.00027316241387685886),
 ('wlan_daOHE_50:3e:aa:e3:1f:be-50:3e:aa:e3:1f:be-50:3e:aa:e3:1f:be',
  0.0002882999829125571),
 ('wlan_daOHE_a4:b1:c1:91:4c:72-a4:b1:c1:91:4c:72-a4:b1:c1:91:4c:72',
  9.858893114122325e-05),
 ('wlan_daOHE_24:f5:a2:ea:86:c3-24:f5:a2:ea:86:c3-24:f5:a2:ea:86:c3',
  6.841744734386446e-05),
 ('wlan_daOHE_50:3e:aa:e4:01:93-50:3e:aa:e4:01:93-50:3e:aa:e4:01:93',
  5.897853008102251e-06),
 ('wlan_saOHE_00:0c:29:cf:08:aa', 0.48417576824363523),
 ('wlan_saOHE_24:f5:a2:ea:86:c3', 0.008766637454545012),
 ('wlan_saOHE_94:e9:79:82:c5:77-94:e9:79:82:c5:77-94:e9:79:82:c5:77',
  8.949721272316395e-05),
 ('wlan_saOHE_00:0c:29:cf:08:aa-00:0c:29:cf:08:aa', 0.004846827464417092),
 ('ip_dstOHE_192.168.2.130', 0.11818792086739371),
 ('data_dataOHE_NA', 0.0003028535215779918),
 ('data_dataOHE_862dd6f326b9c3c0406a7b95a91d376

In [23]:
len(bestModel.bestModel.stages[-1].featureImportances)

11582

The test on the subsample tells us which are important dimensions that we can then use for out model. The lengh output shows that even with the subsample the one hot encoder makes adds an order of magnitude of dimensions

In [13]:
train, test = df.randomSplit([0.8,0.2])

In [14]:
print('Events for the training {}'.format(train.count()))
print('Events for the validation {}'.format(test.count()))

Events for the training 1851077
Events for the validation 461683


In [15]:
importantNumerical=[
   
]

In [16]:
importantCategorical=[
    'wlan_da',
    'ip_dst',
    'wlan_sa'
]

In [17]:
ImportantstringIndexer = StringIndexer(inputCols=importantCategorical, outputCols=[x + "index" for x in importantCategorical]).setHandleInvalid("keep") 
Importantencoder = OneHotEncoder(inputCols=ImportantstringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in importantCategorical]) 

In [18]:
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
ImportantAssemblerInputs = [c + "OHE" for c in importantCategorical] + importantNumerical
ImportantvecAssembler = VectorAssembler(inputCols=ImportantAssemblerInputs, outputCol="importantfeatures")

In [19]:


rf = RandomForestClassifier(labelCol='label', featuresCol='importantfeatures', numTrees=50)

In [20]:

 
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[ImportantstringIndexer, Importantencoder, ImportantvecAssembler,rf])
 

 


In [21]:
# Define the pipeline model.
pipelineModel = pipeline.fit(train)

In [22]:
# Apply the pipeline model to the test dataset.
pred = pipelineModel.transform(test)

In [23]:
pred.show(1)

+----------------+---------+------------+--------------------+----------------+--------------------------+------------------+-------------------+--------------------------+---------------------------+---------------------+-----------------+----------------------+---------------+----------------+---------------------+----------------+---------------------+-------------------+-------------+-----------------+-----------------+----------+---------------------+----------------------+-------+----------+------------+-------------+----------------+-----------------+--------------+------------+-------------+---------------+---------------------+-----------------+---------------------------+----------------------------+----------------------+--------------------+-----------------+-------------------+-------------------+--------------------+-----------------+-------+--------+---------+-------+--------+---------------+------------------+--------------------+------------------+--------------------+

In [24]:
pred_pd = pred.select(['label', 'prediction', 'probability']).toPandas()
pred_pd.head()

Unnamed: 0,label,prediction,probability
0,0,0.0,"[0.9497419472602844, 0.05025805273971564]"
1,0,0.0,"[0.9497419472602844, 0.05025805273971564]"
2,0,0.0,"[0.9497419472602844, 0.05025805273971564]"
3,0,0.0,"[0.9497419472602844, 0.05025805273971564]"
4,0,0.0,"[0.9497419472602844, 0.05025805273971564]"


In [25]:
from sklearn.metrics import confusion_matrix

confusion_matrix(pred_pd['label'], pred_pd['prediction'])
tn, fp, fn, tp = confusion_matrix(pred_pd['label'], pred_pd['prediction']).ravel()

In [26]:
print("There are "+str(tn)+ " true negatives")
print("There are "+str(tp)+ " true positives")
print("There are "+str(fn)+ " false negatives")
print("There are "+str(fp)+ " false positives")

There are 435428 true negatives
There are 0 true positives
There are 26255 false negatives
There are 0 false positives


In [27]:
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import matthews_corrcoef

precision,recall,fscore,support = precision_recall_fscore_support(pred_pd['label'], pred_pd['prediction'], average='binary')
matthews_corrcoef = matthews_corrcoef(pred_pd['label'], pred_pd['prediction'])
print("The precision is "+str(precision))
print("The recall is "+str(recall))
print("The fscore is "+str(fscore))
print("The Matthews correlation coefficient is "+str(matthews_corrcoef))

The precision is 0.0
The recall is 0.0
The fscore is 0.0
The Matthews correlation coefficient is 0.0


  _warn_prf(average, modifier, msg_start, len(result))


In [28]:
pred_pd.loc[pred_pd['label'] == 1]

Unnamed: 0,label,prediction,probability
32870,1,0.0,"[0.8477088182382644, 0.15229118176173556]"
34112,1,0.0,"[0.9047288384737716, 0.09527116152622837]"
34425,1,0.0,"[0.8477088182382644, 0.15229118176173556]"
34441,1,0.0,"[0.8477088182382644, 0.15229118176173556]"
34516,1,0.0,"[0.8477088182382644, 0.15229118176173556]"
...,...,...,...
461673,1,0.0,"[0.9047288384737716, 0.09527116152622837]"
461674,1,0.0,"[0.9047288384737716, 0.09527116152622837]"
461678,1,0.0,"[0.8723497726574259, 0.1276502273425741]"
461681,1,0.0,"[0.8573030642383795, 0.1426969357616204]"


The model has classified everything as negative so needs to be adjusted