In [1]:
import numpy as np
import csv

import pyspark.mllib.regression 
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.sql import SparkSession
from pyspark.mllib.regression import LabeledPoint
from sklearn.model_selection import train_test_split
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from sklearn import preprocessing
from pyspark.ml.linalg import DenseVector
from pyspark.ml.regression import LinearRegression
import config as cf

In [2]:
NUM_FEATURES = 13
NUM_DATAPOINTS = 10000

In [3]:
def create_spark_context():
    """Set up spark context"""
    conf = (SparkConf())
    sc = SparkContext(conf=conf)

    return sc

In [4]:
#Create spark context
sc = create_spark_context()

In [5]:
X_unscaled = []
y_class = []
y_reg_unscaled = []
#Extract data from csv file
with open(cf.PATHS[cf.HOUSE_PRICES]) as csvfile:
    reader = csv.reader(csvfile)
    for i, row in enumerate(reader):         
        if(i != 0):
            xi = [None]*13
            xi[0] = row[3]
            xi[1] = row[4]
            xi[2] = row[5]
            xi[3] = row[6]
            xi[4] = row[7]
            xi[5] = row[8]
            xi[6] = row[9]
            xi[7] = row[10]
            xi[8] = row[11]
            xi[9] = row[12]
            xi[10] = row[13]
            xi[11] = row[14]
            xi[12] = row[15]
            y_reg_unscaled.append(row[2])
            y_class.append(str(int(float(row[2])>530000)))
            X_unscaled.append(xi)
                
        if(i == NUM_DATAPOINTS):
            break

X = preprocessing.scale(X_unscaled)


In [6]:
class LinReg:
    """A class that takes in a numpy matrix X and column vector y 
        Allows you to apply 7030 split and 10 fold cross fold validation
        with metrics returned as a tuple as so:
        (RMSE, MAE)
    """
    def __init__(self, X, y):
        self.spark = SparkSession(sc)
        self.X = X
        self.y = preprocessing.scale(y)
        self.data = []
        for i, yi in enumerate(self.y):
            xi = self.X[i]
            xi_string = ""
            xi_string+=str(yi)
            for x in xi:
                xi_string+=','
                xi_string+=str(x)
            self.data.append(xi_string)

        self.rdd = sc.parallelize(self.data)
        self.rdd = self.rdd.map(lambda line: line.split(","))
        #Convert rdd to df for easier manip
        self.df = self.rdd.map(lambda line: Row(price=line[0],
                              bedrooms=line[1], 
                              bathrooms=line[2], 
                              sqft_living=line[3],
                              sqft_lot=line[4], 
                              floors=line[5], 
                              waterfront=line[6],
                              view=line[7], 
                              condition=line[8], 
                              grade=line[9],
                              sqft_above=line[10], 
                              sqft_basement=line[11], 
                              yr_built=line[12],
                              yr_renovated=line[13])).toDF()
        self.columns = ['price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated']
        self.df = self.convertColumn(self.df, self.columns, FloatType())
        #Seperate data in labels and features
        self.input_data = self.df.rdd.map(lambda x: (x[5], DenseVector(x[:5] + x[6:])))
        self.df1 = self.spark.createDataFrame(self.input_data, ["label", "features"])
        
    def convertColumn(self, df, names, newType):
        """Cast columns from string to floats"""
        for name in names: 
            df = df.withColumn(name, df[name].cast(newType))
        return df 
    
    def split7030(self):
        """Applies 70 30 split testing"""
        train_data, test_data = self.df1.randomSplit([.7,.3],seed=1234)
        # Initialize `lr`
        lr = LinearRegression(labelCol="label", maxIter=10)

        # Fit the data to the model
        linear_model = lr.fit(train_data)
        linear_model.transform(test_data)
        metric_1 = linear_model.summary.rootMeanSquaredError
        metric_2 = linear_model.summary.meanAbsoluteError
        return metric_1, metric_2
    
    def crossval10(self):
        """Applies 10 fold cross validation testing"""
        metric_1_sum = 0
        metric_2_sum = 0
        for i in range(0, 10):
            train_data, test_data = self.df1.randomSplit([.9,.1],seed=i*1234)
            self.lr = LinearRegression(labelCol="label", maxIter=10)
            self.linear_model = self.lr.fit(train_data)
            self.linear_model.transform(test_data)
            metric_1 = self.linear_model.summary.rootMeanSquaredError
            metric_1_sum += metric_1
            metric_2 = self.linear_model.summary.meanAbsoluteError
            metric_2_sum += metric_2
            
            
        return metric_1_sum/10, metric_2_sum/10

In [7]:
lr = LinReg(X, y_reg_unscaled)

In [8]:
result = lr.crossval10()
print('10 fold cross validation')
print("Root mean square error: ", result[0])
print("Mean absolute error: ", result[1])

10 fold cross validation
Root mean square error:  0.5932805764764201
Mean absolute error:  0.37457785145920003


In [9]:
result = lr.split7030()
print('70 30 split')
print("Root mean square error: ", result[0])
print("Mean absolute error: ", result[1])

70 30 split
Root mean square error:  0.5761615380448505
Mean absolute error:  0.3785820167943481


In [10]:
class RandF:
    """A class that takes in a numpy matrix X and column vector y 
        and implements the random forest algorithm with a max depth of 2
        Allows you to apply 7030 split and 10 fold cross fold validation
        with metrics returned as a tuple as so:
        (f1 score, accuracy)"""
    def __init__(self, X, y):
        self.spark = SparkSession(sc)
        self.X = X
        self.y = y
        self.data = []
        for i, yi in enumerate(self.y):
            xi = self.X[i]
            xi_string = ""
            xi_string+=str(yi)
            for x in xi:
                xi_string+=','
                xi_string+=str(x)
            self.data.append(xi_string)

        self.rdd = sc.parallelize(self.data)
        self.rdd = self.rdd.map(lambda line: line.split(","))
        self.df = self.rdd.map(lambda line: Row(price=line[0],
                              bedrooms=line[1], 
                              bathrooms=line[2], 
                              sqft_living=line[3],
                              sqft_lot=line[4], 
                              floors=line[5], 
                              waterfront=line[6],
                              view=line[7], 
                              condition=line[8], 
                              grade=line[9],
                              sqft_above=line[10], 
                              sqft_basement=line[11], 
                              yr_built=line[12],
                              yr_renovated=line[13])).toDF()
        self.columns = ['price', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'yr_renovated']
        self.df = self.convertColumn(self.df, self.columns, FloatType())
        self.input_data = self.df.rdd.map(lambda x: (x[5], DenseVector(x[:5] + x[6:])))
        self.df1 = self.spark.createDataFrame(self.input_data, ["label", "features"])
        
    def convertColumn(self, df, names, newType):
        """Converts columns to floats"""
        for name in names: 
            df = df.withColumn(name, df[name].cast(newType))
        return df 
    
    def split7030(self):
        """Applies 70 30 split testing"""
        train_data, test_data = self.df1.randomSplit([.7,.3],seed=1234)
        rf = RandomForestClassifier(labelCol='label', featuresCol='features',numTrees=3)
        fit = rf.fit(train_data)
        predicted = fit.transform(test_data)
        predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
        labels = predicted.select("label").rdd.map(lambda x: x[0])
        prediction_and_label = predictions.zip(labels).collect()
       
        return self.metrics(prediction_and_label)
        
    
    def crossval10(self):
        """Applies 10 fold cross validation testing"""
        metric_1_sum = 0
        metric_2_sum = 0
        for i in range(0, 10):
            train_data, test_data = self.df1.randomSplit([.7,.3],seed=i*1234)

            rf = RandomForestClassifier(labelCol='label', featuresCol='features',numTrees=3)
            fit = rf.fit(train_data)
            predicted = fit.transform(test_data)

            predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
            labels = predicted.select("label").rdd.map(lambda x: x[0])
            prediction_and_label = predictions.zip(labels).collect()
            m = self.metrics(prediction_and_label)
            metric_1_sum += m[0]
            metric_2_sum += m[1]
            
        return metric_1_sum/10, metric_2_sum/10     
            
    def metrics(self, prediction_and_label):
        """returns f1 score and accuracy as tuple from an array of (predication, label) tuples"""
        TP = 0
        FP = 0
        TN = 0
        FN = 0
        for p, l in prediction_and_label:
            p_bool = 0
            if(p>0.5):
                p_bool = 1
            if(p_bool == 1 and l ==1):
                TP+=1
            if(p_bool == 1 and l ==0):
                FP+=1
            if(p_bool == 0 and l ==0):
                TN+=1
            if(p_bool == 0 and l ==1):
                FN+=1
                
        accuracy = float(TP+TN)/float(TP+FP+TN+FN)        
        recall = float(TP)/float(TP+FN)
        precision = float(TP)/float(TP+FP)
        f1 = 2*(recall*precision)/(recall + precision)
        return f1, accuracy   
        
        
        
        
        

In [11]:
rf = RandF(X,y_class)

In [16]:
result = rf.split7030()
print('70 30 split')
print('f1 score: ', result[0])
print('accuracy: ', result[1])

70 30 split
f1 score:  0.7487131492746841
accuracy:  0.8204613841524574


In [15]:
result = rf.crossval10()
print('10 fold cross validation')
print('f1 score: ', result[0])
print('accuracy: ', result[1])

10 fold cross validation
f1 score:  0.7225243060295502
accuracy:  0.8133795231525955


In [None]:
sc.stop()