![Cloud-First](https://github.com/tulip-lab/sit742/blob/develop/Jupyter/image/CloudFirst.png?raw=1)


# SIT742: Modern Data Science
**(Module: Big Data Manipulation)**

---
- Materials in this module include resources collected from various open-source online repositories.
- You are free to use, change and distribute this package.
- If you found any issue/bug for this document, please submit an issue at [tulip-lab/sit742](https://github.com/tulip-lab/sit742/issues)


Prepared by **SIT742 Teaching Team**

---


## Session 4G: SparkSQL and Data Understanding
---

### Table of Content

Part A: SparkSQL

1. Loading in a DataFrame

2. Creating SQLContext

3. Creating DataFrame

4. Grouping Aggregation

5. Running SQL Queries

Part B: SparkSQL Application

6.  Getting the data and creating the RDD

7. Getting a Data Frame

8. Queries as DataFrame operations


---

## Introduction ##

This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of *Data Frame* and using *SQL language* to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis.

In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.      

This lab session will assume that you have uploaded two data files into cloud, and note down the address:
- mtcars.csv
- kddcup_data_10_percent-d8e1d.gz


# Part A: SparkSQL

### 1.Loading in a DataFrame
To create a Spark DataFrame we load an external DataFrame, called `mtcars`. This DataFrame includes 32 observations on 11 variables.

[, 1]	mpg	Miles/(US) --> gallon  
[, 2]	cyl	--> Number of cylinders  
[, 3]	disp	--> Displacement (cu.in.)  
[, 4]	hp -->	Gross horsepower  
[, 5]	drat -->	Rear axle ratio  
[, 6]	wt -->	Weight (lb/1000)  
[, 7]	qsec -->	1/4 mile time  
[, 8]	vs -->	V/S  
[, 9]	am -->	Transmission (0 = automatic, 1 = manual)  
[,10]	gear -->	Number of forward gears  
[,11]	carb -->	Number of carburetors  

In [1]:
!pip install wget

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=e6483974e783f51fbe37f6bf6e097f0d5d9fd99b93020bdcdd5617657e11641f
  Stored in directory: /root/.cache/pip/wheels/01/46/3b/e29ffbe4ebe614ff224bad40fc6a5773a67a163251585a13a9
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2


In [2]:
import wget

link_to_data = 'https://raw.githubusercontent.com/tulip-lab/sit742/refs/heads/develop/Jupyter/data/mtcars.csv'
DataSet = wget.download(link_to_data)

In [3]:
import pandas as pd
mtcars = pd.read_csv('mtcars.csv')

In [4]:
mtcars.head()

Unnamed: 0,car,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


### 2.Initialize SQLContext
To work with dataframes we need a SQLContext which is created using `SQLContext(sc)`. SQLContext uses SparkContext which was the main entry point for Spark functionality, named `sc`.

A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

In [5]:
#update local version of the package catalog
!apt-get update
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark 3.3.3
# !wget -q https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
# # unzip it
# !tar xf spark-3.3.3-bin-hadoop3.tgz
# install findspark
!pip install -q findspark

import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

import findspark
findspark.init()

0% [Working]            Hit:1 https://cli.github.com/packages stable InRelease
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connected to cloud                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,628 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:6 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,940 kB]
Hit:11 https://ppa.launchpadcontent.n

In [6]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

#The getOrCreate function for the SparkContext may be used to get or instantiate a SparkContext and register it as a singleton object.
sc = SparkContext.getOrCreate()

#SQLContext is the entry point for working with structured data (rows and columns) in Spark, in Spark 1.x.
#As of Spark 2.0, this is replaced by SparkSession.
#However, we are keeping the class here for backward compatibility.
#A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files
sqlContext = SQLContext(sc)



### 3.Creating Spark DataFrames
With SQLContext and a loaded local DataFrame, we create a Spark DataFrame:

In [7]:
#Creates a DataFrame from an RDD, a list or a pandas.DataFrame
sdf = sqlContext.createDataFrame(mtcars.to_dict('records'))
#Prints out the schema in the tree format.
sdf.printSchema()

root
 |-- am: long (nullable = true)
 |-- car: string (nullable = true)
 |-- carb: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- drat: double (nullable = true)
 |-- gear: long (nullable = true)
 |-- hp: long (nullable = true)
 |-- mpg: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- wt: double (nullable = true)



You can also directly load this csv file into a Spark DataFrame.

In [8]:
#Sample 1
#Returns a DataFrameReader that can be used to read data in as a DataFrame.
sdf2 = sqlContext.read.csv("mtcars.csv")
print(sdf2)


#Sample 2 to define a specific format
#Specifies the input data source format.
sdf3=sqlContext.read.format('csv').load('mtcars.csv')
print(sdf3)

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]
DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]


#### (3a) Displays the content of the DataFrame


In [9]:
sdf.show(5)

+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+
| am|              car|carb|cyl| disp|drat|gear| hp| mpg| qsec| vs|   wt|
+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+
|  1|        Mazda RX4|   4|  6|160.0| 3.9|   4|110|21.0|16.46|  0| 2.62|
|  1|    Mazda RX4 Wag|   4|  6|160.0| 3.9|   4|110|21.0|17.02|  0|2.875|
|  1|       Datsun 710|   1|  4|108.0|3.85|   4| 93|22.8|18.61|  1| 2.32|
|  0|   Hornet 4 Drive|   1|  6|258.0|3.08|   3|110|21.4|19.44|  1|3.215|
|  0|Hornet Sportabout|   2|  8|360.0|3.15|   3|175|18.7|17.02|  0| 3.44|
+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+
only showing top 5 rows



#### (3b) Selecting columns

In [10]:
sdf.select('mpg').show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows



#### (3c)  Filtering Data
Filter the DataFrame to only retain rows with `mpg` less than 18

In [11]:
sdf.filter(sdf['mpg'] < 18).show(5)

+---+-----------+----+---+-----+----+----+---+----+-----+---+----+
| am|        car|carb|cyl| disp|drat|gear| hp| mpg| qsec| vs|  wt|
+---+-----------+----+---+-----+----+----+---+----+-----+---+----+
|  0| Duster 360|   4|  8|360.0|3.21|   3|245|14.3|15.84|  0|3.57|
|  0|  Merc 280C|   4|  6|167.6|3.92|   4|123|17.8| 18.9|  1|3.44|
|  0| Merc 450SE|   3|  8|275.8|3.07|   3|180|16.4| 17.4|  0|4.07|
|  0| Merc 450SL|   3|  8|275.8|3.07|   3|180|17.3| 17.6|  0|3.73|
|  0|Merc 450SLC|   3|  8|275.8|3.07|   3|180|15.2| 18.0|  0|3.78|
+---+-----------+----+---+-----+----+----+---+----+-----+---+----+
only showing top 5 rows



#### (3d)  Operating on Columns
Spark also provides a number of functions that can directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions to convert lb to metric ton.

In [12]:
# A new clomun name 'wtTon' is created and its value equals 'wt' * 0.45
sdf.withColumn('wtTon', sdf['wt'] * 0.45).show(6)

+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+-------+
| am|              car|carb|cyl| disp|drat|gear| hp| mpg| qsec| vs|   wt|  wtTon|
+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+-------+
|  1|        Mazda RX4|   4|  6|160.0| 3.9|   4|110|21.0|16.46|  0| 2.62|  1.179|
|  1|    Mazda RX4 Wag|   4|  6|160.0| 3.9|   4|110|21.0|17.02|  0|2.875|1.29375|
|  1|       Datsun 710|   1|  4|108.0|3.85|   4| 93|22.8|18.61|  1| 2.32|  1.044|
|  0|   Hornet 4 Drive|   1|  6|258.0|3.08|   3|110|21.4|19.44|  1|3.215|1.44675|
|  0|Hornet Sportabout|   2|  8|360.0|3.15|   3|175|18.7|17.02|  0| 3.44|  1.548|
|  0|          Valiant|   1|  6|225.0|2.76|   3|105|18.1|20.22|  1| 3.46|  1.557|
+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+-------+
only showing top 6 rows



In [13]:
sdf.show(6)

+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+
| am|              car|carb|cyl| disp|drat|gear| hp| mpg| qsec| vs|   wt|
+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+
|  1|        Mazda RX4|   4|  6|160.0| 3.9|   4|110|21.0|16.46|  0| 2.62|
|  1|    Mazda RX4 Wag|   4|  6|160.0| 3.9|   4|110|21.0|17.02|  0|2.875|
|  1|       Datsun 710|   1|  4|108.0|3.85|   4| 93|22.8|18.61|  1| 2.32|
|  0|   Hornet 4 Drive|   1|  6|258.0|3.08|   3|110|21.4|19.44|  1|3.215|
|  0|Hornet Sportabout|   2|  8|360.0|3.15|   3|175|18.7|17.02|  0| 3.44|
|  0|          Valiant|   1|  6|225.0|2.76|   3|105|18.1|20.22|  1| 3.46|
+---+-----------------+----+---+-----+----+----+---+----+-----+---+-----+
only showing top 6 rows



###4.Grouping, Aggregation
Spark DataFrames support a number of commonly used functions to aggregate data after grouping. For example we can compute the average weight of cars by their cylinders as shown below:

In [14]:
sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)

#It also equals the below line:
sdf.groupby(['cyl']).agg({"wt": "AVG"}).show(5)

+---+-----------------+
|cyl|          avg(wt)|
+---+-----------------+
|  6|3.117142857142857|
|  8|3.999214285714286|
|  4|2.285727272727273|
+---+-----------------+

+---+-----------------+
|cyl|          avg(wt)|
+---+-----------------+
|  6|3.117142857142857|
|  8|3.999214285714286|
|  4|2.285727272727273|
+---+-----------------+



In [15]:
# We can also sort the output from the aggregation to get the most common cars
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)

+---+---------+
|cyl|count(wt)|
+---+---------+
|  8|       14|
|  4|       11|
|  6|        7|
+---+---------+



### 5.Running SQL Queries from Spark DataFrames
A Spark DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data. The `sql` function enables applications to run SQL queries programmatically and returns the result as a DataFrame.



In [16]:
# Register this DataFrame as a table.
sdf.registerTempTable("cars")

# SQL statements can be run by using the sql method
highgearcars = sqlContext.sql("SELECT gear FROM cars WHERE cyl >= 4 AND cyl <= 9")
highgearcars.show(6)




+----+
|gear|
+----+
|   4|
|   4|
|   4|
|   3|
|   3|
|   3|
+----+
only showing top 6 rows



## Part B: SparkSQL Application

### 6.Getting the data and creating the RDD

As we did in previous notebooks, we will use the reduced dataset (10 percent) provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million network interactions. The file is provided as a Gzip file that we will download from the GitHub.

You can also use the urllib to download the orginal Gzip file as below codes:
```
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
```

In [17]:
import wget
link_to_data = 'https://github.com/tulip-lab/sit742/raw/develop/Jupyter/data/kddcup.gz'
DataSet = wget.download(link_to_data, out='kdd.gz')

In [18]:
!ls -l
!pwd

total 2104
-rw-r--r-- 1 root root 2144903 Aug 27 11:53 kdd.gz
-rw-r--r-- 1 root root    1697 Aug 27 11:51 mtcars.csv
drwxr-xr-x 1 root root    4096 Aug 22 13:37 sample_data
/content


In [19]:
data_file = "kdd.gz"

#textFile is used to read a text file from HDFS,
#a local file system (available on all nodes),
#or any Hadoop-supported file system URI, and return it as an RDD of Strings.
#In here, we assume that the file "kdd.gz" haved been in the HDFS
raw_data = sc.textFile(data_file).cache()

### 7.Getting a Data Frame

A Spark `DataFrame` is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

The entry point into all SQL functionality in Spark is the `SQLContext` class. To create a basic instance, all we need is a `SparkContext` reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object `sc` for this purpose.    

In [20]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)



#### (7a) Inferring the schema

With a `SQLContext`, we are ready to create a `DataFrame` from our existing RDD. But first we need to tell Spark SQL the schema in our data.   

Spark SQL can convert an RDD of `Row` objects to a `DataFrame`. Rows are constructed by passing a list of key/value pairs as *kwargs* to the `Row` class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the [column names](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names).  

In [21]:
from pyspark.sql import Row


#Map is used to return an iterator that applies function to every item of iterable, yielding the results.
#If additional iterable arguments are passed, function must take that many arguments and is applied to the items from all iterables in parallel.
#With multiple iterables, the iterator stops when the shortest iterable is exhausted.
csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]),
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

In [22]:
help(sqlContext)

Help on SQLContext in module pyspark.sql.context object:

class SQLContext(builtins.object)
 |  SQLContext(sparkContext: pyspark.context.SparkContext, sparkSession: Optional[pyspark.sql.session.SparkSession] = None, jsqlContext: Optional[py4j.java_gateway.JavaObject] = None)
 |
 |  The entry point for working with structured data (rows and columns) in Spark, in Spark 1.x.
 |
 |  As of Spark 2.0, this is replaced by :class:`SparkSession`. However, we are keeping the class
 |  here for backward compatibility.
 |
 |  A SQLContext can be used to create :class:`DataFrame`, register :class:`DataFrame` as
 |  tables, execute SQL over tables, cache tables, and read parquet files.
 |
 |  .. deprecated:: 3.0.0
 |      Use :func:`SparkSession.builder.getOrCreate()` instead.
 |
 |  Parameters
 |  ----------
 |  sparkContext : :class:`SparkContext`
 |      The :class:`SparkContext` backing this SQLContext.
 |  sparkSession : :class:`SparkSession`
 |      The :class:`SparkSession` around which this 

Once we have our RDD of `Row` we can infer and register the schema.  

In [23]:
#Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
interactions_df = sqlContext.createDataFrame(row_data)

In [24]:
#Registers this RDD as a temporary table using the given name.
interactions_df.registerTempTable("interactions")

Now we can run SQL queries over our data frame that has been registered as a table.  

In [25]:
# Select tcp network interactions with more than 1 second duration and no transfer from destination
# The sqlContext is uesd to returns a DataFrame representing the result of the given query.
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



The results of SQL queries are RDDs and support all the normal RDD operations.  

In [26]:
# Output duration together with dst_bytes
# Return a new RDD by applying a function to each element of this RDD.
tcp_interactions_out = tcp_interactions.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
    print(ti_out)

Duration: 5057, Dest. bytes: 0
Duration: 5059, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5041, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5064, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5049, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5048, Dest. bytes: 0
Duration: 5047, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5068, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5052, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5054, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5058, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5032, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5040, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5066, Dest. bytes: 0
Duration

We can easily have a look at our data frame schema using `printSchema`.  

In [27]:
interactions_df.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)



### 8.Queries as `DataFrame` operations

Spark `DataFrame` provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.  

In [28]:
from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+

Query performed in 6.575 seconds


Now imagine that we want to count how many interactions last more than 1 second, with no data transfer from destination, grouped by protocol type. We can just add to filter calls to the previous.   

In [29]:
t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

+-------------+-----+
|protocol_type|count|
+-------------+-----+
|          tcp|  139|
+-------------+-----+

Query performed in 5.57 seconds


We can use this to perform some [exploratory data analysis](http://en.wikipedia.org/wiki/Exploratory_data_analysis). Let's count how many attack and normal interactions we have. First we need to add the label column to our data.    

In [30]:
def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"

row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]),
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)

This time we don't need to register the schema since we are going to use the OO query interface.  

Let's check the previous actually works by counting attack and normal data in our data frame.  

In [31]:
t0 = time()
interactions_labeled_df.select("label").groupBy("label").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+

Query performed in 5.003 seconds


Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.  

In [32]:
t0 = time()
interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|normal|          udp| 19177|
|normal|         icmp|  1288|
|normal|          tcp| 76813|
|attack|         icmp|282314|
|attack|          tcp|113252|
|attack|          udp|  1177|
+------+-------------+------+

Query performed in 5.633 seconds


At first sight it seems that *udp* interactions are in lower proportion between network attacks versus other protocol types.  

And we can do much more sofisticated groupings. For example, add to the previous a "split" based on data transfer from target.   

In [33]:
t0 = time()
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

+------+-------------+---------------+------+
| label|protocol_type|(dst_bytes = 0)| count|
+------+-------------+---------------+------+
|normal|          udp|          false| 15583|
|attack|          udp|          false|    11|
|attack|          tcp|           true|110583|
|normal|          tcp|          false| 67500|
|attack|         icmp|           true|282314|
|attack|          tcp|          false|  2669|
|normal|          tcp|           true|  9313|
|normal|          udp|           true|  3594|
|normal|         icmp|           true|  1288|
|attack|          udp|           true|  1166|
+------+-------------+---------------+------+

Query performed in 5.725 seconds


We see how relevant is this new split to determine if a network interaction is an attack.  

We will stop here, but we can see how powerfull this type of queries are in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's `DataFrame` operations and data sources, have a look at the oficial documentation [here](https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations).    