---
<center><u><h1>Apache Spark. Part 2</h1></u></center>
---

---

# DataFrame

Apache Spark provides an additional data structure that is too similar to pandas's DataFrame (also called DataFrame) that supports SQL syntax and works much faster than RDD. It is represented in [Spark SQL](http://spark.apache.org/sql/) Spark's module designed for structured data processing. 

A DataFrame is like a table in a traditional relational database. But it has some internal optimizations. DataFrame can be constructed from a lot of sources like: structured tables, Hive tables, external databases, or from existing RDDs.

Unlike the basic Spark RDD API, the interfaces of Spark SQL provide more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. 

Let's create a simple `Dataframe`. First, we'll create a `DataFrame` from a list of tuples with names and ages of people. To create a DataFrame we use [`createDataFrame`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame) method of SparkSession (by default it is `spark`, but it is possibility to create any other)

In [None]:
# Create a list with People: (name, age)
data = [
    ("John",32),
    ("Alice",23),
    ("Hannah",19),
    ("Nick", 27)
]

# Create a DataFrame from this list, we have to specify names of columns
df = spark.createDataFrame(data, ["Name", "Age"])

To display a DataFrame in table like form we may use the `show()` method

In [None]:
df.show()

It is also possible to create a DataFrame also from RDD. Let's create a RDD from our list of tuples. And then create a DataFrame from it.

In [None]:
# Make an RDD from a list of tuples `data` and print it using `collect` method
rdd = sc.parallelize(data)
rdd.collect()

In [None]:
# Create a DataFrame from this RDD, specifying names of columns
df = spark.createDataFrame(rdd, ["Name","Age"])

Now the DataFrame `df` is created and you can use `show(n)` method to print the first `n` rows of the DataFrame (currently, the `df` consists of only 4 rows and we will skip `n` in brackets). By default `show()` return 20 rows. 

In [None]:
df.show()

To see the schema of DataFrame use `printSchema` method

In [None]:
df.printSchema()

Conversely, you can convert a DataFrame back to RDD simply invoking it's `rdd` method. This will return a RDD of [`Row`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row) objects.

In [None]:
# Convert DataFrame to RDD
dfrdd = df.rdd
# Look at contents of RDD
dfrdd.collect()

In [None]:
# Create a dataframe from RDD, vice versa 
df = spark.createDataFrame(dfrdd)
df.show()

You can access data in row using attributes or indexing like:
`row.Name` or `row["Name"]`

In [None]:
# Retrieve first element
row = dfrdd.collect()[0]
print(row)
# Access data by attribute
print(row.Name)
# Access data by indexing
print(row["Age"])

Now let's see performance difference between RDD and DataFrame. We will use the [Amazon Access Samples Data Set](https://archive.ics.uci.edu/ml/datasets/Amazon+Access+Samples). We take the small part of them. The used dataset contains 716063 rows. Open this file in your computer to see its structure

In [None]:
# Read file with data
amazon_rdd = sc.textFile("amazon_access_samples.csv")
# Find its header
first = amazon_rdd.first()
header = first.split(",")
print("Column names: ", header)
# Remove the first row and divide all next lines into separated columns
amazon_rdd = amazon_rdd.filter(lambda x: x != first).map( lambda x: x.split(",") )
amazon_df = spark.createDataFrame(amazon_rdd, header)

In [None]:
amazon_rdd.take(5)

In [None]:
amazon_df.show(5)

Now we will evaluate the time for ordering just created RDD `amazon_rdd` and DataFrame `amazon_df` by three columns `"TARGET_NAME"`, `"LOGIN"`, `"REQUEST_DATE"`

In [None]:
from time import time

t_df_start = time()  # Start time
print(amazon_df.orderBy("TARGET_NAME", "LOGIN", "REQUEST_DATE").take(5))
t_df_end = time() - t_df_start  # Elapsed time

In [None]:
t_rdd_start = time()
print(amazon_rdd.sortBy(lambda a: (a[1], a[2], a[3])).take(5))
t_rdd_end = time() - t_rdd_start

Build a simple barchart using [matplotlib](http://matplotlib.org/) Python library

In [None]:
import numpy as np
import matplotlib.pyplot as plt; plt.rcdefaults()

%matplotlib inline

objects = ("DataFrame", "RDD")   # bars' names
x_pos = np.arange(len(objects))  # positions along X axis
performance = [t_df_end, t_rdd_end]  # height of bars 
bar_width = 0.5
plt.bar(x_pos, performance, bar_width, align='center', alpha=0.5)
plt.xticks(x_pos, objects)
plt.ylabel('Time')
plt.title('Performance difference between DataFrame and RDD')

plt.show()

As you can see DataFrame works much faster at aggregation, particularly. By the way, DataFrames require much less memory at cashing.

# Operations with DataFrame
DataFrame have many methods full list you can see [here](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

In the below table you see most common used of them:

|Name|Description|
|:---|------------|
|`agg(*exprs)`|Aggregate on the entire DataFrame without groups.|
|`collect()`|Returns all the records as a list of Row.|
|`columns`|Returns all column names as a list.|
|`count()`|Returns the number of rows in this DataFrame.|
|`distinct()`|Returns a new DataFrame containing the distinct rows in this DataFrame.|
|`drop(col)`|Returns a new DataFrame that drops the specified column.|
|`filter(condition)`|Filters rows using the given condition.|
|`first()`|Returns the first row as a Row.|
|`foreach(f)`|Applies the `f` function to all Row of this DataFrame.|
|`groupBy(*cols)`|Groups the DataFrame using the specified columns, so we can run aggregation on them.|
|`orderBy(*cols, ascending=True)`|Returns a new DataFrame sorted by the specified column(s)|
|`select(*cols)`|Compute the sum for each numeric columns for each group.|
|`show(n=20)`|Prints the first n rows to the console.|
|`take(num)`|Returns the first num rows as a list of Row.|
|`toPandas()`|Returns the contents of this DataFrame as Pandas pandas.DataFrame.|
|`withColumn(colName, col)`|Returns a new DataFrame by adding a column or replacing the existing column that has the same name.|
|`write`|Interface for saving the content of the non-streaming DataFrame out into external storage.|

Below we provide few examples of using the above listed commands.

Let's display the first row of only one column `"LOGIN"` in the DataFrame `amazon_df`

In [None]:
amazon_df.select("LOGIN").first()

You can select not only one column

In [None]:
amazon_df.select("REQUEST_DATE", "AUTHORIZATION_DATE", "LOGIN").show(3)

To display all DataFrame column names use the `columns` method

In [None]:
amazon_df.columns

Let's select only unique records in the `"ACTION"` column using `distinct()` method

In [None]:
amazon_df.select("ACTION").distinct().show()

You can delete some column if you don't need it

In [None]:
amazon_df.drop("ACTION").show(3)

`orderBy()` method allows ordering rows by values in one or many columns. Let's order the table at first by `"TARGET_NAME"`, then by `"LOGIN"` and after this by `"REQUEST_DATE"` in descendging order.

We use also `toPandas()` for printing table in more pretty view as pandas' DataFrames

In [None]:
# In this time we sort in descending order
amazon_df.orderBy("TARGET_NAME", "LOGIN", "REQUEST_DATE", ascending=False).toPandas().head(5)

Pay attention that previously removed column `"ACTION"` whatever exists in the `amazon_df` DataFrame. It is due to we simply remove it from the current output at calling `show()` command but not in the DataFrame in general, i.e. create a copy of this DataFrame. To delete any column in the DataFrame we shoul reasing it

    amazon_df = amazon_df.drop("ACTION")

You can also group rows by one or few columns and apply the preferable aggregation to each group. For example, let's count how many rows are for each pair of `"LOGIN"` and `"TARGET_NAME"`. The obtained field (result of aggregation will get the name of aggregation operation) can be used in other method at once (below we use it for ordering)

In [None]:
amazon_df.groupBy("LOGIN", "TARGET_NAME").count().orderBy("count", ascending=False).show(5)

What if we want to do some operation with columns? For example convert data type of `"REQUEST_DATE"`

In [None]:
# First we need to see what type of data already in DataFrame
# Pay attention on the type of "REQUEST_DATE" column
amazon_df.printSchema()

To convert data type we will use the method `withColumn()`

In [None]:
# Import datetime Spark's type
from pyspark.sql.types import TimestampType
# Spark works with its own function formats, so we should convert any our function to this format using `udf`
# We should also define the data type of outputs of this function. In our case it is `TimestampType`
from pyspark.sql.functions import udf
from datetime import datetime

def convert_dttm(x):
    try:
        # `datetime.strptime(date_string, format)` returns a datetime corresponding to date_string, parsed according to format. 
        # This "REQUEST_DATE"'s value 2010-04-20 09:33:25 has the format '%Y-%m-%d %H:%M:%S'
        # The meaning of letter you may find here https://docs.python.org/3/library/datetime.html#strftime-and-strptime-behavior
        return datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
    except:
        return None
# Convert Python function to Spark function
convert_dttm_udf = udf(convert_dttm, TimestampType())

amazon_df = amazon_df.withColumn("REQUEST_DATE", convert_dttm_udf(amazon_df.REQUEST_DATE))
# Check whether data type of "REQUEST_DATE" changed
amazon_df.printSchema()

Now we can work with this column data as with datetime and extract the day from the date using `day` attribute of `datetime.datetime` object

In [None]:
from pyspark.sql.types import IntegerType

get_day = udf(lambda x: x.day, IntegerType())
amazon_df.withColumn("REQUEST_DAY", get_day(amazon_df["REQUEST_DATE"])).limit(10).toPandas()

Pay attention how we cut the displaying table using `limit()` method.

Let's find maximum and minimum values of `"LOGIN"` column using `agg(params)` method. We can define `params` as a dictionary of the structure

    { "column name 1": "aggregation function 1", "column name 2": "aggregation function 2", ... }
   
The full list of aggregation functions you may find [here](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#module-pyspark.sql.functions). Particualrly, 

* `"min"` returns the minimal value
* `"avg"` calculates the average value
* `"count"` returns amount of records. etc.

Let's find the maximal value in `"LOGIN"` column

In [None]:
amazon_df.agg({"LOGIN": "max"}).show()

Find how much rows is unique using `count()`

In [None]:
amazon_df.distinct().count()

Read data you already know how, now try to write it. Let's delete some column and write the data to csv. More about write you can find here http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

In [None]:
amazon_df_write = amazon_df.drop("AUTHORIZATION_DATE")
amazon_df_write.write.csv("my_first_csv_file.csv", mode="overwrite", header=True)

`mode="overwrite"` means that you will clear all content of the `"my_first_csv_file.csv"` and write the new content. To add new content to end of a file use `mode="append"`.

But when you open the folder you will see something like that:
![alt text](images/1.png)
And how to read this file? For this we need [**HiveContext**](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

In [None]:
from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

If you whant to set you own schema you can do this.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("ACTION", StringType()),
    StructField("TARGET_NAME", StringType()),
    StructField("LOGIN", StringType()),
    StructField("REQUEST_DATE", TimestampType()),    
])

amazon_df_read = sqlContext.read.csv("my_first_csv_file.csv", header=True, schema=schema)
amazon_df_read.show()

In [None]:
amazon_df_read.printSchema()

## SQL in Spark
Spark support SQL queries. Let's look how it works.

In [None]:
# First you need to register DataFrame as Table setting the name of using DataFrame (`amazon_df` in our case)
# and the respective table name (`"amazon_table"` in our case)
sqlContext.registerDataFrameAsTable(amazon_df, "amazon_table")

Let's write the query that selects all rows with `"TARGET_NAME"` less than `9521` but larger than `7315`

In [None]:
query = """
    SELECT *
    FROM amazon_table
    WHERE TARGET_NAME BETWEEN '7315' AND '9521'
    ORDER BY REQUEST_DATE DESC
"""
target_df = sqlContext.sql(query)
target_df.show()

Such filtering can be also done with the help of `filter()` method

In [None]:
amazon_df.filter( (amazon_df["TARGET_NAME"] > '7315') & (amazon_df["TARGET_NAME"] < '9521') ).show()

Note, if you need to write few filter conditions like above you shoul wtire them in brackets and join using `&` (logical `AND`) sign if you need that both conditions fulfillment or `|` (logical `OR`) if any of them. If you need only one condition, for example, `amazon_df["TARGET_NAME"] > '7315'` simply write

    amazon_df.filter(amazon_df["TARGET_NAME"] > '7315')

---
# Classification with Apache Spark

In this part of the lesson you will learn how to use machine learning algorithms provided by Apache Spark (we will perform only [logistic regression](https://en.wikipedia.org/wiki/Logistic_regression) as an example) to solve classfication problems. 

Let's remember that classfication is the problem of identifying to which of a set of categories a new observation belongs, on the basis of a training set of data containing observations whose category membership is known. Another words it is the process of organizing data into categories for its most effective and efficient use. 

# Diagnosis prediction


In this example we will use the Wisconsin Diagnostic Breast Cancer (WDBC) dataset which categorizes breast tumor cases as either benign or malignant based on 9 features to predict the diagnosis. You need to save [this file](https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/breast-cancer-wisconsin.data) from [UC Irvine Machine Learning Repository](https://archive.ics.uci.edu/ml/index.html) into the folder where the current IPython notebook lies.

For each cancer observation, we have the following information:

1. Sample code number: id number 
2. Clump Thickness: 1 - 10 
3. Uniformity of Cell Size: 1 - 10 
4. Uniformity of Cell Shape: 1 - 10 
5. Marginal Adhesion: 1 - 10 
6. Single Epithelial Cell Size: 1 - 10 
7. Bare Nuclei: 1 - 10 
8. Bland Chromatin: 1 - 10 
9. Normal Nucleoli: 1 - 10 
10. Mitoses: 1 - 10 
11. Class: (2 for benign, 4 for malignant)

The Cancer Observation csv file has the following format :

    1000025,5,1,1,1,2,1,3,1,1,2
    1002945,5,4,4,5,7,10,3,2,1,2
    1015425,3,1,1,1,2,2,3,1,1,2
    1016277,6,8,8,1,3,4,3,7,1,2
    1017023,4,1,1,3,2,1,3,1,1,2

In this scenario, we will build a logistic regression model to predict the label of malignant or not based on the following features:

    Label → malignant or benign (1 or 0)
    Features → columns 2-10 from the above list

Let's read the downloaded file

In [None]:
raw_data = sc.textFile("breast-cancer-wisconsin.data.txt")

print("File contains {} rows".format(raw_data.count()))
raw_data.take(5)

and transform RDD to DataFrame (we define column names as follows):

In [None]:
col_names = [
    "id",            # Sample code number
    "thickness",     # Clump Thickness
    "size",          # Uniformity of Cell Size
    "shape",         # Uniformity of Cell Shape
    "adhesion",      # Marginal Adhesion
    "epithelial",    # Single Epithelial Cell Size
    "nuclei",        # Bare Nuclei
    "chromatin",     # Bland Chromatin
    "nucleoli",      # Normal Nucleoli
    "mitoses",       # Mitoses
    "class",         # Class
]

data = spark.createDataFrame(raw_data.map(lambda x: x.split(",")), col_names)
print("DataFrame schema:")
data.printSchema()
data.limit(5).toPandas()

As you can see all columns have string type although are numbers. Let's convert them to the respective type using `cast()` method. Also there is no need to use the `"id"` column for classification, that's why we will remove it.

But here also the problem of presense of missing data takes place. If you attentively look at the dataset values, you may notice the `?` signs there. Let's check which columns have such values and how many, at first 

In [None]:
import pyspark.sql.functions as F

for col_name in col_names:
    c = data.filter(data[col_name] == "?").count()
    if c > 0:
        print("The column '{}' has {} null values".format(col_name, c))

Thus we have about 2% of records with null values. We can remove them without loss of essential information and then convert all values to float type

In [None]:
from pyspark.sql.types import DoubleType

# Select rows without '?' in "nuclei" column
data = data.filter(data["nuclei"] != "?")

# Remove "id" column
data = data.drop("id")
del col_names[0]

# Change type from string to double (is almost equivalent to float)
for i in col_names:
    data = data.withColumn(i, data[i].cast(DoubleType()))
    
print("DataFrame schema:")
data.printSchema()
data.limit(5).toPandas()

Above we noted that will use class labels 1 and 0 for malignant and benign (instead of 4 and 2). It is common used notations for binary classification. The [StringIndexer](https://spark.apache.org/docs/latest/ml-features.html#stringindexer) class allows doing this very easily. It encodes a string column of labels to a column of label indices and works not only with numerical values but also with strings that is very convinient.

Let's look how it works

In [None]:
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer and define two parameters
# inputCol = name of the columns which values should be converted to label indices
# outputCol = name of the columns containing these indices
indexer = StringIndexer(
    inputCol="class", 
    outputCol="label"
)

indexed_data = indexer.fit(data).transform(data)
indexed_data.limit(10).toPandas()

Thus, class `2.0` (benign) was replaced by `0.0` as well as class `4.0` (malignant) replaced by `1.0`.

In order for the features to be used by a machine learning algorithm, the features are transformed and put into [Feature Vectors](https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector), which are vectors of numbers representing the value for each feature.

Below a [VectorAssembler](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler) is used to transform and return a new DataFrame with all of the feature columns in a vector column

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=col_names[:-1],   # all columns except for "class" and "label"
    outputCol="features"
)

feature_data = assembler.transform(indexed_data)
feature_data.limit(5).toPandas()

Thus, the column `"features"` contains the list of all cell values in `"thickness"`, `"size"`, `"shape"`, `"adhesion"`, `"epithelial"`, `"nuclei"`,	`"chromatin"`, `"nucleoli"` and	`"mitoses"` columns with saving of the order

<img src="images/vector_assembler.png" width=90%>

Now we can move to building classification model. This scheme demonstrates which steps should be implemented.

<img src="images/classification_flow.png">

Thus, first of all we need to split all data into a training data set and a test data set using [`randomSplit()`](http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) method, 70% of the data is used to train the model, and 30% will be used for testing (this percentages are defined as attributes of `randomSplit([train_ratio, test_ratio])`). But we need also only `"labelIndex"` and `"features"` columns for fit a classification model.

In [None]:
trainingData, testData = feature_data.select("label", "features").randomSplit([0.7, 0.3])
print("trainingData has {} rows".format(trainingData.count()))
print("testData has {} rows".format(testData.count()))

Next, we train the [logistic regression model with elastic net regularization](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression).

The model is trained by making associations between the input features and the labeled output associated with those features

In [None]:
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression object 
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(trainingData)

Next we use the test data to get predictions

In [None]:
predictions = lrModel.transform(testData) 
predictions.limit(5).toPandas()

As you can see, the previous model transform produced a new columns: `"rawPrediction"` (typically the direct probability/confidence calculation), `"probablity"` (probability of each class given the raw prediction,
doing the computation in-place; in current example the first element in brackets correcponds to label `0` and the second to `1`; the largest wins) and `"prediction"` (predicted class).

Let's calculate accuracy. It is the ratio of correctly classified object in test data to the total amount of object in test data

In [None]:
accuracy = predictions.filter(predictions["prediction"] == predictions["label"]).count() / testData.count() * 100
print("Accuracy = {0:.2f}%".format(accuracy))

To evaluate the predictions we may use a [`MulticlassClassificationEvaluator`](http://spark.apache.org/docs/2.0.0/api/python/_modules/pyspark/ml/evaluation.html) which returns a precision metric by comparing the test label column with the test prediction column. 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy2 = evaluator.evaluate(predictions)
print("Accuracy = {0:.2f}%".format(accuracy2 * 100))

Thus, we achieve enough large accuracy on such small dataset without any advanced machine learning tricks and are able now classify patients automatically.

---

> # Exercise 1
> Convert **AUTHORIZATION_DATE** to Timestamp type. Then create culumn **TIME_DIFF** and find difference between **AUTHORIZATION_DATE** and **REQUEST_DATE**

In [None]:
# type your code here 

> # Exercise 2
> Use the HiveContext group **amazon_df** by **TARGET_NAME** and find maximum, minimum and average values of **LOGIN**.
> Don't forget register table in HiveContext.

In [None]:
# type your code here 

> # Exercise 3
> Calculate [precision and recall metrics](https://en.wikipedia.org/wiki/Precision_and_recall) for the provided classification example. Use both direct calculations and [`MulticlassClassificationEvaluator`](http://spark.apache.org/docs/2.0.0/api/python/_modules/pyspark/ml/evaluation.html). Try to interpret obtained results.

In [None]:
# type your code here 