In [4]:
import sklearn
import pandas as pd
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.ml import Pipeline
import os
from pyspark.ml.evaluation import RegressionEvaluator

# Import necessary dependencies

In [7]:
from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .appName("BA Model") \
    .getOrCreate()
#Create PySpark DataFrame from Pandas
# spark_df=spark.createDataFrame(df) 
spark_df = spark.read.format("csv").option('header',"true").load('/content/drive/My Drive/ba_data_final.csv')
# spark_df = spark.read.format("csv").option('header',"true").load('data/ba_data_final.csv')

spark_df.printSchema()
spark_df.show()

root
 |-- avg_rating: string (nullable = true)
 |-- num_review: string (nullable = true)
 |-- num_sold: string (nullable = true)
 |-- price: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- first_category: string (nullable = true)
 |-- second_category: string (nullable = true)
 |-- third_category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- shop_name: string (nullable = true)
 |-- shop_like_tier: string (nullable = true)
 |-- shop_num_review: string (nullable = true)
 |-- shop_reply_percentage: string (nullable = true)
 |-- shop_reply_time: string (nullable = true)
 |-- shop_creation_time: string (nullable = true)
 |-- shop_num_follower: string (nullable = true)
 |-- name_description: string (nullable = true)
 |-- augmented_description: string (nullable = true)

+----------+----------+--------+------+--------------------+-----+--------------------+-----------

In [61]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.feature import Imputer
from pyspark.sql.types import DoubleType


from pyspark.ml.regression import RandomForestRegressor

from pyspark.ml.regression import LinearRegression, LinearRegressionSummary

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql.functions import col
from pyspark.sql.functions import log
from math import log10

import pyspark.sql.functions as F
import numpy as np

from pyspark.ml.regression import GBTRegressor

def log_udf(s):
  return log(s,10)

class Model_Pipeline():
  def __init__(self,
               omit_fts = [],
               model_save_path = "checkpoint/model.model/",
               pipeline_save_path = 'checkpoint/pipeline.model/',
               string_indexer_save_path = 'checkpoint/str_idx.model/',
               model_name = 'random_forest', cross_validation = 0, text_process_method = 'tf-idf', min_doc_freq = 2, n_gram = 1,
              text_attr ='name_description',
              cat_attrs = [  'first_category', 'second_category', 'third_category', 'shop_like_tier'],
              num_attrs = ['avg_rating', 'num_review' , 'num_sold', 'shop_num_review', 'shop_reply_percentage', 'shop_creation_time', 'shop_num_follower'],
               omitted_fts = []):
    
    self.model_save_path = model_save_path
    self.pipeline_save_path = pipeline_save_path
    self.string_indexer_save_path = string_indexer_save_path
    self.model_name = model_name
    self.cross_validation = cross_validation
    
    self.text_process_method = text_process_method
    self.min_doc_freq = min_doc_freq
    self.n_gram = n_gram
        
    self.text_feature = text_attr
    self.cat_features = cat_attrs
    self.num_features = num_attrs

    self.pipeline_list = []
    self.pipeline = None

    self.model = None

    self.feature_columns = []
    self.cv = cross_validation

    self.omitted_fts = []

    self.indexer =   StringIndexer(inputCols=[feature for feature in self.cat_features], outputCols= [ (feature + '_numeric' )for feature in self.cat_features])

  
  def build_pipeline(self):

    # Process string attributes with TF-IDF or CountVectorizer 
    # for feature in self.text_features:
    tokenizer = Tokenizer(inputCol=self.text_feature, outputCol=self.text_feature + "_words")
    # wordsData = tokenizer.transform(self.df)
    if self.text_feature not in self.omitted_fts:
      if self.text_process_method == 'tf-idf':
        hashing_tf = HashingTF(inputCol=self.text_feature + "_words", outputCol=self.text_feature + "_rawFeatures",numFeatures=500)
        idf_transformer = IDF(inputCol=self.text_feature + "_rawFeatures", outputCol=self.text_feature + '_features',minDocFreq=self.min_doc_freq)
        text_transformer = Pipeline(stages=[tokenizer,hashing_tf,idf_transformer])
        self.pipeline_list.append(text_transformer)
        self.feature_columns.append(self.text_feature + '_features')
      
      elif self.text_process_method == 'count-vectorizer':
        cv = CountVectorizer(inputCol= self.text_feature + "_words", outputCol=self.text_feature + '_features')
        text_transformer = Pipeline(stages=[tokenizer, cv])
        self.pipeline_list.append(text_transformer)
        self.feature_columns.append(self.text_feature + '_features')    

    # # Process categorical variables: one hot encoding 
    # indexer = StringIndexer(inputCols=[feature for feature in self.cat_features], outputCols= [ (feature + '_numeric' )for feature in self.cat_features])
    oh_encoder = OneHotEncoder(inputCols = [(feature + '_numeric')for feature in self.cat_features], outputCols = [ (feature + '_onehot' )for feature in self.cat_features])
    categorical_transformer = Pipeline(stages=[oh_encoder])
    self.pipeline_list.append(categorical_transformer)

    for feature in self.cat_features:
      if feature not in self.omitted_fts:
        self.feature_columns.append(feature + '_onehot')

    #Process numeric attributes
    for feature in self.num_features:
      if feature not in self.omitted_fts:
        vectorAssembler = VectorAssembler(inputCols=[feature], outputCol=feature + '_unscaled')
        scaler_for_feature = MinMaxScaler(inputCol=feature + '_unscaled', outputCol=feature + '_features')
        numeric_transformer = Pipeline(stages = [vectorAssembler,scaler_for_feature])
        self.pipeline_list.append(numeric_transformer)
        self.feature_columns.append(feature + '_features')
    

    # Final feature assembler: assemble all chosen features
    feature_assembler = VectorAssembler(inputCols=self.feature_columns, outputCol= 'features')
    self.pipeline_list.append(feature_assembler)
    self.pipeline = Pipeline(stages = self.pipeline_list)
    

    return self.pipeline
  
  def process_input_df(self,df):
  
    processed_df = df
    processed_df = processed_df.fillna(0, subset=self.num_features)
    for feature in self.num_features:
      processed_df = processed_df.withColumn(feature, processed_df[feature].cast(DoubleType()))    
    
    processed_df = processed_df.withColumn('price', processed_df['price'].cast(DoubleType()))
  
    # processed_df = processed_df.withColumn("log_price", F.log10(F.col('price')))    # processed_df = processed_df.withColumn("log_price", log(df["price"]) )
    # processed_df.show()
    processed_df = processed_df.withColumn("log_price", F.log10(F.col('price')))    # processed_df = processed_df.withColumn("log_price", log(df["price"]) )

    return processed_df
  


  def fit_transform(self, train_data):
    df = self.process_input_df(train_data)
    # print(df.summary())
    # Process categorical variables: one hot encoding 
    # indexer = StringIndexer(inputCols=[feature for feature in self.cat_features], outputCols= [ (feature + '_numeric' )for feature in self.cat_features])
    self.indexer = self.indexer.setHandleInvalid("skip").fit(df)
    df = self.indexer.transform(df)
    # Build data processing pipeline
    self.build_pipeline()
    self.pipeline = self.pipeline.fit(df)
    transformed_data = self.pipeline.transform(df)
    transformed_data.select('features').show()
    # model.transform(df).show()

    # Create a model
    if self.model_name == 'random_forest':
      self.model = RandomForestRegressor(featuresCol="features", labelCol="log_price", predictionCol= 'log_prediction', numTrees=100)

    if self.model_name == 'lr':
      self.model =  LinearRegression(featuresCol="features", labelCol="log_price",predictionCol= 'log_prediction')
    
    elif self.model_name == 'gbt':
      self.model = GBTRegressor(featuresCol="features", labelCol='log_price', predictionCol='log_prediction')

    self.model  = self.model.fit(transformed_data)
    # result_df = self.model.transform(transformed_data).select(['price', 'log_prediction'])
    # result_df = result_df.withColumn("prediction", 10 ** F.col('log_prediction'))   
    # self.model.transform(transformed_data).select(['price', 'log_prediction']).show()

    if os.path.exists(self.model_save_path):
      self.model.write().overwrite().save(self.model_save_path)
    else:
      self.model.save(self.model_save_path)

    if os.path.exists(self.pipeline_save_path):
      self.pipeline.write().overwrite().save(self.pipeline_save_path)
    else:  
      self.pipeline.save(self.pipeline_save_path)

    if os.path.exists(self.string_indexer_save_path):
      self.indexer.write().overwrite().save(self.string_indexer_save_path)
    else:  
      self.indexer.save(self.string_indexer_save_path)


    # self.model.save(self.model_save_path)
    # self.pipeline.save(self.pipeline_save_path)

    return self.model
  
  def save(self,pipeline_path, model_path):
    self.pipeline.save(pipeline_path)
    self.model.save(model_path)

  def predict(self, input_data, load = True):
    if load:
      self.model.load(self.model_save_path)
      self.pipeline.load(self.pipeline_save_path)
      self.indexer.load(self.string_indexer_save_path)
    input_data = self.process_input_df(input_data)
    input_data = self.indexer.transform(input_data)
    transformed_data = self.pipeline.transform(input_data)

    result_df = self.model.transform(transformed_data).select(['price', 'log_prediction'])
    result_df = result_df.withColumn("prediction", 10 ** F.col('log_prediction'))  
    result_df.show() 
    
    return result_df


  
  def hyper_tuning(self,train_data):
    df = self.process_input_df(train_data)
    # print(df.summary())
    # Process categorical variables: one hot encoding 
    # indexer = StringIndexer(inputCols=[feature for feature in self.cat_features], outputCols= [ (feature + '_numeric' )for feature in self.cat_features])
    self.indexer = self.indexer.setHandleInvalid("skip").fit(df)
    df = self.indexer.transform(df)
    # Build data processing pipeline
    self.build_pipeline()
    self.pipeline = self.pipeline.fit(df)
    transformed_data = self.pipeline.transform(df)
    transformed_data.select('features').show()
    # model.transform(df).show()

    # Create a model
    if self.model_name == 'random_forest':
      self.model = RandomForestRegressor(featuresCol="features", labelCol="log_price", predictionCol= 'log_prediction')
      # Create ParamGrid for Cross Validation
      rfparamGrid = (ParamGridBuilder()
            #  .addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
               .addGrid(self.model.maxDepth, [2, 5, 10])
            #  .addGrid(.maxBins, [10, 20, 40, 80, 100])
             #.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
               .addGrid(self.model.numTrees, [5, 20, 50])

             .build())
      rfevaluator = RegressionEvaluator(predictionCol="log_prediction", labelCol="log_price", metricName="r2")
      # Create 5-fold CrossValidator
      rfcv = CrossValidator(estimator = self.model,
                            estimatorParamMaps = rfparamGrid,
                            evaluator = rfevaluator,
                            numFolds = self.cv)
      
      self.model  = rfcv.fit(transformed_data)
      if os.path.exists(self.model_save_path):
        self.model.write().overwrite().save(self.model_save_path)
      else:
        self.model.save(self.model_save_path)

      if os.path.exists(self.pipeline_save_path):
        self.pipeline.write().overwrite().save(self.pipeline_save_path)
      else:  
        self.pipeline.save(self.pipeline_save_path)

      if os.path.exists(self.string_indexer_save_path):
        self.indexer.write().overwrite().save(self.string_indexer_save_path)
      else:  
        self.indexer.save(self.string_indexer_save_path)


      # result_df = rfcv_model.transform(transformed_data).select(['price', 'log_prediction'])
      # result_df = result_df.withColumn("prediction", 10 ** F.col('log_prediction'))   
      # rfcv_model.transform(transformed_data).select(['price', 'log_prediction']).show()

      

    if self.model_name == 'lr':
      self.model =  LinearRegression(featuresCol="features", labelCol="log_price",predictionCol= 'log_prediction')
      
      lrevaluator = RegressionEvaluator(predictionCol="log_prediction", labelCol="log_price", metricName="r2")
      # Create 5-fold CrossValidator
      lrparamGrid = ParamGridBuilder()\
                  .addGrid(self.model.regParam, [0.1, 0.01]) \
                  .addGrid(self.model.elasticNetParam, [0.0, 0.5, 1.0])\
                  .build()

      lrcv = CrossValidator(estimator = self.model,
                            estimatorParamMaps = lrparamGrid,
                            evaluator = lrevaluator,
                            numFolds = self.cv)
      
      self.model  = lrcv.fit(transformed_data)

    if self.model_name == 'gbt':
      self.model = GBTRegressor(featuresCol="features", labelCol='log_price', predictionCol='log_prediction')
      
      gbtevaluator = RegressionEvaluator(predictionCol="log_prediction", labelCol="log_price", metricName="r2")
      # Create 5-fold CrossValidator
      gbtparamGrid = ParamGridBuilder()\
                  .addGrid(self.model.maxBins, [10, 20, 40]) \
                  .addGrid(self.model.maxDepth, [2, 5, 10])\
                  .build()

      gbtcv = CrossValidator(estimator = self.model,
                            estimatorParamMaps = gbtparamGrid,
                            evaluator = gbtevaluator,
                            numFolds = self.cv)
      
      self.model  = gbtcv.fit(transformed_data)

    # elif self.model_name == 'gbt':
    #   self.model = 
    return self.model

  def evaluate(self, result_df):
    rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
    rmse = rmse.evaluate(result_df)
    # mape = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mape")
    # mape = mape.evaluate(result_df)
    r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
    r2 = r2.evaluate(result_df)
  
  # def feature_selection_experiment()


In [62]:
full_df = spark_df
train_df,  test_df = full_df.randomSplit([0.7,0.3], seed=1000)

# Experiment 1: Random Forest

In [11]:
model_save_path = 'checkpoint/rf_cv_model.model/'
pipeline_save_path = 'checkpoint/rf_cv_pipeline.model/'
indexer_save_path = 'checkpoint/rf_cv_indexer.model/'

rf_pipeline = Model_Pipeline(text_process_method='tf-idf', model_name='random_forest', cross_validation=3,
                             model_save_path=model_save_path,pipeline_save_path=pipeline_save_path,
                             string_indexer_save_path=indexer_save_path)
rf_pipeline.hyper_tuning(train_df)
result_df = rf_pipeline.predict(test_df)

+--------------------+
|            features|
+--------------------+
|(735,[21,28,40,49...|
|(735,[12,21,22,41...|
|(735,[1,21,32,47,...|
|(735,[7,8,19,21,2...|
|(735,[0,4,6,19,32...|
|(735,[13,45,49,89...|
|(735,[0,4,7,28,38...|
|(735,[104,106,201...|
|(735,[40,64,105,1...|
|(735,[17,18,25,27...|
|(735,[0,15,19,32,...|
|(735,[19,87,129,1...|
|(735,[22,32,48,49...|
|(735,[19,32,42,49...|
|(735,[2,32,47,49,...|
|(735,[4,13,32,41,...|
|(735,[19,32,42,49...|
|(735,[21,106,149,...|
|(735,[0,32,49,86,...|
|(735,[7,13,21,25,...|
+--------------------+
only showing top 20 rows

+--------+-----------------+------------------+
|   price|   log_prediction|        prediction|
+--------+-----------------+------------------+
|100000.0|5.328461798522914|213040.31644543912|
| 35000.0|5.026329884551503|106250.23139703862|
|109000.0|5.052071998354094|112738.43410945764|
| 99000.0|5.180230641874978|151436.52727363378|
|653761.0|5.439827943116557| 275313.7761204475|
| 99000.0|5.016153777569873|103789.585

In [12]:
print(rf_pipeline.model.getEstimatorParamMaps()[np.argmax(rf_pipeline.model.avgMetrics)])

{Param(parent='RandomForestRegressor_a6f0dbaf56df', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10, Param(parent='RandomForestRegressor_a6f0dbaf56df', name='numTrees', doc='Number of trees to train (>= 1).'): 50}


In [13]:
rf_pipeline.model.avgMetrics

[0.15493541826173066,
 0.1605379324615471,
 0.161269316021341,
 0.29938608406887773,
 0.3092304607009145,
 0.3123334673339213,
 0.4267070896663678,
 0.45087903647696065,
 0.4548645861234332]

In [14]:
result_df = rf_pipeline.predict(test_df)

+--------+-----------------+------------------+
|   price|   log_prediction|        prediction|
+--------+-----------------+------------------+
|100000.0|5.328461798522914|213040.31644543912|
| 35000.0|5.026329884551503|106250.23139703862|
|109000.0|5.052071998354094|112738.43410945764|
| 99000.0|5.180230641874978|151436.52727363378|
|653761.0|5.439827943116557| 275313.7761204475|
| 99000.0|5.016153777569873|103789.58550911318|
|399961.0|5.438619874798029| 274549.0054345606|
|175000.0|5.077074072479916|119419.17662795319|
| 69000.0|5.016464397077551|103863.84526415577|
|117500.0|5.037412436447726|108996.47087868428|
| 89000.0| 5.01176243755105|102745.41190127475|
|302355.0|5.198012438780914|157765.64552653517|
|329000.0|5.261353170357041|182537.95062948746|
|145000.0|5.132453830433764| 135660.6302692646|
|  4900.0| 4.92535076852764|  84207.4989457817|
|109000.0|5.148531359266233|140776.88759930755|
|183027.0|5.177688506734359|150552.68551426506|
| 24000.0|4.581726573108475|38170.387873

In [15]:
def rmsle(result_df,labelCol="price", predictionCol="prediction"):
    import numpy as np
    from sklearn.metrics import mean_squared_log_error
    
    label = np.array(result_df.select(labelCol).collect())
    prediction = np.array(result_df.select(predictionCol).collect())
    
    return np.sqrt(np.square(np.log(label + 1) - np.log(prediction + 1)).mean())
rmsle = rmsle(result_df)
r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(result_df)

In [16]:
print(rmsle, r2)

0.7794082373546175 0.0871966678375441


# Experiment 2: Linear regression

In [20]:
# Linear regression
model_save_path = 'checkpoint/lr_cv_model.model/'
pipeline_save_path = 'checkpoint/lr_cv_pipeline.model/'
indexer_save_path = 'checkpoint/lr_cv_indexer.model/'

lr_pipeline = Model_Pipeline(text_process_method='tf-idf', model_name='lr', cross_validation=3
                              ,model_save_path=model_save_path,pipeline_save_path=pipeline_save_path,
                             string_indexer_save_path=indexer_save_path)
lr_pipeline.hyper_tuning(train_df)

+--------------------+
|            features|
+--------------------+
|(735,[21,28,40,49...|
|(735,[12,21,22,41...|
|(735,[1,21,32,47,...|
|(735,[7,8,19,21,2...|
|(735,[0,4,6,19,32...|
|(735,[13,45,49,89...|
|(735,[0,4,7,28,38...|
|(735,[104,106,201...|
|(735,[40,64,105,1...|
|(735,[17,18,25,27...|
|(735,[0,15,19,32,...|
|(735,[19,87,129,1...|
|(735,[22,32,48,49...|
|(735,[19,32,42,49...|
|(735,[2,32,47,49,...|
|(735,[4,13,32,41,...|
|(735,[19,32,42,49...|
|(735,[21,106,149,...|
|(735,[0,32,49,86,...|
|(735,[7,13,21,25,...|
+--------------------+
only showing top 20 rows



CrossValidatorModel_f24d1a536ee0

In [24]:
lr_result_df = lr_pipeline.predict(test_df, load = False)

+--------+------------------+------------------+
|   price|    log_prediction|        prediction|
+--------+------------------+------------------+
|100000.0|4.6229830733384025|41974.262417039936|
| 35000.0| 4.875467930392357| 75070.26179393864|
|109000.0|5.1160592066421255|130634.89683681735|
| 99000.0| 5.217941863342864| 165174.0673824245|
|653761.0| 5.141184908500536|138415.55820716493|
| 99000.0| 4.949022121224878| 88924.64113619637|
|399961.0| 5.221261860181377|166441.59163590154|
|175000.0| 5.186168189579721|153521.14102699482|
| 69000.0| 4.853667365659403| 71394.92893735235|
|117500.0| 5.024132873393433|105714.08944236723|
| 89000.0| 5.170999529422518|148251.64787722187|
|302355.0| 5.072150674342831|118073.02077226073|
|329000.0| 5.134968543452487|136448.43015322147|
|145000.0|5.1431788878213744|139052.52767903017|
|  4900.0|  4.67608507981077|47433.490002776525|
|109000.0| 5.097062370789736|125043.85978753652|
|183027.0| 4.811688324613572| 64816.91021399408|
| 24000.0|  4.405956

In [25]:
def rmsle(result_df,labelCol="price", predictionCol="prediction"):
    import numpy as np
    from sklearn.metrics import mean_squared_log_error
    
    label = np.array(result_df.select(labelCol).collect())
    prediction = np.array(result_df.select(predictionCol).collect())
    
    return np.sqrt(np.square(np.log(label + 1) - np.log(prediction + 1)).mean())
rmsle = rmsle(lr_result_df)
r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(lr_result_df)
print(r2, rmsle)

0.09690928592306225 0.7502347505839183


# Gradient Booosting Tree

In [63]:
# Linear regression
model_save_path = 'checkpoint/gbt_cv_model.model/'
pipeline_save_path = 'checkpoint/gbt_cv_pipeline.model/'
indexer_save_path = 'checkpoint/gbt_cv_indexer.model/'

lr_pipeline = Model_Pipeline(text_process_method='tf-idf', model_name='gbt', cross_validation=3
                              ,model_save_path=model_save_path,pipeline_save_path=pipeline_save_path,
                             string_indexer_save_path=indexer_save_path)
lr_pipeline.hyper_tuning(train_df)

+--------------------+
|            features|
+--------------------+
|(735,[21,28,40,49...|
|(735,[12,21,22,41...|
|(735,[1,21,32,47,...|
|(735,[7,8,19,21,2...|
|(735,[0,4,6,19,32...|
|(735,[13,45,49,89...|
|(735,[0,4,7,28,38...|
|(735,[104,106,201...|
|(735,[40,64,105,1...|
|(735,[17,18,25,27...|
|(735,[0,15,19,32,...|
|(735,[19,87,129,1...|
|(735,[22,32,48,49...|
|(735,[19,32,42,49...|
|(735,[2,32,47,49,...|
|(735,[4,13,32,41,...|
|(735,[19,32,42,49...|
|(735,[21,106,149,...|
|(735,[0,32,49,86,...|
|(735,[7,13,21,25,...|
+--------------------+
only showing top 20 rows



CrossValidatorModel_9392df4b89b9

In [65]:
gbt_result = lr_pipeline.predict(test_df, load = False)

+--------+------------------+------------------+
|   price|    log_prediction|        prediction|
+--------+------------------+------------------+
|100000.0|5.1763128824027715|150076.56552657625|
| 35000.0| 5.064149888669369|115917.73556467587|
|109000.0|4.9785920032907764| 95190.14827104405|
| 99000.0| 5.780277610120832|  602944.877901053|
|653761.0|5.6059296377386465|403580.00161861535|
| 99000.0|4.9856413140016835| 96747.84808561488|
|399961.0| 5.865742305383197| 734078.1635079588|
|175000.0| 5.011640745434248|102716.62600691781|
| 69000.0| 5.067729847935101|116877.21323436906|
|117500.0|4.9979977318558575| 99540.02187658547|
| 89000.0|4.9934755768461105| 98508.92433833734|
|302355.0| 5.262416638520931| 182985.4837637902|
|329000.0| 5.313996381086945|206061.27424107416|
|145000.0|5.1144257938652125|130144.49239702361|
|  4900.0| 4.193557626362892|15615.562265010314|
|109000.0| 5.126353005494265|133768.23767033993|
|183027.0| 4.983839441723878| 96347.27627666948|
| 24000.0|4.37712113

In [69]:
def rmsle(result_df,labelCol="price", predictionCol="prediction"):
    import numpy as np
    from sklearn.metrics import mean_squared_log_error
    
    label = np.array(result_df.select(labelCol).collect())
    prediction = np.array(result_df.select(predictionCol).collect())
    
    return np.sqrt(np.square(np.log(label + 1) - np.log(prediction + 1)).mean())
rmsle = rmsle(gbt_result)
r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(gbt_result)
print(r2, rmsle)

-1.5078826545942476 0.7624000046505081


# Ex 3:
Text data 

In [29]:
# Linear regression
model_save_path = 'checkpoint/lr_cv_model.model/'
pipeline_save_path = 'checkpoint/lr_cv_pipeline.model/'
indexer_save_path = 'checkpoint/lr_cv_indexer.model/'

lr_augmented_text_pipeline = Model_Pipeline(text_process_method='tf-idf', model_name='lr', cross_validation=3,
                                           text_attr ='augmented_description',
       model_save_path=model_save_path,pipeline_save_path=pipeline_save_path,
                             string_indexer_save_path=indexer_save_path)
lr_augmented_text_pipeline.hyper_tuning(train_df)

+--------------------+
|            features|
+--------------------+
|(735,[21,28,40,49...|
|(735,[12,21,22,41...|
|(735,[1,21,32,47,...|
|(735,[7,8,19,21,2...|
|(735,[0,4,6,19,32...|
|(735,[13,45,49,89...|
|(735,[0,4,7,28,38...|
|(735,[49,104,106,...|
|(735,[40,64,105,1...|
|(735,[17,18,25,27...|
|(735,[0,15,19,32,...|
|(735,[19,87,129,1...|
|(735,[22,32,48,49...|
|(735,[19,32,42,49...|
|(735,[2,32,47,49,...|
|(735,[4,13,32,41,...|
|(735,[19,32,42,49...|
|(735,[21,49,106,1...|
|(735,[0,32,49,86,...|
|(735,[7,13,21,25,...|
+--------------------+
only showing top 20 rows



CrossValidatorModel_ea40668a19ab

In [31]:
augmented_text_result_df = lr_augmented_text_pipeline.predict(test_df, load = False)

+--------+------------------+------------------+
|   price|    log_prediction|        prediction|
+--------+------------------+------------------+
|100000.0| 4.621127320154411|41795.287804780295|
| 35000.0|  4.86328304266858| 72993.30744260924|
|109000.0| 5.098915194109758|125578.47191793415|
| 99000.0|5.2343235178025616|171523.45567153592|
|653761.0| 5.140403780512051| 138166.8258469042|
| 99000.0| 4.945686691229401|  88244.3058567235|
|399961.0| 5.222292204860905|166836.93582148192|
|175000.0| 5.193098269198628| 155990.5427676853|
| 69000.0| 4.875222151949121| 75027.78961669351|
|117500.0| 5.034856163736559| 108356.7982032637|
| 89000.0| 5.183584936993292|  152610.683343985|
|302355.0| 5.085083935421073|121642.10737441931|
|329000.0| 5.126298852696104|133751.55895792725|
|145000.0| 5.146996723796223|140280.31221428525|
|  4900.0| 4.684329069679172| 48342.49596809935|
|109000.0| 5.093632952080223|124060.33572121356|
|183027.0|4.8078255030296715| 64242.95410398304|
| 24000.0| 4.3981345

In [32]:
def rmsle(result_df,labelCol="price", predictionCol="prediction"):
    import numpy as np
    from sklearn.metrics import mean_squared_log_error
    
    label = np.array(result_df.select(labelCol).collect())
    prediction = np.array(result_df.select(predictionCol).collect())
    
    return np.sqrt(np.square(np.log(label + 1) - np.log(prediction + 1)).mean())
rmsle = rmsle(lr_result_df)
r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(augmented_text_result_df)
print(r2, rmsle)

0.10053909542602568 0.7502347505839183


# Count vectorizer experiment

In [47]:
# Get rid of numeric feautures with high correlation with the price
omitted_fts = ['shop_like_tier', 'shop_review', 'shop_sold']
# Linear regression
model_save_path = 'checkpoint/lr_cv_model.model/'
pipeline_save_path = 'checkpoint/lr_cv_pipeline.model/'
indexer_save_path = 'checkpoint/lr_cv_indexer.model/'

lr_count_vec_pipeline = Model_Pipeline(text_process_method='count-vectorizer', model_name='lr', cross_validation=3,
                                           text_attr ='augmented_description',
       model_save_path=model_save_path,pipeline_save_path=pipeline_save_path,
                             string_indexer_save_path=indexer_save_path,
                             omitted_fts = omitted_fts)
lr_count_vec_pipeline.hyper_tuning(train_df)

+--------------------+
|            features|
+--------------------+
|(11489,[0,1,4,5,7...|
|(11489,[0,6,10,14...|
|(11489,[0,4,5,7,9...|
|(11489,[0,2,3,4,5...|
|(11489,[0,1,3,4,5...|
|(11489,[0,1,2,4,5...|
|(11489,[0,1,2,4,5...|
|(11489,[0,1,2,11,...|
|(11489,[0,2,17,22...|
|(11489,[0,38,157,...|
|(11489,[0,1,2,6,1...|
|(11489,[0,4,5,7,9...|
|(11489,[0,1,3,4,5...|
|(11489,[0,1,3,4,5...|
|(11489,[0,1,3,4,5...|
|(11489,[0,3,4,5,7...|
|(11489,[0,1,3,4,5...|
|(11489,[0,1,2,10,...|
|(11489,[0,1,6,11,...|
|(11489,[0,2,4,5,6...|
+--------------------+
only showing top 20 rows



CrossValidatorModel_cf67bbfb9214

In [52]:
count_vec_result = lr_count_vec_pipeline.predict(test_df, load = False)

+--------+------------------+------------------+
|   price|    log_prediction|        prediction|
+--------+------------------+------------------+
|100000.0| 4.771991358296545| 59154.98632468295|
| 35000.0| 4.831829781040821|  67893.7475237261|
|109000.0|5.0373326154885785| 108976.4397621591|
| 99000.0| 5.221789881550247|  166644.076742473|
|653761.0| 5.237655229935777|172844.36679667258|
| 99000.0| 4.864929118493908|   73270.493809425|
|399961.0| 5.279662289079756|190397.95921586052|
|175000.0| 5.359288622980065| 228711.8269123763|
| 69000.0| 4.859754941175453| 72402.72987760484|
|117500.0| 4.920913243293499| 83351.46611819437|
| 89000.0|  5.09405065303879|124179.71335871877|
|302355.0| 5.001783157636082|100411.43128326368|
|329000.0| 5.177687202258534|150552.23330487282|
|145000.0| 5.118538161835456|131382.69370084992|
|  4900.0| 4.389012723528192| 24491.34993117126|
|109000.0|  5.04805140278893| 111699.5446751105|
|183027.0|   4.8221813760605| 66402.03298437702|
| 24000.0| 4.3255430

In [55]:
def rmsle(result_df,labelCol="price", predictionCol="prediction"):
    import numpy as np
    from sklearn.metrics import mean_squared_log_error
    
    label = np.array(result_df.select(labelCol).collect())
    prediction = np.array(result_df.select(predictionCol).collect())
    
    return np.sqrt(np.square(np.log(label + 1) - np.log(prediction + 1)).mean())
rmsle = rmsle(count_vec_result)
r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
r2 = r2.evaluate(count_vec_result)
print(r2, rmsle)

0.22148892396290387 0.7098799752478321
