In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder
from sklearn.metrics import accuracy_score
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import math
from pyspark.ml.feature import VectorAssembler
from math import log

In [2]:
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("NaiveBayes")\
    .getOrCreate()

In [3]:
training_data = spark.read.csv('../train.csv', header=True, inferSchema=True)
cols_to_drop = ['_c0','id']
training_data = training_data.drop(*cols_to_drop)
training_data = training_data.toPandas()
train = pd.DataFrame(training_data)


test_data = spark.read.csv('../test.csv', header=True, inferSchema=True)
cols_to_drop = ['_c0','id']
test_data = test_data.drop(*cols_to_drop)
test_data = test_data.toPandas()
test = pd.DataFrame(test_data)

In [4]:
numerical_columns = ['Age', 'Flight Distance', 'Departure Delay in Minutes','Arrival Delay in Minutes']
nominal_columns = ['Gender', 'Customer Type','Type of Travel', 'Class','satisfaction']

In [5]:
# drop rows with nulls values
train = train.dropna()

# encode nominal features
train_mappings = {}

for col in nominal_columns:
    encoder = LabelEncoder()
    train[col] = encoder.fit_transform(train[col])
    mapping = {category: label for category, label in zip(encoder.classes_, encoder.transform(encoder.classes_))}
    train_mappings[col] = mapping

print(train_mappings)


# scaling numerical features
for col in numerical_columns:
#     scaler = StandardScaler()
#     train[col] = scaler.fit_transform(train[[col]])
        train[col] = pd.cut(train[col], bins=10, labels=False)
train

{'Gender': {'Female': 0, 'Male': 1}, 'Customer Type': {'Loyal Customer': 0, 'disloyal Customer': 1}, 'Type of Travel': {'Business travel': 0, 'Personal Travel': 1}, 'Class': {'Business': 0, 'Eco': 1, 'Eco Plus': 2}, 'satisfaction': {'neutral or dissatisfied': 0, 'satisfied': 1}}


Unnamed: 0,Gender,Customer Type,Age,Type of Travel,Class,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,...,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes,satisfaction
0,1,0,0,1,2,0,3,4,3,1,...,5,4,3,4,4,5,5,0,0,0
1,1,1,2,0,0,0,3,2,3,3,...,1,1,5,3,1,4,1,0,0,0
2,0,0,2,0,0,2,2,2,2,2,...,5,4,3,4,4,4,5,0,0,1
3,0,0,2,0,0,1,2,5,5,5,...,2,2,5,3,1,4,2,0,0,0
4,1,0,6,0,0,0,3,3,3,3,...,3,3,4,4,3,3,3,0,0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
103899,0,1,2,0,1,0,2,1,2,3,...,2,3,1,4,2,3,2,0,0,0
103900,1,0,5,0,0,4,4,4,4,4,...,5,5,5,5,5,5,4,0,0,1
103901,1,1,2,0,0,3,1,1,1,3,...,4,3,2,4,5,5,4,0,0,0
103902,0,1,1,0,1,1,1,1,1,5,...,1,4,5,1,5,4,1,0,0,0


In [6]:
# drop rows with nulls values
test = test.dropna()

# encode nominal features
test_mappings = {}

for col in nominal_columns:
    encoder = LabelEncoder()
    test[col] = encoder.fit_transform(test[col])
    mapping = {category: label for category, label in zip(encoder.classes_, encoder.transform(encoder.classes_))}
    test_mappings[col] = mapping

print(test_mappings)


# scaling numerical features
for col in numerical_columns:
    # scaler = StandardScaler()
    # test[col] = scaler.fit_transform(test[[col]])
    test[col] = pd.cut(test[col], bins=10, labels=False)


test

{'Gender': {'Female': 0, 'Male': 1}, 'Customer Type': {'Loyal Customer': 0, 'disloyal Customer': 1}, 'Type of Travel': {'Business travel': 0, 'Personal Travel': 1}, 'Class': {'Business': 0, 'Eco': 1, 'Eco Plus': 2}, 'satisfaction': {'neutral or dissatisfied': 0, 'satisfied': 1}}


Unnamed: 0,Gender,Customer Type,Age,Type of Travel,Class,Flight Distance,Inflight wifi service,Departure/Arrival time convenient,Ease of Online booking,Gate location,...,Inflight entertainment,On-board service,Leg room service,Baggage handling,Checkin service,Inflight service,Cleanliness,Departure Delay in Minutes,Arrival Delay in Minutes,satisfaction
0,0,0,5,0,1,0,5,4,3,4,...,5,5,5,5,2,5,5,0,0,1
1,0,0,3,0,0,5,1,1,3,1,...,4,4,4,4,3,4,5,0,0,1
2,1,1,1,0,1,0,2,0,2,4,...,2,4,1,3,2,2,2,0,0,0
3,1,0,4,0,0,6,0,0,0,2,...,1,1,1,1,3,1,4,0,0,1
4,0,0,5,0,1,2,2,3,4,3,...,2,2,2,2,4,2,4,0,0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
25971,1,1,3,0,0,0,3,3,3,1,...,4,3,2,4,4,5,4,0,0,0
25972,1,0,2,0,0,1,4,4,4,4,...,4,4,5,5,5,5,4,0,0,1
25973,0,0,1,1,1,1,2,5,1,5,...,2,4,3,4,5,4,2,0,0,0
25974,1,0,0,0,0,2,3,3,3,3,...,4,3,2,5,4,5,4,0,0,1


In [7]:
train_spark = spark.createDataFrame(train)
test_spark = spark.createDataFrame(test)

In [8]:
FEATURES_COL = train_spark.columns[:-1]
LABEL_COL = train_spark.columns[-1]

In [9]:
# calculate the prior probabilities

# map function
# input: data 
# output: (class, 1) --> (0,1) (0,1) (1,1) ....
def prior_mapper(lines):
    pairs = []
    for line in lines:
        pairs.append((line[LABEL_COL], 1))
    return pairs

# shuffle & sort
# output: (class, list of 1s) 

# reduce:
# output: (class, sum of 1s)
def prior_reducer(pair):
    class_value, values = pair
    count = 0
    for value in values:
        count += value
    return [(class_value, count)]

# to RDD
train_spark_rdd = train_spark.rdd

# map 
mapper_output = train_spark_rdd.mapPartitions(prior_mapper)

# shuffle & sort
shuffle_output = mapper_output.groupByKey()

# reduce
reducer_output = shuffle_output.flatMap(prior_reducer)

# to list
output_list = reducer_output.collect()
print(output_list)

total_size = train_spark_rdd.count()
print(total_size)

prior_probs = {}
for i in output_list:
    prior_probs[i[0]] = log(i[1]/total_size)
print(prior_probs)

[(0, 58697), (1, 44897)]
103594
{0: -0.5680907948876823, 1: -0.8361084357191146}


In [20]:
# calculate the conditional probabilities & posterior probabilities

# map function
# input: data 
# output: (class, (feature, value, count) )
def posterior_mapper(lines):
    
    temp = {} 
    # dict of dict {
    # class1: {
    #     feature1: {
    #            value1: count,
    #            value2: count,
    #     } 
    #   }
    # }

    for line in lines:

        # add new class
        class_value = line[LABEL_COL]
        if class_value not in temp:
            temp[class_value] = {}

        for feature in FEATURES_COL:
            
            # add new feature
            if feature not in temp[class_value]:
                temp[class_value][feature] = {}

            # add new feature value
            feature_value = line[feature]
            if line[feature] not in temp[class_value][feature]:
                temp[class_value][feature][feature_value] = 0
            
            # update feature value
            temp[class_value][feature][feature_value] += 1

    pairs = []
    for class_key in temp:
        for feature in temp[class_key]:
            for feature_value in temp[class_key][feature]:
                count = temp[class_key][feature][feature_value]
                pairs.append((class_key, (feature, feature_value, count)))
    return pairs

# shuffle & sort
# output: (class, list of tuples (feature, feature value, count) )

# reduce
# output: (class, feature, feature value, count)

def posterior_reducer(pair):

    class_value, tuples = pair

    temp = {}
    total_count = 0

    for tuple in tuples:
        feature = tuple[0]
        feature_value = tuple[1]
        count = tuple[2]
        total_count += count

        if feature not in temp:
            temp[feature] = {}
        if feature_value not in temp[feature]:
            temp[feature][feature_value] = 0
        temp[feature][feature_value]  += count

    pairs = []
    for feature in temp:
        for feature_value in temp[feature]:
            pairs.append((class_value, feature, feature_value, temp[feature][feature_value] / total_count * len(FEATURES_COL)))
    return pairs


# map 
mapper_output = train_spark_rdd.mapPartitions(posterior_mapper)

# shuffle & sort
shuffle_output = mapper_output.groupByKey()

# reduce
reducer_output = shuffle_output.flatMap(posterior_reducer)

# to list
output_list = reducer_output.collect()
print(output_list)

posterior_probs={}
for tuple in output_list:
    class_value , feature , feature_value , value_count = tuple

    if class_value not in posterior_probs:
        posterior_probs[class_value] = {}
    if feature not in posterior_probs[class_value]:
        posterior_probs[class_value][feature] = {}

    posterior_probs[class_value][feature][feature_value] = log(value_count)

print(posterior_probs)

[(0, 'Gender', 1, 0.4870777041416086), (0, 'Gender', 0, 0.5129222958583914), (0, 'Customer Type', 0, 0.7538545411179447), (0, 'Customer Type', 1, 0.2461454588820553), (0, 'Age', 0, 0.07644342981753753), (0, 'Age', 2, 0.1900608208255959), (0, 'Age', 5, 0.1073479053443958), (0, 'Age', 4, 0.1425967255566724), (0, 'Age', 1, 0.12022760958822427), (0, 'Age', 3, 0.16210368502649197), (0, 'Age', 6, 0.10494573828304683), (0, 'Age', 7, 0.07802783787927833), (0, 'Age', 8, 0.016048520367310082), (0, 'Age', 9, 0.0021977273114469225), (0, 'Type of Travel', 1, 0.49177981838935547), (0, 'Type of Travel', 0, 0.5082201816106445), (0, 'Class', 2, 0.09595038928735711), (0, 'Class', 0, 0.2579859277305484), (0, 'Class', 1, 0.6460636829820945), (0, 'Flight Distance', 0, 0.3826771385249672), (0, 'Flight Distance', 1, 0.3145987018075881), (0, 'Flight Distance', 2, 0.12382234185733512), (0, 'Flight Distance', 4, 0.04858851389338467), (0, 'Flight Distance', 6, 0.015571494284205326), (0, 'Flight Distance', 3, 0.0

In [21]:
print('Class 0:', posterior_probs[0]['Gender'])
print('Class 1:', posterior_probs[1]['Gender'])

Class 0: {1: -0.7193316118684231, 0: -0.6676309153457309}
Class 1: {0: -0.6922343959471178, 1: -0.6940607991097851}


In [36]:
def get_class(line):

    temp = {}

    for class_value in prior_probs:

        total_prob = prior_probs[class_value]

        for feature in FEATURES_COL:

            if line[feature] in posterior_probs[class_value][feature]:
                feature_value = line[feature]
                total_prob += posterior_probs[class_value][feature][feature_value]
            else:
                total_prob += log(0.000001)

        temp[class_value] = total_prob

    pred_class = max(temp, key=lambda k: temp[k])

    return pred_class

In [37]:
# classification 

# map function
# input: data 
# output: (0, 1) -> true prediction or (0, 0) -> false prediction

def class_mapper(lines):
    pairs = []
    for line in lines:
        # true prediction
        if get_class(line) == line[LABEL_COL]:
            pairs.append((0,1))
        else:
            pairs.append((0,0))
    return pairs

# shuffle & sort
# output: (0, list of 1s and 0s)

# reduce
# output: (0, sum of the list)
def class_reducer(line):
    count = 0
    for value in line[1]:
        count += value
    return [(0, count)]

# map 
mapper_output = train_spark_rdd.mapPartitions(class_mapper)

# shuffle & sort
shuffle_output = mapper_output.groupByKey()

# reduce
reducer_output = shuffle_output.flatMap(class_reducer)

# to list
output_list = reducer_output.collect()
print(output_list)

train_accuracy = output_list[0][1] / train_spark_rdd.count()
print('Training Accuracy: ', train_accuracy)


[(0, 92571)]
Training Accuracy:  0.8935942236036836


In [38]:
# to RDD
test_spark_rdd = test_spark.rdd

# map 
mapper_output = test_spark_rdd.mapPartitions(class_mapper)

# shuffle & sort
shuffle_output = mapper_output.groupByKey()

# reduce
reducer_output = shuffle_output.flatMap(class_reducer)

# to list
output_list = reducer_output.collect()
print(output_list)

test_accuracy = output_list[0][1] / test_spark_rdd.count()
print('Training Accuracy: ', test_accuracy)


[(0, 23046)]
Training Accuracy:  0.8900475031861893
