# 1 часть

In [139]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()
filename_data = 'mycsv.csv'
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
csv = csv.withColumn('mileage', csv.mileage.cast(IntegerType()))
csv.show(10)

+---+------+------------------+-------------+--------+--------+------+-------+------------+-----+-------+-----------+
|_c0| brand|              name|     bodyType|   color|fuelType|  year|mileage|transmission|power|  price|   location|
+---+------+------------------+-------------+--------+--------+------+-------+------------+-----+-------+-----------+
|  0|Nissan|              Note|Хэтчбек 5 дв.|   Серый|  Бензин|2016.0|  58000|    Вариатор| 79.0| 850000|      Артём|
|  1|Toyota|           Harrier|   Джип 5 дв.|   Белый|  Бензин|2017.0|  20000|        АКПП|231.0|3300000|Владивосток|
|  2|Nissan|              Juke|   Джип 5 дв.|Бордовый|  Бензин|2010.0|  40000|    Вариатор|114.0| 795000|  Уссурийск|
|  3|Nissan|           Liberty|      Минивэн|   Белый|  Бензин|2002.0| 140000|    Вариатор|147.0| 235000|  Лучегорск|
|  4|Toyota|Land Cruiser Prado|   Джип 5 дв.|   Белый|  Дизель|2017.0|  41000|        АКПП|177.0|3450000|Владивосток|
|  5|Nissan|             NV200|      Минивэн|   Белый|  

In [66]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("price", "truePrice")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 608541  Testing Rows: 260160


In [67]:
strIdx = StringIndexer(inputCols = ['brand', 'name', 'fuelType','transmission', 'color', 'bodyType', 'location'], 
                       outputCols = ['brand_index', 'name_index', 'fuelType_index', 'transmission_index', 'color_index', 'bodyType_index', 'location_index'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['brand_index', 'name_index', 'fuelType_index', 'transmission_index', 'color_index', 'bodyType_index', 'location_index'],
                          outputCol="features_cat")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), 
                       outputCol = "features_index", 
                       handleInvalid = "keep")
numVect = VectorAssembler(inputCols = ["mileage",'power', 'year'], 
                          outputCol="features_num", 
                          handleInvalid = "keep")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), 
                      outputCol="features_norm")
featVect = VectorAssembler(inputCols=["features_index", "features_norm"], 
                           outputCol="features", 
                           handleInvalid = "keep")
rfr = RandomForestRegressor(featuresCol = 'features', 
                      labelCol='price',
                      numTrees = 10,
                      maxDepth=2,
                      maxBins = 181834)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, rfr])

In [68]:
pipelineModel = pipeline.fit(train)

In [69]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "truePrice").show()

+--------------------+------------------+---------+
|            features|        prediction|truePrice|
+--------------------+------------------+---------+
|[2.0,6.0,0.0,0.0,...| 802180.7706852386|   850000|
|(10,[1,3,7,8,9],[...| 2332183.433996185|  3300000|
|[2.0,30.0,0.0,0.0...| 797034.0150764396|   795000|
|[1.0,5.0,0.0,2.0,...| 1252915.561372306|  1190000|
|[0.0,2.0,0.0,0.0,...| 2332183.433996185|  2145000|
|[1.0,5.0,0.0,2.0,...| 1252915.561372306|  1320000|
|(10,[1,7,8,9],[2....| 2332183.433996185|  2475000|
|[6.0,68.0,0.0,1.0...| 2332183.433996185|  3400000|
|[0.0,23.0,0.0,0.0...| 802180.7706852386|   915000|
|[7.0,32.0,0.0,3.0...| 772155.3779051815|   960000|
|[1.0,27.0,0.0,1.0...| 745705.2556317279|   225000|
|(10,[1,7,8,9],[2....| 2332183.433996185|  3300000|
|[5.0,262.0,0.0,1....| 567437.0704955963|   120000|
|[5.0,40.0,0.0,1.0...| 716864.6698205115|   695000|
|[0.0,0.0,0.0,0.0,...| 827263.4592175775|   800000|
|(10,[0,1,7,8,9],[...|1447165.5175160302|  1620000|
|[0.0,57.0,1

In [70]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="truePrice", metricName="rmse")

In [71]:

# RMSE
rmse = regressionEvaluator.evaluate(pred_df)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(pred_df)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(pred_df)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(pred_df)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 431613.85
The MSE for the random forest regression model is 186290516643.19
The R2 for the random forest regression model is 0.74
The MAE for the random forest regression model is 311640.15


In [72]:
param_grid = ParamGridBuilder().\
    addGrid(rfr.numTrees, [10, 15, 20]).\
    addGrid(rfr.maxDepth, [1, 2, 4]).\
    addGrid(rfr.maxBins , [181834, 362432, 724864]).\
    build()

In [73]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=param_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="price", \
                                metricName="rmse"), \
                    numFolds=2)

In [74]:
cv_model = cv.fit(train)

In [75]:
newPrediction = cv_model.transform(test)

In [76]:
# RMSE
rmse = regressionEvaluator.evaluate(newPrediction)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(newPrediction)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(newPrediction)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(newPrediction)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 187563.54
The MSE for the random forest regression model is 72318029546.16
The R2 for the random forest regression model is 0.90
The MAE for the random forest regression model is 187563.54


# 2 часть

In [140]:
csv = csv.drop(csv._c0).withColumn('label', when(col('mileage') >= 129804.4, 1).otherwise(0))
csv = csv.drop(csv.mileage)
csv.show()

+-------+------------------+-------------+-----------+--------+------+------------+-----+-------+-------------+-----+
|  brand|              name|     bodyType|      color|fuelType|  year|transmission|power|  price|     location|label|
+-------+------------------+-------------+-----------+--------+------+------------+-----+-------+-------------+-----+
| Nissan|              Note|Хэтчбек 5 дв.|      Серый|  Бензин|2016.0|    Вариатор| 79.0| 850000|        Артём|    0|
| Toyota|           Harrier|   Джип 5 дв.|      Белый|  Бензин|2017.0|        АКПП|231.0|3300000|  Владивосток|    0|
| Nissan|              Juke|   Джип 5 дв.|   Бордовый|  Бензин|2010.0|    Вариатор|114.0| 795000|    Уссурийск|    0|
| Nissan|           Liberty|      Минивэн|      Белый|  Бензин|2002.0|    Вариатор|147.0| 235000|    Лучегорск|    1|
| Toyota|Land Cruiser Prado|   Джип 5 дв.|      Белый|  Дизель|2017.0|        АКПП|177.0|3450000|  Владивосток|    0|
| Nissan|             NV200|      Минивэн|      Белый|  

In [141]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
print("Training Rows:", train.count(), " Testing Rows:", test.count())

Training Rows: 608479  Testing Rows: 260222


In [142]:
strIdx = StringIndexer(inputCols = ['brand', 'name', 'fuelType','transmission', 'color', 'bodyType', 'location'], 
                       outputCols = ['brand_index', 'name_index', 'fuelType_index', 'transmission_index', 'color_index', 'bodyType_index', 'location_index'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['brand_index', 'name_index', 'fuelType_index', 'transmission_index', 'color_index', 'bodyType_index', 'location_index'],
                          outputCol="features_cat")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), 
                       outputCol = "features_index", 
                       handleInvalid = "keep")
numVect = VectorAssembler(inputCols = ['power', 'year'], 
                          outputCol="features_num", 
                          handleInvalid = "keep")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), 
                      outputCol="features_norm")
featVect = VectorAssembler(inputCols=["features_index", "features_norm"], 
                           outputCol="features", 
                           handleInvalid = "keep")
lr = LogisticRegression(labelCol="label", 
                        featuresCol="features", 
                        maxIter=10,
                        regParam=0.3)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [143]:
pipelineModel = pipeline.fit(train)

In [144]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueLabel").show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[35.0,464.0,0.0,1...|       1.0|        0|
|[18.0,224.0,0.0,2...|       0.0|        0|
|[18.0,555.0,0.0,0...|       0.0|        1|
|[18.0,555.0,0.0,0...|       0.0|        1|
|[18.0,555.0,0.0,0...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[18.0,218.0,1.0,2...|       0.0|        1|
|[10.0,615.0,0.0,1...|       0.0|        0|
|[10.0,615.0,0.0,1...|       0.0|        0|
|[10.0,615.0,0.0,1...|       0.0|        0|
|[10.0,615.0,0.0,1...|       0.0|        0|
|[10.0,98.0,0.0,1....|       1.0|        1|
|[10.0,98.0,0.0,1....|       1.0|        1|
|[10.0,98.0,0.0,1....|       1.0

In [145]:
tp = float(pred_df.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(pred_df.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(pred_df.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(pred_df.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           68876.0|
|       FP|           14652.0|
|       TN|          123929.0|
|       FN|           52765.0|
|Precision|0.8245857676467772|
|   Recall|0.5662235594906323|
|       F1|0.6714074738386403|
+---------+------------------+



In [146]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(pred_df)
print ("AUR = ", aur)

AUR =  0.8459216213225724


In [147]:
paramGrid = ParamGridBuilder().\
    addGrid(lr.maxIter, [30, 40, 60]).\
    addGrid(lr.regParam, [0.6, 0.8, 0.9]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), estimatorParamMaps=paramGrid, 
                    numFolds=2)

In [148]:
cv_model = cv.fit(train)

In [149]:
newPrediction = cv_model.transform(test)

In [150]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           62185.0|
|       FP|           12443.0|
|       TN|          126138.0|
|       FN|           59456.0|
|Precision|0.8332663343517178|
|   Recall|0.5112174349109264|
|       F1|0.6336711350238704|
+---------+------------------+



In [151]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator2.evaluate(newPrediction)
print( "AUR2 = ", aur2)

AUR2 =  0.7107143957230468
