In [11]:
import findspark
findspark.init("/home/raj/spark")

In [12]:
# import modules 
#converting pyspark sql dataframe into labeled Rdd Dataframe
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col
from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# import naive bayes model module
from pyspark.mllib.classification import NaiveBayes , NaiveBayesModel

def Naive_bayes():
    
#create sparksession
    spark = SparkSession \
        .builder \
        .appName("naive_bayes") \
        .getOrCreate()
	
#creating dataframe	
    ad_data= spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/home/raj/Downloads/notenook/adult2.csv")

    cols = ad_data.columns

# dependent variable on which our model will predict
    categoricalcolumns = ["workclass"]

# convert raw features to numberic features and one hot encoding for features representation 

    stages = []
    for categoricalcol in categoricalcolumns:
        stringindexer = StringIndexer(inputCol=categoricalcol , outputCol=categoricalcol+"index")
        encoder = OneHotEncoder(inputCol=categoricalcol+"index" , outputCol=categoricalcol+"classvec")
        stages += [stringindexer , encoder]
    print(stages)

#convert string features into numeric features
    label_stringidx = StringIndexer(inputCol= "income" , outputCol= "label")


    stages += [label_stringidx]


    numeric_cols = ['age' , 'hours_per_week']

#transforming all feature vector into single vector 
    assemblerinputs  = list(map(lambda c: c + "classvec" , categoricalcolumns)) + numeric_cols


    assembler = VectorAssembler(inputCols=assemblerinputs , outputCol="features")


    stages += [assembler]


    print(stages)
    print(assembler)


#creating pipeline and Transforming the data
    pipline = Pipeline(stages=stages)

    piplinemodel = pipline.fit(ad_data)


    dataset = piplinemodel.transform(ad_data)

    dataset.printSchema

# keep relevant col.
#selectedcols =  ["label" , "features"] + cols

    selectedcols =  ["label" , "features"]
    dataset = dataset.select(selectedcols)

    trainingData, testData = dataset.randomSplit([0.7, 0.3], seed = 100)
#trainingData.show()



    def as_old(v):
        if isinstance(v, ml_linalg.SparseVector):
            return mllib_linalg.SparseVector(v.size, v.indices, v.values)
        if isinstance(v, ml_linalg.DenseVector):
            return mllib_linalg.DenseVector(v.values)
        raise ValueError("Unsupported type {0}".format(type(v)))
    
#mapping sql dataframe to Rdd_dataframe
    trainingDataRdd=trainingData.rdd.map(lambda p:LabeledPoint(p.label,as_old(p.features)))
    testDataRdd=testData.rdd.map(lambda p:LabeledPoint(p.label,as_old(p.features)))

# model training
    model = NaiveBayes.train(trainingDataRdd, 1.0)


#model prediction and 
    predictionAndLabel = testDataRdd.map(lambda p: (model.predict(p.features), p.label))
    accuracy = 1.0 * predictionAndLabel.filter(lambda pl: pl[0] == pl[1]).count() / testDataRdd.count()
    print('model accuracy {}'.format(accuracy))
    
    return accuracy

