In [31]:
import pickle
from pyspark.sql import SparkSession
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

class MyModel:
    def __init__(self):
        self.spark = SparkSession.builder.appName("MyApp").getOrCreate()
        self.model = None

    def __getstate__(self):
        state = self.__dict__.copy()
        # del state['model']
        # del state['spark']
        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
        self.spark = SparkSession.builder.appName("MyApp").getOrCreate()
        self.model = None

    def indexer(self,input_df):
        indexer = StringIndexer(inputCols=[
        'Solution Key',
        'Error Information',
        'Source Line Number',
        'Remote IP Address',
        'Program/Method/Function Module',
        'Package',
        'Name of Method or Function Module',
        'Name of Class or Program',
        'Message Area',
        'Message number',
        'Expiry Date',
        'Error Subcategory',
        'Error Short Text',
        'Application component ID',
        'Application Area',
        'ABAP Name of Consumer or Server Proxy',
        'Error Log Information',
        'Sender party',
        'Sender interface operation',
        'Sender interface namespace',
        'Sender interface name',
        'Receiver interface operation',
        'Receiver interface namespace',
        'Receiver interface name'],
        outputCols=[
        'label',
        'Error Information_index',
        'Source Line Number_index',
        'Remote IP Address_index',
        'Program/Method/Function Module_index',
        'Package_index',
        'Name of Method or Function Module_index',
        'Name of Class or Program_index',
        'Message Area_index',
        'Message number_index',
        'Expiry Date_index',
        'Error Subcategory_index',
        'Error Short Text_index',
        'Application component ID_index',
        'Application Area_index',
        'ABAP Name of Consumer or Server Proxy_index',
        'Error Log Information_index',
        'Sender party_index',
        'Sender interface operation_index',
        'Sender interface namespace_index',
        'Sender interface name_index',
        'Receiver interface operation_index',
        'Receiver interface namespace_index',
        'Receiver interface name_index']).setHandleInvalid("keep")
        # Getting the training data
        return indexer.fit(input_df).transform(input_df)
        

    def train(self, input_df):
        # Convert the input dataframe to a format expected by the Naive Bayes model
        
        df = self.assembler(input_df)

        # Train the Naive Bayes model
        self.model = NaiveBayes(smoothing=1.0, modelType="multinomial").fit(df)

    def assembler(self,input_df):
            numericCols = [
            'Error Information_index',
            'Source Line Number_index',
            'Remote IP Address_index',
            'Program/Method/Function Module_index',
            'Package_index',
            'Name of Method or Function Module_index',
            'Name of Class or Program_index',
            'Message Area_index',
            'Message number_index',
            'Expiry Date_index',
            'Error Subcategory_index',
            'Error Short Text_index',
            'Application component ID_index',
            'Application Area_index',
            'ABAP Name of Consumer or Server Proxy_index',
            'Error Log Information_index',
            'Sender party_index',
            'Sender interface operation_index',
            'Sender interface namespace_index',
            'Sender interface name_index',
            'Receiver interface operation_index',
            'Receiver interface namespace_index',
            'Receiver interface name_index'
                ]
            assembler = VectorAssembler(inputCols=numericCols, outputCol='features')
            return assembler.transform(input_df).select('features', 'label')
    def evaluate(self, input_df):
        # Convert the input dataframe to a format expected by the Naive Bayes model
       
        df=self.assembler(input_df)
        # Make predictions using the Naive Bayes model
        # if self.model is not None:
        #     predictions = self.model.transform(df)
        predictions = self.model.transform(df)
        

        # Evaluate the accuracy of the predictions
        evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
        accuracy = evaluator.evaluate(predictions)
        return accuracy

# Load the input data
spark = SparkSession.builder.appName("MyApp").getOrCreate()
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("Token1_Falcon_HC_Model_Data_CSV.csv")


# Split the data into training and testing sets


# Create an instance of MyModel
model = MyModel()
df=model.indexer(df)
# train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df=df
test_df=df.where(df.Index>113)

# Train the model
model.train(train_df)

# Evaluate the model
accuracy = model.evaluate(test_df)
print(f"Accuracy: {accuracy}")


Accuracy: 0.7755102040816326
