In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

## Map functions

These functions are probably the most commonly used functions when dealing with an RDD object.

* `map()`
* `mapValues()`
* `flatMap()`
* `flatMapValues()`

### `map`

The `map()` method applies a function to each elements of the RDD. Each element has to be a valid input to the function. The returned RDD has the function outputs as its new elements.

Elements in the RDD object `map_exp_rdd` below are rows of the `mtcars` in string format. We are going to apply the `map()` function multiple times to convert each string elements as a list elements. Each list element has two values: the first value will be the auto model in string format; the second value will be a list of numeric values.

In [None]:
from google.colab import files
files.upload()

Saving mtcars.csv to mtcars.csv


{'mtcars.csv': b',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb\r\nMazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4\r\nMazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4\r\nDatsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1\r\nHornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1\r\nHornet Sportabout,18.7,8,360,175,3.15,3.44,17.02,0,0,3,2\r\nValiant,18.1,6,225,105,2.76,3.46,20.22,1,0,3,1\r\nDuster 360,14.3,8,360,245,3.21,3.57,15.84,0,0,3,4\r\nMerc 240D,24.4,4,146.7,62,3.69,3.19,20,1,0,4,2\r\nMerc 230,22.8,4,140.8,95,3.92,3.15,22.9,1,0,4,2\r\nMerc 280,19.2,6,167.6,123,3.92,3.44,18.3,1,0,4,4\r\nMerc 280C,17.8,6,167.6,123,3.92,3.44,18.9,1,0,4,4\r\nMerc 450SE,16.4,8,275.8,180,3.07,4.07,17.4,0,0,3,3\r\nMerc 450SL,17.3,8,275.8,180,3.07,3.73,17.6,0,0,3,3\r\nMerc 450SLC,15.2,8,275.8,180,3.07,3.78,18,0,0,3,3\r\nCadillac Fleetwood,10.4,8,472,205,2.93,5.25,17.98,0,0,3,4\r\nLincoln Continental,10.4,8,460,215,3,5.424,17.82,0,0,3,4\r\nChrysler Imperial,14.7,8,440,230,3.23,5.345,17.42,0,0,3,4\r\nFiat 128,

In [None]:
import pandas as pd

mtcar_data = pd.read_csv('./mtcars.csv')
mtcar_data.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


In [None]:
# create an example RDD
map_exp_rdd = sc.textFile('./mtcars.csv')
map_exp_rdd.take(4)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1']

In [None]:
# split auto model from other feature values
map_exp_rdd_1 = map_exp_rdd.map(lambda x: x.split(',')).map(lambda x: (x[0], x[1:]))
map_exp_rdd_1.take(4)

[('',
  ['mpg',
   'cyl',
   'disp',
   'hp',
   'drat',
   'wt',
   'qsec',
   'vs',
   'am',
   'gear',
   'carb']),
 ('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1'])]

In [None]:
# remove the header row
header = map_exp_rdd_1.first()
# the filter method apply a function to each elemnts. The function output is a boolean value (TRUE or FALSE)
# elements that have output TRUE will be kept.
map_exp_rdd_2 = map_exp_rdd_1.filter(lambda x: x != header)
map_exp_rdd_2.take(4)

[('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1']),
 ('Hornet 4 Drive',
  ['21.4', '6', '258', '110', '3.08', '3.215', '19.44', '1', '0', '3', '1'])]

In [None]:
# convert string values to numeric values
map_exp_rdd_3 = map_exp_rdd_2.map(lambda x: (x[0], list(map(float, x[1]))))
map_exp_rdd_3.take(4)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

### `mapValues`

The `mapValues` function requires that each element in the RDD has a **key/value** pair structure, for example, a tuple of 2 items, or a list of 2 items. The `mapValues` function applies a function to each of the element values. The element key will remain unchanged.

We can apply the `mapValues` function to the RDD object `mapValues_exp_rdd` below.


In [None]:
mapValues_exp_rdd = map_exp_rdd_3
mapValues_exp_rdd.take(4)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

In [None]:
import numpy as np
mapValues_exp_rdd_1 = mapValues_exp_rdd.mapValues(lambda x: np.mean(x))
mapValues_exp_rdd_1.take(4)

[('Mazda RX4', 29.90727272727273),
 ('Mazda RX4 Wag', 29.98136363636364),
 ('Datsun 710', 23.59818181818182),
 ('Hornet 4 Drive', 38.73954545454546)]

When using `mapValues()`, the x in the above lambda function refers to the element value, not including the element key.

### `flatMap`

This function first applies a function to each elements of an RDD and then flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.


In [None]:
x = [('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]
flatMap_exp_rdd = sc.parallelize(x)
flatMap_exp_rdd.collect()

[('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]

In [None]:
flatMap_exp_rdd_1 = flatMap_exp_rdd.flatMap(lambda x: x)
flatMap_exp_rdd_1.collect()

['a', 'b', 'c', 'a', 'a', 'c', 'c', 'c', 'd']

### `flatMapValues`

The `flatMapValues` function requires that each element in the RDD has a **key/value** pair structure. It applies a function to each **element value** of the RDD object and then flatten the results.

For example, my raw data looks like below. But I would like to transform the data so that it has three columns: the first column is the **sample id**; the second the column is the three **types (A,B or C)**; the third column is the **values**.

| sample id |  A |  B |  C |
|:---------:|:--:|:--:|:--:|
|     1     | 23 | 18 | 32 |
|     2     | 18 | 29 | 31 |
|     3     | 34 | 21 | 18 |

In [None]:
# example data
my_data = [
    [1, (23, 28, 32)],
    [2, (18, 29, 31)],
    [3, (34, 21, 18)]
]
flatMapValues_exp_rdd = sc.parallelize(my_data)
flatMapValues_exp_rdd.collect()

[[1, (23, 28, 32)], [2, (18, 29, 31)], [3, (34, 21, 18)]]

In [None]:
# merge A,B,and C columns into on column and add the type column
flatMapValues_exp_rdd_1 = flatMapValues_exp_rdd.flatMapValues(lambda x: list(zip(list('ABC'), x)))
flatMapValues_exp_rdd_1.collect()

[(1, ('A', 23)),
 (1, ('B', 28)),
 (1, ('C', 32)),
 (2, ('A', 18)),
 (2, ('B', 29)),
 (2, ('C', 31)),
 (3, ('A', 34)),
 (3, ('B', 21)),
 (3, ('C', 18))]

In [None]:
# unpack the element values
flatMapValues_exp_rdd_2 = flatMapValues_exp_rdd_1.map(lambda x: [x[0]] + list(x[1]) )
flatMapValues_exp_rdd_2.collect()

[[1, 'A', 23],
 [1, 'B', 28],
 [1, 'C', 32],
 [2, 'A', 18],
 [2, 'B', 29],
 [2, 'C', 31],
 [3, 'A', 34],
 [3, 'B', 21],
 [3, 'C', 18]]

## Aggregate functions
Two aggregate functions:

* `aggregate()`
* `aggregateByKey()`

### `aggregate(zeroValue, seqOp, combOp)`

* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.

The code below calculates the total sum of squares for **mpg** and **disp** in data set **mtcars**.

Step 1: get some data.

In [None]:
mtcars_df = spark.read.csv('./mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])
mtcars_df.take(5)

[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

Step 2: calculate averages of mgp and disp

In [None]:
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print('mpg mean = ', mpg_mean, '; ' 
      'disp mean = ', disp_mean)

mpg mean =  20.090625000000003 ; disp mean =  230.721875


Step 3: build **zeroValue, seqOp** and **combOp**

We are calculating two TSS. We create a tuple to store two values.

In [None]:
zeroValue = (0, 0) 

The **z** below refers to `zeroValue`. Its values get updated after every run. The **x** refers to an element in an RDD partition. In this case, both **z** and **x** have two values.

In [None]:
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)

The `combOp` function simply aggrate all `zeroValues` into one. 

In [None]:
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )

Implement `aggregate()` function.

## `aggregateByKey(zeroValue, seqOp, combOp)`

This function does similar things as `aggregate()`. The `aggregate()` aggregate all results to the very end, but aggregateByKey() merge results by key.

In [None]:
from google.colab import files
files.upload()

Saving iris.csv to iris.csv


{'iris.csv': b'sepal_length,sepal_width,petal_length,petal_width,species\n5.1,3.5,1.4,0.2,setosa\n4.9,3,1.4,0.2,setosa\n4.7,3.2,1.3,0.2,setosa\n4.6,3.1,1.5,0.2,setosa\n5,3.6,1.4,0.2,setosa\n5.4,3.9,1.7,0.4,setosa\n4.6,3.4,1.4,0.3,setosa\n5,3.4,1.5,0.2,setosa\n4.4,2.9,1.4,0.2,setosa\n4.9,3.1,1.5,0.1,setosa\n5.4,3.7,1.5,0.2,setosa\n4.8,3.4,1.6,0.2,setosa\n4.8,3,1.4,0.1,setosa\n4.3,3,1.1,0.1,setosa\n5.8,4,1.2,0.2,setosa\n5.7,4.4,1.5,0.4,setosa\n5.4,3.9,1.3,0.4,setosa\n5.1,3.5,1.4,0.3,setosa\n5.7,3.8,1.7,0.3,setosa\n5.1,3.8,1.5,0.3,setosa\n5.4,3.4,1.7,0.2,setosa\n5.1,3.7,1.5,0.4,setosa\n4.6,3.6,1,0.2,setosa\n5.1,3.3,1.7,0.5,setosa\n4.8,3.4,1.9,0.2,setosa\n5,3,1.6,0.2,setosa\n5,3.4,1.6,0.4,setosa\n5.2,3.5,1.5,0.2,setosa\n5.2,3.4,1.4,0.2,setosa\n4.7,3.2,1.6,0.2,setosa\n4.8,3.1,1.6,0.2,setosa\n5.4,3.4,1.5,0.4,setosa\n5.2,4.1,1.5,0.1,setosa\n5.5,4.2,1.4,0.2,setosa\n4.9,3.1,1.5,0.1,setosa\n5,3.2,1.2,0.2,setosa\n5.5,3.5,1.3,0.2,setosa\n4.9,3.1,1.5,0.1,setosa\n4.4,3,1.3,0.2,setosa\n5.1,3.4,1.5,0.

In [None]:
iris_rdd = sc.textFile('./iris.csv', use_unicode=True)
iris_rdd.take(2)

['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

In [None]:
iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\
    filter(lambda x: x[0] != 'sepal_length').\
    map(lambda x: (x[-1], [*map(float, x[:-1])]))
iris_rdd_2.take(5)

[('setosa', [5.1, 3.5, 1.4, 0.2]),
 ('setosa', [4.9, 3.0, 1.4, 0.2]),
 ('setosa', [4.7, 3.2, 1.3, 0.2]),
 ('setosa', [4.6, 3.1, 1.5, 0.2]),
 ('setosa', [5.0, 3.6, 1.4, 0.2])]

### Define initial values, seqOp and combOp

In [None]:
zero_value = (0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

### Implement `aggregateByKey()`

In [None]:
list_aggregated_iris = iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()
list_aggregated_iris

[('setosa', (1259.0899999999997, 591.2500000000002)),
 ('versicolor', (1774.8600000000001, 388.47)),
 ('virginica', (2189.9000000000005, 447.33))]

# Convert continuous variables to categorical variables

There are two functions we can use to split a continuous variable into categories:

* `pyspark.ml.feature.Binarizer`: split a column of continuous features given a threshold
* `pyspark.ml.feature.Bucktizer`: split a column of continuous features into categories given several breaking points.
    + with n+1 split points, there are n categories (buckets).


In [None]:
import numpy as np
import pandas as pd
np.random.seed(seed=1234)
pdf = pd.DataFrame({
        'x1': np.random.randn(10),
        'x2': np.random.rand(10)*10
    })
np.random.seed(seed=None)
df = spark.createDataFrame(pdf)
df.show()

+--------------------+------------------+
|                  x1|                x2|
+--------------------+------------------+
| 0.47143516373249306| 6.834629351721363|
| -1.1909756947064645| 7.127020269829002|
|  1.4327069684260973|3.7025075479039495|
| -0.3126518960917129| 5.611961860656249|
| -0.7205887333650116| 5.030831653078097|
|  0.8871629403077386|0.1376844959068224|
|  0.8595884137174165| 7.728266216123741|
| -0.6365235044173491| 8.826411906361166|
|0.015696372114428918| 3.648859839013723|
| -2.2426849541854055| 6.153961784334937|
+--------------------+------------------+



## Binarize the column x1 and Bucketize the column x2

In [None]:
from pyspark.ml.feature import Binarizer, Bucketizer
# threshold = 0 for binarizer
binarizer = Binarizer(threshold=0, inputCol='x1', outputCol='x1_new')
# provide 5 split points to generate 4 buckets
bucketizer = Bucketizer(splits=[0, 2.5, 5, 7.5, 10], inputCol='x2', outputCol='x2_new')

# pipeline stages
from pyspark.ml import Pipeline
stages = [binarizer, bucketizer]
pipeline = Pipeline(stages=stages)

# fit the pipeline model and transform the data
pipeline.fit(df).transform(df).show()

+--------------------+------------------+------+------+
|                  x1|                x2|x1_new|x2_new|
+--------------------+------------------+------+------+
| 0.47143516373249306| 6.834629351721363|   1.0|   2.0|
| -1.1909756947064645| 7.127020269829002|   0.0|   2.0|
|  1.4327069684260973|3.7025075479039495|   1.0|   1.0|
| -0.3126518960917129| 5.611961860656249|   0.0|   2.0|
| -0.7205887333650116| 5.030831653078097|   0.0|   2.0|
|  0.8871629403077386|0.1376844959068224|   1.0|   0.0|
|  0.8595884137174165| 7.728266216123741|   1.0|   3.0|
| -0.6365235044173491| 8.826411906361166|   0.0|   3.0|
|0.015696372114428918| 3.648859839013723|   1.0|   1.0|
| -2.2426849541854055| 6.153961784334937|   0.0|   2.0|
+--------------------+------------------+------+------+



# Data Check

In [None]:
!ls

iris.csv    sample_data		       spark-3.1.1-bin-hadoop3.2.tgz
mtcars.csv  spark-3.1.1-bin-hadoop3.2


In [None]:
from google.colab import files
files.upload()

Saving kaggle-titanic-test.csv to kaggle-titanic-test.csv
Saving kaggle-titanic-train.csv to kaggle-titanic-train.csv


{'kaggle-titanic-test.csv': b'PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked\r\n892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q\r\n893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S\r\n894,2,"Myles, Mr. Thomas Francis",male,62,0,0,240276,9.6875,,Q\r\n895,3,"Wirz, Mr. Albert",male,27,0,0,315154,8.6625,,S\r\n896,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22,1,1,3101298,12.2875,,S\r\n897,3,"Svensson, Mr. Johan Cervin",male,14,0,0,7538,9.225,,S\r\n898,3,"Connolly, Miss. Kate",female,30,0,0,330972,7.6292,,Q\r\n899,2,"Caldwell, Mr. Albert Francis",male,26,1,1,248738,29,,S\r\n900,3,"Abrahim, Mrs. Joseph (Sophie Halaut Easu)",female,18,0,0,2657,7.2292,,C\r\n901,3,"Davies, Mr. John Samuel",male,21,2,0,A/4 48871,24.15,,S\r\n902,3,"Ilieff, Mr. Ylio",male,,0,0,349220,7.8958,,S\r\n903,1,"Jones, Mr. Charles Cresson",male,46,0,0,694,26,,S\r\n904,1,"Snyder, Mrs. John Pillsbury (Nelle Stevenson)",female,23,1,0,21228,82.2667,B45,S\r\n905,2,"Ho

In [None]:
titanic = spark.read.csv('./kaggle-titanic-train.csv', header=True, inferSchema=True)
titanic.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

## Data type

First, we want to check if string and numeric variables are imported as we expect.

In [None]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## Data summary

In [None]:
len(titanic.columns)

12

In [None]:
titanic.count()

891

### Summarize *columns*

In [None]:
def describe_columns(df):
    for i in df.columns:
        print('Column: ' + i)
        titanic.select(i).describe().show()

In [None]:
describe_columns(titanic)

Column: PassengerId
+-------+-----------------+
|summary|      PassengerId|
+-------+-----------------+
|  count|              891|
|   mean|            446.0|
| stddev|257.3538420152301|
|    min|                1|
|    max|              891|
+-------+-----------------+

Column: Survived
+-------+-------------------+
|summary|           Survived|
+-------+-------------------+
|  count|                891|
|   mean| 0.3838383838383838|
| stddev|0.48659245426485753|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

Column: Pclass
+-------+------------------+
|summary|            Pclass|
+-------+------------------+
|  count|               891|
|   mean| 2.308641975308642|
| stddev|0.8360712409770491|
|    min|                 1|
|    max|                 3|
+-------+------------------+

Column: Name
+-------+--------------------+
|summary|                Name|
+-------+--------------------+
|  count|                 891|
|   mean|                

### Find columns with missing values

In [None]:
def find_missing_values_columns(df):
    nrow = df.count()
    for v in df.columns:
        summary_df = df.select(v).describe()
        v_count = int(summary_df.collect()[0][v])
        if v_count < nrow:
            missing_percentage = (1 - v_count/nrow) * 100
            print("Total observations: " + str(nrow) + "\n"
                 "Total observations of " + v + ": " + str(v_count) + "\n"
                 "Percentage of missing values: " + str(missing_percentage) + "%" + "\n"
                 "----------------------------")

In [None]:
find_missing_values_columns(titanic)

Total observations: 891
Total observations of Age: 714
Percentage of missing values: 19.865319865319865%
----------------------------
Total observations: 891
Total observations of Cabin: 204
Percentage of missing values: 77.1043771043771%
----------------------------
Total observations: 891
Total observations of Embarked: 889
Percentage of missing values: 0.22446689113355678%
----------------------------


# Subset selection

## Select Rows by index

First, we need to add index to each rows. The **zipWithIndex** function zips the RDD elements with their corresponding index and returns the result as a new element.

In [None]:
mtcars = spark.read.csv('./mtcars.csv', inferSchema=True, header=True)
# correct first column name
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [None]:
mtcars.rdd.zipWithIndex().take(5)

[(Row(model='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
  0),
 (Row(model='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4),
  1),
 (Row(model='Datsun 710', mpg=22.8, cyl=4, disp=108.0, hp=93, drat=3.85, wt=2.32, qsec=18.61, vs=1, am=1, gear=4, carb=1),
  2),
 (Row(model='Hornet 4 Drive', mpg=21.4, cyl=6, disp=258.0, hp=110, drat=3.08, wt=3.215, qsec=19.44, vs=1, am=0, gear=3, carb=1),
  3),
 (Row(model='Hornet Sportabout', mpg=18.7, cyl=8, disp=360.0, hp=175, drat=3.15, wt=3.44, qsec=17.02, vs=0, am=0, gear=3, carb=2),
  4)]

Now we can apply the **map** function to modify the structure of each element. Assume **x** is an element from the above RDD object, **x** has two elements: x[0] and x[1]. x[0] is an **Row** object, and x[1] is the index, which is an integer. We want to merge these two values to create a list. And we also want the first element in the list is the index.

In [None]:
mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).take(5)

[[0, 'Mazda RX4', 21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4],
 [1, 'Mazda RX4 Wag', 21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4],
 [2, 'Datsun 710', 22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1],
 [3, 'Hornet 4 Drive', 21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1],
 [4, 'Hornet Sportabout', 18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2]]

Let's add column names and save the result.

In [None]:
header = ['index'] + mtcars.columns
mtcars_df = mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).toDF(header)

In [None]:
mtcars_df.show(5)

+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    0|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    3|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



After we obtain the **index column**, we can apply the **pyspark.sql.DataFrame.filter** function to select rows of the DataFrame. The **filter** function takes a **column** of **types.BooleanType** as input.

### Select specific rows

In [None]:
mtcars_df.filter(mtcars_df.index.isin([1,2,4,6,9])).show()

+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|    6|       Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|    9|         Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+



### Select rows between a range

In [None]:
mtcars_df.filter(mtcars_df.index.between(5, 10)).show()

+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|index|     model| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|    5|   Valiant|18.1|  6|225.0|105|2.76|3.46|20.22|  1|  0|   3|   1|
|    6|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
|    7| Merc 240D|24.4|  4|146.7| 62|3.69|3.19| 20.0|  1|  0|   4|   2|
|    8|  Merc 230|22.8|  4|140.8| 95|3.92|3.15| 22.9|  1|  0|   4|   2|
|    9|  Merc 280|19.2|  6|167.6|123|3.92|3.44| 18.3|  1|  0|   4|   4|
|   10| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
+-----+----------+----+---+-----+---+----+----+-----+---+---+----+----+



### Select rows by a cutoff index

In [None]:
mtcars_df.filter(mtcars_df.index < 9).show()

+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    0|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    1|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|    2|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    3|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|    4|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|    5|          Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|    6|       Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|    7|        Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|    8|         Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
+-----+-----------------+----+---+-----+

In [None]:
mtcars_df.filter(mtcars_df.index >= 14).show()

+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|   14| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|
|   15|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|
|   16|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|
|   17|           Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|        Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|     Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   20|      Toyota Corona|21.5|  4|120.1| 97| 3.7|2.465|20.01|  1|  0|   3|   1|
|   21|   Dodge Challenger|15.5|  8|318.0|150|2.76| 3.52|16.87|  0|  0|   3|   2|
|   22|        AMC Javelin|15.2|  8|304.0|150|3.15|3.435| 17.3|  0|  0|   3|   2|
|   23|         

## Select rows by logical criteria

Example 1: select rows when **cyl = 4**

In [None]:
mtcars_df.filter(mtcars_df.cyl == 4).show()

+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    2|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|    7|     Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|    8|      Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|   17|      Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|   Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   20| Toyota Corona|21.5|  4|120.1| 97| 3.7|2.465|20.01|  1|  0|   3|   1|
|   25|     Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
|   26| Porsche 914-2|26.0|  4|120.3| 91|4.43| 2.14| 16.7|  0|  1|   5|   2|
|   27|  Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|

In [None]:
mtcars_df.filter(mtcars_df.cyl == 2).show()

+-----+-----+---+---+----+---+----+---+----+---+---+----+----+
|index|model|mpg|cyl|disp| hp|drat| wt|qsec| vs| am|gear|carb|
+-----+-----+---+---+----+---+----+---+----+---+---+----+----+
+-----+-----+---+---+----+---+----+---+----+---+---+----+----+



Example 2: select rows when **vs = 1 and am = 1**

When the filtering is based on multiple **conditions** (e.g., **vs = 1** and **am = 1**), we use the conditions to build a new **boolean type column**. And we filter the DataFrame by the new column.

<span style="color:red">Warning: when passing multiple conditions to the **`when()`** function, each condition has to be within a pair of parentheses</span>

In [None]:
from pyspark.sql import functions as F

In [None]:
filtering_column = F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1).name('filter_col')
filtering_column

Column<'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END AS `filter_col`'>

Now we need to add the new column to the original DataFrame. **This can be done by applying the `select()` function to select all original columns as well as the new filtering columns.**

In [None]:
all_original_columns = [eval('mtcars_df.' + c) for c in mtcars_df.columns]
all_original_columns

[Column<'index'>,
 Column<'model'>,
 Column<'mpg'>,
 Column<'cyl'>,
 Column<'disp'>,
 Column<'hp'>,
 Column<'drat'>,
 Column<'wt'>,
 Column<'qsec'>,
 Column<'vs'>,
 Column<'am'>,
 Column<'gear'>,
 Column<'carb'>]

In [None]:
all_columns = all_original_columns + [filtering_column]
all_columns

[Column<'index'>,
 Column<'model'>,
 Column<'mpg'>,
 Column<'cyl'>,
 Column<'disp'>,
 Column<'hp'>,
 Column<'drat'>,
 Column<'wt'>,
 Column<'qsec'>,
 Column<'vs'>,
 Column<'am'>,
 Column<'gear'>,
 Column<'carb'>,
 Column<'CASE WHEN ((vs = 1) AND (am = 1)) THEN 1 END AS `filter_col`'>]

In [None]:
new_mtcars_df = mtcars_df.select(all_columns)
new_mtcars_df.show()

+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
|index|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|filter_col|
+-----+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----------+
|    0|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|      null|
|    1|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|      null|
|    2|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|         1|
|    3|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|      null|
|    4|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|      null|
|    5|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|      null|
|    6|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|      null|
|    7|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0

Now we can filter the DataFrame by the requested conditions. After we filter the DataFrame, we can drop the filtering column.

In [None]:
new_mtcars_df.filter(new_mtcars_df.filter_col == 1).drop('filter_col').show()

+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|index|         model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    2|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   17|      Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   18|   Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|   19|Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
|   25|     Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
|   27|  Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|
|   31|    Volvo 142E|21.4|  4|121.0|109|4.11| 2.78| 18.6|  1|  1|   4|   2|
+-----+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+



## Select columns by name

We can simply use the **select()** function to select columns by name.

In [None]:
mtcars.select(['hp', 'disp']).show(5)

+---+-----+
| hp| disp|
+---+-----+
|110|160.0|
|110|160.0|
| 93|108.0|
|110|258.0|
|175|360.0|
+---+-----+
only showing top 5 rows



## Select columns by index

We can convert indices to corresponding column names and then select columns by name.

In [None]:
indices = [0,3,4,7]
selected_columns =  [mtcars.columns[index] for index in indices]
selected_columns

['model', 'disp', 'hp', 'qsec']

In [None]:
mtcars.select(selected_columns).show(5)

+-----------------+-----+---+-----+
|            model| disp| hp| qsec|
+-----------------+-----+---+-----+
|        Mazda RX4|160.0|110|16.46|
|    Mazda RX4 Wag|160.0|110|17.02|
|       Datsun 710|108.0| 93|18.61|
|   Hornet 4 Drive|258.0|110|19.44|
|Hornet Sportabout|360.0|175|17.02|
+-----------------+-----+---+-----+
only showing top 5 rows



## Select columns by pattern

Example: columns start with 'd'.

In [None]:
import re
selected_columns = [x for x in mtcars.columns if re.compile('^d').match(x) is not None]
selected_columns

['disp', 'drat']

In [None]:
mtcars.select(selected_columns).show(5)

+-----+----+
| disp|drat|
+-----+----+
|160.0| 3.9|
|160.0| 3.9|
|108.0|3.85|
|258.0|3.08|
|360.0|3.15|
+-----+----+
only showing top 5 rows



# Column expression

A Spark **column instance** is **NOT a column of values** from the **DataFrame**: when you crate a column instance, it does not give you the actual values of that column in the DataFrame. I found it makes more sense to me if I consider a **column instance as a column of expressions**. These expressions are evaluated by other methods (e.g., the **select()**, **groupby()**, and **orderby()** from **pyspark.sql.DataFrame**)

## Use dot (.) to select column from DataFrame

In [None]:
mpg_col = mtcars.mpg
mpg_col

Column<'mpg'>

## Modify a column to generate a new column

In [None]:
mpg_col + 1

Column<'(mpg + 1)'>

In [None]:
mtcars.select(mpg_col * 100).show(5)

+-----------+
|(mpg * 100)|
+-----------+
|     2100.0|
|     2100.0|
|     2280.0|
|     2140.0|
|     1870.0|
+-----------+
only showing top 5 rows



The `pyspark.sql.Column` has many methods that acts on a column and returns a column instance.

In [None]:
mtcars.select(mtcars.gear.isin([2,3])).show(5)

+----------------+
|(gear IN (2, 3))|
+----------------+
|           false|
|           false|
|           false|
|            true|
|            true|
+----------------+
only showing top 5 rows



In [None]:
mtcars.mpg.asc()

Column<'mpg ASC NULLS FIRST'>

## Dot (.) column expression

Create a column expression that will return the original column values.

In [None]:
mpg_col_exp = mtcars.mpg
mpg_col_exp

Column<'mpg'>

In [None]:
mtcars.select(mpg_col_exp).show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows



## Boolean column expression

Create a column expression that will return **boolean values**. 

## `between()`: true/false if the column value is between a given range

In [None]:
mpg_between = mtcars.cyl.between(4,6)
mpg_between

Column<'((cyl >= 4) AND (cyl <= 6))'>

In [None]:
mtcars.select(mtcars.cyl, mpg_between).show(5)

+---+---------------------------+
|cyl|((cyl >= 4) AND (cyl <= 6))|
+---+---------------------------+
|  6|                       true|
|  6|                       true|
|  4|                       true|
|  6|                       true|
|  8|                      false|
+---+---------------------------+
only showing top 5 rows



## `contains()`: true/false if the column value contains a string

In [None]:
model_contains = mtcars.model.contains('Ho')
model_contains

Column<'contains(model, Ho)'>

In [None]:
mtcars.select(mtcars.model, model_contains).show(5)

+-----------------+-------------------+
|            model|contains(model, Ho)|
+-----------------+-------------------+
|        Mazda RX4|              false|
|    Mazda RX4 Wag|              false|
|       Datsun 710|              false|
|   Hornet 4 Drive|               true|
|Hornet Sportabout|               true|
+-----------------+-------------------+
only showing top 5 rows



## `endswith()`: true/false if the column value ends with a string

In [None]:
model_endswith = mtcars.model.endswith('t')
model_endswith

Column<'endswith(model, t)'>

In [None]:
mtcars.select(mtcars.model, model_endswith).show(6)

+-----------------+------------------+
|            model|endswith(model, t)|
+-----------------+------------------+
|        Mazda RX4|             false|
|    Mazda RX4 Wag|             false|
|       Datsun 710|             false|
|   Hornet 4 Drive|             false|
|Hornet Sportabout|              true|
|          Valiant|              true|
+-----------------+------------------+
only showing top 6 rows



## `isNotNull()`: true/false if the column value is not Null

In [None]:
from pyspark.sql import Row
df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
df.show()

+-----+------+
| name|height|
+-----+------+
|  Tom|    80|
|Alice|  null|
+-----+------+



In [None]:
height_isNotNull = df.height.isNotNull()
height_isNotNull

Column<'(height IS NOT NULL)'>

In [None]:
df.select(df.height, height_isNotNull).show()

+------+--------------------+
|height|(height IS NOT NULL)|
+------+--------------------+
|    80|                true|
|  null|               false|
+------+--------------------+



## `isNull()`: true/false if the column value is Null

In [None]:
height_isNull = df.height.isNull()
height_isNull

Column<'(height IS NULL)'>

In [None]:
df.select(df.height, height_isNull).show()

+------+----------------+
|height|(height IS NULL)|
+------+----------------+
|    80|           false|
|  null|            true|
+------+----------------+



## `isin()`: true/false if the column value is contained by the evaluated argument

In [None]:
carb_isin = mtcars.carb.isin([2, 3])
carb_isin

Column<'(carb IN (2, 3))'>

In [None]:
mtcars.select(mtcars.carb, carb_isin).show(10)

+----+----------------+
|carb|(carb IN (2, 3))|
+----+----------------+
|   4|           false|
|   4|           false|
|   1|           false|
|   1|           false|
|   2|            true|
|   1|           false|
|   4|           false|
|   2|            true|
|   2|            true|
|   4|           false|
+----+----------------+
only showing top 10 rows



## `like()`: true/false if the column value matches a pattern based on a _SQL LIKE_ 

In [None]:
model_like = mtcars.model.like('Ho%')
model_like

Column<'model LIKE Ho%'>

In [None]:
mtcars.select(mtcars.model, model_like).show(10)

+-----------------+--------------+
|            model|model LIKE Ho%|
+-----------------+--------------+
|        Mazda RX4|         false|
|    Mazda RX4 Wag|         false|
|       Datsun 710|         false|
|   Hornet 4 Drive|          true|
|Hornet Sportabout|          true|
|          Valiant|         false|
|       Duster 360|         false|
|        Merc 240D|         false|
|         Merc 230|         false|
|         Merc 280|         false|
+-----------------+--------------+
only showing top 10 rows



## `rlike()`: true/false if the column value matches a pattern based on a _SQL RLIKE_ (LIKE with Regex)

In [None]:
model_rlike = mtcars.model.rlike('t$')
model_rlike

Column<'model RLIKE t$'>

In [None]:
mtcars.select(mtcars.model, model_rlike).show()

+-------------------+--------------+
|              model|model RLIKE t$|
+-------------------+--------------+
|          Mazda RX4|         false|
|      Mazda RX4 Wag|         false|
|         Datsun 710|         false|
|     Hornet 4 Drive|         false|
|  Hornet Sportabout|          true|
|            Valiant|          true|
|         Duster 360|         false|
|          Merc 240D|         false|
|           Merc 230|         false|
|           Merc 280|         false|
|          Merc 280C|         false|
|         Merc 450SE|         false|
|         Merc 450SL|         false|
|        Merc 450SLC|         false|
| Cadillac Fleetwood|         false|
|Lincoln Continental|         false|
|  Chrysler Imperial|         false|
|           Fiat 128|         false|
|        Honda Civic|         false|
|     Toyota Corolla|         false|
+-------------------+--------------+
only showing top 20 rows



## `startswith()`: true/false if the column value starts with a string

In [None]:
model_startswith = mtcars.model.startswith('Merc')
model_startswith

Column<'startswith(model, Merc)'>

In [None]:
mtcars.select(mtcars.model, model_startswith).show()

+-------------------+-----------------------+
|              model|startswith(model, Merc)|
+-------------------+-----------------------+
|          Mazda RX4|                  false|
|      Mazda RX4 Wag|                  false|
|         Datsun 710|                  false|
|     Hornet 4 Drive|                  false|
|  Hornet Sportabout|                  false|
|            Valiant|                  false|
|         Duster 360|                  false|
|          Merc 240D|                   true|
|           Merc 230|                   true|
|           Merc 280|                   true|
|          Merc 280C|                   true|
|         Merc 450SE|                   true|
|         Merc 450SL|                   true|
|        Merc 450SLC|                   true|
| Cadillac Fleetwood|                  false|
|Lincoln Continental|                  false|
|  Chrysler Imperial|                  false|
|           Fiat 128|                  false|
|        Honda Civic|             

# `pyspark.sql.functions` functions

`pyspark.sql.functions` is collection of built-in functions for **creating column expressions**. These functions largely increase methods that we can use to manipulate DataFrame and DataFrame columns.

There are many sql functions from the `pyspark.sql.functions` module. Here I only choose a few to show how these functions extend the ability to create column expressions.

In [None]:
from pyspark.sql import functions as F

## `abs()`: create column expression that returns absolute values of a column

In [None]:
from pyspark.sql import Row
df = sc.parallelize([Row(x=1), Row(x=-1), Row(x=-2)]).toDF()
df.show()

+---+
|  x|
+---+
|  1|
| -1|
| -2|
+---+



In [None]:
x_abs = F.abs(df.x)
x_abs

Column<'abs(x)'>

In [None]:
df.select(df.x, x_abs).show()

+---+------+
|  x|abs(x)|
+---+------+
|  1|     1|
| -1|     1|
| -2|     2|
+---+------+



## `concat()`: create column expression that concatenates multiple column values into one 

In [None]:
df = sc.parallelize([Row(a='apple', b='tree'), Row(a='orange', b='flowers')]).toDF()
df.show()

+------+-------+
|     a|      b|
+------+-------+
| apple|   tree|
|orange|flowers|
+------+-------+



In [None]:
ab_concat = F.concat(df.a, df.b)
ab_concat

Column<'concat(a, b)'>

In [None]:
df.select(df.a, df.b, ab_concat).show()

+------+-------+-------------+
|     a|      b| concat(a, b)|
+------+-------+-------------+
| apple|   tree|    appletree|
|orange|flowers|orangeflowers|
+------+-------+-------------+



## `corr()`: create column expression that returns pearson correlation coefficient between two columns

In [None]:
# Reload the mtcars data
mtcars = spark.read.csv('./mtcars.csv', inferSchema=True, header=True)
mtcars.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [None]:
drat_wt_corr = F.corr(mtcars.drat, mtcars.wt)
drat_wt_corr

Column<'corr(drat, wt)'>

In [None]:
mtcars.select(drat_wt_corr).show()

+-------------------+
|     corr(drat, wt)|
+-------------------+
|-0.7124406466973717|
+-------------------+



## `array()`: create column expression that merge multiple column values into an array

This function can be used to build **feature column** in machine learning models.

In [None]:
cols = [eval('mtcars.' + col) for col in mtcars.columns[1:]]
cols

[Column<'mpg'>,
 Column<'cyl'>,
 Column<'disp'>,
 Column<'hp'>,
 Column<'drat'>,
 Column<'wt'>,
 Column<'qsec'>,
 Column<'vs'>,
 Column<'am'>,
 Column<'gear'>,
 Column<'carb'>]

In [None]:
cols_array = F.array(cols)
cols_array

Column<'array(mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb)'>

In [None]:
mtcars.select(cols_array).show(truncate=False)

+-----------------------------------------------------------------+
|array(mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb)    |
+-----------------------------------------------------------------+
|[21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]  |
|[21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0] |
|[22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]  |
|[21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0]|
|[18.7, 8.0, 360.0, 175.0, 3.15, 3.44, 17.02, 0.0, 0.0, 3.0, 2.0] |
|[18.1, 6.0, 225.0, 105.0, 2.76, 3.46, 20.22, 1.0, 0.0, 3.0, 1.0] |
|[14.3, 8.0, 360.0, 245.0, 3.21, 3.57, 15.84, 0.0, 0.0, 3.0, 4.0] |
|[24.4, 4.0, 146.7, 62.0, 3.69, 3.19, 20.0, 1.0, 0.0, 4.0, 2.0]   |
|[22.8, 4.0, 140.8, 95.0, 3.92, 3.15, 22.9, 1.0, 0.0, 4.0, 2.0]   |
|[19.2, 6.0, 167.6, 123.0, 3.92, 3.44, 18.3, 1.0, 0.0, 4.0, 4.0]  |
|[17.8, 6.0, 167.6, 123.0, 3.92, 3.44, 18.9, 1.0, 0.0, 4.0, 4.0]  |
|[16.4, 8.0, 275.8, 180.0, 3.07, 4.07, 17.4, 0.0

# `udf()` function and sql types


The `pyspark.sql.functions.udf()` function is a very important function. It allows us to transfer a **user defined function** to a **`pyspark.sql.functions`** function which can act on columns of a DataFrame. It makes data framsformation much more flexible.

Using `udf()` could be tricky. The key is to understand how to define the `returnType` parameter.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

In [None]:
# Reload mtcars data
mtcars = spark.read.csv('./mtcars.csv', inferSchema=True, header=True)
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



**The structure of the schema passed to `returnType` has to match the data structure of the return value from the user defined function**.

**Case 1**: divide **disp** by **hp** and put the result to a new column

The user defined function returns a float value.

In [None]:
def disp_by_hp(disp, hp):
    return(disp/hp)

In [None]:
disp_by_hp_udf = udf(disp_by_hp, returnType=FloatType())

In [None]:
all_original_cols = [eval('mtcars.' + x) for x in mtcars.columns]
all_original_cols

[Column<'model'>,
 Column<'mpg'>,
 Column<'cyl'>,
 Column<'disp'>,
 Column<'hp'>,
 Column<'drat'>,
 Column<'wt'>,
 Column<'qsec'>,
 Column<'vs'>,
 Column<'am'>,
 Column<'gear'>,
 Column<'carb'>]

In [None]:
disp_by_hp_col = disp_by_hp_udf(mtcars.disp, mtcars.hp)
disp_by_hp_col

Column<'disp_by_hp(disp, hp)'>

In [None]:
all_new_cols = all_original_cols + [disp_by_hp_col]
all_new_cols

[Column<'model'>,
 Column<'mpg'>,
 Column<'cyl'>,
 Column<'disp'>,
 Column<'hp'>,
 Column<'drat'>,
 Column<'wt'>,
 Column<'qsec'>,
 Column<'vs'>,
 Column<'am'>,
 Column<'gear'>,
 Column<'carb'>,
 Column<'disp_by_hp(disp, hp)'>]

In [None]:
mtcars.select(all_new_cols).show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+--------------------+
|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|disp_by_hp(disp, hp)|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+--------------------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|           1.4545455|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|           1.4545455|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|           1.1612903|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|           2.3454545|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|            2.057143|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|            2.142857|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|           1.4693878|
|          Merc 240D|24.4|  4|

**case 2**: create an array column that contain **disp** and **hp** values

In [None]:
# define function
def merge_two_columns(col1, col2):
    return([float(col1), float(col2)])

# convert user defined function into an udf function (sql function)
array_merge_two_columns_udf = udf(merge_two_columns, returnType=ArrayType(FloatType()))

In [None]:
array_col = array_merge_two_columns_udf(mtcars.disp, mtcars.hp)
array_col

Column<'merge_two_columns(disp, hp)'>

In [None]:
all_new_cols = all_original_cols + [array_col]
all_new_cols

[Column<'model'>,
 Column<'mpg'>,
 Column<'cyl'>,
 Column<'disp'>,
 Column<'hp'>,
 Column<'drat'>,
 Column<'wt'>,
 Column<'qsec'>,
 Column<'vs'>,
 Column<'am'>,
 Column<'gear'>,
 Column<'carb'>,
 Column<'merge_two_columns(disp, hp)'>]

In [None]:
mtcars.select(all_new_cols).show(5, truncate=False)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------------------+
|model            |mpg |cyl|disp |hp |drat|wt   |qsec |vs |am |gear|carb|merge_two_columns(disp, hp)|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------------------+
|Mazda RX4        |21.0|6  |160.0|110|3.9 |2.62 |16.46|0  |1  |4   |4   |[160.0, 110.0]             |
|Mazda RX4 Wag    |21.0|6  |160.0|110|3.9 |2.875|17.02|0  |1  |4   |4   |[160.0, 110.0]             |
|Datsun 710       |22.8|4  |108.0|93 |3.85|2.32 |18.61|1  |1  |4   |1   |[108.0, 93.0]              |
|Hornet 4 Drive   |21.4|6  |258.0|110|3.08|3.215|19.44|1  |0  |3   |1   |[258.0, 110.0]             |
|Hornet Sportabout|18.7|8  |360.0|175|3.15|3.44 |17.02|0  |0  |3   |2   |[360.0, 175.0]             |
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------------------+
only showing top 5 rows



## `ArrayType` vs. `StructType`

Both `ArrayType` and `StructType` can be used to build `returnType` for a list. The difference is: 

1. `ArrayType` requires all elements in the list have the same `elementType`, while `StructType` can have different `elementTypes`.
2. `StructType` represents a `Row` object.


**Define an `ArrayType` with elementType being `FloatType`.**

In [None]:
# define function
def merge_two_columns(col1, col2):
    return([float(col1), float(col2)])
array_type = ArrayType(FloatType())
array_merge_two_columns_udf = udf(merge_two_columns, returnType=array_type)

**Define a `StructType` with one elementType being `StringType` and the other being `FloatType`.**

In [None]:
# define function
def merge_two_columns(col1, col2):
    return([str(col1), float(col2)])
struct_type = StructType([
    StructField('f1', StringType()),
    StructField('f2', FloatType())
])
struct_merge_two_columns_udf = udf(merge_two_columns, returnType=struct_type)

**array column** expression: both values are float type values

In [None]:
array_col = array_merge_two_columns_udf(mtcars.hp, mtcars.disp)
array_col

Column<'merge_two_columns(hp, disp)'>

**struct column** expression: first value is a string and the second value is a float type value.

In [None]:
struct_col = struct_merge_two_columns_udf(mtcars.model, mtcars.disp)
struct_col

Column<'merge_two_columns(model, disp)'>

**Results**

In [None]:
mtcars.select(array_col, struct_col).show(truncate=False)

+---------------------------+------------------------------+
|merge_two_columns(hp, disp)|merge_two_columns(model, disp)|
+---------------------------+------------------------------+
|[110.0, 160.0]             |{Mazda RX4, 160.0}            |
|[110.0, 160.0]             |{Mazda RX4 Wag, 160.0}        |
|[93.0, 108.0]              |{Datsun 710, 108.0}           |
|[110.0, 258.0]             |{Hornet 4 Drive, 258.0}       |
|[175.0, 360.0]             |{Hornet Sportabout, 360.0}    |
|[105.0, 225.0]             |{Valiant, 225.0}              |
|[245.0, 360.0]             |{Duster 360, 360.0}           |
|[62.0, 146.7]              |{Merc 240D, 146.7}            |
|[95.0, 140.8]              |{Merc 230, 140.8}             |
|[123.0, 167.6]             |{Merc 280, 167.6}             |
|[123.0, 167.6]             |{Merc 280C, 167.6}            |
|[180.0, 275.8]             |{Merc 450SE, 275.8}           |
|[180.0, 275.8]             |{Merc 450SL, 275.8}           |
|[180.0, 275.8]         