In [3]:
#Tạo một đối tượng SparkSession mới để sử dụng Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Linear Regression').getOrCreate()

In [4]:
#Tải thư viện
from pyspark.ml.regression import LinearRegression

In [14]:
#Tải dữ liệu
df=spark.read.csv('C:\house_data.csv',inferSchema=True,header=True)

In [15]:
#Khai thác và phân tích dữ liệu
#Kiểm tra kích thước dữ liệu
print((df.count(), len(df.columns)))

(21613, 21)


In [16]:
#Xem kiểu dữ liệu
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)



In [17]:
#Xem thống kê dữ liệu
df.describe().show(5,False)

df.head(5)

+-------+--------------------+---------------+------------------+-----------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-----------------+------------------+
|summary|id                  |date           |price             |bedrooms         |bathrooms         |sqft_living       |sqft_lot          |floors            |waterfront          |view               |condition         |grade             |sqft_above        |sqft_basement     |yr_built          |yr_renovated      |zipcode           |lat                |long               |sqft_living15    |sqft_lot15        |
+-------+--------------------+---------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------

[Row(id=7129300520, date='20141013T000000', price=221900.0, bedrooms=3, bathrooms=1.0, sqft_living=1180, sqft_lot=5650, floors=1.0, waterfront=0, view=0, condition=3, grade=7, sqft_above=1180, sqft_basement=0, yr_built=1955, yr_renovated=0, zipcode=98178, lat=47.5112, long=-122.257, sqft_living15=1340, sqft_lot15=5650),
 Row(id=6414100192, date='20141209T000000', price=538000.0, bedrooms=3, bathrooms=2.25, sqft_living=2570, sqft_lot=7242, floors=2.0, waterfront=0, view=0, condition=3, grade=7, sqft_above=2170, sqft_basement=400, yr_built=1951, yr_renovated=1991, zipcode=98125, lat=47.721, long=-122.319, sqft_living15=1690, sqft_lot15=7639),
 Row(id=5631500400, date='20150225T000000', price=180000.0, bedrooms=2, bathrooms=1.0, sqft_living=770, sqft_lot=10000, floors=1.0, waterfront=0, view=0, condition=3, grade=6, sqft_above=770, sqft_basement=0, yr_built=1933, yr_renovated=0, zipcode=98028, lat=47.7379, long=-122.233, sqft_living15=2720, sqft_lot15=8062),
 Row(id=2487200875, date='2014

In [22]:
#Xem mối tương quan giữa các biến đầu vào và đầu ra
from pyspark.sql.functions import corr

df.select(corr('id','sqft_lot15')).show()

+--------------------+
|corr(id, sqft_lot15)|
+--------------------+
|-0.13879786589346885|
+--------------------+



In [23]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [24]:
#Xem các biến đầu vào và đầu ra
df.columns

['id',
 'date',
 'price',
 'bedrooms',
 'bathrooms',
 'sqft_living',
 'sqft_lot',
 'floors',
 'waterfront',
 'view',
 'condition',
 'grade',
 'sqft_above',
 'sqft_basement',
 'yr_built',
 'yr_renovated',
 'zipcode',
 'lat',
 'long',
 'sqft_living15',
 'sqft_lot15']

In [81]:
#Tạo véc tơ assembler kết hợp 5 biến đầu vào gọi là feature
vec_assmebler = VectorAssembler(inputCols=['id', 'grade', 'sqft_above', 'sqft_basement', 
'yr_built'], outputCol='features')

In [82]:
# biến đổi giá trị trong vector
features_df = vec_assmebler.transform(df)

In [83]:
#Xem lại véc tơ đặc trưng
features_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- sqft_living: integer (nullable = true)
 |-- sqft_lot: integer (nullable = true)
 |-- floors: double (nullable = true)
 |-- waterfront: integer (nullable = true)
 |-- view: integer (nullable = true)
 |-- condition: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- sqft_above: integer (nullable = true)
 |-- sqft_basement: integer (nullable = true)
 |-- yr_built: integer (nullable = true)
 |-- yr_renovated: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- sqft_living15: integer (nullable = true)
 |-- sqft_lot15: integer (nullable = true)
 |-- features: vector (nullable = true)



In [98]:
#Bổ sung thêm cột đặc trưng
# xem véc tơ đặc trưng
features_df.select('features').show(5,False)

+---------------------------------------+
|features                               |
+---------------------------------------+
|[7.12930052E9,7.0,1180.0,0.0,1955.0]   |
|[6.414100192E9,7.0,2170.0,400.0,1951.0]|
|[5.6315004E9,6.0,770.0,0.0,1933.0]     |
|[2.487200875E9,7.0,1050.0,910.0,1965.0]|
|[1.95440051E9,8.0,1680.0,0.0,1987.0]   |
+---------------------------------------+
only showing top 5 rows



In [85]:
#Tạo dữ liệu huấn luyện: gồm cột đặc trưng (features) và đầu ra (ouput)
data_model = features_df.select('features', 'long')

In [86]:
#Xem dữ liệu huấn luyện
data_model.show(5,False)

+---------------------------------------+--------+
|features                               |long    |
+---------------------------------------+--------+
|[7.12930052E9,7.0,1180.0,0.0,1955.0]   |-122.257|
|[6.414100192E9,7.0,2170.0,400.0,1951.0]|-122.319|
|[5.6315004E9,6.0,770.0,0.0,1933.0]     |-122.233|
|[2.487200875E9,7.0,1050.0,910.0,1965.0]|-122.393|
|[1.95440051E9,8.0,1680.0,0.0,1987.0]   |-122.045|
+---------------------------------------+--------+
only showing top 5 rows



In [87]:
#Xem kích cỡ dữ liệu huấn luyện
print((data_model.count(), len(data_model.columns)))

(21613, 2)


In [88]:
#Bước 5: Chia tập dữ liệu huấn luyện và kiểm tra
#Thông thường chia theo tỉ lệ 70-30 hoặc 20-80
train_df, test_df = data_model.randomSplit([0.7,0.3])


In [89]:
#xem kích cỡ của tập huấn luyện
print((train_df.count(), len(train_df.columns)))

(15185, 2)


In [90]:
#xem kích cỡ của tập kiểm tra
print((test_df.count(), len(test_df.columns)))

(6428, 2)


In [91]:
#Bước 6: Xây dựng mô hình hồi quy tuyến tính
#6.1 Tải thư viện học máy
from pyspark.ml.regression import LinearRegression

In [96]:
#6.2 Khởi tạo mô hình
model = LinearRegression(labelCol='long')

In [97]:
#6.3 Huấn luyện mô hình
model_lr =  model.fit(train_df)

In [99]:
#6.4 lỖI bình phương tối thiểu trên dữ liệu huấn luyện
train_predict = model_lr.evaluate(train_df)

In [108]:
#Lỗi bình phương tối thiểu
train_predict.meanSquaredError

0.015323775961366169

In [102]:
#Lỗi R2 trên dữ liệu huấn luyện
print(train_predict.r2)

0.2304687492066818


In [104]:
# Bước 7: Đánh giá mô hình trên dữ liệu kiểm tra
# 7.1 Đánh giá dữ liệu kiểm tra
test_results = model_lr.evaluate(train_df)

In [106]:
#7.2 In kết quả đánh giá
print(test_results.r2)

0.2304687492066818


In [109]:
# lỗi bình phương tối thiểu
test_results.meanSquaredError

0.015323775961366169