In [1]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
sales = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/home/youssef-salah/Downloads/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")\
.coalesce(5)\
.where("Description IS NOT NULL")

The read function is used to load a dataframe, when .format is used to specify a certain format dataframe to be loaded, where in this example it is csv. The previous function loaded all dataframes of type csv. The .option field specifies some parameters other than the default parameters for loading dataframes.


In [3]:
simpleDF = spark.read.json("/home/youssef-salah/Downloads/Spark-The-Definitive-Guide-master/data/simple-ml")
scaleDF = spark.read.parquet("/home/youssef-salah/Downloads/Spark-The-Definitive-Guide-master/data/simple-ml-scaling")

spark.read.load assumes a format of parquet where when given .jason it is told to read and load a dataframe from a json format.

In [4]:
sales

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

The sales dataframe consists of the following fields: InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country

In [5]:
simpleDF

DataFrame[color: string, lab: string, value1: bigint, value2: double]

The simpleDF consists of color, lab , value1 and value2 fields

In [6]:
scaleDF

DataFrame[id: int, features: vector]

ScaleDF consists of id and features fields 

In [7]:
scaleDF.show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.1,-1.0]|
|  1| [2.0,1.1,1.0]|
|  0|[1.0,0.1,-1.0]|
|  1| [2.0,1.1,1.0]|
|  1|[3.0,10.1,3.0]|
+---+--------------+



In [8]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_4d317e3a50e4__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean. This depends on the input parameters where by default it assumes a true withStd parameter to scale the data to unit standard deviation and a false withMean parameter which should center the data with mean before scaling. In the previous example it is applied to the vectors of the field features of the dataframe scaleDF. Where the standardScaler is fit onto the dataframe values and then transformed the vector values to new normalized values with a standard deviation of 1 in the output column.

In [9]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_beb4230ad3fe__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



MinMaxScaler transforms a dataset of Vector rows, rescaling each feature to a specific range. It is given the min and max input parameters, that are used to rescale the vector values accordingly. The vector values are rescaled using the function Rescaled(ei)=(ei−Emin/Emax−Emin)∗(max−min)+min . The field feature in the scaleDF dataframe is rescaled with a min of 5 and a max of 10 , giving the output column of rescaled vectors.


In [10]:
from pyspark.ml.feature import Normalizer
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_f1dc628b4f40__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+



Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm, norm = 1 . It takes as a parameter the coloumn of vectors to be normalized and the p-norm value that is used to normalize the vector. It is 2 by default how ever in the example it is set to 1 thus leading to a norm equal to 1 for the vector. It is applied onto the features column in the scaleDF dataframe.

In [11]:
simpleDF.show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red|good|    45| 38.97187133755819|
|green|good|     1|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green|good|    15| 38.97187133755819|
|green|good|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     1| 38.97187133755819|
|  red| bad|     2|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



In [12]:
from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

StringIndexer encodes a string column of labels to a column of label indices. Meaning that it fits over a column of strings in the dataframe and outputs a column of indices according to the values of that column. The new calculapted indices are in range [0, number of unique labels[. The defaul ordering of the labels is the "frequencyDesc" where the most frequent label is assigned 0 and the one below it in frequency is assigned 1 and so on. It is applied to the lab column in the simpleDF table. Where the most freqquent value was bad thus given the index 0.

In [13]:
valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|valueInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|     6.0|
|  red| bad|     1| 38.97187133755819|     2.0|
|  red| bad|     2|14.386294994851129|     7.0|
|  red| bad|    16|14.386294994851129|     1.0|
|  red|good|    45| 38.97187133755819|     3.0|
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|  

Similar to the previous block, StringIndexer is applied to the value1 column of the simpleDF table. The value1 value 12 is the most frequent value thus given the index 0.

In [14]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
colorLab.show()
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show()

+-----+--------+
|color|colorInd|
+-----+--------+
|green|     1.0|
| blue|     2.0|
| blue|     2.0|
|green|     1.0|
|green|     1.0|
|green|     1.0|
|  red|     0.0|
|  red|     0.0|
|  red|     0.0|
|  red|     0.0|
|  red|     0.0|
|green|     1.0|
| blue|     2.0|
| blue|     2.0|
|green|     1.0|
|green|     1.0|
|green|     1.0|
|  red|     0.0|
|  red|     0.0|
|  red|     0.0|
+-----+--------+
only showing top 20 rows

+-----+--------+----------------------------------+
|color|colorInd|OneHotEncoder_cdcf9f8d6d5c__output|
+-----+--------+----------------------------------+
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1

StringIndexer is first applied to the color column of the simpleDF table , resulting in the new index column colorInd. Since when applying the transform of the StringIndexer, only the color column is selected, the colorLab table consists of the color and colorInd columns only. Then the OneHotEncoder is applied to the colorInd column of the colorLab table.
The OneHotEncoder transforms the values in colorInd into a binary vector where at maximum number of one values in the encoded vector may be 1. Since there are three distinct values in the colorInd column, the vector is of length 2 and the mapping is as follows 0  -> 10 , 1  -> 01 , 2  -> 00 thus for the first entry we have 2 for the vector length, 1 for the position where the 1.0 is at. And for an entry of value 2.0 both the second entry is empty because no position has a 1.0 value in it and since there is no 1.0 the last entry in the tuple is empty as well.

In [15]:
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
.setInputCol("Description")\
.setOutputCol("DescOut")\
.setPattern(" ")\
.setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
|12 MESSAGE CARDS WITH ENVELOPES    |[12, message, cards, with, envelopes]     |
|BLUE HARMONICA IN BOX              |[blue, harmonica, in, box]                |
|GUMBALL COAT RACK                  |[gumball, coat, rack]                     |
|SKULLS  WATER TRANSFER TATTOOS     |[skulls, water, transfer, tattoos]        |
|FELTCRAFT GIRL AMELIE KIT          |[feltcraft, girl, amelie, kit]            |
|CAMOUFLAGE LED TORCH               |[camouflage, led, torch]                  |
|WHITE SKULL HOT WATER BOTTLE       |[white, skull, hot, water, bottle]        |
|ENGLISH ROSE HOT WATER BOTT

Tokenization is the process of taking text and breaking it into individual terms. Like for example when given a sentence as an input, it returns the words in this sentence. RegexTokenizer allows more advanced tokenization based on regular expression matching. In other words the RegexTokenizer acts as the split function where the pattern parameter is set to the regex that it should match and split upon. In our given command the Description column is given to the tokenizer as input and the pattern would be the " ", thus the output column contains an array of all the space seperated words for each entry in the description column.

In [16]:
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
.transform(sales.select("Description", "CustomerId"))\
.where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
.where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
.setFeaturesCol("countVec")\
.setLabelCol("CustomerId")\
.setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
.drop("customerId", "Description", "DescOut").show()

NameError: name 'fittedCV' is not defined

I was informed by my colleague Anwar labib that we should drop the last command as it depends on fitting the model to the data frame