### Basics of spark

In [5]:
# creating dataframe
emp_df = spark.read.csv('D:\LinkedIn_Learning\ApcheSpark\employee.txt', header = True)

In [6]:
# printing lists of structure /contents
emp_df

DataFrame[id: string, last_name: string, email: string, gender: string, department: string, start_date: string, salary: string, job_title: string, region_id: string]

In [7]:
# printing schema
emp_df.schema

StructType(List(StructField(id,StringType,true),StructField(last_name,StringType,true),StructField(email,StringType,true),StructField(gender,StringType,true),StructField(department,StringType,true),StructField(start_date,StringType,true),StructField(salary,StringType,true),StructField(job_title,StringType,true),StructField(region_id,StringType,true)))

In [8]:
# more readable fortmat of schema
emp_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: string (nullable = true)



In [9]:
# printing columns
emp_df.columns

['id',
 'last_name',
 'email',
 'gender',
 'department',
 'start_date',
 'salary',
 'job_title',
 'region_id']

In [10]:
# take the first five employees listed
emp_df.take(5)

[Row(id='1', last_name="'Kelley'", email="'rkelley0@soundcloud.com'", gender="'Female'", department="'Computers'", start_date="'10/2/2009'", salary='67470', job_title="'Structural Engineer'", region_id='2'),
 Row(id='2', last_name="'Armstrong'", email="'sarmstrong1@infoseek.co.jp'", gender="'Male'", department="'Sports'", start_date="'3/31/2008'", salary='71869', job_title="'Financial Advisor'", region_id='2'),
 Row(id='3', last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary='101768', job_title="'Recruiting Manager'", region_id='3'),
 Row(id='4', last_name="'Murray'", email="'jmurray3@gov.uk'", gender="'Female'", department="'Jewelery'", start_date="'12/25/2014'", salary='96897', job_title="'Desktop Support Technician'", region_id='3'),
 Row(id='5', last_name="'Ellis'", email="'jellis4@sciencedirect.com'", gender="'Female'", department="'Grocery'", start_date="'9/19/2002'", salary='63702', job_title="'Software

In [14]:
# count function
emp_df.count()

1000

### Working with large dataset
- use sampling (approximation)
- use filtering

In [12]:
# creating sample dataframe based on emp_df (e.g 10 %)
sample_df = emp_df.sample(False, 0.1)
# False paramenter because: for not replace sampling without replacement
# 0.1 is 10%

In [13]:
sample_df.count()
# the output of sampling is approximate 

108

In [20]:
# filtering (for manager whose salary is more than 100000)
emp_mng_df = emp_df.filter('salary >= 100000')

In [21]:
emp_mng_df.count()

478

In [23]:
# first 20 salaries in the filtered dataframe
emp_mng_df.select('salary').show()

+------+
|salary|
+------+
|101768|
|118497|
|108657|
|108093|
|121966|
|141139|
|106659|
|148952|
|109890|
|115274|
|144724|
|126103|
|144965|
|113507|
|120579|
|107222|
|125668|
|113857|
|108378|
|133424|
+------+
only showing top 20 rows



### MLlib packages
- three types of functions
1. Machine learning algorithms: algorithms for
    - Classification : categorizing something (e.g customer likely to leave for a competitor)
    - Regresion : predicting numeric value like a home price
    - Clustering : to group similar items together (unlike classfication, no predifined groups, so useful for exploring data)
    - Topic modeling : to identify themes in a text
        
2. Workflows : organization of commonly used steps
    - Pre-processing
    - Feature transformations
    - Pipelines
    - Evaluations
    - Hyperparameter tunning
3. Utilites ( distributed math an statistics functions)


## Data preparation and transformation

### Preprocessing (2 types)
    - numeric data
    - text data

### Numeric: Normalize
- Maps data values from their original range to the range of 0-1
- Avoids problems when some attributes have large ranges and others have small ranges
    - Salaryof employer(large range)
    - Years of employemnt (small range)
    
### Numeric: Standardize
- Map data vlaues from their original range to -1 to 1
- Also mean value of 0
- Normanlly distributed with standard deviation of 1
- Used when attributes have different scales and ML algorithms assume normal distribution


### Numeric: Partitioning
- Map data values from continuous values to buckets
- Deciles and percentiles are example fo buckets
- Useful to work with groups of values instead of a contiuous range of values


### Text : Tokenizing
- Map text from a single string to a set of tokens/words
- Example
    - 'This is a book'
    - ['This', 'is', 'a', 'book']
    

### Text: TF-IDF (Term Frequency-Inverse Document Frequency)
- Map text from a single, typically long string, to a vector indicating the frequency of each word in a text relative to a group of texts (corpus)
- Widely used in text classification





In [26]:
 ## lets do some practice of those concepts above

### Normalizing data

In [42]:
# Normaizing data
# We do this so that differences in the scale of different features
# do not adversely affect our models (e.g salary and miles)
# importing some packages
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors # linear albebra

# creating a simple dataframe (each row will include an identifier and a list of values)
features_df = spark.createDataFrame([
                                (1, Vectors.dense([10.0, 10000.0, 1.0])),
                                (2, Vectors.dense([20.0, 30000.0, 2.0])),
                                (3, Vectors.dense([30.0, 40000.0, 3.0]))
],['id', 'features'])


In [43]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [44]:
# create scaler object
feature_scaler = MinMaxScaler(inputCol ='features', outputCol ='scaled_features')#Transform inputCol to new outputCol

# fit model to the data (using fit function)
scaled_model = feature_scaler.fit(features_df)

# apply the transformation to create new scaled featured set
scaled_features_df = scaled_model.transform(features_df)

In [45]:
# take a look of first row
scaled_features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), scaled_features=SparseVector(3, {}))]

In [46]:
# let's compare the original data with scaled version
# now select all features
scaled_features_df.select('features', 'scaled_features').show()

+------------------+--------------------+
|          features|     scaled_features|
+------------------+--------------------+
|[10.0,10000.0,1.0]|           (3,[],[])|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



### Standarzing data

- we may have dta with nearly bell shape or normally distributed or not exactly, we do standardization for all cases
- Some ML algorithms e.g support vector machine and some linear models work better when all of the features hae a unit variance and a zero mean
- whey we apply standardization, our data slightly shifted to shape to becomes more normalized or more like bell curve

In [47]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

# We use same dataframe of earlier

In [53]:
# create scaler object
feature_standard_scaler = StandardScaler(inputCol ='features', outputCol ='standard_features', withStd = True, withMean =True) # withmean 0 (true)

# fiting data to the model
standard_model = feature_standard_scaler.fit(features_df)

# Transform the data to new standardize form
standard_features_df = standard_model.transform(features_df)

In [54]:
standard_features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), standard_features=DenseVector([-1.0, -1.0911, -1.0]))]

- So, we see that in addtion to id column and features, we have also new column named standard_features that contains a dense vector with standardized data values

In [57]:
# let's look all values in the df
standard_features_df.show()

+---+------------------+--------------------+
| id|          features|   standard_features|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



### Organizing Continuous data into buckets/partition

In [63]:
from pyspark.ml.feature import Bucketizer
# bucketizer allows us to group data based on boundaires
# so now we provide list of boundaries and called it splits
splits = [-float('inf'), -10.0, 0.0, 10.0, float('inf')]
# at the lower end i put negative infinity e.g -float('inf'), then -infinity to -10 another bucket and so on 
# last bucket is anything upto +10 and + infinity

In [64]:
# define some data
b_data = [(-800.0,), (-10.5,), (-1.8,), (0.0,), (9.2,), (90.5,)]
b_df = spark.createDataFrame(b_data, ['features'])
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.8|
|     0.0|
|     9.2|
|    90.5|
+--------+



In [65]:
# create bucktizer object
bucketizer = Bucketizer(splits = splits, inputCol = 'features', outputCol ='b_features')

# transform the data
bucketed_df = bucketizer.transform(b_df)

# we don't need to fit the data here, as buckterizer is simple, and splits is list of boundaries i want for each bucket, so no need to fit

In [67]:
bucketed_df.show()

+--------+----------+
|features|b_features|
+--------+----------+
|  -800.0|       0.0|
|   -10.5|       0.0|
|    -1.8|       1.0|
|     0.0|       2.0|
|     9.2|       2.0|
|    90.5|       3.0|
+--------+----------+



In [68]:
# so we see that features have been assigned to different buckets according to their size

### Text data

In [69]:
from pyspark.ml.feature import Tokenizer

In [70]:
# define new data frame for sentences(with 3 rows)
sentences_df = spark.createDataFrame([
    (1, 'This is an introduciton to Spark MLlib'),
    (2, 'MLlib includes libraies for classification and regression'),
    (3, 'It also contains supporting tools for piplines')
], ['id', 'sentences'])

In [71]:
sentences_df.show()

+---+--------------------+
| id|           sentences|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes li...|
|  3|It also contains ...|
+---+--------------------+



In [72]:
# create tokenizer object
sent_token = Tokenizer(inputCol = 'sentences', outputCol ='words')

# tranformation function of the Tokenizer
sent_tokenized_df = sent_token.transform(sentences_df)

# here also, no need to fit the data, as in tokenization we will split up strings into separate words

In [73]:
# let's have a look of tokenized dataframe
sent_tokenized_df.show()


+---+--------------------+--------------------+
| id|           sentences|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes li...|[mllib, includes,...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



In [74]:
# here we see, the words column contains list of words with tokenization

### TF-IDF

- when we tokenize, we get list of words
- we count number of times, a particular word appears
- we do counting for all of our documents in our corpus (collection of documents)
- we also count up, how often a term appears accross all of the documents
- So, we have got 2 sets of counts:
        - count of number of times the word show up in a single document
        - count of how often those words sow up across all of the documents
- These two numbers give us TF-IDF

In [75]:
# let's work now with hashing tf and idf
from pyspark.ml.feature import HashingTF, IDF

In [76]:
# verify the df
sentences_df

DataFrame[id: bigint, sentences: string]

In [77]:
# look at the first sentence
sentences_df.take(1)

[Row(id=1, sentences='This is an introduciton to Spark MLlib')]

In [78]:
sent_tokenized_df.take(1)

[Row(id=1, sentences='This is an introduciton to Spark MLlib', words=['this', 'is', 'an', 'introduciton', 'to', 'spark', 'mllib'])]

In [80]:
# Creating hashing term frequency object
hasingTF = HashingTF(inputCol ='words', outputCol='rawFeatures',numFeatures = 20)

# apply transform to our tokenized sentences
sent_hfTF_df = hasingTF.transform(sent_tokenized_df)

In [82]:
sent_hfTF_df.take(1)

[Row(id=1, sentences='This is an introduciton to Spark MLlib', words=['this', 'is', 'an', 'introduciton', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 2.0, 15: 1.0}))]

In [83]:
# we see that new column is a vector and it maps each word to an indices,

In [84]:
# scaling the rawFeature vector values based on how often the words appear in the entire collection of sentences

# create an IDF object
idf = IDF(inputCol ='rawFeatures', outputCol='idf_features')

# fitting into inverse documnet frequency model
idf_model = idf.fit(sent_hfTF_df)

# apply transformation function to creaet new dataframe
tfidf_df = idf_model.transform(sent_hfTF_df)


In [86]:
# so now we have the dataframe that contains tf and idf data, lets have look of first data
tfidf_df.take(1)

[Row(id=1, sentences='This is an introduciton to Spark MLlib', words=['this', 'is', 'an', 'introduciton', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 2.0, 15: 1.0}), idf_features=SparseVector(20, {6: 0.2877, 8: 0.2877, 9: 0.6931, 10: 0.6931, 13: 1.3863, 15: 0.0}))]

- now we see that, new column which contains the idf features along with old features
- these are measures of each word relative to how frequently they occur in the entire corpus. In our case our corpus is just three sentences

### Summary of preprocessing transformation

- Numeric transformation
     - MinMaxScaler (map attributes from 0-1)
     - StandardScaler (map attributes from -1 to 1)
     - Bucketizer (create partitions for grouping values)
    
- Text transformation
     - Tokenizer (splitting string into list of words)
     - HashingTF (creating TF-IDF vectors from text)


## Clustering (useful for data exploration)

### Clustering algorithms
- group data into clusters that allow us to see how large data sets can break down into distinct subgroups (looking for macro level structures)
- K-means : finding cluster in small and mid-sized dataset
- Hierarchical clustering with Bisecting K-means : for larger dataset

In [108]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

### K-means

In [91]:
# loading dataset
cluster_df = spark.read.csv('.\clustering_dataset.csv', header=True, inferSchema = True)
# use infer schema as we want to work with numeric data

In [92]:
cluster_df

DataFrame[col1: int, col2: int, col3: int]

In [93]:
cluster_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [98]:
cluster_df.show(75)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
|   4|   8|   5|
|   6|   6|   7|
|   4|   6|   2|
|   8|   1|   1|
|   7|   5|  10|
|  17|  25|  21|
|  15|  23|  32|
|  42|  25|  45|
|  41|  47|  21|
|  37|  20|  27|
|  40|  18|  26|
|  41|  28|  50|
|  32|  25|  40|
|  24|  29|  35|
|  47|  18|  47|
|  36|  42|  45|
|  49|  29|  15|
|  47|  39|  22|
|  38|  27|  25|
|  45|  23|  40|
|  23|  36|  19|
|  47|  40|  50|
|  37|  30|  40|
|  42|  48|  41|
|  29|  31|  21|
|  36|  39|  48|
|  50|  24|  31|
|  42|  44|  37|
|  37|  39|  46|
|  22|  40|  30|
|  17|  29|  41|
|  85| 100|  69|
|  68|  76|  67|
|  76|  70|  93|
|  62|  66|  91|
|  83|  93|  7

In [99]:
# so we see that the data is grouped into 3 clusters. e.g first 25 rows with value between 0-10, second 25 rows between  15-60 and so on
# so the data is natually clustered
# we do now litte transformation to put these columns into a feature vector (single vector column)

In [102]:
# create a vector assembler
vectorAssembler = VectorAssembler(inputCols= ['col1', 'col2', 'col3'], outputCol = 'features')

# transformation
vector_cluster_df = vectorAssembler.transform(cluster_df)

In [103]:
vector_cluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [104]:
# now we see the differences, we have added up one new column which is a single feature vector
# we did this because K-means algorithm will work with that features vector column

In [105]:
# create an obect Kmeans
kmeans = KMeans().setK(3) # se the number of cluster or K to 3

# set the C which deternmines where the KMeans algorithm starts,
# it is useful if we are doing testing, to have consistency
kmeans = kmeans.setSeed(1)

# fit our data into model
k_model = kmeans.fit(vector_cluster_df)


In [106]:
# find the centers of these clusters
centers = k_model.clusterCenters()
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

In [107]:
# here we see Kmeans discover three clusters and they have centererd, for example one cluster is centered around the points 35, 31, 34

### Hierarchical clustering

In [110]:
vector_cluster_df.show(5)

+----+----+----+-------------+
|col1|col2|col3|     features|
+----+----+----+-------------+
|   7|   4|   1|[7.0,4.0,1.0]|
|   7|   7|   9|[7.0,7.0,9.0]|
|   7|   9|   6|[7.0,9.0,6.0]|
|   1|   6|   5|[1.0,6.0,5.0]|
|   6|   7|   7|[6.0,7.0,7.0]|
+----+----+----+-------------+
only showing top 5 rows



- This df contains 3 columns with raw data
- a fourth column, which is features, which maps those 3 columns into a vector reprsentation, which is used by k-means 

In [112]:
# Now we will apply bisecting k-means on that
from pyspark.ml.clustering import BisectingKMeans

# creating bisecting k-means object
bkmeans = BisectingKMeans().setK(3)
bkmeans = bkmeans.setSeed(1)

# create a model
bk_model = bkmeans.fit(vector_cluster_df)

In [113]:
# creating bkcenters object
bkcenters = bk_model.clusterCenters()
bkcenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [114]:
# recall the k-means centers
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

In [116]:
# so we see they are same or pretty close 
# Keep in mind that different algorithms may find different centers

## Classification algorithms

- splitting data into different categories (category A or category B)
- we will see 3 classfication algorithms here e.g
        - Naive Bayes
        - Decision trees
        - Multilayer perceptron

In [118]:
# using the Iris data set
from pyspark.sql.functions import * # pyspark sql functionality
from pyspark.ml.feature import VectorAssembler # preprocessing tools
from pyspark.ml.feature import StringIndexer # preprocessing tools


In [153]:
# loading iris data
iris_df = spark.read.csv('.\iris1.csv',inferSchema = True)

In [154]:
iris_df.take(1)

[Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa')]

In [155]:
# we see that the columns name are just co0, c1 and so on. so we can rename these

In [156]:
iris_df.show(3)

+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3|        _c4|
+---+---+---+---+-----------+
|5.1|3.5|1.4|0.2|Iris-setosa|
|4.9|3.0|1.4|0.2|Iris-setosa|
|4.7|3.2|1.3|0.2|Iris-setosa|
+---+---+---+---+-----------+
only showing top 3 rows



In [157]:
# appply sql functionality to improve the dataframe by renaming the column name
iris_df = iris_df.select(col('_c0').alias('sepal_length'),
                        col('_c1').alias('sepal_width'),
                        col('_c2').alias('petal_length'),
                        col('_c3').alias('petal_width'),
                        col('_c4').alias('species')
                        )

In [158]:
# look at first row
iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [160]:
iris_df.show(3)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 3 rows



In [161]:
iris_df.columns

['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']

In [163]:
# let's transform this into vector structure
# create vector assembler
vectorAssembler = VectorAssembler(inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], outputCol = 'features')

# apply transformation
v_iris_df = vectorAssembler.transform(iris_df)


In [164]:
v_iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [165]:
# so we see that row with 4 measurements and new column named features with the vector of the four measurement

### Convert the label name (species) into numeric value

In [166]:
# we will use the transformation called StringIndexer 
# create an indexer
indexer = StringIndexer(inputCol ='species', outputCol='label')

# create dataframe that capture this indexed value (fitting data)
index_iris_df = indexer.fit(v_iris_df).transform(v_iris_df) # in shortcut apply also transform


In [168]:
# look at the first row
index_iris_df.show(1)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    species|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 1 row



In [169]:
# we see that we have all 5 old columns and also new column named label
# we see that label has 0, as this species (iris-setosa) has been mapped to an index value of 0
# so we are ready with classfication algorithms

### Naive Bayes Classification

In [170]:
# so we have the dataframe which has the indexed and vectorized iris data

# recalling the data
# structue of data
index_iris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [171]:
index_iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [172]:
# we see our raw data (first 5 columns from the file),one feature vector (some numeric measures) and  one label (0 in this case)

In [174]:
from pyspark.ml.classification import NaiveBayes # naivebayes classifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # to evaluate the accuracy of our model

In [176]:
# let's split our data
splits = index_iris_df.randomSplit([0.6, 0.4], 1) # call randomSplit method
# one dataframe will return 60% of data and the other with 40 % of data, and 1 for the seed

In [None]:
- The random split will return a list, in this case it will have two dataframe
- First one is train dataframe and can be accessed by index 0 and so on

In [177]:
# Splitting iris data set into train and test data set
train_df = splits[0]
test_df = splits[1]

In [181]:
# checking the lenght of train, test and original dataframe
train_df.count(), test_df.count(), index_iris_df.count()

(98, 52, 150)

In [182]:
# create naive bayes classifier
nb = NaiveBayes(modelType='multinomial') # we used multinomial as we have 3 iris classes

# fit our training data to a model
nb_model = nb.fit(train_df)

# so, now we have a model that we can make predictions with
# Creating prediction data frame
predictions_df = nb_model.transform(test_df)

In [183]:
# look at first row of predictions data frame
predictions_df.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [184]:
# here the prediction is correct, but looking at one example does not tell us how well the model behaves overall

In [190]:
# Do some through evaluation
# Create an evaluator
evaluator = MulticlassClassificationEvaluator(labelCol ='label', predictionCol ='prediction', metricName = 'accuracy')
# Here we specify our label column, which in this case is simply label, and we're going to compare our label to what was predicted. Our prediction column is called "prediction," and the metric that we're trying to measure is accuracy. 

In [191]:
# Save the results and call the evaluator and evaluate our predictions
nb_accuracy = evaluator.evaluate(predictions_df)

In [192]:
nb_accuracy

0.9807692307692307

In [193]:
# so we see the accuracy is pretty good with NaiveBayes classifier

In [194]:
# Let's have a look at other classification algorithms

### Multilayer perceptron classification 
- it is a type of neural network

In [195]:
# we will use the same dataset- the indexed and vectorised version of dataframe
index_iris_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [199]:
# look at the data structure
index_iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [200]:
# here,features are feature vector which take 4 measuremnts and put them into a single vector
# label map the string species name into a number 0, 1 or 2

In [202]:
# importig modules to support multilayer perceptron
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [205]:
''' Now, the way a multi-layer perceptron classifier works is that we have, as the name implies, multiple levels of neurons. Now in all cases, the first layer has the same number of nodes as there are inputs. So for us we have four measures so our first layer will be four. So I'm going to create a list of layers. I'm going to set the first element to be four. Now the last element should have the same number of neurons as there are types of outputs. Now in our case there's three types of iris species. So our last row will be three. Now we want to have layers in between, and the layers in between will help the multi-layer perceptron learn how to classify correctly. So I'm going to insert two rows of five neurons each. So we are going to have a four layer multi-layer perceptron. First layer will have four neurons, the middle two layers will have five neurons each, and then the output layer will have three neurons. One for each kind of iris species.
'''

" Now, the way a multi-layer perceptron classifier works is that we have, as the name implies, multiple levels of neurons. Now in all cases, the first layer has the same number of nodes as there are inputs. So for us we have four measures so our first layer will be four. So I'm going to create a list of layers. I'm going to set the first element to be four. Now the last element should have the same number of neurons as there are types of outputs. Now in our case there's three types of iris species. So our last row will be three. Now we want to have layers in between, and the layers in between will help the multi-layer perceptron learn how to classify correctly. So I'm going to insert two rows of five neurons each. So we are going to have a four layer multi-layer perceptron. First layer will have four neurons, the middle two layers will have five neurons each, and then the output layer will have three neurons. One for each kind of iris species.\n"

In [206]:
# create list of layers
layers = [4,5,5,3] # input layer (4), middle two layers are 5 neuron each, output layer (3)

In [208]:
# create multilayer perceptron
mlp = MultilayerPerceptronClassifier(layers = layers, seed= 1)

# build a model and fit on train data
mlp_model = mlp.fit(train_df)

# create predction and transform on test data
mlp_predictions = mlp_model.transform(test_df)

In [209]:
# Evaluate the predictions
# create an evaluator
mlp_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')

In [211]:
# calculate the accuracy
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)

In [212]:
mlp_accuracy

0.6923076923076923

In [213]:
# accuracy is not goot compared to naive Bayes

### Decision tree classification

index_iris_df

In [215]:
index_iris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [216]:
from pyspark.ml.classification import DecisionTreeClassifier

In [218]:
# create decision tree object (classifier)
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')

# Create a model and fit on train data
dt_model = dt.fit(train_df)

# create prediction (using the model) and transform on test data
dt_predictions = dt_model.transform(test_df)


#### steps
- So I've created a classifier, which I call dt. 
- Using dt, I built a model by fitting our training data to that 
- and then I used the model and applied the transform over our test data to make some predictions.

In [219]:
# now, evaluate those predictions
# create an evaluator 
dt_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# measure the accuracy
dt_accuracy= dt_evaluator.evaluate(dt_predictions)

In [220]:
dt_accuracy

0.9423076923076923

- so the accuracy is quite good with DecisionTree classifier
- we observed that NaiveBayes and DecisionTree quite works well with our dataset
- But the MultilayerPerceptron did not work well with iris dataset

### Conclusions
- it is helpful to experiment with a number of different algorithms and number of different configurations, if that's required by the algorithm
- Naive Bayes work well- if attributes in the dataset are independent to each other (didn't corelate tightly)
- Multilayer perceptron - works well with non-linear data
- DecisionTree - good option in many cases and it is worth starting with decision trees and then try other algorithms from there.                                                                                    
                                                                                     

## Regression
- Allow us to make predictions about numeric values
- to make projections into the future

- We wil focus here 3 regression algorighms
        - Linear Regression
        - Decision Tree Regression
        - Gradient Boost Regression

### Problem statment: To predict how much power a plant can generate, based on some factors

In [222]:
# import codes for linear regression
from pyspark.ml.regression import LinearRegression

In [255]:
# reading data into dataframe
# first i read the data in pandas dataframe (for excel file)
import pandas
df = pandas.read_excel('./power_plant.xlsx', sheet_name='Sheet1',inferSchema=True)

# open the data in spark session 
pp_df = spark.createDataFrame(df)

In [256]:
# look at data structure
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [258]:
# double means numeric data

In [259]:
pp_df.columns

['AT', 'V', 'AP', 'RH', 'PE']

In [260]:
from pyspark.ml.feature import VectorAssembler

# create a feature vector 
# create vectorAssembler object
vectorAssembler = VectorAssembler(inputCols= ['AT', 'V', 'AP', 'RH'], outputCol='features')

# create new dataframe and apply transform 
vector_pp_df = vectorAssembler.transform(pp_df)

In [261]:
# look at one row
vector_pp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [262]:
# so we are ready to create linear regression model, after creating the feature vector
# Now, create linear regression object
lr = LinearRegression(featuresCol='features', labelCol='PE')

# Build the model by fitting data (fit the model)
lr_model = lr.fit(vector_pp_df)


In [263]:
# look at some features of the model e.g
lr_model.coefficients 
# it gives us 4 numbers which correspond to the coefficents of differnet variables

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [264]:
lr_model.intercept
# it gives us a point where the line crosses the Y axis

454.6092742751131

In [265]:
# So basically we fit a line to our data (that's what linear regression is all about)

In [268]:
# error: quality of the linear model
lr_model.summary.rootMeanSquaredError
# it is a measure of how much error there is in our predictions

4.557126016749482

- So we're looking at an error of about 4.56 on a value in the 455 range is the intercept, 
- so in terms of scale we have an error of around 1% which is actually quite good.

In [267]:
# save the model
lr_model.save('lr1.model')

### Decision tree regression

In [269]:
from pyspark.ml.regression import DecisionTreeRegressor # regression
from pyspark.ml.evaluation import RegressionEvaluator # evaluator
from pyspark.ml.feature import VectorAssembler # vector feature

In [274]:
# we will use the same data set of power plant
pp_df.show(3)

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
+-----+-----+-------+-----+------+
only showing top 3 rows



In [275]:
# create vectorAssembler object
vectorAssembler = VectorAssembler(inputCols= ['AT', 'V', 'AP', 'RH'], outputCol='features')

# create new dataframe and apply transform 
vector_pp_df = vectorAssembler.transform(pp_df)

In [276]:
vector_pp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [278]:
# splitting data into train and test data
splits = vector_pp_df.randomSplit([0.7, 0.3])

# create train data frame
train_df = splits[0]

#create test data frame
test_df = splits[1]

In [279]:
train_df.count(), test_df.count(), vector_pp_df.count()

(6628, 2940, 9568)

In [281]:
# create a decision tree object
dt = DecisionTreeRegressor(featuresCol='features', labelCol='PE')

# create a model and fit training data to it
dt_model = dt.fit(train_df)

# make predictions and apply transform to test data
dt_predictions = dt_model.transform(test_df)

#### Steps
- So we've created our decision tree regressor, 
- we fed our training data to it 
- and we created some predictions by applying our test data. 

In [286]:
# Evaluate our results
dt_evaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')
# On classification model, we used accuracy 
# Here, as we will do numeric prediction, we ar more interested in how far off our predictions are, so we will use error metric (Rmse)

# calculate the error
rmse = dt_evaluator.evaluate(dt_predictions)

In [287]:
rmse

4.575757924726427

In [288]:
# so we see that the result is pretty good in terms of rmse and very close to the error of linearn regression

### Gradient-boosted tree regression

In [289]:
from pyspark.ml.regression import GBTRegressor

# create instance of the regressor
gbt = GBTRegressor(featuresCol='features', labelCol='PE')

# create the model and fit the training data
gbt_model = gbt.fit(train_df)

# create prediction and apply transform to the test data
gbt_predictions = gbt_model.transform(test_df)

# create a (gbt)evaluator (to evaluate the model)
gbt_evaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')

# getting rmse from gbt model
gbt_rmse =gbt_evaluator.evaluate(gbt_predictions)


In [290]:
gbt_rmse

4.1252760455724955

In [291]:
# it is slightly better than other two regression methods

### Conlusions
- Regression algorithms are designed to make numeric projections
- Experiment with multiple algorithms and 
- See which works best for the particular data set
- Because it is easy to experiment as you just need to add several lines of code


    - Linear regression is simple and widely used
    - Decision tree works as does lineear regression
    - Gradient-boost sometimes give best performance

In [292]:
# Understand recommendation systme
# Spark MLlib collaborative filtering
# we can use Alternating least square (ALS) method for that
# import ALS from pyspark.ml.recommendation

# all the process is same as above 

### Tips
#### Three basic stages of building machine learning model
- preprocessing : collect, reform and transform
        - load data into DataFrame
        - include headers, or column names, in text files
        - use inferSchema=True (will make sure dates and numeric values get mapped to their appropriate data type)
        - use VectorAssembler to create feature vectors
        - use StringIndexer to map from string to numeric indexes
- model building: apply machine learning algorithms to training data
        - split data into trianing and test sets
        - fit models using trianing data
        - create predictions by applying trnasform to the test data
- validation: assess the quality of models built in step 2
        - use MLlib evaluators
            - MulticlassClassificationEvaluator
            - RegressionEvaluator
        - Experiment with multiple algorithms (e.g lr, dt, gbt)
        - vary hyperparameters for the algorithms working with

In [None]:
# AWS public data set for big data 