<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## Feature Extraction and Transformation using Spark


Estimated time needed: **30** minutes


<p style='color: red'>The purpose of this lab is to show you how to use Spark to extract and transform features.


## __Table of Contents__

<ol>
  <li>
    <a href="#Objectives">Objectives
    </a>
  </li>
  <li>
    <a href="#Datasets">Datasets
    </a>
  </li>
  <li>
    <a href="#Setup">Setup
    </a>
    <ol>
      <li>
        <a href="#Installing-Required-Libraries">Installing Required Libraries
        </a>
      </li>
      <li>
        <a href="#Importing-Required-Libraries">Importing Required Libraries
        </a>
      </li>
    </ol>
  </li>
  <li> 
    <a href="#Examples">Examples
    </a>
    <ol>
    <li>
      <a href="#Task-1---Tokenizer">Task 1 - Tokenizer
      </a>
    </li>
    <li>
      <a href="#Task-2---CountVectorizer">Task 2 - CountVectorizer
      </a>
    </li>
    <li>
      <a href="#Task-3---TF-IDF">Task 3 - TF-IDF
      </a>
    </li>
    <li>
      <a href="#Task-4---StopWordsRemover">Task 4 - StopWordsRemover
      </a>
    </li>
    <li>
      <a href="#Task-5---StringIndexer">Task 5 - StringIndexer
      </a>
    </li>
    <li>
      <a href="#Task-6---StandardScaler">Task 6 - StandardScaler
      </a>
    </li>
    </ol>
  </li>
  <li>
    <a href="#Exercises">Exercises
    </a>
  </li>
  <ol>
    <li>
      <a href="#Exercise-1---Tokenizer">Exercise 1 - Tokenizer
      </a>
    </li>
    <li>
      <a href="#Exercise-2---CountVectorizer">Exercise 2 - CountVectorizer
      </a>
    </li>
    <li>
      <a href="#Exercise-3---StringIndexer">Exercise 3 - StringIndexer
      </a>
    </li>
    <li>
      <a href="#Exercise-4---StandardScaler">Exercise 4 - StandardScaler
      </a>
    </li>
  </ol>
</ol>


















## Objectives

After completing this lab you will be able to:

 - Use the feature extractor CountVectorizer
 - Use the feature extractor TF-IDF
 - Use the feature transformer Tokenizer
 - Use the feature transformer StopWordsRemover
 - Use the feature transformer StringIndexer
 - Use the feature transformer StandardScaler
 


## Datasets

In this lab you will be using dataset(s):

 - Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg 
 


----


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to connect to this cluster.

If you wish to download this jupyter notebook and run on your local computer, follow the instructions mentioned <a href="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/labs/Connecting_to_spark_cluster_using_Skills_Network_labs.ipynb">here.</a>



The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [None]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

In [None]:
#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Feature Extraction and Transformation using Spark").getOrCreate()

## Task 1 - Tokenizer


A tokenizer is used to break a sentence into words.


In [None]:
#import tokenizer
from pyspark.ml.feature import Tokenizer

In [None]:
#create a sample dataframe
sentenceDataFrame = spark.createDataFrame([
    (1, "Spark is a distributed computing system."),
    (2, "It provides interfaces for multiple languages"),
    (3, "Spark is built on top of Hadoop")
], ["id", "sentence"])

In [None]:
#display the dataframe
sentenceDataFrame.show(truncate = False)

In [None]:
#create tokenizer instance.
#mention the column to be tokenized as inputcol
#mention the output column name where the tokens are to be stored.
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

In [None]:
#tokenize
token_df = tokenizer.transform(sentenceDataFrame)

In [None]:
#display the tokenized data
token_df.show(truncate=False)

## Task 2 - CountVectorizer


CountVectorizer is used to convert text into numerical format. It gives the count of each word in a given document.


In [None]:
#import CountVectorizer
from pyspark.ml.feature import CountVectorizer

In [None]:
#create a sample dataframe and display it.
textdata = [(1, "I love Spark Spark provides Python API ".split()),
            (2, "I love Python Spark supports Python".split()),
            (3, "Spark solves the big problem of big data".split())]

textdata = spark.createDataFrame(textdata, ["id", "words"])

textdata.show(truncate=False)

In [None]:
# Create a CountVectorizer object
# mention the column to be count vectorized as inputcol
# mention the output column name where the count vectors are to be stored.
cv = CountVectorizer(inputCol="words", outputCol="features")

In [None]:
# Fit the CountVectorizer model on the input data
model = cv.fit(textdata)

In [None]:
# Transform the input data to bag-of-words vectors
result = model.transform(textdata)

In [None]:
# display the dataframe
result.show(truncate=False)

## Task 3 - TF-IDF


Term Frequency-Inverse Document Frequency is used to quantify the importance of a word in a document. TF-IDF is computed by multiplying the number of times a word occurs in a document by the inverse document frequency of the word.


In [None]:
#import necessary classes for TF-IDF calculation
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


In [None]:
#create a sample dataframe and display it.
sentenceData = spark.createDataFrame([
        (1, "Spark supports python"),
        (2, "Spark is fast"),
        (3, "Spark is easy")
    ], ["id", "sentence"])

sentenceData.show(truncate = False)

In [None]:
#tokenize the "sentence" column and store in the column "words"
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show(truncate = False)

In [None]:
# Create a HashingTF object
# mention the "words" column as input
# mention the "rawFeatures" column as output

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(wordsData)

featurizedData.show(truncate = False)

In [None]:
# Create an IDF object
# mention the "rawFeatures" column as input
# mention the "features" column as output

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
tfidfData = idfModel.transform(featurizedData)

In [None]:
#display the tf-idf data
tfidfData.select("sentence", "features").show(truncate=False)

## Task 4 - StopWordsRemover


StopWordsRemover is a transformer that filters out stop words like "a","an" and "the".


In [None]:
#import StopWordsRemover
from pyspark.ml.feature import StopWordsRemover

In [None]:
#create a dataframe with sample text and display it
textData = spark.createDataFrame([
    (1, ['Spark', 'is', 'an', 'open-source', 'distributed', 'computing', 'system']),
    (2, ['IT', 'has', 'interfaces', 'for', 'multiple', 'languages']),
    (3, ['It', 'has', 'a', 'wide', 'range', 'of', 'libraries', 'and', 'APIs'])
], ["id", "sentence"])

textData.show(truncate = False)

In [None]:
# remove stopwords from "sentence" column and store the result in "filtered_sentence" column
remover = StopWordsRemover(inputCol="sentence", outputCol="filtered_sentence")
textData = remover.transform(textData)

In [None]:
# display the dataframe
textData.show(truncate = False)

## Task 5 - StringIndexer


StringIndexer converts a column of strings into a column of integers.


In [None]:
#import StringIndexer
from pyspark.ml.feature import StringIndexer

In [None]:
#create a dataframe with sample text and display it
colors = spark.createDataFrame(
    [(0, "red"), (1, "red"), (2, "blue"), (3, "yellow" ), (4, "yellow"), (5, "yellow")],
    ["id", "color"])

colors.show()

In [None]:
# index the strings in the column "color" and store their indexes in the column "colorIndex"
indexer = StringIndexer(inputCol="color", outputCol="colorIndex")
indexed = indexer.fit(colors).transform(colors)

In [None]:
# display the dataframe
indexed.show()

## Task 6 - StandardScaler



StandardScaler transforms the data so that it has a mean of 0 and a standard deviation of 1


In [None]:
#import StandardScaler
from pyspark.ml.feature import StandardScaler


In [None]:
# Create a sample dataframe and display it
from pyspark.ml.linalg import Vectors
data = [(1, Vectors.dense([70, 170, 17])),
        (2, Vectors.dense([80, 165, 25])),
        (3, Vectors.dense([65, 150, 135]))]
df = spark.createDataFrame(data, ["id", "features"])

df.show()

In [None]:
# Define the StandardScaler transformer
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

In [None]:
# Fit the transformer to the dataset
scalerModel = scaler.fit(df)

In [None]:
# Scale the data
scaledData = scalerModel.transform(df)

In [None]:
# Show the scaled data
scaledData.show(truncate = False)

Stop Spark Session


In [None]:
spark.stop()

# Exercises


Create Spark Session


In [None]:
#Create SparkSession
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Exercises - Feature Extraction and Transformation using Spark").getOrCreate()

Create Dataframes


In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/proverbs.csv


In [None]:
# Load proverbs dataset
textdata = spark.read.csv("proverbs.csv", header=True, inferSchema=True)

In [None]:
# display dataframe
textdata.show(truncate = False)

In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg.csv


In [None]:
# Load mpg dataset
mpgdata = spark.read.csv("mpg.csv", header=True, inferSchema=True)

In [None]:
# display dataframe
mpgdata.show()

### Exercise 1 - Tokenizer


In [None]:
#display the dataframe
textdata.show(truncate = False)

Write code to tokenize the "text" column of the "textdata" dataframe and store the tokens in the column "words"


In [None]:
# your code goes here

<details>
    <summary>Click here for a Hint</summary>
    
Refer to Task 1

</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")

textdata = tokenizer.transform(textdata)
```

</details>


In [None]:
#display the tokenized data
textdata.select("id","words").show(truncate=False)

### Exercise 2 - CountVectorizer


CountVectorize the column "words" of the "textdata" dataframe and store the result in the column "features"


In [None]:
# your code goes here

<details>
    <summary>Click here for a Hint</summary>
    
Refer to Task 2
</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="features")

model = cv.fit(textdata)

textdata = model.transform(textdata)
```

</details>


In [None]:
# Show the resulting dataframe
textdata.select("words","features").show(truncate=False)

### Exercise 3 - StringIndexer


Convert the string column "Origin" to a numeric column "OriginIndex" in the dataframe "mpgdata"


In [None]:
# your code goes here


<details>
    <summary>Click here for a Hint</summary>
    
Refer to Task 5

</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")
indexed = indexer.fit(mpgdata).transform(mpgdata)
```

</details>


In [None]:
#show the dataframe

indexed.orderBy(rand()).show()


### Exercise 4 - StandardScaler



Create a single column named "feaures" using the columns "Cylinders", "Engine Disp", "Horsepower", "Weight"


In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Cylinders", "Engine Disp", "Horsepower", "Weight"], outputCol="features")

mpg_transformed_data = assembler.transform(mpgdata)

#show the dataframe
mpg_transformed_data.select("MPG","features").show(truncate = False)

Use StandardScaler to scale the "features" column of the dataframe "mpg_transformed_data" and save the scaled data into the "scaledFeatures" column.


In [None]:
# your code goes here

<details>
    <summary>Click here for a Hint</summary>
    
Refer to Task 6

</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

scalerModel = scaler.fit(mpg_transformed_data)

scaledData = scalerModel.transform(mpg_transformed_data)
```

</details>


In [None]:
# Show the scaled data
scaledData.select("features","scaledFeatures").show(truncate = False)

Stop Spark Session


In [None]:
spark.stop()

Congratulations you have completed this lab.<br>


## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-14|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
