Setting up a PySpark session

In [None]:
!pip install pyspark

In [None]:
# Import các thư viện cần thiết
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, month, to_date
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql import Row


Đọc dữ liệu

In [None]:
# Phần 1: Khởi tạo SparkSession
spark = SparkSession.builder.appName('ZillowPrediction').getOrCreate()

In [None]:
# Phần 2: Đọc các tệp CSV
pro_2016 = spark.read.csv('properties_2016.csv', header=True, inferSchema=True)
pro_2017 = spark.read.csv('properties_2017.csv', header=True, inferSchema=True)
sample_submiss = spark.read.csv('sample_submission.csv', header=True, inferSchema=True)
train_2016 = spark.read.csv('train_2016_v2.csv', header=True, inferSchema=True)
train_2017 = spark.read.csv('train_2017.csv', header=True, inferSchema=True)

Đảm bảo

In [None]:
# Phần 3: Đảm bảo rằng các cột trong pro_2016 và pro_2017 có cùng schema
for col_name in pro_2016.columns:
    if col_name not in pro_2017.columns:
        pro_2017 = pro_2017.withColumn(col_name, lit(None).cast(pro_2016.schema[col_name].dataType))

for col_name in pro_2017.columns:
    if col_name not in pro_2016.columns:
        pro_2016 = pro_2016.withColumn(col_name, lit(None).cast(pro_2017.schema[col_name].dataType))

Nối

In [None]:
# Phần 4: Nối các dòng của hai bộ dữ liệu
data2016 = train_2016.join(pro_2016, on='parcelid', how='left')
data2017 = train_2017.join(pro_2017, on='parcelid', how='left')
data = data2016.union(data2017)

Future engineering

In [None]:
# Phần 5: Khởi tạo danh sách để lưu trữ số lượng giá trị null trong mỗi cột
null_counts = []
for column_name in data.columns:
    null_count = data.filter(col(column_name).isNull()).count()
    null_counts.append((column_name, null_count))

In [None]:
# Phần 6: Xoá các cột có số lượng null lớn hơn một nửa số lượng dòng trong bộ dữ liệu
total_rows = data.count()
threshold = total_rows / 2
columns_to_drop = [col_name for col_name, null_count in null_counts if null_count > threshold]
final_data = data.drop(*columns_to_drop)

In [None]:
# Phần 7: Tính toán giá trị median của cột "logerror" và điền giá trị thiếu
median_imputer = final_data.approxQuantile("logerror", [0.5], 0.25)[0]
final_data = final_data.fillna({"logerror": median_imputer})

In [None]:
# Phần 8: Tạo cột transaction_month
final_data = final_data.withColumn('transaction_month', month(to_date(final_data['transactiondate'], 'yyyy-MM-dd')))

In [None]:
# Phần 9: Sử dụng StringIndexer để chuyển đổi các cột chuỗi thành các chỉ số số
string_columns = ['propertycountylandusecode', 'propertyzoningdesc']
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in string_columns]
indexer_models = [indexer.fit(final_data) for indexer in indexers]

for model in indexer_models:
    final_data = model.transform(final_data)

In [None]:
# Phần 10: Loại bỏ các cột chuỗi ban đầu
final_data = final_data.drop(*string_columns)

In [None]:
# Phần 11: Xử lý các giá trị null trong các cột đặc trưng
final_data = final_data.na.fill(0)  # Điền tất cả các giá trị null bằng 0

Huấn luyện

In [None]:
# Phần 12: Chọn các đặc trưng cần thiết
feature_columns = [col for col in final_data.columns if col not in ['logerror', 'transactiondate', 'parcelid']]

In [None]:
# Phần 13: Chia dữ liệu thành tập huấn luyện và tập kiểm tra
train_data, valid_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Phần 14: Tạo VectorAssembler để biến đổi các đặc trưng thành vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [None]:
# Phần 15: Khởi tạo mô hình LinearRegression
lr = LinearRegression(featuresCol='features', labelCol='logerror')

In [None]:
# Phần 16: Tạo Pipeline
pipeline = Pipeline(stages=[assembler, lr])

In [None]:
# Phần 17: Huấn luyện mô hình
model = pipeline.fit(train_data)

Dự đoán

In [None]:
# Phần 18: Dự đoán trên tập kiểm tra
predictions = model.transform(valid_data)

In [None]:
# Phần 19: Tính Mean Absolute Error
evaluator = RegressionEvaluator(labelCol="logerror", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print(f'Mean Absolute Error: {mae}')

In [None]:
# Phần 20: Dự đoán logerror cho dữ liệu mới
new_row = Row(
    bathroomcnt=2.0, bedroomcnt=2.0, buildingqualitytypeid=9, calculatedbathnbr=2.0,
    calculatedfinishedsquarefeet=512.0, finishedsquarefeet12=512, fips=6023, fullbathcnt=2,
    heatingorsystemtypeid=2, latitude=4316563, longitude=-128613175, lotsizesquarefeet=11042.0,
    propertylandusetypeid=211, rawcensustractandblock=6.0371759321008E7, regionidcity=12130,
    regionidcounty=3010, regionidzip=87695, roomcnt=2.0, unitcnt=1, yearbuilt=1980.0,
    structuretaxvaluedollarcnt=50098.0, taxvaluedollarcnt=70890.0, assessmentyear=2015,
    landtaxvaluedollarcnt=15991.0, taxamount=1096.78, censustractandblock=60498132321008,
    transaction_month=4, propertycountylandusecode_index=2.0, propertyzoningdesc_index=2.0
)

new_data = spark.createDataFrame([new_row])

# Áp dụng VectorAssembler để biến đổi các đặc trưng thành vector
new_data_with_features = assembler.transform(new_data)

# Đổi tên cột "features" thành "new_features" (nếu cần)
new_data_with_features = new_data_with_features.withColumnRenamed("features", "new_features")

# Dự đoán logerror cho dữ liệu mới
predictions = model.transform(new_data_with_features)

# Hiển thị dự đoán
predictions.select("prediction").show()
