<center>
<a href="http://www.insa-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo-insa.jpg" style="float:left; max-width: 120px; display: inline" alt="INSA"/></a> 
<a href="http://wikistat.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/wikistat.jpg" style="max-width: 250px; display: inline"  alt="Wikistat"/></a>
<a href="http://www.math.univ-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo_imt.jpg" style="float:right; max-width: 200px; display: inline" alt="IMT"/> </a>
</center>

# IA Framework.
## Lab 1  - Introduction to Pyspark.
#### Part 3 La classe *DataFrame* de <a href="http://spark.apache.org/"><img src="http://spark.apache.org/images/spark-logo-trademark.png" style="max-width: 100px; display: inline" alt="Spark"/> </a> [SQL](http://spark.apache.org/sql/)

**Résume**: This notebook continue the introduction to [Spark](https://spark.apache.org/) trough  [`PySpark`](http://spark.apache.org/docs/latest/api/python/) API. In this notebook we will learn to manipulate ***Spark's Dataframe***, which will be the standard class and will replace RDD for Spark 3.0 release. 

## Dataset

We will continue to used [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) dataset in this TP. 

In this TP we will also used the names of the columns as the `Dataframe` class will be able to handle it.

In [None]:
DATA_PATH="" 
import urllib.request
urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names",DATA_PATH+"kddcup.names")

We read the column files and create a list of those names.

In [None]:
file_names = open(DATA_PATH+"kddcup.names","r").readlines()
col_names = [k.split(":")[0] for k in file_names[1:]]+["interactions"]


##  *Spark SQL*


*Spark SQL* is a Spark library which allow to manipulate structured data, which RDD does not. Indeed Spark will see no differences between an RDD of list or an RDD of dictionnaries. 

With *Spark SQL* the data are organized according to a schema. A schema will define the type of variable that will be enable faster computation.

They are various way to build and manipulate those structured object :SQL request, `Dataset` API  (only for java and scala) or `pyspark`' `Dataframe` that will be used all allong this TP. 

## Context

We have seen in the first notebook that Spark needs a context to be used :

In [None]:
sc

In order to manipulate `Dataframe` we will need to define a different kind of context : `SQLContext`

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext

## *DataFrame* construction

A Spark's `Datafrae` is a collection of distributed data organized by column.

It is vert similar to R's *Dataframe* or Python pandas' *Dataframe*.They can be build from various sources such as  : *Hive*, *json*, *xml*, *parquet*, *cassandra*... 

We will see how to build DataFrame from `RDD`, `.csv` and  `pandas`' *DataFrame*.


### From  a RDD

We start by creating the rdd

In [None]:
data_file = DATA_PATH+"kddcup.data_10_percent.gz"
string_rdd = sc.textFile(data_file)
list_rdd = string_rdd.map(lambda l: l.split(","))

#### Specifying the Schema

We will first specify the schema in the RDD before we convert it into a `DataFrame` *DataFrame*. 
The schema will be define as a `StructType` object composed of `StructField` which will defined the type of the different field.

We will create a DataFrame that contains the following columns : *duration,rotocol_type, service, flag, src_bytes, dst_bytes, interactions.*

In [None]:
from pyspark.sql.types import *
fields = [StructField("duration", IntegerType(), True),
          StructField("protocol_type", StringType(), True),
          StructField("service", StringType(), True),
          StructField("flag", StringType(), True),
          StructField("src_bytes", IntegerType(), True),
          StructField("dst_bytes", IntegerType(), True),
          StructField("interactions", StringType(), True)]
schema = StructType(fields)

We can then easily create the `DataFrame` with the `createDataFrame` function. 

In [None]:
subList_rdd = list_rdd.map(lambda p: (int(p[0]), p[1], p[2], p[3], int(p[4]), int(p[5]), p[-1]))
df_rdd = sqlContext.createDataFrame(subList_rdd, schema)
df_rdd

In [None]:
df_rdd.take(2)

#### Inferring the Schema

You can also create a `DataFrame` from a RDD by inferring the schema. 
It will require that the RDD is composed of `Row` object that are dictionnary_like object in Spark.
The type of value from each entry of the Row is deducted from the first row which need to be correctly defined

In [None]:
from pyspark.sql import Row
row_rdd = list_rdd.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    interactions = p[-1],
    )
)

Once the rdd is created you can call the `createDataFrame` function without specifying any schema

In [None]:
df_rdd_1 = sqlContext.createDataFrame(row_rdd)
df_rdd_1.take(2)

A schema can be display with the `printSchema` function.

In [None]:
df_rdd_1.printSchema()

### From a `.csv`

When the source file is structured in a given format (*parquet*, *json*, *csv*),the `spark.read.load` function allow to direclty infer the schema. 

The *kddcup.data_10_percent.gz* file is organized as a`.csv`. The *dataframe* can then be build direclty.

In [None]:
df_csv = spark.read.load(data_file, format="csv", sep=",", inferSchema="true", header="False")
df_csv

We now specify the column names.

In [None]:
df_csv=df_csv.toDF(*col_names)

In [None]:
df_csv.take(2)

### From a `pandas`' *DataFrame* 


Convert `pandas`' *DataFrame*  to a `PysSpark`'s `DataFrame` require the use of *pyarrow* library that enable to move object from *JVM* to *python*.

To use it, you will need to enable its execution :

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
import pandas as pd
pandas_df = pd.read_csv(DATA_PATH+"kddcup.data_10_percent.gz", sep=",", names=col_names)
df = sqlContext.createDataFrame(pandas_df)
df

In [None]:
df.take(2)

## Requête SQL


*SparkSQL* enable to apply SQL request and return the result to a `DataFrame`.

We do not have a SQL database available here. But the sql context to apply sql request on existing `DataFrame`.
We need for that to register it on a  *SQL temporary view* format.

In [None]:
df_rdd.createOrReplaceTempView("interactions")

### Request example

Select *tcp* interaction without transfer and with duration above 1second.


In [None]:
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes 
    = 0""")
tcp_interactions

The request is a *DataFrames* object

In [None]:
tcp_interactions

Note that we are still using Spark! The above execution are lazy. You need to call action's function to actuammy apply the request.

In [None]:
tcp_interactions.count()

In [None]:
# Sortie des durées avec les dst_bytes
tcp_interactions_out = tcp_interactions.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print (ti_out)

##  Operations on *DataFrame* 
### Elementary operation.

*select* allow to extract columns of a `DataFrame`.

In [None]:
df_rdd.select("interactions").take(5)

In [None]:
df_rdd.select("interactions","duration").take(5)

`groupBy` works like pandas' `groupby`.

The example below enable to count number of interaction according to protocol type.

In [None]:
from time import time

t0 = time()
df_rdd.groupBy("protocol_type").count().show()
tt = time() - t0

print ("Times to execute the request : {}".format(round(tt,3)))

`filter` a  *DataFrame* 

We count the interactions, according to protocol type , for interaction that last less thant 1 second and without data transfer.

In [None]:
t0 = time()
df_rdd.filter(df_rdd.duration>1000).filter(df_rdd.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print ("Requete executee en {} secondes".format(round(tt,3)))

You can see that the 'DataFrame' syntax allow much more flexible syntax that RDD's one.

### `map` and *custom function*

The  `map` function is not available on *DataFrame* object. 
You will have to convert the  `DataFrame` to and `RDD` to use it. 

In [None]:
df_rdd.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes)).take(5)

Another way is to use the `udf` (*user defined function*).

You first define the function you want to apply on each row, and convert it as an udf object

In [None]:
from pyspark.sql.functions import udf

function = udf(lambda x,y: "Duration: {}, Dest. bytes: {}".format(x,y))
function

You can the apply this function to the wanted columns of the *DataFrame*. The `alias` function allow you to name the output columns.

In [None]:
output_dataframe = df_rdd.select(function("duration","dst_bytes").alias("string_output"))
output_dataframe.select("string_output").take(5)

### Column creation


We have seen how to apply `udf` function on a *DataFrame*. The result is a one column *DataFrame*. You can add it to an existing *DataFrame* with the `withColumn` function.

We will add a `label` columns to the `df` *DataFrame* which will have two possible value `attack` and `normal`. 

We first create the function.

In [None]:
def attack_or_normal_func(s):
    return "normal" if s == "normal." else "attack"

which is convert in a *user defined function*.

In [None]:
attack_or_normal = udf(attack_or_normal_func)

We then create the `label` column from the `df` *DataFrame* 

In [None]:
df_with_label = df.withColumn("label", attack_or_normal(df.interactions) )

You can know check that the columns has been created

In [None]:
df_with_label.printSchema()

**Exercise**  Display number of attack and normal interaction.

In [None]:
# %load solutions/exercise3_1.py 


**Exercise**  Display number of attack and normal interaction for each protocol_type.

In [None]:
# %load solutions/exercise3_2.py 

You can check all the functionality on the online [documentation](http://spark.apache.org/docs/latest/sql-programming-guide.html).

### Pandas `udf` function

A `pandas udf` function is similar to `udf` function previously defined. As the latest, it allow to apply transformation to a *DataFrame* column but this columns will be treated as a *pandas*' *Séries*.

This will allow better [performances](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html) and also to apply native function of *pandas*. 

They are two type of `pandas udf`: `Scalar` and `Grouped Map`.

#### `Scalar`

*Scalar Pandas UDFs* are used to easily vectorized scalar operation. It will handle p*andas* *Series* as an argument and return *Series* de *pandas* of same size.

Here we will apply *pandas* `cumsum` function

In [None]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def cum_sum(x):
    return x.cumsum()

cum_sum_udf = pandas_udf(cum_sum, returnType=IntegerType())

In [None]:
cum_sum_duration = df.select(cum_sum_udf(col("duration")))

### Fonction *Grouped Map*

 *Grouped map Pandas UDFs* need to be used along with `groupBy().apply()` function allowing to apply `split-apply-combine` pattern. 
 This one is a three steps job:
 

 * *Split* data with `DataFrame.groupBy`.
 * *Apply* function on each group. Input and output will be *pandas*' *DataFrames* output. L
 * *Combine* results in a new *DataFrame*.

Before using `groupBy().apply()`, you have to define

* A python function which will be applied on each group
* A `StructType`  or a  `string` which will specify the schema of the output.

In the following example we will substract mean `duration` according to the label `attack` or `normal`.

In [None]:
@pandas_udf("label string, duration int", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    # pdf is a pandas.DataFrame
    duration = pdf.duration
    return pdf.assign(duration=duration - duration.mean())

df_with_label.select("label","duration").groupby("label").apply(substract_mean).show()