<a href="https://colab.research.google.com/github/tyri0n11/distributed-system/blob/main/7_1_data_manipulation_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sparkContext=sc)

In [2]:
spark

## Map functions

These functions are probably the most commonly used functions when dealing with an RDD object.

* `map()`
* `mapValues()`
* `flatMap()`
* `flatMapValues()`

### `map`

The `map()` method applies a function to each elements of the RDD. Each element has to be a valid input to the function. The returned RDD has the function outputs as its new elements.

Elements in the RDD object `map_exp_rdd` below are rows of the `mtcars` in string format. We are going to apply the `map()` function multiple times to convert each string elements as a list elements. Each list element has two values: the first value will be the auto model in string format; the second value will be a list of numeric values.

In [3]:
import pandas as pd

mtcar_data = pd.read_csv('./mtcars.csv')
mtcar_data.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


In [4]:
type(mtcar_data)

In [5]:
# create an example RDD
map_exp_rdd = sc.textFile('./mtcars.csv')
map_exp_rdd.take(4)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1']

In [6]:
type(map_exp_rdd)

In [7]:
# split auto model from other feature values
map_exp_rdd_1 = map_exp_rdd.map(lambda x: x.split(',')).map(lambda x: (x[0], x[1:]))
map_exp_rdd_1.take(4)

[('',
  ['mpg',
   'cyl',
   'disp',
   'hp',
   'drat',
   'wt',
   'qsec',
   'vs',
   'am',
   'gear',
   'carb']),
 ('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1'])]

In [8]:
# remove the header row
header = map_exp_rdd_1.first()
# the filter method apply a function to each elemnts. The function output is a boolean value (TRUE or FALSE)
# elements that have output TRUE will be kept.
map_exp_rdd_2 = map_exp_rdd_1.filter(lambda x: x != header)
map_exp_rdd_2.take(4)

[('Mazda RX4',
  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),
 ('Mazda RX4 Wag',
  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),
 ('Datsun 710',
  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1']),
 ('Hornet 4 Drive',
  ['21.4', '6', '258', '110', '3.08', '3.215', '19.44', '1', '0', '3', '1'])]

In [9]:
# convert string values to numeric values
map_exp_rdd_3 = map_exp_rdd_2.map(lambda x: (x[0], list(map(float, x[1]))))
map_exp_rdd_3.take(4)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

### `mapValues`

The `mapValues` function requires that each element in the RDD has a **key/value** pair structure, for example, a tuple of 2 items, or a list of 2 items. The `mapValues` function applies a function to each of the element values. The element key will remain unchanged.

We can apply the `mapValues` function to the RDD object `mapValues_exp_rdd` below.


In [10]:
mapValues_exp_rdd = map_exp_rdd_3
mapValues_exp_rdd.take(4)

[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

In [11]:
import numpy as np
mapValues_exp_rdd_1 = mapValues_exp_rdd.mapValues(lambda x: np.mean(x))
mapValues_exp_rdd_1.take(4)

[('Mazda RX4', np.float64(29.90727272727273)),
 ('Mazda RX4 Wag', np.float64(29.98136363636364)),
 ('Datsun 710', np.float64(23.59818181818182)),
 ('Hornet 4 Drive', np.float64(38.73954545454546))]

In [12]:
import numpy as np
mapValues_exp_rdd_2 = mapValues_exp_rdd.mapValues(lambda x: np.min(x))
mapValues_exp_rdd_2.take(4)

[('Mazda RX4', np.float64(0.0)),
 ('Mazda RX4 Wag', np.float64(0.0)),
 ('Datsun 710', np.float64(1.0)),
 ('Hornet 4 Drive', np.float64(0.0))]

In [13]:
import numpy as np
mapValues_exp_rdd_3 = mapValues_exp_rdd.mapValues(lambda x: (np.min(x)+np.max(x))/2)
mapValues_exp_rdd_3.take(4)

[('Mazda RX4', np.float64(80.0)),
 ('Mazda RX4 Wag', np.float64(80.0)),
 ('Datsun 710', np.float64(54.5)),
 ('Hornet 4 Drive', np.float64(129.0))]

When using `mapValues()`, the x in the above lambda function refers to the element value, not including the element key.

In [14]:
# prompt: Provide more examples of `mapValues()` in Spark

# Calculate the sum of all values for each key
mapValues_exp_rdd_4 = mapValues_exp_rdd.mapValues(lambda x: sum(x))
mapValues_exp_rdd_4.take(4)

# Find the maximum value for each key
mapValues_exp_rdd_5 = mapValues_exp_rdd.mapValues(lambda x: max(x))
mapValues_exp_rdd_5.take(4)

# Convert each value (a list of floats) to a numpy array
mapValues_exp_rdd_6 = mapValues_exp_rdd.mapValues(lambda x: np.array(x))
mapValues_exp_rdd_6.take(4)

# Apply a custom function to each value
def custom_function(values):
  """Example custom function that calculates the range of a list of numbers"""
  return max(values) - min(values)

mapValues_exp_rdd_7 = mapValues_exp_rdd.mapValues(custom_function)
mapValues_exp_rdd_7.take(4)

# Example with a conditional operation: square values if the minimum is > 5
mapValues_exp_rdd_8 = mapValues_exp_rdd.mapValues(lambda x: [val**2 for val in x] if min(x) > 5 else x)
mapValues_exp_rdd_8.take(4)


[('Mazda RX4',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),
 ('Mazda RX4 Wag',
  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),
 ('Datsun 710',
  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),
 ('Hornet 4 Drive',
  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]

### `flatMap`

This function first applies a function to each elements of an RDD and then flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.


In [15]:
x = [('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]
flatMap_exp_rdd = sc.parallelize(x)
flatMap_exp_rdd.collect()

[('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]

In [16]:
flatMap_exp_rdd_1 = flatMap_exp_rdd.flatMap(lambda x: x)
flatMap_exp_rdd_1.collect()

['a', 'b', 'c', 'a', 'a', 'c', 'c', 'c', 'd']

In [17]:
type(flatMap_exp_rdd_1)

### `flatMapValues`

The `flatMapValues` function requires that each element in the RDD has a **key/value** pair structure. It applies a function to each **element value** of the RDD object and then flatten the results.

For example, my raw data looks like below. But I would like to transform the data so that it has three columns: the first column is the **sample id**; the second the column is the three **types (A,B or C)**; the third column is the **values**.

| sample id |  A |  B |  C |
|:---------:|:--:|:--:|:--:|
|     1     | 23 | 18 | 32 |
|     2     | 18 | 29 | 31 |
|     3     | 34 | 21 | 18 |

In [18]:
# example data
my_data = [
    [1, (23, 28, 32)],
    [2, (18, 29, 31)],
    [3, (34, 21, 18)]
]
flatMapValues_exp_rdd = sc.parallelize(my_data)
flatMapValues_exp_rdd.collect()

[[1, (23, 28, 32)], [2, (18, 29, 31)], [3, (34, 21, 18)]]

In [19]:
# merge A,B,and C columns into one column and add the type column
flatMapValues_exp_rdd_1 = flatMapValues_exp_rdd.flatMapValues(lambda x: list(zip(list('ABC'), x)))
flatMapValues_exp_rdd_1.collect()

[(1, ('A', 23)),
 (1, ('B', 28)),
 (1, ('C', 32)),
 (2, ('A', 18)),
 (2, ('B', 29)),
 (2, ('C', 31)),
 (3, ('A', 34)),
 (3, ('B', 21)),
 (3, ('C', 18))]

In [20]:
# unpack the element values
flatMapValues_exp_rdd_2 = flatMapValues_exp_rdd_1.map(lambda x: [x[0]] + list(x[1]) )
flatMapValues_exp_rdd_2.collect()

[[1, 'A', 23],
 [1, 'B', 28],
 [1, 'C', 32],
 [2, 'A', 18],
 [2, 'B', 29],
 [2, 'C', 31],
 [3, 'A', 34],
 [3, 'B', 21],
 [3, 'C', 18]]

In [21]:
# prompt: Provide more examples of `flatMapValues()` in Pyspark

# Example 1: Expanding a list of words associated with each key
data = [('doc1', ['apple', 'banana']), ('doc2', ['orange', 'grape', 'kiwi'])]
rdd = sc.parallelize(data)

# Apply flatMapValues to expand the list of words into individual elements.
expanded_rdd = rdd.flatMapValues(lambda words: words)
expanded_rdd.collect()  # Output: [('doc1', 'apple'), ('doc1', 'banana'), ('doc2', 'orange'), ('doc2', 'grape'), ('doc2', 'kiwi')]


# Example 2: Processing a list of tuples associated with each key.
data2 = [('user1', [('productA', 5), ('productB', 2)]), ('user2', [('productC', 1), ('productA', 3)])]
rdd2 = sc.parallelize(data2)

# Separate product and quantity for each user.
product_quantities = rdd2.flatMapValues(lambda purchases: purchases)
product_quantities.collect() # Output: [('user1', ('productA', 5)), ('user1', ('productB', 2)), ('user2', ('productC', 1)), ('user2', ('productA', 3))]

# Example 3: Splitting strings based on a delimiter
data3 = [('sentence1', 'this is a test sentence'), ('sentence2', 'another example')]
rdd3 = sc.parallelize(data3)

# Split each sentence into words.
words_rdd = rdd3.flatMapValues(lambda sentence: sentence.split())
words_rdd.collect() # Output: [('sentence1', 'this'), ('sentence1', 'is'), ('sentence1', 'a'), ('sentence1', 'test'), ('sentence1', 'sentence'), ('sentence2', 'another'), ('sentence2', 'example')]


# Example 4: Generating multiple key-value pairs from each original value
data4 = [('key1', 10), ('key2', 25)]
rdd4 = sc.parallelize(data4)


# Generate key-value pairs based on whether values are even or odd
def process_number(num):
    if num % 2 == 0:
      return [('even', num)]
    else:
      return [('odd', num)]

processed_rdd = rdd4.flatMapValues(process_number)
processed_rdd.collect() # Output: [('key1', ('even', 10)), ('key2', ('odd', 25))]


[('key1', ('even', 10)), ('key2', ('odd', 25))]

## Aggregate functions
Two aggregate functions:

* `aggregate()`
* `aggregateByKey()`

### `aggregate(zeroValue, seqOp, combOp)`

* **zeroValue** is like a data container. Its structure should match with the data structure of the returned values from the seqOp function.
* **seqOp** is a function that takes two arguments: the first argument is the zeroValue and the second argument is an element from the RDD. The zeroValue gets updated with the returned value after every run.
* **combOp** is a function that takes two arguments: the first argument is the final zeroValue from one partition and the other is another final zeroValue from another partition.

The code below calculates the total sum of squares for **mpg** and **disp** in data set **mtcars**.

Step 1: get some data.

In [22]:
mtcars_df = spark.read.csv('./mtcars.csv', inferSchema=True, header=True).select(['mpg', 'disp'])
mtcars_df.take(5)

[Row(mpg=21.0, disp=160.0),
 Row(mpg=21.0, disp=160.0),
 Row(mpg=22.8, disp=108.0),
 Row(mpg=21.4, disp=258.0),
 Row(mpg=18.7, disp=360.0)]

Step 2: calculate averages of mgp and disp

In [23]:
mpg_mean = mtcars_df.select('mpg').rdd.map(lambda x: x[0]).mean()
disp_mean = mtcars_df.select('disp').rdd.map(lambda x: x[0]).mean()
print('mpg mean = ', mpg_mean, '; '
      'disp mean = ', disp_mean)

mpg mean =  20.090625000000003 ; disp mean =  230.721875


Step 3: build **zeroValue, seqOp** and **combOp**

We are calculating two TSS. We create a tuple to store two values.

In [24]:
zeroValue = (0, 0)

The **z** below refers to `zeroValue`. Its values get updated after every run. The **x** refers to an element in an RDD partition. In this case, both **z** and **x** have two values.

In [25]:
seqOp = lambda z, x: (z[0] + (x[0] - mpg_mean)**2, z[1] + (x[1] - disp_mean)**2)

In [26]:
seqOp

<function __main__.<lambda>(z, x)>

The `combOp` function simply aggrate all `zeroValues` into one.

In [27]:
combOp = lambda px, py: ( px[0] + py[0], px[1] + py[1] )

Implement `aggregate()` function.

## `aggregateByKey(zeroValue, seqOp, combOp)`

This function does similar things as `aggregate()`. The `aggregate()` aggregate all results to the very end, but aggregateByKey() merge results by key.

In [29]:
iris_rdd = sc.textFile('./iris.csv', use_unicode=True)
iris_rdd.take(2)

['sepal_length,sepal_width,petal_length,petal_width,species',
 '5.1,3.5,1.4,0.2,setosa']

In [30]:
iris_rdd_2 = iris_rdd.map(lambda x: x.split(',')).\
    filter(lambda x: x[0] != 'sepal_length').\
    map(lambda x: (x[-1], [*map(float, x[:-1])]))
iris_rdd_2.take(5)

[('setosa', [5.1, 3.5, 1.4, 0.2]),
 ('setosa', [4.9, 3.0, 1.4, 0.2]),
 ('setosa', [4.7, 3.2, 1.3, 0.2]),
 ('setosa', [4.6, 3.1, 1.5, 0.2]),
 ('setosa', [5.0, 3.6, 1.4, 0.2])]

### Define initial values, seqOp and combOp

In [31]:
zero_value = (0, 0)
seqOp = (lambda x, y: (x[0] + (y[0])**2, x[1] + (y[1])**2))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [32]:
seqOp

<function __main__.<lambda>(x, y)>

In [33]:
combOp

<function __main__.<lambda>(x, y)>

### Implement `aggregateByKey()`

In [34]:
list_aggregated_iris = iris_rdd_2.aggregateByKey(zero_value, seqOp, combOp).collect()
list_aggregated_iris

[('setosa', (1259.0899999999997, 591.2500000000002)),
 ('versicolor', (1774.8600000000001, 388.47)),
 ('virginica', (2189.9000000000005, 447.33))]

# Convert continuous variables to categorical variables

There are two functions we can use to split a continuous variable into categories:

* `pyspark.ml.feature.Binarizer`: split a column of continuous features given a threshold
* `pyspark.ml.feature.Bucktizer`: split a column of continuous features into categories given several breaking points.
    + with n+1 split points, there are n categories (buckets).


In [35]:
import numpy as np
import pandas as pd
np.random.seed(seed=1234)
pdf = pd.DataFrame({
        'x1': np.random.randn(10),
        'x2': np.random.rand(10)*10
    })
np.random.seed(seed=None)
df = spark.createDataFrame(pdf)
df.show()

+--------------------+------------------+
|                  x1|                x2|
+--------------------+------------------+
| 0.47143516373249306| 6.834629351721363|
| -1.1909756947064645| 7.127020269829002|
|  1.4327069684260973|3.7025075479039495|
| -0.3126518960917129| 5.611961860656249|
| -0.7205887333650116| 5.030831653078097|
|  0.8871629403077386|0.1376844959068224|
|  0.8595884137174165| 7.728266216123741|
| -0.6365235044173491| 8.826411906361166|
|0.015696372114428918| 3.648859839013723|
| -2.2426849541854055| 6.153961784334937|
+--------------------+------------------+



## Binarize the column x1 and Bucketize the column x2

In [36]:
from pyspark.ml.feature import Binarizer, Bucketizer
# threshold = 0 for binarizer
binarizer = Binarizer(threshold=0, inputCol='x1', outputCol='x1_new')
# provide 5 split points to generate 4 buckets
bucketizer = Bucketizer(splits=[0, 2.5, 5, 7.5, 10], inputCol='x2', outputCol='x2_new')

# pipeline stages
from pyspark.ml import Pipeline
stages = [binarizer, bucketizer]
pipeline = Pipeline(stages=stages)

# fit the pipeline model and transform the data
pipeline.fit(df).transform(df).show()

+--------------------+------------------+------+------+
|                  x1|                x2|x1_new|x2_new|
+--------------------+------------------+------+------+
| 0.47143516373249306| 6.834629351721363|   1.0|   2.0|
| -1.1909756947064645| 7.127020269829002|   0.0|   2.0|
|  1.4327069684260973|3.7025075479039495|   1.0|   1.0|
| -0.3126518960917129| 5.611961860656249|   0.0|   2.0|
| -0.7205887333650116| 5.030831653078097|   0.0|   2.0|
|  0.8871629403077386|0.1376844959068224|   1.0|   0.0|
|  0.8595884137174165| 7.728266216123741|   1.0|   3.0|
| -0.6365235044173491| 8.826411906361166|   0.0|   3.0|
|0.015696372114428918| 3.648859839013723|   1.0|   1.0|
| -2.2426849541854055| 6.153961784334937|   0.0|   2.0|
+--------------------+------------------+------+------+



### Exercise:
Do binarizer for `x2` the threshold `3`, and buckertizer `x1` with the split point is `[0.1, 0.3, 0.5, 0.7, 0.9]`.

In [44]:
from pyspark.ml.feature import Binarizer, Bucketizer
from pyspark.ml import Pipeline

binarizer = Binarizer(threshold=3,inputCol='x2', outputCol='x2_new')
# Adjust the splits to cover the full range of values in x1
bucketizer = Bucketizer(splits=[float('-inf'), 0.1, 0.3, 0.5, 0.7, 0.9, float('inf')], inputCol='x1', outputCol='x1_new')

stages = [binarizer, bucketizer]
pipeline = Pipeline(stages=stages)

# fit the pipeline model and transform the data
pipeline.fit(df).transform(df).show()

+--------------------+------------------+------+------+
|                  x1|                x2|x2_new|x1_new|
+--------------------+------------------+------+------+
| 0.47143516373249306| 6.834629351721363|   1.0|   2.0|
| -1.1909756947064645| 7.127020269829002|   1.0|   0.0|
|  1.4327069684260973|3.7025075479039495|   1.0|   5.0|
| -0.3126518960917129| 5.611961860656249|   1.0|   0.0|
| -0.7205887333650116| 5.030831653078097|   1.0|   0.0|
|  0.8871629403077386|0.1376844959068224|   0.0|   4.0|
|  0.8595884137174165| 7.728266216123741|   1.0|   4.0|
| -0.6365235044173491| 8.826411906361166|   1.0|   0.0|
|0.015696372114428918| 3.648859839013723|   1.0|   0.0|
| -2.2426849541854055| 6.153961784334937|   1.0|   0.0|
+--------------------+------------------+------+------+



# Data Check

In [37]:
!ls

iris.csv  mtcars.csv  sample_data


In [39]:
titanic = spark.read.csv('./kaggle-titanic-train.csv', header=True, inferSchema=True)
titanic.show(5)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/kaggle-titanic-train.csv.

## Data type

First, we want to check if string and numeric variables are imported as we expect.

In [None]:
titanic.printSchema()

## Data summary

In [None]:
len(titanic.columns)

In [None]:
titanic.count()

### Summarize *columns*

In [None]:
def describe_columns(df):
    for i in df.columns:
        print('Column: ' + i)
        titanic.select(i).describe().show()

In [None]:
describe_columns(titanic)

### Find columns with missing values

In [None]:
def find_missing_values_columns(df):
    nrow = df.count()
    for v in df.columns:
        summary_df = df.select(v).describe()
        v_count = int(summary_df.collect()[0][v])
        if v_count < nrow:
            missing_percentage = (1 - v_count/nrow) * 100
            print("Total observations: " + str(nrow) + "\n"
                 "Total observations of " + v + ": " + str(v_count) + "\n"
                 "Percentage of missing values: " + str(missing_percentage) + "%" + "\n"
                 "----------------------------")

In [None]:
find_missing_values_columns(titanic)

# Subset selection

## Select Rows by index

First, we need to add index to each rows. The **zipWithIndex** function zips the RDD elements with their corresponding index and returns the result as a new element.

In [None]:
mtcars = spark.read.csv('./mtcars.csv', inferSchema=True, header=True)
# correct first column name
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)

In [None]:
mtcars.rdd.zipWithIndex().take(5)

Now we can apply the **map** function to modify the structure of each element. Assume **x** is an element from the above RDD object, **x** has two elements: x[0] and x[1]. x[0] is an **Row** object, and x[1] is the index, which is an integer. We want to merge these two values to create a list. And we also want the first element in the list is the index.

In [None]:
mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).take(5)

Let's add column names and save the result.

In [None]:
header = ['index'] + mtcars.columns
mtcars_df = mtcars.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).toDF(header)

In [None]:
mtcars_df.show(5)

After we obtain the **index column**, we can apply the **pyspark.sql.DataFrame.filter** function to select rows of the DataFrame. The **filter** function takes a **column** of **types.BooleanType** as input.

### Select specific rows

In [None]:
mtcars_df.filter(mtcars_df.index.isin([1,2,4,6,9])).show()

### Select rows between a range

In [None]:
mtcars_df.filter(mtcars_df.index.between(5, 10)).show()

### Select rows by a cutoff index

In [None]:
mtcars_df.filter(mtcars_df.index < 9).show()

In [None]:
mtcars_df.filter(mtcars_df.index >= 14).show()

## Select rows by logical criteria

Example 1: select rows when **cyl = 4**

In [None]:
mtcars_df.filter(mtcars_df.cyl == 4).show()

In [None]:
mtcars_df.filter(mtcars_df.cyl == 2).show()

Example 2: select rows when **vs = 1 and am = 1**

When the filtering is based on multiple **conditions** (e.g., **vs = 1** and **am = 1**), we use the conditions to build a new **boolean type column**. And we filter the DataFrame by the new column.

<span style="color:red">Warning: when passing multiple conditions to the **`when()`** function, each condition has to be within a pair of parentheses</span>

In [None]:
from pyspark.sql import functions as F

In [None]:
filtering_column = F.when((mtcars_df.vs == 1) & (mtcars_df.am == 1), 1).name('filter_col')
filtering_column

Now we need to add the new column to the original DataFrame. **This can be done by applying the `select()` function to select all original columns as well as the new filtering columns.**

In [None]:
all_original_columns = [eval('mtcars_df.' + c) for c in mtcars_df.columns]
all_original_columns

In [None]:
all_columns = all_original_columns + [filtering_column]
all_columns

In [None]:
new_mtcars_df = mtcars_df.select(all_columns)
new_mtcars_df.show()

Now we can filter the DataFrame by the requested conditions. After we filter the DataFrame, we can drop the filtering column.

In [None]:
new_mtcars_df.filter(new_mtcars_df.filter_col == 1).drop('filter_col').show()

## Select columns by name

We can simply use the **select()** function to select columns by name.

In [None]:
mtcars.select(['hp', 'disp']).show(5)

## Select columns by index

We can convert indices to corresponding column names and then select columns by name.

In [None]:
indices = [0,3,4,7]
selected_columns =  [mtcars.columns[index] for index in indices]
selected_columns

In [None]:
mtcars.select(selected_columns).show(5)

## Select columns by pattern

Example: columns start with `d`.

In [None]:
import re
selected_columns = [x for x in mtcars.columns if re.compile('^d').match(x) is not None]
selected_columns

In [None]:
mtcars.select(selected_columns).show(5)

# Column expression

A Spark **column instance** is **NOT a column of values** from the **DataFrame**: when you crate a column instance, it does not give you the actual values of that column in the DataFrame. I found it makes more sense to me if I consider a **column instance as a column of expressions**. These expressions are evaluated by other methods (e.g., the **select()**, **groupby()**, and **orderby()** from **pyspark.sql.DataFrame**)

## Use dot (.) to select column from DataFrame

In [None]:
mpg_col = mtcars.mpg
mpg_col

## Modify a column to generate a new column

In [None]:
mpg_col + 1

In [None]:
mtcars.select(mpg_col * 100).show(5)

The `pyspark.sql.Column` has many methods that acts on a column and returns a column instance.

In [None]:
mtcars.select(mtcars.gear.isin([2,3])).show(5)

In [None]:
mtcars.mpg.asc()

## Dot (.) column expression

Create a column expression that will return the original column values.

In [None]:
mpg_col_exp = mtcars.mpg
mpg_col_exp

In [None]:
mtcars.select(mpg_col_exp).show(5)

## Boolean column expression

Create a column expression that will return **boolean values**.

## `between()`: true/false if the column value is between a given range

In [None]:
mpg_between = mtcars.cyl.between(4,6)
mpg_between

In [None]:
mtcars.select(mtcars.cyl, mpg_between).show(5)

## `contains()`: true/false if the column value contains a string

In [None]:
model_contains = mtcars.model.contains('Ho')
model_contains

In [None]:
mtcars.select(mtcars.model, model_contains).show(5)

## `endswith()`: true/false if the column value ends with a string

In [None]:
model_endswith = mtcars.model.endswith('t')
model_endswith

In [None]:
mtcars.select(mtcars.model, model_endswith).show(6)

## `isNotNull()`: true/false if the column value is not Null

In [None]:
from pyspark.sql import Row
df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
df.show()

In [None]:
height_isNotNull = df.height.isNotNull()
height_isNotNull

In [None]:
df.select(df.height, height_isNotNull).show()

## `isNull()`: true/false if the column value is Null

In [None]:
height_isNull = df.height.isNull()
height_isNull

In [None]:
df.select(df.height, height_isNull).show()

## `isin()`: true/false if the column value is contained by the evaluated argument

In [None]:
carb_isin = mtcars.carb.isin([2, 3])
carb_isin

In [None]:
mtcars.select(mtcars.carb, carb_isin).show(10)

## `like()`: true/false if the column value matches a pattern based on a _SQL LIKE_

In [None]:
model_like = mtcars.model.like('Ho%')
model_like

In [None]:
mtcars.select(mtcars.model, model_like).show(10)

## `rlike()`: true/false if the column value matches a pattern based on a _SQL RLIKE_ (LIKE with Regex)

In [None]:
model_rlike = mtcars.model.rlike('t$')
model_rlike

In [None]:
mtcars.select(mtcars.model, model_rlike).show()

## `startswith()`: true/false if the column value starts with a string

In [None]:
model_startswith = mtcars.model.startswith('Merc')
model_startswith

In [None]:
mtcars.select(mtcars.model, model_startswith).show()

# `pyspark.sql.functions` functions

`pyspark.sql.functions` is collection of built-in functions for **creating column expressions**. These functions largely increase methods that we can use to manipulate DataFrame and DataFrame columns.

There are many sql functions from the `pyspark.sql.functions` module. Here I only choose a few to show how these functions extend the ability to create column expressions.

In [None]:
from pyspark.sql import functions as F

## `abs()`: create column expression that returns absolute values of a column

In [None]:
from pyspark.sql import Row
df = sc.parallelize([Row(x=1), Row(x=-1), Row(x=-2)]).toDF()
df.show()

In [None]:
x_abs = F.abs(df.x)
x_abs

In [None]:
df.select(df.x, x_abs).show()

## `concat()`: create column expression that concatenates multiple column values into one

In [None]:
df = sc.parallelize([Row(a='apple', b='tree'), Row(a='orange', b='flowers')]).toDF()
df.show()

In [None]:
ab_concat = F.concat(df.a, df.b)
ab_concat

In [None]:
df.select(df.a, df.b, ab_concat).show()

## `corr()`: create column expression that returns pearson correlation coefficient between two columns

In [None]:
# Reload the mtcars data
mtcars = spark.read.csv('./mtcars.csv', inferSchema=True, header=True)
mtcars.show(5)

In [None]:
drat_wt_corr = F.corr(mtcars.drat, mtcars.wt)
drat_wt_corr

In [None]:
mtcars.select(drat_wt_corr).show()

## `array()`: create column expression that merge multiple column values into an array

This function can be used to build **feature column** in machine learning models.

In [None]:
cols = [eval('mtcars.' + col) for col in mtcars.columns[1:]]
cols

In [None]:
cols_array = F.array(cols)
cols_array

In [None]:
mtcars.select(cols_array).show(truncate=False)

# `udf()` function and sql types


The `pyspark.sql.functions.udf()` function is a very important function. It allows us to transfer a **user defined function** to a **`pyspark.sql.functions`** function which can act on columns of a DataFrame. It makes data framsformation much more flexible.

Using `udf()` could be tricky. The key is to understand how to define the `returnType` parameter.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

In [None]:
# Reload mtcars data
mtcars = spark.read.csv('./mtcars.csv', inferSchema=True, header=True)
mtcars = mtcars.withColumnRenamed('_c0', 'model')
mtcars.show(5)

**The structure of the schema passed to `returnType` has to match the data structure of the return value from the user defined function**.

**Case 1**: divide **disp** by **hp** and put the result to a new column

The user defined function returns a float value.

In [None]:
def disp_by_hp(disp, hp):
    return(disp/hp)

In [None]:
disp_by_hp_udf = udf(disp_by_hp, returnType=FloatType())

In [None]:
all_original_cols = [eval('mtcars.' + x) for x in mtcars.columns]
all_original_cols

In [None]:
disp_by_hp_col = disp_by_hp_udf(mtcars.disp, mtcars.hp)
disp_by_hp_col

In [None]:
all_new_cols = all_original_cols + [disp_by_hp_col]
all_new_cols

In [None]:
mtcars.select(all_new_cols).show()

**case 2**: create an array column that contain **disp** and **hp** values

In [None]:
# define function
def merge_two_columns(col1, col2):
    return([float(col1), float(col2)])

# convert user defined function into an udf function (sql function)
array_merge_two_columns_udf = udf(merge_two_columns, returnType=ArrayType(FloatType()))

In [None]:
array_col = array_merge_two_columns_udf(mtcars.disp, mtcars.hp)
array_col

In [None]:
all_new_cols = all_original_cols + [array_col]
all_new_cols

In [None]:
mtcars.select(all_new_cols).show(5, truncate=False)

## `ArrayType` vs. `StructType`

Both `ArrayType` and `StructType` can be used to build `returnType` for a list. The difference is:

1. `ArrayType` requires all elements in the list have the same `elementType`, while `StructType` can have different `elementTypes`.
2. `StructType` represents a `Row` object.


**Define an `ArrayType` with elementType being `FloatType`.**

In [None]:
# define function
def merge_two_columns(col1, col2):
    return([float(col1), float(col2)])
array_type = ArrayType(FloatType())
array_merge_two_columns_udf = udf(merge_two_columns, returnType=array_type)

**Define a `StructType` with one elementType being `StringType` and the other being `FloatType`.**

In [None]:
# define function
def merge_two_columns(col1, col2):
    return([str(col1), float(col2)])
struct_type = StructType([
    StructField('f1', StringType()),
    StructField('f2', FloatType())
])
struct_merge_two_columns_udf = udf(merge_two_columns, returnType=struct_type)

**array column** expression: both values are float type values

In [None]:
array_col = array_merge_two_columns_udf(mtcars.hp, mtcars.disp)
array_col

**struct column** expression: first value is a string and the second value is a float type value.

In [None]:
struct_col = struct_merge_two_columns_udf(mtcars.model, mtcars.disp)
struct_col

**Results**

In [None]:
mtcars.select(array_col, struct_col).show(truncate=False)