# SPARK ML

## 1. Import spark and ML libraries

In [43]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\Program Files\\spark-3.3.0-bin-hadoop3\\'

In [None]:
#sc.stop()

In [44]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession.builder.getOrCreate()

In [45]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, StringIndexer
from pyspark.sql import Row

## 2. Load training and test datasets

In [46]:
from pyspark.sql import Row

def load_dataframe(path):
    rdd = sc.textFile(path)\
        .map(lambda line: line.split())\
        .map(lambda words: Row(label=words[0], words=words[1:]))
    return spark.createDataFrame(rdd)

# Load train and test data
train_data = load_dataframe("20ng-train-all-terms.txt")
test_data = load_dataframe("20ng-test-all-terms.txt")

## 3. Vectorization

In [47]:
# Learn the vocabulary of our training data
vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
vectorizer_transformer = vectorizer.fit(train_data)

In [48]:
# Create bags of words for train and test data
train_bag_of_words = vectorizer_transformer.transform(train_data)
test_bag_of_words = vectorizer_transformer.transform(test_data)

In [49]:
# Convert string labels to floats
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
label_indexer_transformer = label_indexer.fit(train_bag_of_words)
train_bag_of_words = label_indexer_transformer.transform(train_bag_of_words)
test_bag_of_words = label_indexer_transformer.transform(test_bag_of_words)

## 4. Trainning

In [50]:
# Learn multiclass classifier on training data
classifier = NaiveBayes(
    labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted"
)
classifier_transformer = classifier.fit(train_bag_of_words)



## 5. Prediction and evaluation

In [51]:
# Predict labels on test data
test_predicted = classifier_transformer.transform(test_bag_of_words)

In [52]:
# Classifier evaluation
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index", predictionCol="label_index_predicted", metricName="accuracy"
)
accuracy = evaluator.evaluate(test_predicted)
print("Accuracy = {:.2f}".format(accuracy))

Accuracy = 0.80


## 6. work with a pipline

In [29]:
from pyspark.ml import Pipeline

In [30]:
vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
classifier = NaiveBayes(
    labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted",
)
pipeline = Pipeline(stages=[vectorizer, label_indexer, classifier])
pipeline_model = pipeline.fit(train_data)

test_predicted = pipeline_model.transform(test_data)

In [31]:
accuracy = evaluator.evaluate(test_predicted)
print("Accuracy = {:.2f}".format(accuracy))

Accuracy = 0.80
