<center>
<a href="http://www.insa-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo-insa.jpg" style="float:left; max-width: 120px; display: inline" alt="INSA"/></a> 
<a href="http://wikistat.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/wikistat.jpg" style="max-width: 250px; display: inline"  alt="Wikistat"/></a>
<a href="http://www.math.univ-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo_imt.jpg" style="float:right; max-width: 200px; display: inline" alt="IMT"/> </a>
</center>

# IA Framework.
## Lab 1  - Introduction to Pyspark.
#### Part 3: *DataFrame*   <a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo-trademark.png" style="max-width: 100px; display: inline" alt="Spark"/> </a> [SQL](http://spark.apache.org/sql/)

**Résume**: This notebook continue the introduction to [Spark](https://spark.apache.org/) trough  [`PySpark`](http://spark.apache.org/docs/latest/api/python/) API. In this notebook we will learn to manipulate ***Spark's Dataframe***, which will be the standard class and will replace RDD for Spark 3.0 release. 

## Dataset

We will continue to used [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) dataset in this TP. 

In this TP we will also used the names of the columns as the `Dataframe` class will be able to handle it.

In [1]:
DATA_PATH="" 
import urllib.request
urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names",DATA_PATH+"kddcup.names")

('kddcup.names', <http.client.HTTPMessage at 0x7f0126f459d0>)

We read the file of column's names and create a list of those names.

In [2]:
file_names = open(DATA_PATH+"kddcup.names","r").readlines()
col_names = [k.split(":")[0] for k in file_names[1:]]+["interactions"]


##  *Spark SQL*


*Spark SQL* is a Spark library which allows to manipulate structured data, while RDD does not. Indeed Spark will see no differences between an RDD of list or an RDD of dictionaries. 

With *Spark SQL* the data are organised according to a schema. A schema will define the type of variable that will enable faster computation.

They are various way to build and manipulate those structured object :SQL request, `Dataset` API  (only for java and scala) or `pyspark`' `Dataframe` that will be used all along this TP. 

## Context

We have seen in the first notebook that Spark needs a context to be used :

In [3]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In order to manipulate `Dataframe` we will need to define a new context : the `SQLContext`.

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x7f0124805110>

## *DataFrame* construction

A Spark's `Dataframe` is a collection of distributed data organised by column.

It is similar to R's *Dataframe* or Python pandas' *Dataframe*.They can be build from various sources such as  : *Hive*, *json*, *xml*, *parquet*, *cassandra*... 

We will see how to build DataFrame from `RDD`, `.csv` and  `pandas`' *DataFrame*.


### From  a RDD

We first create the RDD.

In [5]:
data_file = DATA_PATH+"kddcup.data_10_percent.gz"
string_rdd = sc.textFile(data_file)
list_rdd = string_rdd.map(lambda l: l.split(","))

#### Specifying the Schema

We will first specify the schema in the RDD before we convert it into a `DataFrame`.  The schema is defined as a `StructType` object composed of `StructField` which will defined the type of the different field.

We will create a DataFrame that contains the following columns : *duration,rotocol_type, service, flag, src_bytes, dst_bytes, interactions.*

In [6]:
from pyspark.sql.types import *
fields = [StructField("duration", IntegerType(), True),
          StructField("protocol_type", StringType(), True),
          StructField("service", StringType(), True),
          StructField("flag", StringType(), True),
          StructField("src_bytes", IntegerType(), True),
          StructField("dst_bytes", IntegerType(), True),
          StructField("interactions", StringType(), True)]
schema = StructType(fields)

We can then easily create the `DataFrame` with the `createDataFrame` function. 

In [7]:
subList_rdd = list_rdd.map(lambda p: (int(p[0]), p[1], p[2], p[3], int(p[4]), int(p[5]), p[-1]))
df_rdd = sqlContext.createDataFrame(subList_rdd, schema)
df_rdd

DataFrame[duration: int, protocol_type: string, service: string, flag: string, src_bytes: int, dst_bytes: int, interactions: string]

In [8]:
df_rdd.take(2)

[Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=181, dst_bytes=5450, interactions='normal.'),
 Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=239, dst_bytes=486, interactions='normal.')]

#### Inferring the Schema

You can also create a `DataFrame` from a RDD by inferring the schema. 
It requires that the RDD is composed of `Row` objects that are dictionary-like object in Spark.
The type of value from each entry of the Row is deducted from the first row which need to be correctly defined.

In [9]:
from pyspark.sql import Row
row_rdd = list_rdd.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    interactions = p[-1],
    )
)

Once the RDD is created you can call the `createDataFrame` function without specifying any schema.

In [10]:
df_rdd_1 = sqlContext.createDataFrame(row_rdd)
df_rdd_1.take(2)

[Row(dst_bytes=5450, duration=0, flag='SF', interactions='normal.', protocol_type='tcp', service='http', src_bytes=181),
 Row(dst_bytes=486, duration=0, flag='SF', interactions='normal.', protocol_type='tcp', service='http', src_bytes=239)]

A schema can be displayed with the `printSchema` function

In [11]:
df_rdd_1.printSchema()

root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- interactions: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)



### From a `.csv`

When the source file is structured in a given format (*parquet*, *json*, *csv*),the `spark.read.load` function allow to directly infer the schema. 

The *kddcup.data_10_percent.gz* file is organised as a`.csv`. The *dataframe* can then be build directly.

In [13]:
df_csv = sqlContext.read.load(data_file, format="csv", sep=",", inferSchema="true", header="False")
df_csv

DataFrame[_c0: int, _c1: string, _c2: string, _c3: string, _c4: int, _c5: int, _c6: int, _c7: int, _c8: int, _c9: int, _c10: int, _c11: int, _c12: int, _c13: int, _c14: int, _c15: int, _c16: int, _c17: int, _c18: int, _c19: int, _c20: int, _c21: int, _c22: int, _c23: int, _c24: double, _c25: double, _c26: double, _c27: double, _c28: double, _c29: double, _c30: double, _c31: int, _c32: int, _c33: double, _c34: double, _c35: double, _c36: double, _c37: double, _c38: double, _c39: double, _c40: double, _c41: string]

We now specify the column names.

In [14]:
df_csv=df_csv.toDF(*col_names)

In [15]:
df_csv.take(2)

[Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=181, dst_bytes=5450, land=0, wrong_fragment=0, urgent=0, hot=0, num_failed_logins=0, logged_in=1, num_compromised=0, root_shell=0, su_attempted=0, num_root=0, num_file_creations=0, num_shells=0, num_access_files=0, num_outbound_cmds=0, is_host_login=0, is_guest_login=0, count=8, srv_count=8, serror_rate=0.0, srv_serror_rate=0.0, rerror_rate=0.0, srv_rerror_rate=0.0, same_srv_rate=1.0, diff_srv_rate=0.0, srv_diff_host_rate=0.0, dst_host_count=9, dst_host_srv_count=9, dst_host_same_srv_rate=1.0, dst_host_diff_srv_rate=0.0, dst_host_same_src_port_rate=0.11, dst_host_srv_diff_host_rate=0.0, dst_host_serror_rate=0.0, dst_host_srv_serror_rate=0.0, dst_host_rerror_rate=0.0, dst_host_srv_rerror_rate=0.0, interactions='normal.'),
 Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=239, dst_bytes=486, land=0, wrong_fragment=0, urgent=0, hot=0, num_failed_logins=0, logged_in=1, num_compromised=0, roo

### From a `pandas`' *DataFrame* 


Convert `pandas`' *DataFrame*  to a `PysSpark`'s `DataFrame` require the use of *pyarrow* library that enable to move object from *JVM* to *python*.

To use it, you will need to enable its execution :

In [23]:
# sc.conf.set("spark.sql.execution.arrow.enabled", "true")

In [21]:
import pandas as pd
pandas_df = pd.read_csv(DATA_PATH+"kddcup.data_10_percent.gz", sep=",", names=col_names)
df = sqlContext.createDataFrame(pandas_df)
df

DataFrame[duration: bigint, protocol_type: string, service: string, flag: string, src_bytes: bigint, dst_bytes: bigint, land: bigint, wrong_fragment: bigint, urgent: bigint, hot: bigint, num_failed_logins: bigint, logged_in: bigint, num_compromised: bigint, root_shell: bigint, su_attempted: bigint, num_root: bigint, num_file_creations: bigint, num_shells: bigint, num_access_files: bigint, num_outbound_cmds: bigint, is_host_login: bigint, is_guest_login: bigint, count: bigint, srv_count: bigint, serror_rate: double, srv_serror_rate: double, rerror_rate: double, srv_rerror_rate: double, same_srv_rate: double, diff_srv_rate: double, srv_diff_host_rate: double, dst_host_count: bigint, dst_host_srv_count: bigint, dst_host_same_srv_rate: double, dst_host_diff_srv_rate: double, dst_host_same_src_port_rate: double, dst_host_srv_diff_host_rate: double, dst_host_serror_rate: double, dst_host_srv_serror_rate: double, dst_host_rerror_rate: double, dst_host_srv_rerror_rate: double, interactions: st

In [22]:
df.take(2)

[Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=181, dst_bytes=5450, land=0, wrong_fragment=0, urgent=0, hot=0, num_failed_logins=0, logged_in=1, num_compromised=0, root_shell=0, su_attempted=0, num_root=0, num_file_creations=0, num_shells=0, num_access_files=0, num_outbound_cmds=0, is_host_login=0, is_guest_login=0, count=8, srv_count=8, serror_rate=0.0, srv_serror_rate=0.0, rerror_rate=0.0, srv_rerror_rate=0.0, same_srv_rate=1.0, diff_srv_rate=0.0, srv_diff_host_rate=0.0, dst_host_count=9, dst_host_srv_count=9, dst_host_same_srv_rate=1.0, dst_host_diff_srv_rate=0.0, dst_host_same_src_port_rate=0.11, dst_host_srv_diff_host_rate=0.0, dst_host_serror_rate=0.0, dst_host_srv_serror_rate=0.0, dst_host_rerror_rate=0.0, dst_host_srv_rerror_rate=0.0, interactions='normal.'),
 Row(duration=0, protocol_type='tcp', service='http', flag='SF', src_bytes=239, dst_bytes=486, land=0, wrong_fragment=0, urgent=0, hot=0, num_failed_logins=0, logged_in=1, num_compromised=0, roo

## Requête SQL


*SparkSQL* enables to apply SQL request and return the result to a `DataFrame`.

We do not have a SQL database available here. But the sql context to apply sql request on existing `DataFrame`.
We need for that to register it on a  *SQL temporary view* format.

In [24]:
df_rdd.createOrReplaceTempView("interactions")

### Request example

Select *tcp* interaction without transfer (dst_bytes = 0) and with duration above 1second.


In [28]:
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes 
    = 0""")
tcp_interactions

DataFrame[duration: int, dst_bytes: int]

The request is a *DataFrames* object

In [27]:
tcp_interactions

DataFrame[duration: int, dst_bytes: int]

Note that we are still using Spark! The above execution are lazy. You need to call action's function to actually apply the request.

In [29]:
tcp_interactions.count()

139

In [30]:
# Sortie des durées avec les dst_bytes
tcp_interactions_out = tcp_interactions.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print (ti_out)

Duration: 5057, Dest. bytes: 0
Duration: 5059, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5041, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5064, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5049, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5048, Dest. bytes: 0
Duration: 5047, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5068, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5052, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5054, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5058, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5032, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5040, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5066, Dest. bytes: 0
Duration

##  Operations on *DataFrame* 
### Elementary operation.

*select* allow to extract columns of a `DataFrame`.

In [31]:
df_rdd.select("interactions").take(5)

[Row(interactions='normal.'),
 Row(interactions='normal.'),
 Row(interactions='normal.'),
 Row(interactions='normal.'),
 Row(interactions='normal.')]

In [32]:
df_rdd.select("interactions","duration").take(5)

[Row(interactions='normal.', duration=0),
 Row(interactions='normal.', duration=0),
 Row(interactions='normal.', duration=0),
 Row(interactions='normal.', duration=0),
 Row(interactions='normal.', duration=0)]

`groupBy` works like pandas' `groupby`.

The example below enables to count number of interactions according to protocol types.

In [33]:
from time import time

t0 = time()
df_rdd.groupBy("protocol_type").count().show()
tt = time() - t0

print ("Times to execute the request : {}".format(round(tt,3)))

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+

Times to execute the request : 5.895


`filter` a  *DataFrame* 

We count the interactions, according to protocol types , for interaction that last less than 1 second and without data transferred.

In [36]:
t0 = time()
df_rdd.filter(df_rdd.duration>1000).filter(df_rdd.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

+-------------+-----+
|protocol_type|count|
+-------------+-----+
|          tcp|  139|
+-------------+-----+

Requete executee en 6.154 secondes


You can see that the 'DataFrame' syntax allows much more flexible syntax that RDD's one.

### `map` and *custom function*

The  `map` function is not available on *DataFrame* object. 
You will have to convert the  `DataFrame` to `RDD` to use it. 

In [37]:
df_rdd.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes)).take(5)

['Duration: 0, Dest. bytes: 5450',
 'Duration: 0, Dest. bytes: 486',
 'Duration: 0, Dest. bytes: 1337',
 'Duration: 0, Dest. bytes: 1337',
 'Duration: 0, Dest. bytes: 2032']

Another way is to use the `udf` (*user defined function*).

You first define the function you want to apply on each row, and convert it as an udf object

In [38]:
from pyspark.sql.functions import udf

function = udf(lambda x,y: "Duration: {}, Dest. bytes: {}".format(x,y))
function

<function __main__.<lambda>(x, y)>

You can the apply this function to the wanted columns of the *DataFrame*. The `alias` function allow you to name the output columns.

In [39]:
output_dataframe = df_rdd.select(function("duration","dst_bytes").alias("string_output"))
output_dataframe.select("string_output").take(5)

[Row(string_output='Duration: 0, Dest. bytes: 5450'),
 Row(string_output='Duration: 0, Dest. bytes: 486'),
 Row(string_output='Duration: 0, Dest. bytes: 1337'),
 Row(string_output='Duration: 0, Dest. bytes: 1337'),
 Row(string_output='Duration: 0, Dest. bytes: 2032')]

### Column creation


We have seen how to apply `udf` function on a *DataFrame*. The result is a one column *DataFrame*. You can add it to an existing *DataFrame* with the `withColumn` function.

We will add a `label` columns to the `df` *DataFrame* which will have two possible values: `attack` and `normal`. 

We first create the function.

In [40]:
def attack_or_normal_func(s):
    return "normal" if s == "normal." else "attack"

which is converted in a *user defined function*.

In [41]:
attack_or_normal = udf(attack_or_normal_func)

We then create the `label` column from the `df` *DataFrame* 

In [42]:
df_with_label = df.withColumn("label", attack_or_normal(df.interactions) )

You can know check that the column has been created

In [43]:
df_with_label.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- land: long (nullable = true)
 |-- wrong_fragment: long (nullable = true)
 |-- urgent: long (nullable = true)
 |-- hot: long (nullable = true)
 |-- num_failed_logins: long (nullable = true)
 |-- logged_in: long (nullable = true)
 |-- num_compromised: long (nullable = true)
 |-- root_shell: long (nullable = true)
 |-- su_attempted: long (nullable = true)
 |-- num_root: long (nullable = true)
 |-- num_file_creations: long (nullable = true)
 |-- num_shells: long (nullable = true)
 |-- num_access_files: long (nullable = true)
 |-- num_outbound_cmds: long (nullable = true)
 |-- is_host_login: long (nullable = true)
 |-- is_guest_login: long (nullable = true)
 |-- count: long (nullable = true)
 |-- srv_count: long (nullable = true)
 |-- serror_rate: d

**Exercise**  Display number of attack and normal interaction.

In [45]:
df_with_label.groupBy("label").count().show()

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+



In [46]:
# %load solutions/exercise3_1.py 

**Exercise**  Display number of attack and normal interaction for each protocol_type.

In [48]:
df_with_label.groupBy("protocol_type", "label").count().show()

+-------------+------+------+
|protocol_type| label| count|
+-------------+------+------+
|          tcp|normal| 76813|
|          udp|normal| 19177|
|         icmp|normal|  1288|
|          udp|attack|  1177|
|          tcp|attack|113252|
|         icmp|attack|282314|
+-------------+------+------+



In [51]:
# %load solutions/exercise3_2.py 

You can check all the functionality on the online [documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html).

### Pandas `udf` function

A `pandas udf` function is similar to `udf` function previously defined. As the latest, it allow to apply transformation to a *DataFrame* column but this columns will be treated as a *pandas*' *series*.

This will allow better [performances](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html) and also to apply native function of *pandas*. 

They are two types of `pandas udf`: `Scalar` and `Grouped Map`.

#### `Scalar`

*Scalar Pandas UDFs* are used to easily vectorized scalar operation. It will handle p*andas* *Series* as an argument and return *Series* de *pandas* of same size.

Here we will apply *pandas* `cumsum` function

In [54]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def cum_sum(x):
    return x.cumsum()

cum_sum_udf = pandas_udf(cum_sum, returnType=IntegerType())

In [55]:
cum_sum_duration = df.select(cum_sum_udf(col("duration")))

### Fonction *Grouped Map*

 *Grouped map Pandas UDFs* need to be used along with `groupBy().apply()` function allowing to apply `split-apply-combine` pattern. 
 This one is a three steps job:
 

 * *Split* data with `DataFrame.groupBy`.
 * *Apply* function on each group. Input and output will be *pandas*' *DataFrames* output. L
 * *Combine* results in a new *DataFrame*.

Before using `groupBy().apply()`, you have to define

* A python function which will be applied on each group
* A `StructType`  or a  `string` which will specify the schema of the output.

In the following example we will subtract mean `duration` according to the label `attack` or `normal`.

In [None]:
@pandas_udf("label string, duration int", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    # pdf is a pandas.DataFrame
    duration = pdf.duration
    return pdf.assign(duration=duration - duration.mean())

df_with_label.select("label","duration").groupby("label").apply(substract_mean).show()