# DataFrame API
DataFrame is another Spark API which is very convinient for structured data.

To use it, we need to instantiate a SparkSession, which is essentialy just enhaced SparkContext.
It is created in similar way

In [None]:
import os, sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2.1"
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], 'python'))
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], 'python/lib/py4j-0.10.4-src.zip'))

In [None]:
import pyspark
import pyspark.sql

In [None]:
sparkConf = pyspark.SparkConf() \
    .set("spark.executor.memory", "2560m")\
    .set("spark.driver.memory", "2560m")\
    .set("spark.yarn.executor.memoryOverhead", 3584)\
    .set("spark.yarn.driver.memoryOverhead", 3584)\
    .set("spark.python.worker.memory", "1536m")\
    .set("spark.executor.instances", 11)\
    .set("spark.default.parallelism", 300)

In [None]:
ss = pyspark.sql.SparkSession.builder.config(conf=sparkConf).appName('seminar3-df').master('yarn').getOrCreate()
ss

Web UI (aka Application UI or webUI or Spark UI) is the web interface of a running Spark application to monitor and inspect Spark job executions in a web browser.

In [None]:
port = ss.sparkContext.uiWebUrl.split(':')[-1]
print 'http://cluster1:{}'.format(port)

# Getting the Data Files

Download files if you did not do it in previous seminar.

The KDD Cup 1999 competition dataset is described in detail 
[here](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99).

In [None]:
# ! wget "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz" -O "/data/kddcup.data_10_percent.gz"

Put data into hdfs

In [None]:
# ! hdfs dfs -put /data/kddcup.data_10_percent.gz ./

A DataFrame is a Dataset organized into named columns. 
It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: csv, structured data files, tables in Hive, external databases, or existing RDDs. 
To create one we utilize a DataFrameReader avaliable in SparkSession.

In [None]:
data_path = 'kddcup.data_10_percent.gz'
data = ss.read.csv(data_path)
data.show(5)

Sometimes it's more convinient to use pandas dataframe representation in notebooks like this

In [None]:
data.limit(5).toPandas()

We don't have column names in our data, but they are avaliable seperatley. Let's rename columns.

In [None]:
import requests
header = requests.get('http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names').text.split('\n')[1:-1]

types = [h.split(':')[1].strip(' .') for h in header]
header = [h.split(':')[0] for h in header]

In [None]:
data_with_header = data
for i, h in enumerate(header + ['tag']):
    data_with_header = data_with_header.withColumnRenamed('_c{}'.format(i), h)
data_with_header.limit(5).toPandas()

DataFrames have schema - information about columns in dataframe. You can view it like this.

In [None]:
data_with_header.printSchema()

All of the columns have string type - thats because we read them from csv and didn't use inferSchema flag. Lets cast continuous columns ourselves.

To do this, we use spark column functions. 
Column represents a column in a Dataset that holds a Catalyst Expression that produces a value per row.
You can construct Column insatance from it's name using pyspark.sql.functions.col and then call different functions on it, including cast.

In [None]:
import pyspark.sql.functions as sf

In [None]:
def cast_if_continuous(col_name, t):
    if t == u'continuous':
        return sf.col(col_name).cast('float').alias(col_name)
    else:
        return sf.col(col_name)

data_with_types = data_with_header.select([cast_if_continuous(h, t) for h, t in zip(header, types)] + ['tag'])

Now we have apropriate types in our dataframe

In [None]:
data_with_types.printSchema()

You also can do different transformations on columns. For example let's calculate mean error rate for each column.

There are several ways to introduce new column into our dataframe.
One of them is to use .withColumn, which accepts column expression and column name.

Another is to use .select with different column expressions as arguments.
Expressions also could be strings or constants, which internally transforms to columns using sf.col or sf.lit (literal value).
To provide a name for new column, you can call .alias on column.

You can use '\*' wildcard to select all columns in dataframe.

In [None]:
mean_er_df = data_with_types.select('tag', sf.col('protocol_type'), 
                         ((sf.col('dst_host_serror_rate') + 
                           sf.col('dst_host_srv_serror_rate') +
                           sf.col('dst_host_rerror_rate') + 
                           sf.col('dst_host_srv_rerror_rate') / 4).alias('mean_err_rate')))
mean_er_df.orderBy('mean_err_rate', ascending=False).show(5)

It's a lot easier to do aggregations on data using DataFrame API, because sf module also provides so called aggregate functions, which can be used with .groupby.

Let's calculate the same statistic as in RDD API

First, group data by two columns

In [None]:
grouped_df = data_with_types.groupBy('tag', 'protocol_type')
grouped_df

Now, aggregate it with corresponding function

In [None]:
pt_df = grouped_df.agg(sf.count('protocol_type').alias('count'))
pt_df.show(5)

Let's compare to values we got in previous seminar

In [None]:
protocols_by_type = {'back._tcp': 2203,  'buffer_overflow._tcp': 30,  'ftp_write._tcp': 8,  'guess_passwd._tcp': 53,  'imap._tcp': 12,  'ipsweep._icmp': 1153,  'ipsweep._tcp': 94,  'land._tcp': 21,  'loadmodule._tcp': 9,  'multihop._tcp': 7,  'neptune._tcp': 107201,  'nmap._icmp': 103,  'nmap._tcp': 103,  'nmap._udp': 25,  'normal._icmp': 1288,  'normal._tcp': 76813,  'normal._udp': 19177,  'perl._tcp': 3,  'phf._tcp': 4,  'pod._icmp': 264,  'portsweep._icmp': 1,  'portsweep._tcp': 1039,  'rootkit._tcp': 7,  'rootkit._udp': 3,  'satan._icmp': 3,  'satan._tcp': 1416,  'satan._udp': 170,  'smurf._icmp': 280790,  'spy._tcp': 2,  'teardrop._udp': 979,  'warezclient._tcp': 1020,  'warezmaster._tcp': 20}

In [None]:
protocols_by_type3 = {'{}_{}'.format(r['tag'], r['protocol_type']):r['count'] for r in pt_df.collect()}
assert protocols_by_type3 == protocols_by_type

As an exercise, calculate mean size (scr_bytes column) of payload for each tag. List of aggregate functions can be found [here](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$)

In [None]:
# Task 1
mean_src_bytes_by_tag_df = ...
mean_src_bytes_by_tag = ...
assert mean_src_bytes_by_tag['teardrop.'] == 28

# Spark SQL

It's also possible to run SQL queries against dataframes. You must first create temproary view, that is virtual table to run your query against.

In [None]:
data_with_types.createOrReplaceTempView('data')

Now you can use SparkSession to query your temproary view. You get back another DataFrame as a result, so you can run more computatations using either sql or regular API.

In [None]:
sql_df = ss.sql('select * from data')
sql_df.limit(5).toPandas()

Let's count the same stuff for the fourth time, now using the power of SQL.

In [None]:
protocols_by_type_sql = ss.sql('''select tag, protocol_type, count(protocol_type) as count 
                                  from data group by tag, protocol_type''')
protocols_by_type_sql.limit(5).toPandas()

We can even compare execution plans for both dataframes and confirm that they are the same

In [None]:
protocols_by_type_sql.explain()

In [None]:
pt_df.explain()

# User Defined Functions
With RDD you can simply pass your function to .map method, but with DataFrame you need to provide type information for spark to be able to use your function in experssions. Luckily, it is very easy in general

You just need to wrap you function with functions.udf (stands for user-defined function) and spark will do the rest.

For example, let's say you want to mirror protocol name for whatever reason.

In [None]:
def reverse_string(value):
    return value[::-1]

reverse_string_udf = sf.udf(reverse_string)

Now you can use your udf in expressions, applying it to columns and get another column in return. 

In [None]:
reversed_protocol_column = reverse_string_udf(sf.col('protocol_type'))
reversed_protocol_column

In [None]:
sql_df.withColumn('protocol_type', reversed_protocol_column).limit(5).toPandas()

UDFs can accept multiple columns as arguments. Now, write a udf to extract key from row to complete our verification for sql dataframe

(Note how you can use functions.udf as a decorator)

In [None]:
@sf.udf
def key(...):
    # task 2
    ...

    
protocols_by_key = protocols_by_type_sql.withColumn('key', key(...))

In [None]:
protocols_by_type4 = {r['key']: r['count'] for r in protocols_by_key.collect()}
assert protocols_by_type4 == protocols_by_type