# Handling Categorical features

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv('../data/colors.csv', header=True, inferSchema=True)
data.show()


+---+------+
| id| color|
+---+------+
|  1|   red|
|  2|  blue|
|  3|orange|
|  4| white|
|  5|   red|
|  6|orange|
|  7|   red|
|  8| white|
|  9|   red|
+---+------+



In [9]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="color", outputCol="color_indexed")


In [10]:
indexer_model = indexer.fit(data)


In [11]:
indexed_data= indexer_model.transform(data)
# to view the data
indexed_data.show()


+---+------+-------------+
| id| color|color_indexed|
+---+------+-------------+
|  1|   red|          0.0|
|  2|  blue|          3.0|
|  3|orange|          1.0|
|  4| white|          2.0|
|  5|   red|          0.0|
|  6|orange|          1.0|
|  7|   red|          0.0|
|  8| white|          2.0|
|  9|   red|          0.0|
+---+------+-------------+



In [12]:
from pyspark.ml.feature import OneHotEncoder

In [13]:
ohe = OneHotEncoder(inputCols=["color_indexed"], outputCols=["color_ohe"])
ohe_model = ohe.fit(indexed_data)
encoded_data = ohe_model.transform(indexed_data)
encoded_data.show()


+---+------+-------------+-------------+
| id| color|color_indexed|    color_ohe|
+---+------+-------------+-------------+
|  1|   red|          0.0|(3,[0],[1.0])|
|  2|  blue|          3.0|    (3,[],[])|
|  3|orange|          1.0|(3,[1],[1.0])|
|  4| white|          2.0|(3,[2],[1.0])|
|  5|   red|          0.0|(3,[0],[1.0])|
|  6|orange|          1.0|(3,[1],[1.0])|
|  7|   red|          0.0|(3,[0],[1.0])|
|  8| white|          2.0|(3,[2],[1.0])|
|  9|   red|          0.0|(3,[0],[1.0])|
+---+------+-------------+-------------+



# Feature Scaling

In [14]:
data = spark.read.csv('../data/wine.data', header=False, inferSchema=True)
data.show()


+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480|
|  1|13.24|2.59|2.87|21.0|118| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735|
|  1| 14.2|1.76|2.45|15.2|112|3.27|3.39|0.34|1.97|6.75|1.05|2.85|1450|
|  1|14.39|1.87|2.45|14.6| 96| 2.5|2.52| 0.3|1.98|5.25|1.02|3.58|1290|
|  1|14.06|2.15|2.61|17.6|121| 2.6|2.51|0.31|1.25|5.05|1.06|3.58|1295|
|  1|14.83|1.64|2.17|14.0| 97| 2.8|2.98|0.29|1.98| 5.2|1.08|2.85|1045|
|  1|13.86|1.35|2.27|16.0| 98|2.98|3.15|0.22|1.85|7.22|1.01|3.55|1045|
|  1| 14.1|2.16| 2.3|18.0|105|2.95|3.32|0.22|2.38|5.75|1.25|3.17|1510|
|  1|1

In [15]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=data.columns[1:], outputCol="features")
data_2 = assembler.transform(data)


In [16]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [17]:
scaler_model = scaler.fit(data_2)

In [18]:
scaled_data = scaler_model.transform(data_2)

In [19]:
scaled_data.select("features", "scaled_features").collect()

[Row(features=DenseVector([14.23, 1.71, 2.43, 15.6, 127.0, 2.8, 3.06, 0.28, 2.29, 5.64, 1.04, 3.92, 1065.0]), scaled_features=DenseVector([17.5284, 1.5307, 8.8575, 4.6713, 8.892, 4.4739, 3.0635, 2.2498, 4.001, 2.4328, 4.55, 5.5212, 3.3819])),
 Row(features=DenseVector([13.2, 1.78, 2.14, 11.2, 100.0, 2.65, 2.76, 0.26, 1.28, 4.38, 1.05, 3.4, 1050.0]), scaled_features=DenseVector([16.2596, 1.5933, 7.8004, 3.3537, 7.0016, 4.2342, 2.7632, 2.0891, 2.2364, 1.8893, 4.5937, 4.7888, 3.3343])),
 Row(features=DenseVector([13.16, 2.36, 2.67, 18.6, 101.0, 2.8, 3.24, 0.3, 2.81, 5.68, 1.03, 3.17, 1185.0]), scaled_features=DenseVector([16.2104, 2.1125, 9.7323, 5.5696, 7.0716, 4.4739, 3.2437, 2.4105, 4.9095, 2.4501, 4.5062, 4.4648, 3.763])),
 Row(features=DenseVector([14.37, 1.95, 2.5, 16.8, 113.0, 3.85, 3.49, 0.24, 2.18, 7.8, 0.86, 3.45, 1480.0]), scaled_features=DenseVector([17.7008, 1.7455, 9.1126, 5.0306, 7.9118, 6.1516, 3.494, 1.9284, 3.8088, 3.3646, 3.7625, 4.8592, 4.6998])),
 Row(features=DenseVe

In [20]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(min=0, max=1, inputCol='features', outputCol='features_minmax')
scaler_model = scaler.fit(data_2)
data_3 = scaler_model.transform(data_2)


In [21]:
data_3.select("features", "features_minmax").collect()

[Row(features=DenseVector([14.23, 1.71, 2.43, 15.6, 127.0, 2.8, 3.06, 0.28, 2.29, 5.64, 1.04, 3.92, 1065.0]), features_minmax=DenseVector([0.8421, 0.1917, 0.5722, 0.2577, 0.6196, 0.6276, 0.5738, 0.283, 0.5931, 0.372, 0.4553, 0.9707, 0.5613])),
 Row(features=DenseVector([13.2, 1.78, 2.14, 11.2, 100.0, 2.65, 2.76, 0.26, 1.28, 4.38, 1.05, 3.4, 1050.0]), features_minmax=DenseVector([0.5711, 0.2055, 0.4171, 0.0309, 0.3261, 0.5759, 0.5105, 0.2453, 0.2744, 0.2645, 0.4634, 0.7802, 0.5506])),
 Row(features=DenseVector([13.16, 2.36, 2.67, 18.6, 101.0, 2.8, 3.24, 0.3, 2.81, 5.68, 1.03, 3.17, 1185.0]), features_minmax=DenseVector([0.5605, 0.3202, 0.7005, 0.4124, 0.337, 0.6276, 0.6118, 0.3208, 0.7571, 0.3754, 0.4472, 0.696, 0.6469])),
 Row(features=DenseVector([14.37, 1.95, 2.5, 16.8, 113.0, 3.85, 3.49, 0.24, 2.18, 7.8, 0.86, 3.45, 1480.0]), features_minmax=DenseVector([0.8789, 0.2391, 0.6096, 0.3196, 0.4674, 0.9897, 0.6646, 0.2075, 0.5584, 0.5563, 0.3089, 0.7985, 0.8573])),
 Row(features=DenseVect

# PCA

In [22]:
data = spark.read.csv('../data/digits.csv', header=True, inferSchema=True)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=data.columns[1:], outputCol='features')
data_2 = assembler.transform(data)


AnalysisException: Path does not exist: file:/Users/rajkgupta/Documents/mygitrepos/Bigdata-with-pyspark/data/digits.csv;

In [None]:
from pyspark.ml.feature import PCA
pca = PCA(k=2, inputCol='features', outputCol='features_pca')


In [None]:
pca_model = pca.fit(data_2)

In [None]:
pca_data = pca_model.transform(data_2).select('features_pca')


In [None]:
spark.stop()