In [0]:
# Ucitavanje volume-a u notebook 
path = "/Volumes/workspace/default/real_estate_dataset/real_estate_sales_2001_2022.csv"

df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv(path)

display(df.limit(5))
df.printSchema()

In [0]:
from pyspark.sql import functions as F

# Feature kolone (kombinacija numeričkih + kategorijskih)
label_col = "Sale Amount"
num_cols = ["Assessed Value", "Sales Ratio", "List Year"]
cat_cols = ["Town", "Property Type", "Residential Type"]

selected = [label_col] + num_cols + cat_cols

df_ml = df.select(*[c for c in selected if c in df.columns])

# Ukloni null label i ključne numeričke vrednosti, samo pozitivne cene
df_ml = df_ml.dropna(subset=[label_col, "Assessed Value", "List Year"]) \
             .filter((F.col(label_col) > 0) & (F.col("Assessed Value") > 0))

# Popuni null kategorije
for c in cat_cols:
    if c in df_ml.columns:
        df_ml = df_ml.fillna({c: "Unknown"})

display(df_ml.limit(5))

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# StringIndexer za kategorije
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_cols if c in df_ml.columns
]

# OneHotEncoder
encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_ohe")
    for c in cat_cols if c in df_ml.columns
]

# Assembler
assembler_inputs = [c for c in num_cols if c in df_ml.columns] + [f"{c}_ohe" for c in cat_cols if c in df_ml.columns]

assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features", handleInvalid="keep")

In [0]:
train, test = df_ml.randomSplit([0.8, 0.2], seed=42)
print("Train:", train.count(), "Test:", test.count())

In [0]:
# Treniranje modela, linearna regresija
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol="features",
    labelCol=label_col,
    maxIter=50,
    regParam=0.05
)

pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])
model = pipeline.fit(train)

pred = model.transform(test)
display(pred.select(label_col, "prediction").limit(10))

In [0]:
# Evaluacija (RMSE, MAE, R²)
from pyspark.ml.evaluation import RegressionEvaluator

rmse = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse").evaluate(pred)
mae  = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mae").evaluate(pred)
r2   = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2").evaluate(pred)

print(f"RMSE: {rmse:,.2f}")
print(f"MAE:  {mae:,.2f}")
print(f"R2:   {r2:.4f}")