In [3]:
from pyspark import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import socket

Apache Spark achieves high performance for both batch and streaming data, using a state-of-the-art DAG scheduler, a query optimizer, and a physical execution engine. 

# Spark Installation
1. Download and install spark here https://spark.apache.org/downloads.html (Can also be installed using ```brew install spark``` on a mac)
2. ```pip install pyspark```

Note: These installations assume you have jupyter notebooks with a python3 kernel already installed

# Spark Context
> Spark context sets up internal services and establishes a connection to a Spark execution environment. Once a SparkContext is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs (until SparkContext is stopped). A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application (don’t get confused with the other meaning of Master in Spark, though).

![](../images/sparkcontext-services.png)

Source: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-SparkContext.html

# spark DAG
> DAG stands for Directed Acyclic Graph. Every time you call .compute() (or in this demo's examples, .show()) the DAG is executed where certain processes are parallelized automatically and completed by separate workers. The spark manager is able to combine the individual task's results into your final computation. 

![](../images/spark_dag.png)


In [4]:
sc = SparkContext()
spark = SparkSession(sc)

In [5]:
# Replace "dns.log.csv" with the path to your intended csv
df = spark.read.csv("lab2-SIEMS/dns.log.csv", inferSchema = True, header = True)

In [4]:
df.printSchema()

root
 |-- ts: double (nullable = true)
 |-- uid: string (nullable = true)
 |-- id_orig_h: string (nullable = true)
 |-- id_orig_p: integer (nullable = true)
 |-- id_resp_h: string (nullable = true)
 |-- id_resp_p: integer (nullable = true)
 |-- proto: string (nullable = true)
 |-- trans_id: integer (nullable = true)
 |-- rtt: string (nullable = true)
 |-- query: string (nullable = true)
 |-- qclass: string (nullable = true)
 |-- qclass_name: string (nullable = true)
 |-- qtype: string (nullable = true)
 |-- qtype_name: string (nullable = true)
 |-- rcode: string (nullable = true)
 |-- rcode_name: string (nullable = true)
 |-- AA: string (nullable = true)
 |-- TC: string (nullable = true)
 |-- RD: string (nullable = true)
 |-- RA: string (nullable = true)
 |-- Z: integer (nullable = true)
 |-- answers: string (nullable = true)
 |-- TTLs: string (nullable = true)
 |-- rejected: string (nullable = true)



In [5]:
df.show()

+-------------+------------------+--------------------+---------+-----------------+---------+-----+--------+--------+----------------+------+-----------+-----+----------+-----+----------+---+---+---+---+---+---------------+-----------+--------+
|           ts|               uid|           id_orig_h|id_orig_p|        id_resp_h|id_resp_p|proto|trans_id|     rtt|           query|qclass|qclass_name|qtype|qtype_name|rcode|rcode_name| AA| TC| RD| RA|  Z|        answers|       TTLs|rejected|
+-------------+------------------+--------------------+---------+-----------------+---------+-----+--------+--------+----------------+------+-----------+-----+----------+-----+----------+---+---+---+---+---+---------------+-----------+--------+
|100026.067944|C9i83h2JapmUOWew67|fd77:48a1:c0bb:0:...|    65060|fd77:48a1:c0bb::1|       53|  udp|   18926|0.000480|am.super1024.com|     1| C_INTERNET|    1|         A|    0|   NOERROR|  F|  F|  T|  T|  0|107.170.193.108| 705.000000|    null|
|100146.080567|CYx4P

In [6]:
df.count()

8655

In [7]:
df.describe('ts').show()

+-------+------------------+
|summary|                ts|
+-------+------------------+
|  count|              8655|
|   mean|139138.87804777155|
| stddev|   98652.144442041|
|    min|         22.382279|
|    max|     435918.340205|
+-------+------------------+



## Filtering by a condition

In [8]:
df.filter(df.trans_id==0).show()

+-------------+------------------+--------------------+---------+-----------+---------+-----+--------+--------+--------------------+------+-----------+-----+----------+-----+----------+---+---+---+---+---+--------------------+--------------------+--------+
|           ts|               uid|           id_orig_h|id_orig_p|  id_resp_h|id_resp_p|proto|trans_id|     rtt|               query|qclass|qclass_name|qtype|qtype_name|rcode|rcode_name| AA| TC| RD| RA|  Z|             answers|                TTLs|rejected|
+-------------+------------------+--------------------+---------+-----------+---------+-----+--------+--------+--------------------+------+-----------+-----+----------+-----+----------+---+---+---+---+---+--------------------+--------------------+--------+
| 10073.942632|C6u43D1XRozgEZwEj9|       192.168.1.215|     5353|224.0.0.251|     5353|  udp|       0|       -|_googlecast._tcp....|     -|          -|    -|         -|    0|   NOERROR|  T|  F|  F|  F|  0|chromecast-11ddab...|0.0

## Perform groupby and Count

In [9]:
country_df = df.groupby("query")\
.count()\
.show()

+---------------+-----+
|          query|count|
+---------------+-----+
|     NEFACT.COM|    3|
|    AIRASHII.NL|    3|
|TOMOMI-ASKA.COM|    3|
|  RETROAPPS.COM|    3|
|     RMGPLC.COM|    3|
|  BAWALOUNGE.AT|    3|
|   ASTURDOG.COM|    3|
|ROYTHURESON.COM|    3|
|   VIRTULENZ.DE|    3|
|    MTN-INC.COM|    3|
|AAVANTINDIA.COM|    3|
|ONESTOPCREW.COM|    3|
|OURBESTLIFE.ORG|    3|
|      ZXC.CO.KR|    3|
|MAURINESAPP.COM|    3|
|  SPORTIVPIT.RU|    3|
|RADIOCALACA.COM|    3|
| MERCURYMAX.COM|    3|
|      CHSPB.ORG|    3|
|      WENICE.IT|    3|
+---------------+-----+
only showing top 20 rows



## Mapping over rows

We can define a function, and then apply that function to every element in a column. An example is shown below applying the function `valid_ip` to the column `answers` and naming the resulting column `valid_ip`. We then display the new column `valid_ip` as the output.

In [13]:
def valid_ip(address):
    try: 
        socket.inet_aton(address)
        return address
    except:
        return 'INVALID_IP'

df.withColumn("valid_ip", udf(valid_ip)("answers"))\
.select('valid_ip').collect()

[Row(valid_ip='107.170.193.108'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='131.107.255.255'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='107.170.193.108'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='INVALID_IP'),
 Row(valid_ip='IN

## Registering as a Table

In [11]:
df.registerTempTable('demo_table')

In [12]:
spark.sql('select distinct(answers) from demo_table').show()

+--------------------+
|             answers|
+--------------------+
|prga-005096._comp...|
|     107.170.193.108|
|prga-005096._rfb....|
|www.msftncsi.com....|
|TXT 35 id=11ddab1...|
|11ddab1c-3b74-5e4...|
|         147.75.40.2|
|www.msftncsi.com....|
|TXT 50 LKDC:SHA1....|
|TXT 50 LKDC:SHA1....|
|TXT 50 LKDC:SHA1....|
|TXT 0 ,_rfb._tcp....|
|chromecast-11ddab...|
|   fd3e:4f5a:5b81::1|
|chromecast-11ddab...|
|TXT 50 LKDC:SHA1....|
|chromecast-11ddab...|
|                   -|
|TXT 50 LKDC:SHA1....|
|TXT 0 ,_rfb._tcp....|
+--------------------+
only showing top 20 rows

