In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('cau3').getOrCreate()

In [None]:
df = spark.read.csv('cubic_zirconia.csv',inferSchema=True,header=True)

### Explore data

In [None]:
df.count()

26967

In [None]:
df = df.withColumn("price", df["price"].cast('float'))

In [None]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- price: float (nullable = true)



In [None]:
df.show(5)

+---+-----+---------+-----+-------+-----+-----+----+----+----+------+
|_c0|carat|      cut|color|clarity|depth|table|   x|   y|   z| price|
+---+-----+---------+-----+-------+-----+-----+----+----+----+------+
|  1|  0.3|    Ideal|    E|    SI1| 62.1| 58.0|4.27|4.29|2.66| 499.0|
|  2| 0.33|  Premium|    G|     IF| 60.8| 58.0|4.42|4.46| 2.7| 984.0|
|  3|  0.9|Very Good|    E|   VVS2| 62.2| 60.0|6.04|6.12|3.78|6289.0|
|  4| 0.42|    Ideal|    F|    VS1| 61.6| 56.0|4.82| 4.8|2.96|1082.0|
|  5| 0.31|    Ideal|    F|   VVS1| 60.4| 59.0|4.35|4.43|2.65| 779.0|
+---+-----+---------+-----+-------+-----+-----+----+----+----+------+
only showing top 5 rows



In [None]:
df.describe().show()

+-------+-----------------+-------------------+---------+-----+-------+------------------+------------------+------------------+-----------------+------------------+------------------+
|summary|              _c0|              carat|      cut|color|clarity|             depth|             table|                 x|                y|                 z|             price|
+-------+-----------------+-------------------+---------+-----+-------+------------------+------------------+------------------+-----------------+------------------+------------------+
|  count|            26967|              26967|    26967|26967|  26967|             26270|             26967|             26967|            26967|             26967|             26967|
|   mean|          13484.0| 0.7983754218118336|     null| null|   null|61.745146555006194| 57.45607965290908| 5.729853524678309|5.733568806318799|3.5380572551637184|3939.5181147328217|
| stddev|7784.846690847547|0.47774547354501784|     null| null|   null|1.41

### Preprocessing

In [None]:
# bỏ cột stt không cần
df= df.select('carat','cut','color','clarity','depth','table','x','y','z','price')

In [None]:
# check null, nan
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
carat,0
cut,0
color,0
clarity,0
depth,0
table,0
x,0
y,0
z,0
price,0


In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
carat,0
cut,0
color,0
clarity,0
depth,697
table,0
x,0
y,0
z,0
price,0


In [None]:
# số lương dòng chưa null chiếm khoảng 2.6% không đáng kể nên ta sẽ drop các dòng chứa Null
df = df.dropna(how="any", subset=["depth"])

In [None]:
# đã drop
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
carat,0
cut,0
color,0
clarity,0
depth,0
table,0
x,0
y,0
z,0
price,0


In [None]:
#drop duplicate
num_dist_rows = df.distinct().count()

In [None]:
num_dist_rows

26236

In [None]:
df.count()

26270

In [None]:
df = df.drop_duplicates()

In [None]:
df.count()

26236

In [None]:
# train test split

### pipeline

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

In [None]:
def min_max_scaler(df, cols_to_scale, prefix= 'mm_'):
    for col in cols_to_scale:
        max_values = df.agg({col: 'max'}).collect()[0][0]
        min_values = df.agg({col: 'min'}).collect()[0][0]
        new_column_name = prefix + col
        df = df.withColumn(new_column_name, 
                      (df[col] - min_values) / (max_values - min_values))
    return df
df = min_max_scaler(df, cols_to_scale=['depth','table','x','y','z'])

In [None]:
indexer1 = StringIndexer(inputCol='cut', outputCol='cut_idx')
encoder1 = OneHotEncoder(inputCol='cut_idx',outputCol='cut_dummy')

In [None]:
indexer2 = StringIndexer(inputCol='color', outputCol='color_idx')
encoder2 = OneHotEncoder(inputCol='color_idx',outputCol='color_dummy')

In [None]:
indexer3 = StringIndexer(inputCol='clarity', outputCol='clarity_idx')
encoder3 = OneHotEncoder(inputCol='clarity_idx',outputCol='clarity_dummy')

In [None]:
assembler = VectorAssembler(inputCols=[
    'carat','cut_dummy','color_dummy','clarity_dummy','mm_depth','mm_table','mm_x','mm_y','mm_z'
    ],outputCol='features')

In [None]:
train_df, test_df = df.randomSplit([0.7, 0.3])

In [None]:
# linear regression
ln_reg_diamond = LinearRegression(featuresCol='features', labelCol='price',predictionCol='prediction')

In [None]:
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, encoder1, encoder2, encoder3, assembler, ln_reg_diamond])
fit_model = pipeline.fit(train_df)
results = fit_model.transform(test_df)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator()

In [None]:
results = results.withColumnRenamed('price', 'label')

In [None]:
evaluator.evaluate(results, {evaluator.metricName: "rmse"})

1126.6100803256654

rmse nằm khoảng 10% đến 20% giá trị trung bình đầu ra

In [None]:
df_given_stone = spark.createDataFrame([
    (1.5,'Fair','G','VS2',66.2,54,7.2,7.1,4.7)
], schema='carat float,cut string,color string,clarity string,mm_depth float,mm_table int,mm_x float,mm_y float,mm_z float')

In [None]:
predict = fit_model.transform(df_given_stone)

In [None]:
predict.select('prediction').show()

+-------------------+
|         prediction|
+-------------------+
|-186322.01785834646|
+-------------------+



logistic regression dự đoán giá của viên này là 186322

In [None]:
from pyspark.ml.regression import RandomForestRegressor
random_forest_regressor = RandomForestRegressor(featuresCol='features', labelCol='price')

In [None]:
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, encoder1, encoder2, encoder3, assembler, random_forest_regressor ])
fit_model = pipeline.fit(train_df)
results = fit_model.transform(test_df)

In [None]:
predict = fit_model.transform(df_given_stone)

In [None]:
predict.select('prediction').show()

+-----------------+
|       prediction|
+-----------------+
|12361.00946305898|
+-----------------+



In [None]:
results = results.withColumnRenamed('price', 'label')

In [None]:
evaluator.evaluate(results, {evaluator.metricName: "rmse"})

1222.3572655913488

rmse nằm khoảng 10% đến 20% giá trị trung bình đầu ra