In [None]:
# -*- coding: utf-8 -*-
# Indentation: Jupyter Notebook

'''
Data Preprocessing using Spark
'''

__version__ = 1.0
__author__ = "Sourav Raj"
__author_email__ = "souravraj.iitbbs@gmail.com"


In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
sc =SparkContext()
spark=SparkSession(sc)

# Normalize

In [11]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [31]:
features_df=spark.createDataFrame([
    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
    (2, Vectors.dense([20.0, 20000.0, 2.0]),),
    (3, Vectors.dense([20.0, 30000.0, 5.0]),),
    (4, Vectors.dense([30.0, 40000.0, 10.0]),)],['id', 'features'])

In [53]:
features_df=spark.createDataFrame([
    (1, Vectors.dense([10.0, 10000.0]),),
    (2, Vectors.dense([15.0, 20000.0]),),
    (3, Vectors.dense([20.0, 30000.0]),),
    (4, Vectors.dense([30.0, 40000.0]),)],['id', 'features'])

In [21]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [54]:
# creating a scalar object
feature_scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')

In [55]:
smodel=feature_scaler.fit(features_df)

In [56]:
sfeatures_df= smodel.transform(features_df)

In [17]:
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), scaled_features=DenseVector([0.0, 0.0, 0.0]))]

In [57]:
sfeatures_df.select('features', 'scaled_features').show()

+--------------+--------------------+
|      features|     scaled_features|
+--------------+--------------------+
|[10.0,10000.0]|           [0.0,0.0]|
|[15.0,20000.0]|[0.25,0.333333333...|
|[20.0,30000.0]|[0.5,0.6666666666...|
|[30.0,40000.0]|           [1.0,1.0]|
+--------------+--------------------+



In [45]:
sfeatures_df.select('features', 'scaled_features').show()

+--------+---------------+
|features|scaled_features|
+--------+---------------+
|  [10.0]|          [0.0]|
|  [15.0]|         [0.25]|
|  [20.0]|          [0.5]|
|  [30.0]|          [1.0]|
+--------+---------------+



In [30]:
sfeatures_df.select('features', 'scaled_features').show()

+-------------------+--------------------+
|           features|     scaled_features|
+-------------------+--------------------+
| [10.0,10000.0,1.0]|       [0.0,0.0,0.0]|
| [20.0,20000.0,2.0]|[0.5,0.3333333333...|
|[30.0,40000.0,10.0]|       [1.0,1.0,1.0]|
+-------------------+--------------------+



In [25]:
sfeatures_df.select('features', 'scaled_features').show()

+------------------+--------------------+
|          features|     scaled_features|
+------------------+--------------------+
|[10.0,10000.0,1.0]|       [0.0,0.0,0.0]|
|[20.0,20000.0,2.0]|[0.5,0.3333333333...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



In [19]:
sfeatures_df.select('features', 'scaled_features').show()

+------------------+--------------------+
|          features|     scaled_features|
+------------------+--------------------+
|[10.0,10000.0,1.0]|       [0.0,0.0,0.0]|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



In [47]:
print(dir(MinMaxScaler))

['__abstractmethods__', '__class__', '__del__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__metaclass__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_abc_cache', '_abc_negative_cache', '_abc_negative_cache_version', '_abc_registry', '_call_java', '_clear', '_copyValues', '_copy_params', '_create_from_java_class', '_create_model', '_create_params_from_java', '_dummy', '_empty_java_param_map', '_fit', '_fit_java', '_from_java', '_make_java_param_pair', '_new_java_array', '_new_java_obj', '_randomUID', '_resetUid', '_resolveParam', '_set', '_setDefault', '_shouldOwn', '_to_java', '_transfer_param_map_from_java', '_transfer_param_map_to_java', '_transfer_params_from_java', '_transfer_params_to_java', 'copy', 'explainParam', 'explainParams', 'extractParamMap', 'fit', 'fitMultiple', 'getInputCol', 'getMax', 'getMin', 'getOrDefault', 'getO

# Standardize

In [59]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [60]:
features_df=spark.createDataFrame([
    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
    (2, Vectors.dense([20.0, 20000.0, 2.0]),),
    (3, Vectors.dense([20.0, 30000.0, 5.0]),),
    (4, Vectors.dense([30.0, 40000.0, 10.0]),)],['id', 'features'])

In [62]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [63]:
feature_stand_scaler=StandardScaler(inputCol='features', outputCol='sfeatures', withStd=True, withMean=True)

In [64]:
stand_smodel=feature_stand_scaler.fit(features_df)

In [65]:
stand_sfeatures_df=stand_smodel.transform(features_df)

In [66]:
stand_sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.2247, -1.1619, -0.866]))]

In [67]:
stand_sfeatures_df.show()

+---+-------------------+--------------------+
| id|           features|           sfeatures|
+---+-------------------+--------------------+
|  1| [10.0,10000.0,1.0]|[-1.2247448713915...|
|  2| [20.0,20000.0,2.0]|[0.0,-0.387298334...|
|  3| [20.0,30000.0,5.0]|[0.0,0.3872983346...|
|  4|[30.0,40000.0,10.0]|[1.22474487139158...|
+---+-------------------+--------------------+



# Bucketize numeric data

In [2]:
from pyspark.ml.feature import Bucketizer
# bucketizer work with range of boundaries 
splits=[-float('inf'), -10.0, 0.0, 10.0, float('inf') ]

In [3]:
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]

In [4]:
b_df=spark.createDataFrame(b_data, ['features'])

In [5]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [6]:
bucketizer = Bucketizer(splits=splits, inputCol='features', outputCol='bfeatures')

In [7]:
bucketed_df=bucketizer.transform(b_df)
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



# Tokenize text data

In [8]:
from pyspark.ml.feature import Tokenizer

In [9]:
sentences_df=spark.createDataFrame([(1, 'This is an introduction to Spark MLlib'),
                                   (2, 'MLlib includes libraries for classification'),
                                   (3, 'It also contains supporting tools for pipelines')],
                                  ['id', 'sentence'])

In [10]:
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes li...|
|  3|It also contains ...|
+---+--------------------+



In [11]:
sent_token= Tokenizer(inputCol='sentence', outputCol='words')

In [12]:
sent_tokenized_df = sent_token.transform(sentences_df)

In [13]:
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes li...|[mllib, includes,...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



# TF-IDF

In [14]:
from pyspark.ml.feature import HashingTF, IDF

In [20]:
hashigTF=HashingTF(inputCol='words', outputCol='rawfeatures', numFeatures=20)

In [21]:
sent_hfTF_df=hashigTF.transform(sent_tokenized_df)
sent_hfTF_df.take(1)

[Row(id=1, sentence=u'This is an introduction to Spark MLlib', words=[u'this', u'is', u'an', u'introduction', u'to', u'spark', u'mllib'], rawfeatures=SparseVector(20, {1: 2.0, 5: 1.0, 6: 1.0, 8: 1.0, 12: 1.0, 13: 1.0}))]

In [22]:
idf =IDF(inputCol='rawfeatures', outputCol='idf_features')

In [23]:
idfModel=idf.fit(sent_hfTF_df)
tfidf_df=idfModel.transform(sent_hfTF_df)

In [24]:
tfidf_df.take(1)

[Row(id=1, sentence=u'This is an introduction to Spark MLlib', words=[u'this', u'is', u'an', u'introduction', u'to', u'spark', u'mllib'], rawfeatures=SparseVector(20, {1: 2.0, 5: 1.0, 6: 1.0, 8: 1.0, 12: 1.0, 13: 1.0}), idf_features=SparseVector(20, {1: 0.5754, 5: 0.6931, 6: 0.2877, 8: 0.2877, 12: 0.0, 13: 0.6931}))]