This notebook is about the Landsat dataset. We make use of the model trained before and will apply this in a distributed fashion by broadcasting the model to all worker nodes.

In [1]:
validation = sc.textFile("hdfs:///user/lsda/landsat/landsat_validation.csv")

print("First line of the validation RDD: {}".format(validation.take(1)))
print("Number of elements in the validation RDD: {}".format(validation.count()))

First line of the validation RDD: ['4,2533,2269,1904,2447,2204,2048,1063,1061,4590']
Number of elements in the validation RDD: 1335558


Next, we define a Python function to extract, for each line, the label and the associated features.

In [2]:
def parse(line):
    
    try:
    
        line = line.split(',')
        label = int(line[0])
        features = [float(f) for f in line[1:]]
        
        return (label, features)
    
    except Exception as e:
        
        return None

validation = validation.map(parse)
validation = validation.filter(lambda line: line is not None)

print("First line of modified RDD: {}".format(validation.take(1)))

First line of modified RDD: [(4, [2533.0, 2269.0, 1904.0, 2447.0, 2204.0, 2048.0, 1063.0, 1061.0, 4590.0])]


Let's load the fitted tree ensemble. This happens on the driver machine.

In [3]:
import pickle

with open('model.save', 'rb') as f:
    model_driver = pickle.load(f)

FileNotFoundError: [Errno 2] No such file or directory: 'model.save'

In [None]:
# (1) broadcast the model 

model = sc.broadcast(model_driver)

# (2) apply 'map' to the validation RDD to obtain a new RDD 
#     'labels_predictions' with elements of the form (label, pred)
import numpy
def compute_predictions(row):

    label = row[0]
    features = numpy.array(row[1]).reshape((1,9))
    pred = int(model.value.predict(features))
    
    return (label, pred)

# Task: This transformation takes a long time! Instead 
# of 'map', make use of the 'mapPartitions' transformation, see
# https://spark.apache.org/docs/latest/rdd-programming-guide.html
validation = validation.sample(False, 0.01)
labels_preds = validation.map(compute_predictions)

# YOUR CODE HERE
# labels_preds = ...

In [None]:
labels_preds.take(10)

In [None]:
def compute_errors(row):
    
    if row[0] == row[1]:
        return 0
    else:
        return 1
    
errors = labels_preds.map(compute_errors)
errors_count = errors.reduce(lambda a,b:a+b)
labels_count = labels_preds.count()

In [None]:
print("Classification accuracy: {}".format(1 - errors_count / labels_count))