<a href="https://colab.research.google.com/github/raushan9jnv/TextClassification/blob/main/ColabPysparkFinal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
#we will install Apache Spark 3.0.1 with Hadoop 2.7
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

In [None]:
#Now, we just need to unzip that folder
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

tar: spark-3.0.1-bin-hadoop2.7.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [None]:
#There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.
!pip install -q findspark

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark


#https://stackoverflow.com/questions/55240940/error-while-installing-spark-on-google-colab

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=16851206221bbb1ade42335442dfe2d99ded938767ca8ebe1b470c596d1e44fc
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

# Text Classification

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import first
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import mean, countDistinct,desc
from pyspark.sql.functions import lit


spark = SparkSession.builder.appName("TextClassificationModel").getOrCreate()

class TextClassificationModel:
    def __init__(self, dataframe, text_columns, categorical_column):
        self.df = dataframe
        self.text_columns = text_columns
        self.categorical_column = categorical_column
        
    def show_summary(self):
        print("DataFrame Schema:")
        self.df.printSchema()
        print("\nDataFrame Summary:")
        self.df.describe().show()
        print("\nDataFrame Head:")
        self.df.show(5)

 

    def clean_dataset(self):
        # perform any necessary cleaning of the dataset here

        # Extract categorical and numeric columns
        categorical_cols = [col for col in self.df.dtypes if col[1] == "string"]
        numeric_cols = [col for col in self.df.dtypes if col[1] in ["double", "int"]]

        # Handling null values for categorical columns
        for col, _ in categorical_cols:
            mode_value = self.df.groupBy().agg(countDistinct(col).alias("count")).sort(desc("count")).limit(1).collect()[0][0]
            self.df = self.df.fillna(mode_value, subset=[col])

        # Handling null values for numeric columns
        for col, _ in numeric_cols:
            mean_value = self.df.agg(mean(col)).first()[0]
            self.df = self.df.fillna(mean_value, subset=[col])

        return self.df

    def feature_segregation(self):
        print("feature segregation started")
        if len(self.text_columns) > 1:
            self.df = self.df.withColumn("text", concat_ws(" ", *self.text_columns))
        else:
            self.df = self.df.withColumnRenamed(self.text_columns[0], "text")
        self.X = self.df.select("text")
        self.y = self.df.select(self.categorical_column)
        print("feature segregation completed")

    def feature_extraction(self):
        print("feature extraction started")
        tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\s+")
        self.X = tokenizer.transform(self.X)

        hashing_tf = HashingTF(inputCol="tokens", outputCol="tf_features")
        self.X = hashing_tf.transform(self.X)

        idf = IDF(inputCol="tf_features", outputCol="features")
        self.X = idf.fit(self.X).transform(self.X)
        # print(self.X.show())
        print("feature extraction completed")

    def train_model(self):
        self.classes_ = self.y.distinct().collect()

        # Create a vector assembler to combine all feature columns into one vector column
        assembler = VectorAssembler(inputCols=["features"], outputCol="vector_features")
        data = assembler.transform(self.X)
        # add self.categorical_column with data
        # data = data.withColumn("Master_SOP", lit(self.categorical_column))
        print(data.show(5))


        # Split the data into training and testing datasets
        (trainingData, testData) = data.randomSplit([0.8, 0.2])
        print(self.categorical_column)
        print(self.categorical_column)

        self.models = {
            # 'Logistic Regression': LogisticRegression(featuresCol="vector_features"),
            # 'Naive Bayes': NaiveBayes(featuresCol="vector_features"),
            'Decision Tree': DecisionTreeClassifier(featuresCol="vector_features", labelCol=self.categorical_column),
            # 'Random Forest': RandomForestClassifier(featuresCol="vector_features", labelCol=self.categorical_column),
            # 'Linear SVC': LinearSVC(featuresCol="vector_features", labelCol=self.categorical_column)
        }

        self.best_model = None
        self.best_accuracy = 0
        for name, model in self.models.items():
            # Train the model
            model = model.fit(trainingData)

            # Evaluate the model on test data
            predictions = model.transform(testData)
            accuracy = predictions.filter(predictions[self.categorical_column] == predictions["prediction"]).count() / predictions.count()

            # Save the best model
            if accuracy > self.best_accuracy:
                self.best_accuracy = accuracy
                self.best_model = model
        
        print(f"Best Model: {list(self.models.keys())[list(self.models.values()).index(self.best_model)]} with accuracy: {self.best_accuracy}")



df_pyspark=spark.read.option('header','true').csv('data.csv',inferSchema=True)
text_columns = ["Ticket_Title", "Application"] # No text columns in this example
categorical_column = "Master_SOP"
model = TextClassificationModel(df_pyspark, text_columns, categorical_column)

model.show_summary()
model.clean_dataset()
model.feature_segregation()
model.feature_extraction()
model.train_model()

        




DataFrame Schema:
root
 |-- Ticket_Number: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Ticket_Title: string (nullable = true)
 |-- Ticket_Description: string (nullable = true)
 |-- Support_Group: string (nullable = true)
 |-- Application: string (nullable = true)
 |-- Resolution_Description: string (nullable = true)
 |-- Master_SOP: string (nullable = true)
 |-- EASE: string (nullable = true)
 |-- extracted_phrase: string (nullable = true)
 |-- last_updated: string (nullable = true)


DataFrame Summary:
+-------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+----------------------+-----------------+--------------------+--------------------+--------------------+
|summary|       Ticket_Number|           Priority|        Ticket_Title|  Ticket_Description|     Support_Group|         Application|Resolution_Description|       Master_SOP|                EASE|    extracted_phrase|        last_

IllegalArgumentException: ignored

In [None]:
## read the dataset
df_pyspark=spark.read.option('header','true').csv('data.csv',inferSchema=True)
print("DataFrame Schema:")
df_pyspark.printSchema()
print("\nDataFrame Summary:")
df_pyspark.describe().show()
print("\nDataFrame Head:")
df_pyspark.show(5)

DataFrame Schema:
root
 |-- Ticket_Number: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Ticket_Title: string (nullable = true)
 |-- Ticket_Description: string (nullable = true)
 |-- Support_Group: string (nullable = true)
 |-- Application: string (nullable = true)
 |-- Resolution_Description: string (nullable = true)
 |-- Master_SOP: string (nullable = true)
 |-- EASE: string (nullable = true)
 |-- extracted_phrase: string (nullable = true)
 |-- last_updated: string (nullable = true)


DataFrame Summary:
+-------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+----------------------+-----------------+--------------------+--------------------+--------------------+
|summary|       Ticket_Number|           Priority|        Ticket_Title|  Ticket_Description|     Support_Group|         Application|Resolution_Description|       Master_SOP|                EASE|    extracted_phrase|        last_

In [None]:
df.summary()

DataFrame[summary: string, Ticket_Number: string, Priority: string, Ticket_Title: string, Ticket_Description: string, Support_Group: string, Application: string, Resolution_Description: string, Master_SOP: string, EASE: string, extracted_phrase: string, last_updated: string]

In [None]:
# EXAMPLW
import pandas as pd
import numpy as np

# # Create a sample DataFrame
# df = pd.DataFrame({
#     "name": ["John", "Jane", "Jim", "Jessica", "Jack"],
#     "age": [31, 32, np.nan, 35, np.nan],
#     "gender": ["Male", "Female", "Male", "Female", "Male"],
#     "income": [50000, 55000, np.nan, 60000, np.nan]
# })

# # Initialize the TextClassificationModel
text_columns = [] # No text columns in this example
categorical_column = []
model = TextClassificationModel(df_pyspark, text_columns, categorical_column)

# Show the summary of the DataFrame
# model.show_summary()

# Clean the DataFrame
model.clean_dataset()

# Show the summary of the cleaned DataFrame
model.show_summary()


DataFrame Schema:
root
 |-- Ticket_Number: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Ticket_Title: string (nullable = true)
 |-- Ticket_Description: string (nullable = true)
 |-- Support_Group: string (nullable = true)
 |-- Application: string (nullable = true)
 |-- Resolution_Description: string (nullable = true)
 |-- Master_SOP: string (nullable = true)
 |-- EASE: string (nullable = true)
 |-- extracted_phrase: string (nullable = true)
 |-- last_updated: string (nullable = true)


DataFrame Summary:
+-------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+----------------------+-----------------+--------------------+--------------------+--------------------+
|summary|       Ticket_Number|           Priority|        Ticket_Title|  Ticket_Description|     Support_Group|         Application|Resolution_Description|       Master_SOP|                EASE|    extracted_phrase|        last_

In [None]:
from pyspark.sql.functions import mean, mode

class TextClassificationModel:
    def __init__(self, dataframe, text_columns, categorical_column):
        self.df = dataframe
        self.text_columns = text_columns
        self.categorical_column = categorical_column

    def clean_dataset(self):
        # perform any necessary cleaning of the dataset here

        # Extract categorical and numeric columns
        categorical_cols = [col for col in self.df.dtypes if col[1] == "string"]
        numeric_cols = [col for col in self.df.dtypes if col[1] in ["double", "int"]]

        # Handling null values for categorical columns
        for col, _ in categorical_cols:
            self.df = self.df.fillna(self.df.groupBy().agg(mode(col).alias(col)), subset=[col])

        # Handling null values for numeric columns
        for col, _ in numeric_cols:
            self.df = self.df.fillna(self.df.agg(mean(col)).first()[0], subset=[col])

        return self.df


ImportError: ignored

In [None]:
from pyspark.sql.functions import mean, countDistinct,desc


class TextClassificationModel:
    def __init__(self, dataframe, text_columns, categorical_column):
        self.df = dataframe
        self.text_columns = text_columns
        self.categorical_column = categorical_column

    def clean_dataset(self):
        # perform any necessary cleaning of the dataset here

        # Extract categorical and numeric columns
        categorical_cols = [col for col in self.df.dtypes if col[1] == "string"]
        numeric_cols = [col for col in self.df.dtypes if col[1] in ["double", "int"]]

        # Handling null values for categorical columns
        for col, _ in categorical_cols:
            mode_value = self.df.groupBy().agg(countDistinct(col)).sort(desc("count(DISTINCT %s)" % col)).limit(1).collect()[0][0]
            self.df = self.df.fillna(mode_value, subset=[col])

        # Handling null values for numeric columns
        for col, _ in numeric_cols:
            mean_value = self.df.agg(mean(col)).first()[0]
            self.df = self.df.fillna(mean_value, subset=[col])

        return self.df


In [None]:
from pyspark.sql.functions import mean, countDistinct, desc

class TextClassificationModel:
    def __init__(self, dataframe, text_columns, categorical_column):
        self.df = dataframe
        self.text_columns = text_columns
        self.categorical_column = categorical_column

    def clean_dataset(self):
        # perform any necessary cleaning of the dataset here

        # Extract categorical and numeric columns
        categorical_cols = [col for col in self.df.dtypes if col[1] == "string"]
        numeric_cols = [col for col in self.df.dtypes if col[1] in ["double", "int"]]

        # Handling null values for categorical columns
        for col, _ in categorical_cols:
            mode_value = self.df.groupBy().agg(countDistinct(col).alias("count")).sort(desc("count")).limit(1).collect()[0][0]
            self.df = self.df.fillna(mode_value, subset=[col])

        # Handling null values for numeric columns
        for col, _ in numeric_cols:
            mean_value = self.df.agg(mean(col)).first()[0]
            self.df = self.df.fillna(mean_value, subset=[col])

        return self.df


In [None]:
from pyspark.ml.feature import HashingTF, IDF

class TextClassificationModel:
    def __init__(self, dataframe, text_columns, categorical_column):
        self.df = dataframe
        self.text_columns = text_columns
        self.categorical_column = categorical_column

    def feature_segregation(self):
        from pyspark.sql.functions import concat_ws
        
        if len(self.text_columns) > 1:
            self.df = self.df.withColumn("text", concat_ws(" ", *self.text_columns))
        else:
            self.df = self.df.withColumnRenamed(self.text_columns[0], "text")
        self.X = self.df.select("text")
        print(self.X.show())
        self.y = self.df.select(self.categorical_column)
        print(self.y.show())

    def feature_extraction(self):
        hashing_tf = HashingTF(inputCol="text", outputCol="tf_features")
        self.X = hashing_tf.transform(self.X)
        idf = IDF(inputCol="tf_features", outputCol="features")
        self.X = idf.fit(self.X).transform(self.X)


text_columns = ["Ticket_Title", "Application"] # No text columns in this example
categorical_column = ["Master_SOP"]
model = TextClassificationModel(df_pyspark, text_columns, categorical_column)
model.feature_segregation()
model.feature_extraction()

+--------------------+
|                text|
+--------------------+
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE - Report/Data...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|JDE Incident Requ...|
|Need access to mo...|
|SIR - Incident ti...|
|open a ticket GRP...|
|JDE Incident Requ...|
|Function Key F2 k...|
|JDE Incident Requ...|
|issue with JDE GR...|
|JDE Incident Requ...|
+--------------------+
only showing top 20 rows

None
+--------------+
|    Master_SOP|
+--------------+
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
|Not Applicable|
+--------------+
only show

IllegalArgumentException: ignored

In [None]:
from pyspark.ml.feature import HashingTF, IDF, RegexTokenizer
from pyspark.sql.functions import concat_ws

class TextClassificationModel:
    def __init__(self, dataframe, text_columns, categorical_column):
        self.df = dataframe
        self.text_columns = text_columns
        self.categorical_column = categorical_column

    def feature_segregation(self):
        if len(self.text_columns) > 1:
            self.df = self.df.withColumn("text", concat_ws(" ", *self.text_columns))
        else:
            self.df = self.df.withColumnRenamed(self.text_columns[0], "text")
        self.X = self.df.select("text")
        self.y = self.df.select(self.categorical_column)

    def feature_extraction(self):
        tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\s+")
        self.X = tokenizer.transform(self.X)

        hashing_tf = HashingTF(inputCol="tokens", outputCol="tf_features")
        self.X = hashing_tf.transform(self.X)

        idf = IDF(inputCol="tf_features", outputCol="features")
        self.X = idf.fit(self.X).transform(self.X)
    
    def train_model(self):
        unique_classes = self.y.select(F.unique("y").alias("classes_"))
        self.X_train, self.X_test, self.y_train, self.y_test = self.X.randomSplit([0.8, 0.2], seed=123)
        
        models = {
            'Logistic Regression': LogisticRegression(),
            'Naive Bayes': MultinomialNB(),
            'Decision Tree': DecisionTreeClassifier(),
            'Random Forest': RandomForestClassifier(),
            'Linear SVC': LinearSVC(),
            'KNeighborsClassifier': KNeighborsClassifier(),
            'Gradient Boosting': GradientBoostingClassifier(),
            'AdaBoost': AdaBoostClassifier()
        }
        
        self.best_model = None
        self.best_accuracy = 0
        for name, model in models.items():
            vec_assembler = VectorAssembler(inputCols=self.X_train.columns, outputCol="features")
            pipeline = Pipeline(stages=[vec_assembler, model])
            pipeline_model = pipeline.fit(self.X_train)
            predictions = pipeline_model.transform(self.X_test)
            evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", metricName="areaUnderROC")
            accuracy = evaluator.evaluate(predictions)
            evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="weightedPrecision")
            precision = evaluator.evaluate(predictions)
            evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="weightedRecall")
            recall = evaluator.evaluate(predictions)
            evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="f1")
            f1 = evaluator.evaluate(predictions)
            print(f'{name}\n')
            print(f'{name} accuracy: {accuracy}')
            print(f'{name} precision: {precision}')
            print(f'{name} recall: {recall}')
            print(f'{name} f1 score: {f1}')
            print("-------------------------------------")
            if accuracy > self.best_accuracy:
                self.best_accuracy = accuracy
                self.best_model = pipeline_model
        print(self.best_model)



text_columns = ["Ticket_Title", "Application"] # No text columns in this example
categorical_column = ["Master_SOP"]
model = TextClassificationModel(df_pyspark, text_columns, categorical_column)
model.feature_segregation()
model.feature_extraction()
model.train_model()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, MultinomialNB, DecisionTreeClassifier, RandomForestClassifier, LinearSVC, KNeighborsClassifier, GradientBoostingClassifier, AdaBoostClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TrainModel").getOrCreate()

def train_model(self):
    unique_classes = self.y.select(F.unique("y").alias("classes_"))
    self.X_train, self.X_test, self.y_train, self.y_test = self.X.randomSplit([0.8, 0.2], seed=123)
    
    models = {
        'Logistic Regression': LogisticRegression(),
        'Naive Bayes': MultinomialNB(),
        'Decision Tree': DecisionTreeClassifier(),
        'Random Forest': RandomForestClassifier(),
        'Linear SVC': LinearSVC(),
        'KNeighborsClassifier': KNeighborsClassifier(),
        'Gradient Boosting': GradientBoostingClassifier(),
        'AdaBoost': AdaBoostClassifier()
    }
    
    self.best_model = None
    self.best_accuracy = 0
    for name, model in models.items():
        vec_assembler = VectorAssembler(inputCols=self.X_train.columns, outputCol="features")
        pipeline = Pipeline(stages=[vec_assembler, model])
        pipeline_model = pipeline.fit(self.X_train)
        predictions = pipeline_model.transform(self.X_test)
        evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", metricName="areaUnderROC")
        accuracy = evaluator.evaluate(predictions)
        evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="weightedPrecision")
        precision = evaluator.evaluate(predictions)
        evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="weightedRecall")
        recall = evaluator.evaluate(predictions)
        evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="f1")
        f1 = evaluator.evaluate(predictions)
        print(f'{name}\n')
        print(f'{name} accuracy: {accuracy}')
        print(f'{name} precision: {precision}')
        print(f'{name} recall: {recall}')
        print(f'{name} f1 score: {f1}')
        print("-------------------------------------")
        if accuracy > self.best_accuracy:
            self.best_accuracy = accuracy
            self.best_model = pipeline_model
    print(self.best_model)


ImportError: ignored

In [None]:
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, RandomForestClassifier, LinearSVC
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def train_model(self):
    # Convert categorical column to integer type
    self.y = self.y.withColumn(self.categorical_column, self.y[self.categorical_column].cast(IntegerType()))

    # Create a vector assembler to combine all feature columns into one vector column
    assembler = VectorAssembler(inputCols=["features"], outputCol="features")
    data = assembler.transform(self.X.join(self.y, on="text"))

    # Split the data into training and testing datasets
    (trainingData, testData) = data.randomSplit([0.8, 0.2])

    # Initialize different classifiers
    self.models = {
        'Logistic Regression': LogisticRegression(featuresCol="features", labelCol=self.categorical_column),
        'Naive Bayes': NaiveBayes(featuresCol="features", labelCol=self.categorical_column),
        'Decision Tree': DecisionTreeClassifier(featuresCol="features", labelCol=self.categorical_column),
        'Random Forest': RandomForestClassifier(featuresCol="features", labelCol=self.categorical_column),
        'Linear SVC': LinearSVC(featuresCol="features", labelCol=self.categorical_column)
    }

    self.best_model = None
    self.best_accuracy = 0
    for name, model in self.models.items():
        # Train the model
        classifier = model.fit(trainingData)

        # Evaluate the model on test data
        predictions = classifier.transform(testData)
        evaluator = MulticlassClassificationEvaluator(labelCol=self.categorical_column, predictionCol="prediction", metricName="accuracy")
        accuracy = evaluator.evaluate(predictions)

        # Choose the best model based on accuracy
        if accuracy > self.best_accuracy:
            self.best_accuracy = accuracy
            self.best_model = classifier
        
    print("Best Model: ", list(self.models.keys())[list(self.models.values()).index(self.best_model)])
    print("Accuracy: ", self.best_accuracy)


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier, RandomForestClassifier, LinearSVC

def train_model(self):
    self.classes_ = self.y.distinct().collect()

    # Create a vector assembler to combine all feature columns into one vector column
    assembler = VectorAssembler(inputCols=["features"], outputCol="vector_features")
    data = assembler.transform(self.X)

    # Split the data into training and testing datasets
    (trainingData, testData) = data.randomSplit([0.8, 0.2])

    self.models = {
        'Logistic Regression': LogisticRegression(featuresCol="vector_features", labelCol=self.categorical_column),
        'Naive Bayes': NaiveBayes(featuresCol="vector_features", labelCol=self.categorical_column),
        'Decision Tree': DecisionTreeClassifier(featuresCol="vector_features", labelCol=self.categorical_column),
        'Random Forest': RandomForestClassifier(featuresCol="vector_features", labelCol=self.categorical_column),
        'Linear SVC': LinearSVC(featuresCol="vector_features", labelCol=self.categorical_column)
    }

    self.best_model = None
    self.best_accuracy = 0
    for name, model in self.models.items():
        # Train the model
        model = model.fit(trainingData)

        # Evaluate the model on test data
        predictions = model.transform(testData)
        accuracy = predictions.filter(predictions[self.categorical_column] == predictions["prediction"]).count() / predictions.count()

        # Save the best model
        if accuracy > self.best_accuracy:
            self.best_accuracy = accuracy
            self.best_model = model
    
    print(f"Best Model: {list(self.models.keys())[list(self.models.values()).index(self.best_model)]} with accuracy: {self.best_accuracy}")
