In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, HashingTF
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import split
from pyspark.sql.functions import lit


In [1]:
import pyspark

print("PySpark version:", pyspark.__version__)


PySpark version: 3.5.0


In [3]:
# SparkSession
spark = SparkSession.builder \
    .appName("L_R") \
    .getOrCreate()

# SparkContext
sc = spark.sparkContext

In [4]:

spamFile = sc.textFile("../Data/ham-spam/spam.txt")
normalFile = sc.textFile("../Data/ham-spam/ham.txt")

# Map all email text to vectors of 100 features/dimensions  
tf = HashingTF(numFeatures = 100, inputCol = "word", outputCol = "feature")


In [5]:

# pos: Split email
data_pos_df = spamFile.map(lambda email: (email.split(" "),)).toDF(["word"])

# pos: Create label
data_pos_feature = tf.transform(data_pos_df).withColumn("label", lit(1))


                                                                                

In [6]:

# neg: Split email
data_neg_df = normalFile.map(lambda email: (email.split(" "),)).toDF(["word"])

# neg: Create label
data_neg_feature = tf.transform(data_neg_df).withColumn("label", lit(-1))


In [7]:

# Use the union of both as training data
trainingData = data_pos_feature.union(data_neg_feature)

# Run Linear Regression
lr = LinearRegression(labelCol = "label", featuresCol = "feature")
model = lr.fit(trainingData)


24/03/02 23:10:06 WARN Instrumentation: [7e1f766e] regParam is zero, which might cause numerical instability and overfitting.
24/03/02 23:10:08 WARN Instrumentation: [7e1f766e] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


In [8]:
# Test on a positive example (spam) and a negative one (normal).  

pos_input = spark.createDataFrame([("Viagra GET cheap stuff by sending money to ...".split(" "),)], ["word"])
posTest = tf.transform(pos_input)

neg_input = spark.createDataFrame([("Hi Dad, I started studying Spark the other day.".split(" "),)], ["word"])
negTest = tf.transform(neg_input)

pos_predictions = model.transform(posTest)
neg_predictions = model.transform(negTest)

# Finally show the results
pos_predictions.show()
neg_predictions.show()


+--------------------+--------------------+------------------+
|                word|             feature|        prediction|
+--------------------+--------------------+------------------+
|[Viagra, GET, che...|(100,[1,3,22,26,6...|0.7389632529043003|
+--------------------+--------------------+------------------+





+--------------------+--------------------+------------------+
|                word|             feature|        prediction|
+--------------------+--------------------+------------------+
|[Hi, Dad,, I, sta...|(100,[3,16,17,42,...|0.1285595169714334|
+--------------------+--------------------+------------------+



In [9]:

# Get the predicted value from the DataFrame.
pos_predictions_value = pos_predictions.select("prediction").head()[0]
neg_predictions_value = neg_predictions.select("prediction").head()[0]

# Print the predicted value.
print("Sentence: Viagra GET cheap stuff by sending money to ...\n" + "Prediction: " + str(pos_predictions_value))
print("\nSentence: Hi Dad, I started studying Spark the other day.\n" + "Prediction: " + str(neg_predictions_value))


Sentence: Viagra GET cheap stuff by sending money to ...
Prediction: 0.7389632529043003

Sentence: Hi Dad, I started studying Spark the other day.
Prediction: 0.1285595169714334
