### Loading Spark

In [1]:
import pyspark as spark
from pyspark.sql import SparkSession

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

# Read the CSV file
emp_df = spark.read.csv("./data/employee.txt", header=True)

# Show the dataframe contents
emp_df.show()
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
    (1,Vectors.dense([10.0,10000.0, 1.0]),),
    (2,Vectors.dense([20.0,30000.0, 2.0]),),
    (3,Vectors.dense([30.0,40000.0, 3.0]),)
],["id","features"])

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/01 03:34:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---+------------+--------------------+--------+-------------+------------+------+--------------------+---------+
| id|   last_name|               email|  gender|   department|  start_date|salary|           job_title|region_id|
+---+------------+--------------------+--------+-------------+------------+------+--------------------+---------+
|  1|    'Kelley'|'rkelley0@soundcl...|'Female'|  'Computers'| '10/2/2009'| 67470|'Structural Engin...|        2|
|  2| 'Armstrong'|'sarmstrong1@info...|  'Male'|     'Sports'| '3/31/2008'| 71869| 'Financial Advisor'|        2|
|  3|      'Carr'|'fcarr2@woothemes...|  'Male'| 'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
|  4|    'Murray'|   'jmurray3@gov.uk'|'Female'|   'Jewelery'|'12/25/2014'| 96897|'Desktop Support ...|        3|
|  5|     'Ellis'|'jellis4@scienced...|'Female'|    'Grocery'| '9/19/2002'| 63702|'Software Enginee...|        7|
|  6|  'Phillips'|'bphillips5@time....|  'Male'|      'Tools'| '8/21/2013'|118497|'Execu

In [3]:
emp_df.schema

StructType([StructField('id', StringType(), True), StructField('last_name', StringType(), True), StructField('email', StringType(), True), StructField('gender', StringType(), True), StructField('department', StringType(), True), StructField('start_date', StringType(), True), StructField('salary', StringType(), True), StructField('job_title', StringType(), True), StructField('region_id', StringType(), True)])

In [4]:
emp_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: string (nullable = true)



In [5]:
emp_df.take(5)

[Row(id='1', last_name="'Kelley'", email="'rkelley0@soundcloud.com'", gender="'Female'", department="'Computers'", start_date="'10/2/2009'", salary='67470', job_title="'Structural Engineer'", region_id='2'),
 Row(id='2', last_name="'Armstrong'", email="'sarmstrong1@infoseek.co.jp'", gender="'Male'", department="'Sports'", start_date="'3/31/2008'", salary='71869', job_title="'Financial Advisor'", region_id='2'),
 Row(id='3', last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary='101768', job_title="'Recruiting Manager'", region_id='3'),
 Row(id='4', last_name="'Murray'", email="'jmurray3@gov.uk'", gender="'Female'", department="'Jewelery'", start_date="'12/25/2014'", salary='96897', job_title="'Desktop Support Technician'", region_id='3'),
 Row(id='5', last_name="'Ellis'", email="'jellis4@sciencedirect.com'", gender="'Female'", department="'Grocery'", start_date="'9/19/2002'", salary='63702', job_title="'Software

In [6]:
emp_df.count()

1000

In [7]:
sample_df = emp_df.sample(False,0.1)
sample_df.count()

104

#### Normalizing

In [8]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [9]:
features_df.take(1)

                                                                                

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [10]:
feature_scaler = MinMaxScaler(inputCol="features",outputCol="sfeatures")
# feature_scaler = MinMaxScaler(inputCol="features", outputCol="sfeatures", handleInvalid="keep")
smodel = feature_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=SparseVector(3, {}))]

In [11]:
sfeatures_df.select("features", "sfeatures").show(truncate=False)

+------------------+----------------------------+
|features          |sfeatures                   |
+------------------+----------------------------+
|[10.0,10000.0,1.0]|(3,[],[])                   |
|[20.0,30000.0,2.0]|[0.5,0.6666666666666667,0.5]|
|[30.0,40000.0,3.0]|[1.0,1.0,1.0]               |
+------------------+----------------------------+



In [12]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT, DenseVector

# Create UDF to convert sparse vectors to dense
to_dense = udf(lambda v: DenseVector(v.toArray()), VectorUDT())

# Apply the conversion
sfeatures_df = sfeatures_df.withColumn("sfeatures", to_dense("sfeatures"))

# Show results
sfeatures_df.select("features", "sfeatures").show(truncate=False)

                                                                                

+------------------+----------------------------+
|features          |sfeatures                   |
+------------------+----------------------------+
|[10.0,10000.0,1.0]|[0.0,0.0,0.0]               |
|[20.0,30000.0,2.0]|[0.5,0.6666666666666667,0.5]|
|[30.0,40000.0,3.0]|[1.0,1.0,1.0]               |
+------------------+----------------------------+



                                                                                

#### Standardizing

In [13]:
from pyspark.ml.feature import StandardScaler

feature_stand_scaler = StandardScaler(inputCol="features",outputCol="sfeatures",withStd=True, withMean=True)

In [14]:
stand_smodel = feature_stand_scaler.fit(features_df)
stand_sfeatures_df = stand_smodel.transform(features_df)
stand_sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0]))]

In [15]:
stand_sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



#### Bucketize

In [16]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -10.0, 0.0, 10.0, float("inf")]
b_data = [(-800.0,),(-10.5,),(-1.7,),(0.0,),(8.2,),(90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [17]:
bucketizer = Bucketizer(splits=splits, inputCol="features",outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



#### Tokenizing

In [18]:
from pyspark.ml.feature import Tokenizer

In [19]:
sentences_df = spark.createDataFrame([
    (1, "Norah is doing some stuff with Spark MLlib"),
    (2, "The slick sly fox jumped over the lazy brown dog"),
    (3, "I am going to tokenize some text in a second here")],
    ["id","sentence"])

sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|Norah is doing so...|
|  2|The slick sly fox...|
|  3|I am going to tok...|
+---+--------------------+



In [20]:
sent_token = Tokenizer(inputCol="sentence",outputCol="words")
sent_tokenized_df = sent_token.transform(sentences_df)
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|Norah is doing so...|[norah, is, doing...|
|  2|The slick sly fox...|[the, slick, sly,...|
|  3|I am going to tok...|[i, am, going, to...|
+---+--------------------+--------------------+



#### TF-IDF

In [21]:
from pyspark.ml.feature import HashingTF, IDF

sentences_df

DataFrame[id: bigint, sentence: string]

In [22]:
sentences_df.take(1)

[Row(id=1, sentence='Norah is doing some stuff with Spark MLlib')]

In [23]:
sent_tokenized_df.take(1)

[Row(id=1, sentence='Norah is doing some stuff with Spark MLlib', words=['norah', 'is', 'doing', 'some', 'stuff', 'with', 'spark', 'mllib'])]

In [24]:
hashingTF = HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=20)

In [25]:
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)
sent_hfTF_df.take(1)

[Row(id=1, sentence='Norah is doing some stuff with Spark MLlib', words=['norah', 'is', 'doing', 'some', 'stuff', 'with', 'spark', 'mllib'], rawFeatures=SparseVector(20, {0: 1.0, 2: 1.0, 6: 1.0, 8: 1.0, 9: 2.0, 10: 1.0, 15: 1.0}))]

In [26]:
idf = IDF(inputCol="rawFeatures",outputCol="idf_features")
idfModel = idf.fit(sent_hfTF_df)
tfidf_df = idfModel.transform(sent_hfTF_df)

                                                                                

In [27]:
tfidf_df.take(1)

[Row(id=1, sentence='Norah is doing some stuff with Spark MLlib', words=['norah', 'is', 'doing', 'some', 'stuff', 'with', 'spark', 'mllib'], rawFeatures=SparseVector(20, {0: 1.0, 2: 1.0, 6: 1.0, 8: 1.0, 9: 2.0, 10: 1.0, 15: 1.0}), idf_features=SparseVector(20, {0: 0.2877, 2: 0.6931, 6: 0.6931, 8: 0.0, 9: 0.5754, 10: 0.2877, 15: 0.2877}))]

#### Clustering
- K-means clustering

In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

cluster_df = spark.read.csv("./data/clustering_dataset.csv",header=True,inferSchema=True)

In [29]:
cluster_df

DataFrame[col1: int, col2: int, col3: int]

In [30]:
cluster_df.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [31]:
vectorAssembler = VectorAssembler(inputCols=["col1","col2","col3"],outputCol="features")
vcluster_df = vectorAssembler.transform(cluster_df)
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [32]:
kmeans = KMeans().setK(3)
kmeans = kmeans.setSeed(1)
kmodel = kmeans.fit(vcluster_df)

25/02/01 03:34:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [33]:
centers = kmodel.clusterCenters()
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

- Hierarchical clustering

In [34]:
vcluster_df.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [35]:
from pyspark.ml.clustering import BisectingKMeans

bkmeans = BisectingKMeans().setK(3)
kmeans = kmeans.setSeed(1)
bkmodel = bkmeans.fit(vcluster_df)
bkcenters = bkmodel.clusterCenters()
bkcenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [36]:
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

- Preprocessing Iris data set

In [37]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

iris_df = spark.read.csv("./data/iris.txt", inferSchema=True)
iris_df.take(1)

[Row(_c0=5.1, _c1=3.5, _c2=1.4, _c3=0.2, _c4='Iris-setosa')]

In [38]:
iris_df = iris_df.select(col("_c0").alias("sepal_length"),
    col("_c1").alias("sepal_width"),
    col("_c2").alias("petal_length"),
    col("_c3").alias("petal_width"),
    col("_c4").alias("variety"))

vectorAssembler = VectorAssembler(inputCols=["sepal_length", 
                        "sepal_width","petal_length", 
                        "petal_width"], outputCol="features")

virus_df = vectorAssembler.transform(iris_df)
virus_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, variety='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [39]:
indexer = StringIndexer(inputCol="variety",outputCol="label")
ivirus_df = indexer.fit(virus_df).transform(virus_df)
ivirus_df.show(1)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|    variety|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 1 row



- Naive Bayes classification

In [40]:
ivirus_df

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, variety: string, features: vector, label: double]

In [41]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

splits = ivirus_df.randomSplit([0.6,0.4],1)
train_df = splits[0]
test_df = splits[1]

print(f"""Train: {train_df.count()}
Test: {test_df.count()}""")

Train: 98
Test: 52


In [42]:
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)

In [43]:
predictions_df = nbmodel.transform(test_df)
predictions_df.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, variety='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [44]:
evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")

In [45]:
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy

0.9807692307692307

- Multilayer perception classification

In [46]:
ivirus_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, variety='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [47]:
print(f"""Train: {train_df.count()}
Test: {test_df.count()}
Indexed vector: {ivirus_df.count()}
""")

Train: 98
Test: 52
Indexed vector: 150



In [48]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

layers = [4,3] # < 4 measures + 3 types
neurons = (5, 5, 5, 5, 5) # separately define our neurons here
# 4s yield 0.6923
# 9s yield 0.9807
# 5 - 8 seems to work best here
layers = [4,*neurons,3] # using *neurons to unpack dynamically
mlp = MultilayerPerceptronClassifier(layers=layers,seed=1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)

mlp_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

1.0

- Decision trees classification

In [49]:
from pyspark.ml.classification import DecisionTreeClassifier

# Initialize Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

0.9423076923076923

- Regression
- Preprocessing regression data

In [50]:
from pyspark.ml.regression import LinearRegression

pp_df = spark.read.csv("./data/ccpp.csv",header=True,inferSchema=True)
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [51]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [52]:
lr = LinearRegression(featuresCol="features",labelCol="PE")
lr_model = lr.fit(vpp_df)

25/02/01 03:34:54 WARN Instrumentation: [376c8718] regParam is zero, which might cause numerical instability and overfitting.
25/02/01 03:34:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [53]:
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [54]:
lr_model.intercept

454.6092744523414

In [55]:
lr_model.summary.rootMeanSquaredError

4.557126016749488

In [56]:
lr_model.save("lr1.model")

                                                                                

- Decision tree regression

In [57]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

pp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26)]

In [58]:
vectorAssembler = VectorAssembler(inputCols=["AT","V","AP","RH"],outputCol="features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [59]:
splits = vpp_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]
print(f"""Train: {train_df.count()}
Test: {test_df.count()}
Total: {vpp_df.count()}
""")

Train: 6685
Test: 2883
Total: 9568



In [60]:
dt = DecisionTreeRegressor(featuresCol="features",labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="PE",predictionCol="prediction",metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
rmse

4.537880605370347

- Gradient-boosted tree regression

In [61]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features",labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol="PE",predictionCol="prediction",metricName="rmse")
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
gbt_rmse



4.076993816516189

- Collaborative filtering preprocessing