In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CSVToDataFrame").getOrCreate()
csv_file_path = "spam.csv"
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
    StructField("class", StringType(), True),
    StructField("text", StringType(), True)
])
data = spark.read.option("delimiter", ",").csv(csv_file_path, schema=schema, header=True)

23/11/15 14:19:46 WARN Utils: Your hostname, ouzema-Vivobook-ASUSLaptop-M7400QC-M7400QC resolves to a loopback address: 127.0.1.1; using 20.20.20.234 instead (on interface wlp2s0)
23/11/15 14:19:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/15 14:19:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/15 14:19:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/11/15 14:19:47 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
data.show()

23/11/15 14:19:50 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/SPAM_DETECTION/spam.csv


+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if that��s t...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [3]:
from pyspark.sql.functions import length
data = data.withColumn('length',length(data['text']))
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if that��s t...|    58|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



23/11/15 14:19:51 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/SPAM_DETECTION/spam.csv


In [11]:
data.select("class").where(data["class"] == "ham").count()

23/11/15 14:23:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: v1
 Schema: class
Expected: class but found: v1
CSV file: file:///home/ouzema/Big%20Data/SPAM_DETECTION/spam.csv


4825

In [12]:
data.select("class").where(data["class"] == "spam").count()

23/11/15 14:23:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: v1
 Schema: class
Expected: class but found: v1
CSV file: file:///home/ouzema/Big%20Data/SPAM_DETECTION/spam.csv


747

In [4]:
from pyspark.sql.functions import col, when
data = data.withColumn("class", when(data["class"] == "ham\"\"\"", "ham").otherwise(data["class"]))

In [5]:
data.groupby('class').mean().show()

23/11/15 13:40:13 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


+-----+------------------+
|class|       avg(length)|
+-----+------------------+
|  ham| 71.07065893079155|
| spam|138.45917001338688|
+-----+------------------+



In [6]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [7]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()

### Traitement

In [8]:
from pyspark.sql.functions import col
data.filter(col("text").isNull()).count()

23/11/15 13:40:14 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: v2
 Schema: text
Expected: text but found: v2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


1

In [9]:
data = data.dropna(subset=["text"])

In [10]:
data = data.fillna("N/A", subset=["text"])

In [11]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner = data_prep_pipe.fit(data)
clean_data = cleaner.transform(data)

23/11/15 13:40:14 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:15 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: v2
 Schema: text
Expected: text but found: v2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: v2
 Schema: text
Expected: text but found: v2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
                                                                                

In [12]:
clean_data = clean_data.select(['label','features'])
clean_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13377,[7,10,31,6...|
|  0.0|(13377,[0,23,298,...|
|  1.0|(13377,[2,13,19,2...|
|  0.0|(13377,[0,68,78,1...|
|  0.0|(13377,[35,134,31...|
|  1.0|(13377,[11,66,140...|
|  0.0|(13377,[11,51,107...|
|  0.0|(13377,[126,186,4...|
|  1.0|(13377,[1,45,120,...|
|  1.0|(13377,[0,1,13,26...|
|  0.0|(13377,[18,42,116...|
|  1.0|(13377,[8,17,36,8...|
|  1.0|(13377,[13,28,45,...|
|  0.0|(13377,[38,95,226...|
|  0.0|(13377,[550,1773,...|
|  1.0|(13377,[28,109,11...|
|  0.0|(13377,[80,214,44...|
|  0.0|(13377,[0,2,49,13...|
|  0.0|(13377,[0,74,104,...|
|  1.0|(13377,[4,28,33,5...|
+-----+--------------------+
only showing top 20 rows



23/11/15 13:40:18 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


## Naive Bayes

In [13]:
(training,testing) = clean_data.randomSplit([0.7,0.3])
spam_predictor = nb.fit(training)
test_results = spam_predictor.transform(testing)
test_results.show()

23/11/15 13:40:18 WARN DAGScheduler: Broadcasting large task binary with size 1166.4 KiB
23/11/15 13:40:18 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:19 WARN DAGScheduler: Broadcasting large task binary with size 1143.9 KiB
23/11/15 13:40:19 WARN DAGScheduler: Broadcasting large task binary with size 1371.0 KiB
23/11/15 13:40:19 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13377,[0,1,2,13,...|[-627.86436019134...|[0.99999999998812...|       0.0|
|  0.0|(13377,[0,1,2,40,...|[-1069.9386596866...|[1.0,8.9099224203...|       0.0|
|  0.0|(13377,[0,1,7,8,1...|[-1196.5171174199...|[1.0,3.2650163166...|       0.0|
|  0.0|(13377,[0,1,10,30...|[-878.44022025012...|[1.0,2.7287908807...|       0.0|
|  0.0|(13377,[0,1,14,18...|[-1359.9030015615...|[1.0,3.5262940466...|       0.0|
|  0.0|(13377,[0,1,14,31...|[-217.55833523441...|[1.0,7.7706339815...|       0.0|
|  0.0|(13377,[0,1,17,19...|[-807.77857531980...|[1.0,1.2896161732...|       0.0|
|  0.0|(13377,[0,1,20,26...|[-968.08264758095...|[1.0,6.1869613664...|       0.0|
|  0.0|(13377,[0,1,28,11...|[-598.91038846017...|[1.0,3.1073368663...|       0.0|
|  0.0|(13377,[0

23/11/15 13:40:19 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))

23/11/15 13:40:20 WARN DAGScheduler: Broadcasting large task binary with size 1376.0 KiB
23/11/15 13:40:20 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


Accuracy of model at predicting spam was: 0.9165906618905466


## Random Forest

In [15]:
from pyspark.ml.classification import RandomForestClassifier

# Split your preprocessed data into training and testing sets
(training, testing) = clean_data.randomSplit([0.7, 0.3])  # Use your preprocessed dataset

# Create a Random Forest classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Fit the Random Forest model on the training data
rf_model = rf.fit(training)

# Make predictions on the testing data
rf_predictions = rf_model.transform(testing)

# Show the results
rf_predictions.show()


23/11/15 13:40:20 WARN DAGScheduler: Broadcasting large task binary with size 1163.3 KiB
23/11/15 13:40:20 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:21 WARN DAGScheduler: Broadcasting large task binary with size 1163.4 KiB
23/11/15 13:40:21 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:21 WARN DAGScheduler: Broadcasting large task binary with size 1301.8 KiB
23/11/15 13:40:21 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:23 WARN DAGScheduler: Broadcasting large task binary with size 1530.2 KiB
23/11/15 13:40:24 WARN CSVHeader

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13377,[0,1,2,13,...|[82.1842553767343...|[0.82184255376734...|       0.0|
|  0.0|(13377,[0,1,5,20,...|[87.2237016666320...|[0.87223701666632...|       0.0|
|  0.0|(13377,[0,1,7,15,...|[87.1902520524093...|[0.87190252052409...|       0.0|
|  0.0|(13377,[0,1,17,19...|[83.4134013878543...|[0.83413401387854...|       0.0|
|  0.0|(13377,[0,1,20,26...|[87.2237016666320...|[0.87223701666632...|       0.0|
|  0.0|(13377,[0,1,26,34...|[84.9077787918036...|[0.84907778791803...|       0.0|
|  0.0|(13377,[0,1,42,67...|[87.2237016666320...|[0.87223701666632...|       0.0|
|  0.0|(13377,[0,1,70,10...|[87.2237016666320...|[0.87223701666632...|       0.0|
|  0.0|(13377,[0,1,150,1...|[87.2237016666320...|[0.87223701666632...|       0.0|
|  0.0|(13377,[0

## Logistic Regression

In [16]:
from pyspark.ml.classification import LogisticRegression

# Create a Logistic Regression classifier
lr = LogisticRegression(labelCol="label", featuresCol="features")

# Fit the Logistic Regression model on the training data
lr_model = lr.fit(training)

# Make predictions on the testing data
lr_predictions = lr_model.transform(testing)

# Show the results
lr_predictions.show()


23/11/15 13:40:27 WARN DAGScheduler: Broadcasting large task binary with size 1167.4 KiB
23/11/15 13:40:27 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:28 WARN DAGScheduler: Broadcasting large task binary with size 1168.1 KiB
23/11/15 13:40:28 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:28 WARN DAGScheduler: Broadcasting large task binary with size 1168.1 KiB
23/11/15 13:40:28 WARN DAGScheduler: Broadcasting large task binary with size 1168.1 KiB
23/11/15 13:40:28 WARN DAGScheduler: Broadcasting large task binary with size 1168.1 KiB
23/11/15 13:40:28 WARN DAGScheduler: Broadcasting large task binary with size 1168.1 KiB
23/11/15 13:40:28 WARN DAGScheduler: Broadcasting large ta

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13377,[0,1,2,13,...|[13.8153428869027...|[0.99999899983331...|       0.0|
|  0.0|(13377,[0,1,5,20,...|[12.1399618295425...|[0.99999465830303...|       0.0|
|  0.0|(13377,[0,1,7,15,...|[19.3953947531997...|[0.99999999622699...|       0.0|
|  0.0|(13377,[0,1,17,19...|[14.0644323385741...|[0.99999922035965...|       0.0|
|  0.0|(13377,[0,1,20,26...|[15.4833586150103...|[0.99999981134743...|       0.0|
|  0.0|(13377,[0,1,26,34...|[16.0262759335150...|[0.99999989038329...|       0.0|
|  0.0|(13377,[0,1,42,67...|[16.5537799719103...|[0.99999993531781...|       0.0|
|  0.0|(13377,[0,1,70,10...|[17.3795650912764...|[0.99999997167624...|       0.0|
|  0.0|(13377,[0,1,150,1...|[15.0275025595969...|[0.99999970239622...|       0.0|
|  0.0|(13377,[0

23/11/15 13:40:30 WARN DAGScheduler: Broadcasting large task binary with size 1272.8 KiB
23/11/15 13:40:30 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


## Evaluation des modèles

In [17]:
pip install tabulate

Note: you may need to restart the kernel to use updated packages.


In [18]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the preprocessed data into training and testing sets
(training, testing) = clean_data.randomSplit([0.7, 0.3])  # Replace 'clean_data' with your actual preprocessed dataset

# Create and evaluate Naive Bayes model
nb = NaiveBayes(labelCol="label", featuresCol="features")
nb_model = nb.fit(training)
nb_predictions = nb_model.transform(testing)

# Create and evaluate Random Forest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
rf_model = rf.fit(training)
rf_predictions = rf_model.transform(testing)

# Create and evaluate Logistic Regression model
lr = LogisticRegression(labelCol="label", featuresCol="features")
lr_model = lr.fit(training)
lr_predictions = lr_model.transform(testing)

# Define a function to evaluate and print metrics
def evaluate_model(model, predictions):
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    print(f"Model: {model.__class__.__name__}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-score: {f1:.4f}")
    print()

# Evaluate and print metrics for each model
evaluate_model(nb, nb_predictions)
evaluate_model(rf, rf_predictions)
evaluate_model(lr, lr_predictions)


23/11/15 13:40:33 WARN DAGScheduler: Broadcasting large task binary with size 1166.4 KiB
23/11/15 13:40:33 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:34 WARN DAGScheduler: Broadcasting large task binary with size 1143.9 KiB
23/11/15 13:40:34 WARN DAGScheduler: Broadcasting large task binary with size 1163.3 KiB
23/11/15 13:40:34 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:34 WARN DAGScheduler: Broadcasting large task binary with size 1163.4 KiB
23/11/15 13:40:34 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:34 WARN DAGSchedu

Model: NaiveBayes
Precision: 0.9437
Recall: 0.9080
F1-score: 0.9173



23/11/15 13:40:43 WARN DAGScheduler: Broadcasting large task binary with size 1472.7 KiB
23/11/15 13:40:43 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:43 WARN DAGScheduler: Broadcasting large task binary with size 1472.7 KiB
23/11/15 13:40:43 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:43 WARN DAGScheduler: Broadcasting large task binary with size 1472.7 KiB
23/11/15 13:40:43 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


Model: RandomForestClassifier
Precision: 0.7630
Recall: 0.8735
F1-score: 0.8145



23/11/15 13:40:43 WARN DAGScheduler: Broadcasting large task binary with size 1277.8 KiB
23/11/15 13:40:43 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:44 WARN DAGScheduler: Broadcasting large task binary with size 1277.8 KiB
23/11/15 13:40:44 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


Model: LogisticRegression
Precision: 0.9832
Recall: 0.9834
F1-score: 0.9832



23/11/15 13:40:44 WARN DAGScheduler: Broadcasting large task binary with size 1277.8 KiB
23/11/15 13:40:44 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


In [19]:
from tabulate import tabulate

def evaluate_model_table(model, predictions):
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    table = [
        ["Model", "Precision", "Recall", "F1-Score"],
        [model.__class__.__name__, f"{precision:.4f}", f"{recall:.4f}", f"{f1:.4f}"]
    ]

    print(tabulate(table, headers="firstrow", tablefmt="grid"))

# Evaluate and print metrics for each model in a table
evaluate_model_table(nb, nb_predictions)
evaluate_model_table(rf, rf_predictions)
evaluate_model_table(lr, lr_predictions)


23/11/15 13:40:44 WARN DAGScheduler: Broadcasting large task binary with size 1376.0 KiB
23/11/15 13:40:44 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:45 WARN DAGScheduler: Broadcasting large task binary with size 1376.0 KiB
23/11/15 13:40:45 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:45 WARN DAGScheduler: Broadcasting large task binary with size 1376.0 KiB
23/11/15 13:40:45 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


+------------+-------------+----------+------------+
| Model      |   Precision |   Recall |   F1-Score |
| NaiveBayes |      0.9437 |    0.908 |     0.9173 |
+------------+-------------+----------+------------+


23/11/15 13:40:45 WARN DAGScheduler: Broadcasting large task binary with size 1472.7 KiB
23/11/15 13:40:45 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:45 WARN DAGScheduler: Broadcasting large task binary with size 1472.7 KiB
23/11/15 13:40:45 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:46 WARN DAGScheduler: Broadcasting large task binary with size 1472.7 KiB
23/11/15 13:40:46 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


+------------------------+-------------+----------+------------+
| Model                  |   Precision |   Recall |   F1-Score |
| RandomForestClassifier |       0.763 |   0.8735 |     0.8145 |
+------------------------+-------------+----------+------------+


23/11/15 13:40:46 WARN DAGScheduler: Broadcasting large task binary with size 1277.8 KiB
23/11/15 13:40:46 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv
23/11/15 13:40:46 WARN DAGScheduler: Broadcasting large task binary with size 1277.8 KiB
23/11/15 13:40:46 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


+--------------------+-------------+----------+------------+
| Model              |   Precision |   Recall |   F1-Score |
| LogisticRegression |      0.9832 |   0.9834 |     0.9832 |
+--------------------+-------------+----------+------------+


23/11/15 13:40:46 WARN DAGScheduler: Broadcasting large task binary with size 1277.8 KiB
23/11/15 13:40:46 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 2
CSV file: file:///home/ouzema/Big%20Data/spam.csv


#### Enregistrement du modèle  ####

In [26]:
# Save your model as a directory of files
#lr_model.save("lr_model")
#rf_model.save("rf_model")
#nb_model.save("nb_model")

### Utilisation du modèle

In [21]:
new_dataset = spark.read.csv("spam2.csv", header=True, inferSchema=True)

In [22]:
new_dataset.show()

+--------------------+--------------------+--------------------+--------------------+-----+
|          COMMENT_ID|              AUTHOR|                DATE|             CONTENT|CLASS|
+--------------------+--------------------+--------------------+--------------------+-----+
|z13lgffb5w3ddx1ul...|          dharma pal|2015-05-29 02:30:...|          Nice song﻿|    0|
|z123dbgb0mqjfxbtz...|       Tiza Arellano|2015-05-29 00:14:...|       I love song ﻿|    0|
|z12quxxp2vutflkxv...|Prìñçeśś Âliś Łøv...|2015-05-28 21:00:...|       I love song ﻿|    0|
|z12icv3ysqvlwth2c...|       Eric Gonzalez|2015-05-28 20:47:...|860,000,000 lets ...|    0|
|z133stly3kete3tly...|       Analena López|2015-05-28 17:08:...|shakira is best f...|    0|
|z12myn4rltf4ejddv...| jehoiada wellington|2015-05-28 17:06:...|The best world cu...|    0|
|z135vzqy1yrjhluew...|    Kara Cuthbertson|2015-05-28 15:46:...|             I love﻿|    0|
|z12uujnj2sifvzvav...|       Sudheer Yadav|2015-05-28 10:28:...|SEE SOME MORE SO

In [23]:
new_dataset = new_dataset.select("CONTENT", "CLASS")
new_dataset.show()

+--------------------+-----+
|             CONTENT|CLASS|
+--------------------+-----+
|          Nice song﻿|    0|
|       I love song ﻿|    0|
|       I love song ﻿|    0|
|860,000,000 lets ...|    0|
|shakira is best f...|    0|
|The best world cu...|    0|
|             I love﻿|    0|
|SEE SOME MORE SON...|    1|
|           Awesome ﻿|    0|
|   I like shakira..﻿|    0|
|Shakira - Waka Wa...|    0|
|Why so many disli...|    0|
|I don&#39;t think...|    0|
|          Love song﻿|    0|
|          wery good﻿|    0|
|Every time I hear...|    0|
|Whose watching th...|    0|
|I love this song ...|    0|
|i love this song ...|    0|
|      Waka best one﻿|    0|
+--------------------+-----+
only showing top 20 rows



### Traitement de la nouvelle dataset  ###

In [24]:
from pyspark.sql.functions import length
new_dataset = new_dataset.withColumn('length',length(new_dataset['CONTENT']))
new_dataset.show()

+--------------------+-----+------+
|             CONTENT|CLASS|length|
+--------------------+-----+------+
|          Nice song﻿|    0|    10|
|       I love song ﻿|    0|    13|
|       I love song ﻿|    0|    13|
|860,000,000 lets ...|    0|    86|
|shakira is best f...|    0|    29|
|The best world cu...|    0|    33|
|             I love﻿|    0|     7|
|SEE SOME MORE SON...|    1|    60|
|           Awesome ﻿|    0|     9|
|   I like shakira..﻿|    0|    17|
|Shakira - Waka Wa...|    0|    56|
|Why so many disli...|    0|    34|
|I don&#39;t think...|    0|    47|
|          Love song﻿|    0|    10|
|          wery good﻿|    0|    10|
|Every time I hear...|    0|    89|
|Whose watching th...|    0|    40|
|I love this song ...|    0|    57|
|i love this song ...|    0|    34|
|      Waka best one﻿|    0|    14|
+--------------------+-----+------+
only showing top 20 rows



In [28]:
new_dataset.groupby('CLASS').mean().show()

+-----+----------+------------------+
|CLASS|avg(CLASS)|       avg(length)|
+-----+----------+------------------+
|    1|       1.0|191.17919075144508|
|    0|       0.0|31.321428571428573|
+-----+----------+------------------+



In [27]:
new_dataset = new_dataset.dropna(subset=["CLASS"])
new_dataset.show()

+--------------------+-----+------+
|             CONTENT|CLASS|length|
+--------------------+-----+------+
|          Nice song﻿|    0|    10|
|       I love song ﻿|    0|    13|
|       I love song ﻿|    0|    13|
|860,000,000 lets ...|    0|    86|
|shakira is best f...|    0|    29|
|The best world cu...|    0|    33|
|             I love﻿|    0|     7|
|SEE SOME MORE SON...|    1|    60|
|           Awesome ﻿|    0|     9|
|   I like shakira..﻿|    0|    17|
|Shakira - Waka Wa...|    0|    56|
|Why so many disli...|    0|    34|
|I don&#39;t think...|    0|    47|
|          Love song﻿|    0|    10|
|          wery good﻿|    0|    10|
|Every time I hear...|    0|    89|
|Whose watching th...|    0|    40|
|I love this song ...|    0|    57|
|i love this song ...|    0|    34|
|      Waka best one﻿|    0|    14|
+--------------------+-----+------+
only showing top 20 rows



In [29]:
# Import the lit() function
from pyspark.sql.functions import lit

# Convert the values 0 and 1 to strings
new_dataset = new_dataset.withColumn("CLASS", when(new_dataset["CLASS"] == 0, lit("ham")).otherwise(lit("spam")))

In [30]:
# Rename the CONTENT column to text
new_dataset = new_dataset.withColumnRenamed("CONTENT", "text")
# Rename the CLASS column to class
new_dataset = new_dataset.withColumnRenamed("CLASS", "class")
# Show the updated dataframe
new_dataset.show()

+--------------------+-----+------+
|                text|class|length|
+--------------------+-----+------+
|          Nice song﻿|  ham|    10|
|       I love song ﻿|  ham|    13|
|       I love song ﻿|  ham|    13|
|860,000,000 lets ...|  ham|    86|
|shakira is best f...|  ham|    29|
|The best world cu...|  ham|    33|
|             I love﻿|  ham|     7|
|SEE SOME MORE SON...| spam|    60|
|           Awesome ﻿|  ham|     9|
|   I like shakira..﻿|  ham|    17|
|Shakira - Waka Wa...|  ham|    56|
|Why so many disli...|  ham|    34|
|I don&#39;t think...|  ham|    47|
|          Love song﻿|  ham|    10|
|          wery good﻿|  ham|    10|
|Every time I hear...|  ham|    89|
|Whose watching th...|  ham|    40|
|I love this song ...|  ham|    57|
|i love this song ...|  ham|    34|
|      Waka best one﻿|  ham|    14|
+--------------------+-----+------+
only showing top 20 rows



In [31]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')
clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [32]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])
cleaner_new = data_prep_pipe.fit(new_dataset)
clean_new_dataset = cleaner.transform(new_dataset)

In [33]:
# Import the LogisticRegressionModel class from the pyspark.ml.classification module
from pyspark.ml.classification import LogisticRegressionModel

In [34]:
# Import the pyspark.ml.classification module
import pyspark.ml.classification as cl
# Use the dot notation to access the LogisticRegressionModel class
lr_model = cl.LogisticRegressionModel.load("lr_model")

In [35]:
# Make predictions on the new dataset
predictions = lr_model.transform(clean_new_dataset)

In [37]:
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the preprocessed data into training and testing sets
(training, testing) = clean_new_dataset.randomSplit([0.7, 0.3])  

# Create and evaluate Naive Bayes model
nb = NaiveBayes(labelCol="label", featuresCol="features")
nb_model = nb.fit(training)
nb_predictions = nb_model.transform(testing)

# Create and evaluate Random Forest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
rf_model = rf.fit(training)
rf_predictions = rf_model.transform(testing)

# Create and evaluate Logistic Regression model
lr = LogisticRegression(labelCol="label", featuresCol="features")
lr_model = lr.fit(training)
lr_predictions = lr_model.transform(testing)

# Define a function to evaluate and print metrics
def evaluate_model(model, predictions):
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    print(f"Model: {model.__class__.__name__}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-score: {f1:.4f}")
    print()

# Evaluate and print metrics for each model
evaluate_model(nb, nb_predictions)
evaluate_model(rf, rf_predictions)
evaluate_model(lr, lr_predictions)


23/11/15 13:42:34 WARN DAGScheduler: Broadcasting large task binary with size 1189.8 KiB
23/11/15 13:42:34 WARN DAGScheduler: Broadcasting large task binary with size 1149.2 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1186.7 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1186.7 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1325.2 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1420.9 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1450.2 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1477.4 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1500.3 KiB
23/11/15 13:42:35 WARN DAGScheduler: Broadcasting large task binary with size 1523.7 KiB
23/11/15 13:42:36 WARN DAGScheduler: Broadcasting large task binary with size 1190.8 KiB
23/11/15 13:42:36 WAR

Model: NaiveBayes
Precision: 0.7780
Recall: 0.6080
F1-score: 0.5307



23/11/15 13:42:38 WARN DAGScheduler: Broadcasting large task binary with size 1467.1 KiB
23/11/15 13:42:38 WARN DAGScheduler: Broadcasting large task binary with size 1467.1 KiB


Model: RandomForestClassifier
Precision: 0.7778
Recall: 0.5920
F1-score: 0.5170



23/11/15 13:42:38 WARN DAGScheduler: Broadcasting large task binary with size 1203.4 KiB
23/11/15 13:42:39 WARN DAGScheduler: Broadcasting large task binary with size 1203.4 KiB


Model: LogisticRegression
Precision: 0.8968
Recall: 0.8960
F1-score: 0.8959



23/11/15 13:42:39 WARN DAGScheduler: Broadcasting large task binary with size 1203.4 KiB
