# Set operations on RDDs

**NOTE: This notebook is worth 10% of the grade of project 2.**

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Spark supports many of the operations we have in mathematical sets, such as union and intersection, even when the RDDs themselves are not properly sets. It is important to note that these operations require that the RDDs being operated on are of the same type.  

Set operations are quite straightforward to understand as it work as expected. The only consideration comes from the fact that RDDs are not real sets, and therefore operations such as the union of RDDs doesn't remove duplicates. In this notebook we will have a brief look at `subtract`, `distinct`, and `cartesian`.       

## Getting the data and creating the RDD

In this notebook we will use the reduced dataset (1 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a *Gzip* file in the local directory.  

In [1]:
import os
import gzip
data_file = os.getcwd() + "/../kddcup.data_1_percent.gz"
# TODO: read the textFile from 'data_file' into 'raw_data'. Hint: use sc.textFile(<local path>) 
with gzip.open(data_file, "rt") as f:
    raw_data = [row.strip() for row in f.readlines()]

## Getting attack interactions using `subtract`

For illustrative purposes, imagine we already have our RDD with non attack (normal) interactions from some previous analysis.   

In [2]:
normal_raw_data = [row for row in raw_data if "normal." in row]



We can obtain attack interactions by subtracting normal ones from the original unfiltered RDD as follows.  

In [3]:
# TODO: generate a RDD `attack_raw_data` as `raw_data` subtraced by `normal_raw_data`
attack_raw_data = [row for row in raw_data if row not in normal_raw_data]
print(type(attack_raw_data))
print("The correct answer should be a RDD object")

<class 'list'>
The correct answer should be a RDD object


Let's do some counts to check our results.  

In [4]:
from time import time

# count all
t0 = time()
raw_data_count = len(raw_data)
tt = time() - t0
print("All count in {} secs".format(round(tt,3)))
print ("Len raw_data: {}".format(raw_data_count))

All count in 0.0 secs
Len raw_data: 49402


In [5]:
# count normal
t0 = time()
normal_raw_data_count = len(normal_raw_data)
tt = time() - t0
print("Normal count in {} secs".format(round(tt,3)))

Normal count in 0.0 secs


In [6]:
# count attacks
t0 = time()
attack_raw_data_count = len(attack_raw_data)
tt = time() - t0
print("Attack count in {} secs".format(round(tt,3)))

Attack count in 0.0 secs


In [7]:
print("There are {} normal interactions and {} attacks, from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count))


There are 9641 normal interactions and 39761 attacks, from a total of 49402 interactions


So now we have two RDDs, one with normal interactions and another one with attacks.  

## Protocol and service combinations using `cartesian`

We can compute the Cartesian product between two RDDs by using the `cartesian` transformation. It returns all possible pairs of elements between two RDDs. In our case we will use it to generate all the possible combinations between service and protocol in our network interactions.  

First of all we need to isolate each collection of values in two separate RDDs. For that we will use `distinct` on the CSV-parsed dataset. From the [dataset description](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names) we know that protocol is the second column and service is the third (tag is the last one and not the first as appears in the page).   

So first, let's get the protocols.  

In [8]:
csv_data = [row.split(",") for row in raw_data]
# TODO: generate a new RDD 'protocols' as a collection of *distinct* protocol names
# HINT: protocol is the 2nd value of each row

protocols = list(set([row[1] for row in csv_data]))
protocols

['icmp', 'tcp', 'udp']

Now we do the same for services.  

In [9]:
# TODO: generate a new RDD 'services' as a collection of *distinct* service names
# HINT: protocol is the 3rd value of each row
services = list(set([row[2] for row in csv_data]))
print( len(services))
services


61


['urh_i',
 'hostnames',
 'vmnet',
 'ftp_data',
 'gopher',
 'IRC',
 'daytime',
 'smtp',
 'printer',
 'remote_job',
 'sunrpc',
 'link',
 'bgp',
 'nnsp',
 'private',
 'courier',
 'rje',
 'mtp',
 'klogin',
 'urp_i',
 'http',
 'ldap',
 'csnet_ns',
 'sql_net',
 'systat',
 'auth',
 'iso_tsap',
 'eco_i',
 'finger',
 'pop_3',
 'uucp',
 'pop_2',
 'ecr_i',
 'ftp',
 'netbios_ssn',
 'ntp_u',
 'echo',
 'domain_u',
 'netbios_ns',
 'login',
 'netbios_dgm',
 'time',
 'discard',
 'kshell',
 'netstat',
 'Z39_50',
 'ssh',
 'supdup',
 'other',
 'name',
 'nntp',
 'http_443',
 'telnet',
 'efs',
 'uucp_path',
 'exec',
 'whois',
 'imap4',
 'ctf',
 'domain',
 'shell']

A longer list in this case.

Now we can do the cartesian product.  

In [11]:
product = [(protocol, service) for protocol in protocols for service in services]
print("There are {} combinations of protocol X service".format(len(product)))

There are 183 combinations of protocol X service


Obviously, for such small RDDs doesn't really make sense to use Spark cartesian product. We could have perfectly collected the values after using `distinct` and do the cartesian product locally. Moreover, `distinct` and `cartesian` are expensive operations so they must be used with care when the operating datasets are large.    