# Module 2 Preprocessing

In [None]:
import pyspark

In [2]:
# Start a Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()


## Normalizer

In [6]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [7]:
df4 = spark.createDataFrame([
(1,Vectors.dense([10.0,10000.0,1.0]),),
(2,Vectors.dense([20.0,20000.0,2.0]),),
(3,Vectors.dense([30.0,30000.0,3.0]),)
],["id","features"])

In [8]:
df4.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  2|[20.0,20000.0,2.0]|
|  3|[30.0,30000.0,3.0]|
+---+------------------+



In [9]:
scaler = MinMaxScaler(inputCol="features",outputCol="sfeatures")

In [10]:
model = scaler.fit(df4)

In [11]:
df5 = model.transform(df4)

In [12]:
df5.select("features","sfeatures").show()

+------------------+-------------+
|          features|    sfeatures|
+------------------+-------------+
|[10.0,10000.0,1.0]|[0.0,0.0,0.0]|
|[20.0,20000.0,2.0]|[0.5,0.5,0.5]|
|[30.0,30000.0,3.0]|[1.0,1.0,1.0]|
+------------------+-------------+



## Standardizer

In [14]:
from pyspark.ml.feature import StandardScaler

In [15]:
scaler2 = StandardScaler(inputCol="features",outputCol="sfeatures",withStd=True,withMean=True)

model2 = scaler2.fit(df4)

df6 = model2.transform(df4)

In [16]:
scaler2 = StandardScaler(inputCol="features",outputCol="sfeatures",withStd=True,withMean=True)
model2 = scaler2.fit(df4)
df6 = model2.transform(df4)

In [17]:
df6.select("features","sfeatures").show()

+------------------+----------------+
|          features|       sfeatures|
+------------------+----------------+
|[10.0,10000.0,1.0]|[-1.0,-1.0,-1.0]|
|[20.0,20000.0,2.0]|   [0.0,0.0,0.0]|
|[30.0,30000.0,3.0]|   [1.0,1.0,1.0]|
+------------------+----------------+



## Ex: Normalizer

In [20]:
cctv = spark.read.csv('./data/cctv.csv',header=True,inferSchema=True)

In [21]:
cctv.show()

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
| 8.34|40.77|1010.84|90.01|480.48|
|23.64|58.49| 1011.4| 74.2|445.75|
|29.74| 56.9|1007.15|41.91|438.76|
|19.07|49.69|1007.22|76.79|453.09|
| 11.8|40.66|1017.13| 97.2|464.43|
|13.97|39.16|1016.05| 84.6|470.96|
| 22.1|71.29| 1008.2|75.38|442.35|
|14.47|41.76|1021.98|78.41| 464.0|
|31.25|69.51|1010.25|36.83|428.77|
| 6.77|38.18| 1017.8|81.13|484.31|
|28.28|68.67|1006.36| 69.9|435.29|
|22.99|46.93|1014.15|49.42|451.41|
| 29.3|70.04|1010.95|61.23|426.25|
| 8.14|37.49|1009.04|80.33|480.66|
|16.92| 44.6|1017.34|58.75|460.17|
|22.72|64.15|1021.14|60.34|453.13|
|18.14|43.56|1012.83| 47.1|461.71|
|11.49|44.63|1020.44|86.04|471.08|
| 9.94|40.46| 1018.9|68.51|473.74|
|23.54| 41.1|1002.05|38.05|448.56|
+-----+-----+-------+-----+------+
only showing top 20 rows



In [18]:
from pyspark.ml.feature import VectorAssembler

In [19]:
vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")

In [22]:
cctv2 = vectorAssembler.transform(cctv)

In [25]:
cctv2.show(20,False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|8.34 |40.77|1010.84|90.01|480.48|[8.34,40.77,1010.84,90.01] |
|23.64|58.49|1011.4 |74.2 |445.75|[23.64,58.49,1011.4,74.2]  |
|29.74|56.9 |1007.15|41.91|438.76|[29.74,56.9,1007.15,41.91] |
|19.07|49.69|1007.22|76.79|453.09|[19.07,49.69,1007.22,76.79]|
|11.8 |40.66|1017.13|97.2 |464.43|[11.8,40.66,1017.13,97.2]  |
|13.97|39.16|1016.05|84.6 |470.96|[13.97,39.16,1016.05,84.6] |
|22.1 |71.29|1008.2 |75.38|442.35|[22.1,71.29,1008.2,75.38]  |
|14.47|41.76|1021.98|78.41|464.0 |[14.47,41.76,1021.98,78.41]|
|31.25|69.51|1010.25|36.83|428.77|[31.25,69.51,1010.25,36.83]|
|6.77 |38.18|1017.8 |81.13|484.31|[6.77,38.18,1017.8,81.13]  |
|28.28|68.67|1006.36|69.9 |435.29|[28.28,68.67,1006.36,69.9] |
|22.99|46.93|1014.15|49.42|451.41|[22.99,46.93,1014.15,49.42]|
|29.3 |70.04|1010.95|61.23|426.25|[29.3,70.04,1010.95,6

In [30]:
scaler = MinMaxScaler(inputCol="features",outputCol="sfeatures")
model = scaler.fit(cctv2)
cctv3 = model.transform(cctv2)

In [34]:
cctv3.select('sfeatures').show(20,False)

+-------------------------------------------------------------------------------+
|sfeatures                                                                      |
+-------------------------------------------------------------------------------+
|[0.184985835694051,0.2741992882562278,0.444196980945312,0.8639410187667561]    |
|[0.6184135977337112,0.5895017793594306,0.4580549368968079,0.6520107238605899]  |
|[0.7912181303116148,0.5612099644128113,0.35288294976490975,0.21916890080428952]|
|[0.48895184135977343,0.4329181494661921,0.35461519425884813,0.6867292225201074]|
|[0.2830028328611898,0.27224199288256223,0.5998515219005204,0.9603217158176944] |
|[0.3444759206798867,0.24555160142348748,0.5731254639940605,0.7914209115281501] |
|[0.574787535410765,0.8172597864768684,0.3788666171739686,0.6678284182305629]   |
|[0.3586402266288952,0.29181494661921703,0.7198713189804518,0.7084450402144772] |
|[0.8339943342776205,0.7855871886120998,0.42959663449641244,0.15107238605898124]|
|[0.140509915014

In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000


## Tokenizer

In [None]:
from pyspark.ml.feature import Tokenizer

df7 = spark.createDataFrame([
(1,"This is an introduction to Spark MLib"),
(2,"MLib contains libraries for classification and regression"),
(3,"It also contains supporting tools for pipelines")
],["id","sentence"])

df7.show()

token = Tokenizer(inputCol="sentence",outputCol="words")
df8 = token.transform(df7)

df8.show()

## TF-IDF

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hftoken = HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=20)
df9 = hftoken.transform(df8)

df9.take(1)

idf = IDF(inputCol="rawFeatures",outputCol="idf_features")
model3 = idf.fit(df9)
df10 = model3.transform(df9)

df10.take(1)