In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster('spark://master:7077').setAppName('Prediction')
sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
sc.version
sc.getConf().getAll()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/28 13:55:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[('spark.app.name', 'Prediction'),
 ('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.master', 'spark://master:7077'),
 ('spark.driver.host', 'master'),
 ('spark.app.id', 'app-20221228135540-0001'),
 ('spark.app.startTime', '1672235739245'),
 ('spark.executor.id', 'driver'),

Load training dataset and preprocess the data

In [2]:
from itertools import islice

# get the header and map with the column index for fast access
header = (
    sc
#     .textFile('data/sample.csv', 8)
    .textFile('data/train.csv', 8)
    .take(1)[0]
    .split(',')[1:]
)

headerMap = {}
for i in range(len(header)):
    headerMap[header[i]] = i
    
def parseFloat(x):
    try:
        return float(x)
    except:
        return x

trainRDD = (
    sc
    .textFile('data/train.csv', 8)
    .mapPartitionsWithIndex(
        lambda index, it: islice(it, 1, None) if index == 0 else it # remove header line
    )
    .map(lambda x: x.split(','))
    .map(lambda x: x[1:]) # remove id column
    .map(lambda x: [parseFloat(i) for i in x])
)

                                                                                

Process the columns with continuous values  
Cast and fitler out the valid value from each column  
Since our label is also continuous so I divided it into 8 ranges based on the minimun and maximum values: (-inf, 100000], (100000, 200000], ... , (700000, inf)

In [3]:
import math 

minNumberOfSamples = 10

continuousFeatures = [
    'LotFrontage', 
    'LotArea', 
    'MasVnrArea', 
    'BsmtFinSF1',
    'BsmtFinSF2',
    'BsmtUnfSF',
    'TotalBsmtSF',
    '1stFlrSF',
    '2ndFlrSF',
    'LowQualFinSF',
    'GrLivArea',
    'TotRmsAbvGrd',
    'GarageArea',
    'WoodDeckSF',
    'OpenPorchSF',
    'EnclosedPorch',
    '3SsnPorch',
    'ScreenPorch',
    'PoolArea',
    'MiscVal',
    'MoSold',
] 

labels = [100000, 200000, 300000, 400000, 500000, 600000, 700000, float('inf')]

def parse(x):
    for feature in continuousFeatures:
        if not isinstance(x[headerMap[feature]], float):
            x[headerMap[feature]] = 0
    return x

trainRDD = trainRDD.map(lambda x: parse(x))

data = trainRDD.collect()

                                                                                

# Decision Tree. 

In [4]:
# for feature in continuousFeatures:
#     for x in data:
#         if not isinstance(x[headerMap[feature]], float):
#             x[headerMap[feature]] = 0
import statistics

# calculate the entropy of the input dataset
def entropy(data):
    # we count the appearances of each label in the dataset
    count = {}
    for x in data:
        for label in labels:
            if x[-1] <= label:
                count[label] = count.get(label, 0) + 1
                break
    ret = 0
    # calculate -sum(p_i * log(2, p_i))
    for key,value in count.items():
        x = value / len(data)
        ret -= x * math.log(x, 2)
    return ret

# calculate the variance of a dataset
def var(data):
    data = [x[-1] for x in data]
    mean = sum(data) / len(data)
    return sum((i - mean) ** 2 for i in data) / len(data)
    
# find all possible split points of a feature in an input dataset      
def getSplitPoints(data, feature):
    if feature in continuousFeatures:
        # If this feature is continuous, 
        # return only one split point for easiness, 
        # which the the median of this feature value 
        split_point = statistics.median([x[headerMap[feature]] for x in data])
#         maxVal = max([x[headerMap[feature]] for x in data])
#         minVal = min([x[headerMap[feature]] for x in data])
        return [split_point, float('inf')] # (-inf, median] and (median, inf)
    else:
        # If this feature has discrete values,
        # return the set of all the possible values
        return set([x[headerMap[feature]] for x in data])

# split the input dataset into two by a feature with a split_point
def splitData(data, feature, split_point):
    left = [] # left splitted dataset
    right = [] # right splitted dataset
    
    # criteria for splitting
    p = (
        lambda x: x[headerMap[feature]] <= split_point if feature in continuousFeatures
        else lambda x: x[headerMap[feature]] == split_point
    )
    
    for x in data:
        if p(x):
            left.append(x)
        else:
            right.append(x)
    
    return left, right

# find the best split of the dataset from all features
def findBestSplit(data, features):
    # best feature to split, best split point to split and best information purity
    best_feature, best_split, best_purity = -1, -1, -1
    
    # for each feature, find its possible split points and calculate each purity to find the highest one
    for feature in features:
        split_points = getSplitPoints(data, feature)
        for split_point in split_points:
            left, right = splitData(data, feature, split_point)
            if len(left) == 0 or len(right) == 0:
                continue
            purity = len(data) * var(data) - (len(left) * var(left) + len(right) * var(right))
#             gain = (
#                 entropy(data) 
#                 - len(left) / len(data) * entropy(left)
#                 - len(right) / len(data) * entropy(right)
#             )
            if purity > best_purity:
                best_purity, best_split, best_feature = purity, split_point, feature
    
    return best_feature, best_split

# complete process of splitting dataset
def processSplit(data, features):
    feature, split_point = findBestSplit(data, features)
    left, right = splitData(data, feature, split_point)
    return feature, split_point, left, right

# incase the node cannot be splitted, 
# assign the label to the node with the most common label appear in that node
def findMostCommon(data):
    count = {}
    
    for x in data:
        for label in labels:
            if x[-1] <= label:
                count[label] = count.get(label, 0) + 1
                break
    most_common = -1
    for key, value in count.items():
        if most_common == -1 or value > count[most_common]:
            most_common = key
    
    return most_common

In [5]:
# a node inside the desicion tree
class Node:
    def __init__(self, isLeaf, feature, split_point=None, left=None, right=None):
        self.isLeaf = isLeaf # whether this is a leaf node or not
        if isLeaf:
            self.label = feature # the label of the leaf node
        else:
            self.feature = feature # feature to split
            self.split_point = split_point # split point of feature to split
            self.left = left # left splitted node
            self.right = right # right splitted node
        
class DecisionTree:
    def __init__(self, data, features):
        self.root = None
        self.build(data, features)
    
    def build(self, data, features):
        
        # if the input dataset has little record
        # stop splitting and assign the label to that node as the average of all candidates
        if len(data) <= minNumberOfSamples:
            avg = sum([x[-1] for x in data]) / len(data)
            self.root = Node(True, avg)
        
        # begin to split the current node
        else:
            feature, split_point, left, right = processSplit(data, features)
            
            # if all data goes to left node or right node,
            # set this node to be a leaf node with label as the most common label of all candidates
            if len(left) == 0:
                self.root = Node(True, findMostCommon(right))
            elif len(right) == 0:
                self.root = Node(True, findMostCommon(left))
            
            # continue to split the left node and right node
            else:
                left_node = DecisionTree(left, features)
                right_node = DecisionTree(right, features)
                self.root = Node(False, feature, split_point, left_node, right_node)
            
def predict(item, node):
    if node.isLeaf:
        return node.label
    criteria = (
        lambda x: x[headerMap[node.feature]] <= node.split_point if node.feature in continuousFeatures
        else lambda x: x[headerMap[node.feature]] == node.split_point
    )
    if criteria(item):
        return predict(item, node.left.root)
    else :
        return predict(item, node.right.root)

In [6]:
tree = DecisionTree(data, header)

In [7]:
testData = (
    sc
    .textFile('data/test.csv', 8)
    .mapPartitionsWithIndex(
        lambda index, it: islice(it, 1, None) if index == 0 else it # remove header line
    )
    .map(lambda x: x.split(','))
    .map(lambda x: [parseFloat(i) for i in x])
    .map(lambda x: parse(x))
    .collect()
)

for row in testData:
    y_predict = predict(row, tree.root)
    print(f'Predict for house id {int(row[0])} is {y_predict}')


Predict for house id 1260 is 58671.42857142857
Predict for house id 1261 is 97480.0
Predict for house id 1262 is 58671.42857142857
Predict for house id 1263 is 107500.0
Predict for house id 1264 is 121777.77777777778
Predict for house id 1265 is 107500.0
Predict for house id 1266 is 58671.42857142857
Predict for house id 1267 is 121777.77777777778
Predict for house id 1268 is 107500.0
Predict for house id 1269 is 58671.42857142857
Predict for house id 1270 is 58671.42857142857
Predict for house id 1271 is 58671.42857142857
Predict for house id 1272 is 107500.0
Predict for house id 1273 is 58671.42857142857
Predict for house id 1274 is 58671.42857142857
Predict for house id 1275 is 58671.42857142857
Predict for house id 1276 is 121777.77777777778
Predict for house id 1277 is 58671.42857142857
Predict for house id 1278 is 107500.0
Predict for house id 1279 is 58671.42857142857
Predict for house id 1280 is 121777.77777777778
Predict for house id 1281 is 97480.0
Predict for house id 1282 i

# PLANET

In [8]:
# Initialization: Identifies all the attribute values which need to be considered for splits
split_points = {}
equi_depth = math.ceil(len(data) / 5)

for feature in header[:-1]:
    split_points[feature] = []
    
    # for continuous feature 
    # compute the approximate equi-depth histogram
    # and find 4 split points to split data into equal part
    if feature in continuousFeatures:
        feature_data = sorted([x[headerMap[feature]] for x in data])
        count = 0
        for x in feature_data:
            count += 1
            if count >= equi_depth:
                split_points[feature].append(x)
                count = 0
        split_points[feature].append(float('inf'))
        
    # for categorical feature, indentify the set of all possible values
    else:
        split_points[feature] = list(set([
            x[headerMap[feature]] for x in data
        ]))

In [9]:
split_points

{'MSSubClass': [160.0,
  70.0,
  40.0,
  75.0,
  45.0,
  80.0,
  50.0,
  20.0,
  85.0,
  180.0,
  30.0,
  120.0,
  90.0,
  60.0,
  190.0],
 'MSZoning': ['RM', 'RH', 'C (all)', 'RL', 'FV'],
 'LotFrontage': [24.0, 60.0, 70.0, 80.0, inf],
 'LotArea': [7100.0, 8777.0, 10200.0, 12168.0, inf],
 'Street': ['Grvl', 'Pave'],
 'Alley': ['NA', 'Pave', 'Grvl'],
 'LotShape': ['IR3', 'IR2', 'IR1', 'Reg'],
 'LandContour': ['Bnk', 'Low', 'Lvl', 'HLS'],
 'Utilities': ['NoSeWa', 'AllPub'],
 'LotConfig': ['Corner', 'FR3', 'Inside', 'CulDSac', 'FR2'],
 'LandSlope': ['Sev', 'Mod', 'Gtl'],
 'Neighborhood': ['OldTown',
  'Crawfor',
  'SawyerW',
  'StoneBr',
  'IDOTRR',
  'NAmes',
  'BrDale',
  'MeadowV',
  'BrkSide',
  'Somerst',
  'Blmngtn',
  'Gilbert',
  'SWISU',
  'Mitchel',
  'Veenker',
  'CollgCr',
  'Blueste',
  'Timber',
  'NPkVill',
  'ClearCr',
  'NoRidge',
  'NridgHt',
  'NWAmes',
  'Sawyer',
  'Edwards'],
 'Condition1': ['Feedr',
  'Norm',
  'RRNn',
  'PosN',
  'RRNe',
  'RRAe',
  'Artery',
  'Po

### MapReduce Initialization Task

In [10]:
nodes = {}

def criteria(x, feature, value):
    if feature in continuousFeatures:
        if x[headerMap[feature]] <= value:
            return True
    else:
        if x[headerMap[feature]] == value:
            return True

class PlanetNode:
    def __init__(self, _id, feature, split_point, label=None):
        self.id = _id # Node ID
        self.label = label # Node label
        if label:
            self.label = label
        else:
            self.feature = feature # split feature
            self.split_point = split_point # split value

    # traverse a record to a deepest node and obtain its node ID
    def traverse(self, x):
        if self.label or not self.feature:
            return self.id
        
        if criteria(x, self.feature, self.split_point):
            return nodes[self.id * 2].traverse(x)
        return nodes[self.id * 2 + 1].traverse(x)
            
def traverseTree(x, node):
    return node.traverse(x)

# find all possible splitting criteria (including split feature and split value) of a record
def possibleSplit(x):
    ret = []
    for feature, values in split_points.items():
        for value in values:
            if criteria(x, feature, value):
                ret.append((feature, value))
                break;
    return ret

# compute the purity of a ndoe after splitting
def purity(node, left):
    p_node = node[2] - node[1] ** 2 / node[0]
    p_left = left[2] - left[1] ** 2 / left[0]
    right = (node[0] - left[0], node[1] - left[1], node[2] - left[2])
    p_right = right[2] - right[1] ** 2 / right[0]
    return p_node - p_left - p_right

In [11]:
nodes[0] = PlanetNode(1, None, None) # initialize the tree with one node

### MapReduce FindBestSplit Task

In [12]:
# Mapper

# map each record with a node ID
# (nodeID, (n, s, q))
split_nodes = (
    trainRDD
    .map(lambda x: (traverseTree(x, nodes[0]), (1, x[-1], x[-1] ** 2)))
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))
    .collect()
)

# map each record with a spliting criteria and a nodeID
# ((nodeID, (feature, split point)), (n, s, q))
splitRDD = (
    trainRDD
    .flatMap(lambda x: 
        [(
            (traverseTree(x, nodes[0]), i),
            (1, x[-1], x[-1] ** 2)
        ) 
        for i in possibleSplit(x)]
    )
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))
)

# Reducer

# find the best split in each node
for node in split_nodes:
    best_split = (
        splitRDD
        .filter(lambda x: x[0][0] == node[0])
        .map(lambda x: (x[0], purity(node[1], x[1])))
        .takeOrdered(1, lambda x: -x[1]) # take the split with highest purity
    )
    print(f'Best split for node id {node[0]} is {best_split[0][0][1]}')



Best split for node id 1 is ('GarageCars', 3.0)


                                                                                