In [1]:
# Import packages
import os
import re  # regular expressions
import warnings
import matplotlib.pyplot as plt
import numpy as np
import numpy.ma as ma
import rasterio as rio
from rasterio.plot import plotting_extent
import geopandas as gpd
import earthpy as et
import earthpy.plot as ep
import earthpy.spatial as es
import earthpy.mask as em

import rioxarray as rxr

from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

from sklearn.datasets import dump_svmlight_file

from glob import glob

from queue import Queue

import time

warnings.simplefilter('ignore')

# Set working directory
os.chdir(os.path.join(et.io.HOME, 'BD', 'BA_DATA'))

In [2]:
# Constants
d_limit = 1200 # pixel resolutino (1km) 1200 * 1200
tem_conversion = 0.02 # convert to Kelvin temperature
vege_fillvalue = -3000 
vege_lowerest = -2000
NE_fillvalue = 32767
tem_fillvalue = 0
radius = 50

vege_len = 457 # how many 16-day files for 20 years
tem_len = 914 # how many 8-day files for 20 years
NE_len = 914
thermal_len = 914
files_one_year = 46 # how many 8-day files in a year

In [3]:
vege_fs85 = glob('VegeData_16day/*h08v05*')
tem_fs85 = glob('TemData_8day/*h08v05*')
NE_fs85 = glob('NE_Data_8day/*h08v05*')
thermal_fs85 = glob('ThermalData_8day/*h08v05*')

# Functions used for computation

### Mask function

Pick any location, divide the neighborhood into several regions of interest in various directions, each region is within RADIUS distance, and ANGLE_RANGE degrees.

In [10]:
def sector_mask(shape,centre,radius,angle_range):
    """
    Return a boolean mask for a circular sector. The start/stop angles in  
    `angle_range` should be given in clockwise order.
    """

    x,y = np.ogrid[:shape[0],:shape[1]]
    cx,cy = centre
    tmin,tmax = np.deg2rad(angle_range)

    # ensure stop angle > start angle
    if tmax < tmin:
            tmax += 2*np.pi

    # convert cartesian --> polar coordinates
    r2 = (x-cx)*(x-cx) + (y-cy)*(y-cy)
    theta = np.arctan2(x-cx,y-cy) - tmin

    # wrap angles between 0 and 2*pi
    theta %= (2*np.pi)

    # circular mask
    circmask = r2 <= radius*radius

    # angular mask
    anglemask = theta <= (tmax-tmin)

    return circmask*anglemask

### Compute Feature Sets

In [11]:
def compute_fire_intensity(matrix, row, column):
    """
    Return a feature of a single pixel with row number ROW and column number COLUMN in the 1200 * 1200 matrix MATRIX.
    Feature set is computed based on the average value in the area of a circle in one time frame.
    """
    mask = sector_mask(matrix.shape, (row, column), radius, (0, 360))
    sector = matrix[mask]
    sum_value = np.sum(sector)
    return sum_value

In [12]:
def compute_vege_feature_set(matrix, row, column):
    """
    Return a vegetation indices feature set of a single pixel with row number ROW and column number COLUMN 
    in the 1200 * 1200 matrix MATRIX.
    Feature set is computed based on the average value in 10 regions in one time frame.
    """
    feature_set = []
    for i in range(10):
        mask = sector_mask(matrix.shape, (row, column), radius, (i, i + 36))
        sector = matrix[mask]
        sector[:][sector[:] == vege_fillvalue] = vege_lowerest # change fillvalue
        mean_value = np.mean(sector)
        feature_set.append(mean_value)
    return feature_set

In [13]:
def compute_thermal_feature_set(matrix, row, column):
    """
    Return a thermal anomalies feature set of a single pixel with row number ROW and column number COLUMN 
    in the 1200 * 1200 matrix MATRIX.
    Feature set is computed based on the sum of value in 10 regions in one time frame.
    """
    feature_set = []
    for i in range(10):
        mask = sector_mask(matrix.shape, (row, column), radius, (i, i + 36))
        sector = matrix[mask]
        # normalize to 0 and 1
        sector[:][sector[:] < 7] = 0
        sector[:][sector[:] >= 7] = 1
        sum_value = np.sum(sector)
        feature_set.append(sum_value)
    return feature_set

In [14]:
def compute_tem_feature_set(matrix, row, column):
    """
    Return a surface temperature feature set of a single pixel with row number ROW and column number COLUMN 
    in the 1200 * 1200 matrix MATRIX.
    Feature set is computed based on the mean of value in 10 regions in one time frame.
    """
    feature_set = []
    for i in range(10):
        mask = sector_mask(matrix.shape, (row, column), radius, (i, i + 36))
        sector = matrix[mask]
        mean_value = np.mean(sector)
        feature_set.append(mean_value)
    return feature_set

In [15]:
def compute_NE_feature_set(matrix, row, column):
    """
    Return a NE feature set of a single pixel with row number ROW and column number COLUMN 
    in the 1200 * 1200 matrix MATRIX.
    Feature set is computed based on the mean of value in 10 regions in one time frame.
    """
    feature_set = []
    for i in range(10):
        mask = sector_mask(matrix.shape, (row, column), radius, (i, i + 36))
        sector = matrix[mask]
        mean_value = np.mean(sector)
        feature_set.append(mean_value)
    return feature_set

### Reduce dimension

In [18]:
def rebin(m, shape):
    """
    Reshape the input matrix A to the shape SHAPE.
    """
    sh = shape[0],m.shape[0]//shape[0],shape[1],m.shape[1]//shape[1]
    return m.reshape(sh).mean(-1).mean(1)

### Random sampling

In [22]:
def land_mask(file):
    """
    Compute land mask from QA field in thermal data.
    Used for enforcing random sampling on land pixels.
    """
    all_bands = []
    with rio.open(file) as dataset:
        for name in dataset.subdatasets:
            if re.search("QA", name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    all_bands.append(subdataset.read(1))
    fire_modis = np.stack(all_bands)
    fire_matrix = fire_modis[0].reshape(d_limit,d_limit)
    for i in range(d_limit):
        for j in range(d_limit):
            k = fire_matrix[i][j]
            b1 = k >> 0 & 1
            b2 = k >> 1 & 1
            if b1 != b2:
                fire_matrix[i][j] = 1
            else:
                fire_matrix[i][j] = 0
    return fire_matrix
lm = land_mask(thermal_fs85[0])

In [23]:
def random_position():
    """
    Get a random position on the map, excluding edge cases.
    """
    random_indexR = np.random.randint(60, d_limit - 60)
    random_indexC = np.random.randint(60, d_limit - 60)
    return (random_indexR, random_indexC)

### Compute libsvm style datasets

In [25]:
def compute_features_all(all_fire_fns, file_id, fire_fns, tem_fns, vege_fns, ne_fns, num_file, radius, sample_count):
    """
    Compute target sets, geographical feature sets, time series feature sets for libsvm style for every two 8-day files.
    Target sets are computed based on thermal anomalies data from FIRE_FNS files.
    Target sets: Y -> 1.0/0.0
    Feature fire intensity based on thermal anomalies data from FIRE_FNS files;
    Feature temperature based on surface temperature data from TEM_FNS files;
    Feature vegetation based on vegetation indices data from VEGE_FNS files;
    Feature ne based on net evaporation data from NE_FNS files.
    Feature sets: X -> fire intensity(10), temperature(10), vegetation(10), net evaporation(10), past fire intensity(5)
    Every 16 days, randomly sample SMAMPLE_COUNT number of pixels with fire, and SMAMPLE_COUNT number of pixels without fire.
        if there are less than SMAPLE_COUNT number of pixels with fire, sample all the fire pixels.
    ALL_FIRE_FNS and FILE_ID are used for tracing past files for time series features
    FILE_ID should be no smaller than 115 (5 years)
    """
    
    # sds names for hdf files
    fire_sds1 = "FireMask"
    tem_sds1 = "LST_Day"
    vege_sds1 = "EVI"
    ne_sds1 = ":ET_500m"
    ne_sds2 = "PET"
   
    # get fire mask
    
    sds1 = "FireMask"
    
    file_count = num_file // 2
    fire_positions = []
  
    # compute fire_combined_matrix
    
    f1_bands = []
    f2_bands = []
    
    # open two files in a round
    with rio.open(fire_fns[0]) as dataset:
        for name in dataset.subdatasets:
            if re.search(fire_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f1_bands.append(subdataset.read(1))
    f1_modis = np.stack(f1_bands)
    f1_matrix = f1_modis[0].reshape(d_limit,d_limit)
    
    with rio.open(fire_fns[1]) as dataset:
        for name in dataset.subdatasets:
            if re.search(fire_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f2_bands.append(subdataset.read(1))
    f2_modis = np.stack(f2_bands)
    f2_matrix = f2_modis[0].reshape(d_limit,d_limit)
    
    # combine two matrices
    fire_combined_matrix = (f1_matrix + f2_matrix) / 2
    for r in range(60, 1140):
        for c in range(60, 1140):
            if (fire_combined_matrix[r][c] >= 7):
                 fire_positions.append([r,c])
    
    # compute tem_combined_matrix
    
    f1_bands = []
    f2_bands = []
    with rio.open(tem_fns[0]) as dataset:
        for name in dataset.subdatasets:
            if re.search(tem_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f1_bands.append(subdataset.read(1))
    f1_modis = np.stack(f1_bands)
    f1_matrix = f1_modis[0].reshape(d_limit,d_limit)

    with rio.open(tem_fns[1]) as dataset:
        for name in dataset.subdatasets:
            if re.search(tem_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f2_bands.append(subdataset.read(1))
    f2_modis = np.stack(f2_bands)
    f2_matrix = f2_modis[0].reshape(d_limit,d_limit)
    
    # combine two matrices
    tem_combined_matrix = (f1_matrix + f2_matrix) / 2
    
    # compute vege_combined_matrix
    
    all_bands = []
    with rio.open(vege_fns) as dataset:
        for name in dataset.subdatasets:
            if re.search(vege_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    all_bands.append(subdataset.read(1))
    vege_modis = np.stack(all_bands)
    vege_matrix = vege_modis[0].reshape(d_limit,d_limit)
        
    # compute ne_combined_matrix    
    
    f1_bands = []
    f2_bands = []
    f1_bands_se = []
    f2_bands_se = []
    
    # open two files in a round
    with rio.open(ne_fns[0]) as dataset:
        for name in dataset.subdatasets:
            if re.search(ne_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f1_bands.append(subdataset.read(1))
            if re.search(ne_sds2, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f1_bands_se.append(subdataset.read(1))
    f1_modis = np.stack(f1_bands)
    f1_matrix = rebin(f1_modis[0], (d_limit,d_limit))
    f1_modis_se = np.stack(f1_bands_se)
    f1_matrix_se = rebin(f1_modis_se[0], (d_limit,d_limit))
    
    with rio.open(ne_fns[1]) as dataset:
        for name in dataset.subdatasets:
            if re.search(ne_sds1, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f2_bands.append(subdataset.read(1))
            if re.search(ne_sds2, name):
                with rio.open(name) as subdataset:
                    modis_meta = subdataset.profile
                    f2_bands_se.append(subdataset.read(1))
    f2_modis = np.stack(f2_bands)
    f2_matrix = rebin(f2_modis[0], (d_limit,d_limit))
    f2_modis_se = np.stack(f2_bands_se)
    f2_matrix_se = rebin(f2_modis_se[0], (d_limit,d_limit))
    
    # combine two matrices
    f1_matrix = f1_matrix_se - f1_matrix
    f2_matrix = f2_matrix_se - f2_matrix
    ne_combined_matrix = (f1_matrix + f2_matrix) / 2
        
    all_feature_sets = []
    all_target_sets = []
    random_positions = []
    
    # random samples   
    for i in range(sample_count):
        p = random_position()
        while True:
            
            # check for non-fire land pixel sample
            if (p not in fire_positions) and lm[p[0]][p[1]] == 1:
                random_positions.append(p)
                break;
            else:
                p = random_position()
                
        # now have a good random position
        r = p[0]
        c = p[1]
        features = []
        
        thermal_feature_set = compute_thermal_feature_set(fire_combined_matrix, r, c)
        features.append(thermal_feature_set)
        
        tem_feature_set = compute_thermal_feature_set(tem_combined_matrix, r, c)
        features.append(tem_feature_set)
        
        vege_feature_set = compute_thermal_feature_set(vege_matrix, r, c)
        features.append(vege_feature_set)
        
        ne_feature_set = compute_thermal_feature_set(ne_combined_matrix, r, c)
        features.append(ne_feature_set)
        
        flat_list = [item for sublist in features for item in sublist]
        all_feature_sets.append(flat_list)
        
        # normalize FIRE_MASK to 1 or 0
        if (fire_combined_matrix[r][c] >= 7):
            all_target_sets.append(1)
        else:
            all_target_sets.append(0)
    
    # fire positions
    final_fire_positions = []
    count = 0
    if sample_count > len(fire_positions):
        count = len(fire_positions)
    else:
        count = sample_count
    for i in range(count):
        p = np.random.randint(0, len(fire_positions))
        while True:
            if fire_positions[p] not in final_fire_positions:
                final_fire_positions.append(fire_positions[p])
                break;
            else:
                p = np.random.randint(0, len(fire_positions))
                
    for i in final_fire_positions:
        r = i[0]
        c = i[1]
        features = []
        
        thermal_feature_set = compute_thermal_feature_set(fire_combined_matrix, r, c)
        features.append(thermal_feature_set)
        
        tem_feature_set = compute_thermal_feature_set(tem_combined_matrix, r, c)
        features.append(tem_feature_set)
        
        vege_feature_set = compute_thermal_feature_set(vege_matrix, r, c)
        features.append(vege_feature_set)
        
        ne_feature_set = compute_thermal_feature_set(ne_combined_matrix, r, c)
        features.append(ne_feature_set)
        
        flat_list = [item for sublist in features for item in sublist]
        all_feature_sets.append(flat_list)
        
        # normalize FIRE_MASK to 1 or 0
        if (fire_combined_matrix[r][c] >= 7):
            all_target_sets.append(1)
        else:
            all_target_sets.append(0)

    # compute time series features
    # assume there exists data five years ago
    time_features_arr = time_series_features(all_fire_fns, file_id, radius, random_positions, final_fire_positions)
    time_features = time_features_arr.tolist()
    
    # merge geographic and time series features
    final_all_feature_sets = [geo + time for geo, time in zip(all_feature_sets, time_features)]
    
    return (all_target_sets, final_all_feature_sets)

In [29]:
def time_series_features(all_fire_fns, file_id, radius, random_positions, fire_positions):
    """
    Compute time_series_features for a 16-day combined file for locations: RANDOM_POSITIONS and FIRE_POSITIONS
    """
    rp_count = len(random_positions)
    fp_count = len(fire_positions)
    total_count = rp_count + fp_count
    time_features = np.zeros((total_count, 5))
    
    # compute past fire intensity up to 5 years ago
    for year in range(1, 6):
        past_file_id = file_id - year * files_one_year
        if past_file_id < 0:
            print("cannot find enough historical files")
            return
        past_file = all_fire_fns[past_file_id]
        f_bands = []
        with rio.open(past_file) as dataset:
            for name in dataset.subdatasets:
                if re.search("FireMask", name):
                    with rio.open(name) as subdataset:
                        modis_meta = subdataset.profile
                        f_bands.append(subdataset.read(1))
        f_modis = np.stack(f_bands)
        f_matrix = f_modis[0]
        f_matrix[:][f_matrix[:] < 7] = 0
        f_matrix[:][f_matrix[:] >= 7] = 1
        for i in range(rp_count):
            position = random_positions[i]
            r = position[0]
            c = position[1]
            fire_intensity = compute_fire_intensity(f_matrix, r, c)
            time_features[i][year-1] = fire_intensity
        for j in range(fp_count):
            position = fire_positions[j]
            r = position[0]
            c = position[1]
            fire_intensity = compute_fire_intensity(f_matrix, r, c)
            time_features[j+rp_count][year-1] = fire_intensity
    return time_features

# Data Processing

### Generate a libsvm file for every 16 day data

In [14]:
# From 5th year to 20th year

In [33]:
for i in range(115, 290):
    result = compute_features_all(thermal_fs85, 2*i, thermal_fs85[2*i:(2*i+2)], tem_fs85[2*i:(2*i+2)], vege_fs85[i], NE_fs85[2*i:(2*i+2)], 2, 50, 10)
    dump_svmlight_file(result[1],result[0],os.path.join("svms/sample%s" % i),zero_based=False)

In [34]:
for i in range(290, 457):
    result = compute_features_all(thermal_fs85, 2*i, thermal_fs85[2*i:(2*i+2)], tem_fs85[2*i:(2*i+2)], vege_fs85[i], NE_fs85[2*i:(2*i+2)], 2, 50, 10)
    dump_svmlight_file(result[1],result[0],os.path.join("svms/sample%s" % i),zero_based=False)

### Merge all the libsvm files

cat * > svms

# Build Models using PySpark on Dumbo clusters

### Upload the final libsvm file and build models on Dumbo

#### Decision Tree Classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("./svms")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)

### Result

1.Test Error = 0.00839092

2.Test Error = 0.00252525

3.Test Error = 0.00593472

4.Test Error = 0.00388727

5.Test Error = 0.00507099

Average Accuracy: 99. 483617%

#### Random Forest Classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("./svms")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

### Result

1.Test Error = 0.00945274

2.Test Error = 0.00691017

3.Test Error = 0.00838678

4.Test Error = 0.00853414

5.Test Error = 0.0128395

6.Test Error = 0.00550826

Average Accuracy: 99. 1394735%

### Gradient-boosted tree classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("./svms")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

### Result

1.Test Error = 0.00732422

2.Test Error = 0.00496278

3.Test Error = 0.00767386

4.Test Error = 0.00447984

5.Test Error = 0.00729217

Average Accuracy: 99.3653426%

### One-vs-Rest classifier

In [None]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# load data file.
inputData = spark.read.format("libsvm") \
    .load("./svms")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.7, 0.3])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

### Result

1.Test Error = 0.268806

2.Test Error = 0.286676

3.Test Error = 0.275571

4.Test Error = 0.268966

5.Test Error = 0.258654

Average Accuracy: 72.82654%

### Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm") \
    .load("./svms")

# Split the data into train and test
splits = data.randomSplit([0.7, 0.3], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

### Result

1.Test set accuracy = 0.663537549407

2.Test set accuracy = 0.669216061185

3.Test set accuracy = 0.666833416708

4.Test set accuracy = 0.641062801932

5.Test set accuracy = 0.66087388282

Average Accuracy: 66.03047424104%

### Decision tree regression

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("./svms")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

### Result

1.Root Mean Squared Error (RMSE) on test data = 0.0696107

2.Root Mean Squared Error (RMSE) on test data = 0.068184

3.Root Mean Squared Error (RMSE) on test data = 0.0794518

4.Root Mean Squared Error (RMSE) on test data = 0.0834209

5.Root Mean Squared Error (RMSE) on test data = 0.0839181

Average RMSE: 0.0769171

### Random forest regression

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("./svms")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only

### Result

1.Root Mean Squared Error (RMSE) on test data = 0.112305

2.Root Mean Squared Error (RMSE) on test data = 0.110443

3.Root Mean Squared Error (RMSE) on test data = 0.100557

4.Root Mean Squared Error (RMSE) on test data = 0.12123

5.Root Mean Squared Error (RMSE) on test data = 0.0929676

Average RMSE: 0.10750052

### Gradient-boosted tree regression

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("./svms")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only

### Result

1.Root Mean Squared Error (RMSE) on test data = 0.0848031

2.Root Mean Squared Error (RMSE) on test data = 0.0602163

3.Root Mean Squared Error (RMSE) on test data = 0.0714394

4.Root Mean Squared Error (RMSE) on test data = 0.041476

5.Root Mean Squared Error (RMSE) on test data = 0.085556

Average RMSE: 0.06869816