# Ogrenci Bilgileri -> Harun Korkmaz
# ODEV ISTERLERI
1. Verilerin yüklenmesi
2. Verilerin çeşitli Spark fonksiyonları kullanılarak incelenmesi
3. Özniteliklerin seçimi ve verilerin makine öğrenmesi için hazırlanması, 
4. PySpark ile makine öğrenmesi modelinin oluşturulması
5. Geliştirilen modelin performansının ölçümü

###  1. Kütüphaneleri Yükleme ve Spark Oturumunu Başlatma

In [83]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, isnan, when, count

###  2. Veriyi Yükleme ve İnceleme
- "CaliforniaHousing" session isminde oluşturma işlemi .
- housing.csv dosyasını okuma işlemi .
- header=True → İlk satırı başlık olarak almamızı sağladı .
- inferSchema=True → Veri tiplerini otomatik belirler.
- .show(5) ile ilk 5 rowu yazdırımı .

In [84]:
spark = SparkSession.builder.appName("CaliforniaHousing").getOrCreate()

df = spark.read.csv(f'./housing.csv', inferSchema=True, header=True)
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

- .printSchema() -> pandas kütüphanesinden aşina olduğumuz df.info benzeri her bir sütunun bilgilerini veren fonksiyondur 

In [85]:
df.printSchema()  

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



###  3. Eksik Değerleri İnceleme ve Düzeltme
- Her bir sütunun içinde kaç adet null değer olduğunun gösterimi

In [86]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



- Eksik değerlerin sahip olduğu sütunun medyani ile doldurma işlemi 
- Tekrardan ekrana null değerlerin gösterimi 

In [87]:
median_total_bedrooms = df.approxQuantile("total_bedrooms", [0.5], 0.01)[0]
df = df.fillna({"total_bedrooms": median_total_bedrooms})

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



- Temel istatistikleri gösterimi

In [88]:
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20640|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 536.8187984496124|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           NULL|
| stddev|  2.0035317

###  4. Kategorik Değişkenleri Dönüştürme

1. benzersiz değerleri ekrana yazdırma
 - <1H OCEAN: Okyanusa 1 saat mesafede
 - INLAND: İç bölge
 - NEAR OCEAN: Okyanusa yakın
 - NEAR BAY: Körfeze yakın
 - ISLAND: Ada

2. Kategorik değişkenin dağılımını gösterme
3. Manuel olarak kategorik değerleri numeric hale getirme 


In [89]:
df.select("ocean_proximity").distinct().show()

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



In [90]:
df.groupBy("ocean_proximity").count().show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|         ISLAND|    5|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|      <1H OCEAN| 9136|
|         INLAND| 6551|
+---------------+-----+



- Island değeri az olduğu için onu NEAR OCEAN değerine dahil etme kararı alındı.

In [91]:
from pyspark.sql.functions import when

df = df.withColumn("ocean_proximity_index",
                   when(df["ocean_proximity"] == "<1H OCEAN", 3)
                   .when(df["ocean_proximity"] == "NEAR OCEAN", 2)
                   .when(df["ocean_proximity"] == "NEAR BAY", 1)
                   .when(df["ocean_proximity"] == "INLAND", 0)
                   .otherwise(2))  # ISLAND da NEAR OCEAN gibi 2 olabilir


In [92]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_index: integer (nullable = false)



###  5. Seçili Özellikleri Vektörleştirme
- Kullanılacak tüm değişkenleri feautre_columns değişkeninde birleştirme .
- VectorAssembler ile vektörleştirme işlemi .

In [93]:
from pyspark.ml.feature import VectorAssembler

feature_columns = [
    "longitude", "latitude", "housing_median_age",
    "total_rooms", "total_bedrooms", "population", "households",
    "median_income", "ocean_proximity_index"
]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)

df.select("features", "median_house_value").show(5, truncate=False)

+-----------------------------------------------------------+------------------+
|features                                                   |median_house_value|
+-----------------------------------------------------------+------------------+
|[-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,1.0]    |452600.0          |
|[-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,1.0]|358500.0          |
|[-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,1.0]   |352100.0          |
|[-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,1.0]   |341300.0          |
|[-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,1.0]   |342200.0          |
+-----------------------------------------------------------+------------------+
only showing top 5 rows



###  6. Veriyi Eğitim ve Test Setlerine Ayırma
- %80 kısmı eğitim verisi %20 kısım test verisi olucak şekilde ayırma .
- Ekrana yazdırtma dağılımı .

In [94]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

print(f"Eğitim verisi: {train_data.count()} satır")
print(f"Test verisi: {test_data.count()} satır")

Eğitim verisi: 16560 satır
Test verisi: 4080 satır


###  7. Lineer Regresyon Modeli Eğitme ve Test Etme

In [95]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="median_house_value")

model = lr.fit(train_data)

print(f"Model Katsayıları: {model.coefficients}")
print(f"Model Sabiti (Intercept): {model.intercept}")

Model Katsayıları: [-34623.591733409,-32836.01197630847,1125.2439675008786,-3.425828315045274,74.10962701197703,-47.25030918966792,87.13946484932589,38505.15353348364,9898.613410371747]
Model Sabiti (Intercept): -2968478.4821013696


#### RegressionEvaluator ile LinearRegression modelinin değerlendirilmesi 
- R² ve RSME skorlarının hesaplanması ve yazdırılması .

In [96]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_data)

# RMSE (Hata Oranı) hesapla
rmse_evaluator = RegressionEvaluator(labelCol="median_house_value", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

# R² skorunu hesapla
r2_evaluator = RegressionEvaluator(labelCol="median_house_value", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)

print(f"Modelin RMSE (Hata Oranı): {rmse}")
print(f"Modelin R² (Doğruluk Skoru): {r2}")
#Modelin RMSE (Hata Oranı): 71260.00963511101
#Modelin R² (Doğruluk Skoru): 0.6329835589760625

Modelin RMSE (Hata Oranı): 71260.00963511101
Modelin R² (Doğruluk Skoru): 0.6329835589760625


###  8. Random Forest Modeli Eğitme ve Test Etme

In [97]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="median_house_value", numTrees=100)

rf_model = rf.fit(train_data)

rf_predictions = rf_model.transform(test_data)

#### RegressionEvaluator ile RandomForestRegressor modelinin değerlendirilmesi 
- R² ve RSME skorlarının hesaplanması ve yazdırılması .

In [98]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE (Hata Oranı) hesapla
rmse_evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
rmse_rf = rmse_evaluator.evaluate(rf_predictions)

# R² skorunu hesapla
r2_evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="r2")
r2_rf = r2_evaluator.evaluate(rf_predictions)

print(f"Modelin RMSE (Hata Oranı): {rmse}")
print(f"Modelin R² (Doğruluk Skoru): {r2}")

Modelin RMSE (Hata Oranı): 71260.00963511101
Modelin R² (Doğruluk Skoru): 0.6329835589760625


###  9. Gradient Boosted Trees Modeli Eğitme ve Test Etme

In [99]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features", labelCol="median_house_value", maxIter=10)

gbt_model = gbt.fit(train_data)

gbt_predictions = gbt_model.transform(test_data)

#### RegressionEvaluator ile GBTRegressor modelinin değerlendirilmesi 
- R² ve RSME skorlarının hesaplanması ve yazdırılması .

In [100]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE (Hata Oranı) hesapla
rmse_evaluator = RegressionEvaluator(labelCol="median_house_value", metricName="rmse")
rmse = rmse_evaluator.evaluate(gbt_predictions)

# R² skorunu hesapla
r2_evaluator = RegressionEvaluator(labelCol="median_house_value", metricName="r2")
r2 = r2_evaluator.evaluate(gbt_predictions)

print(f"Modelin RMSE (Hata Oranı): {rmse}")
print(f"Modelin R² (Doğruluk Skoru): {r2}")



Modelin RMSE (Hata Oranı): 60559.94094706756
Modelin R² (Doğruluk Skoru): 0.7349275134500518


### Sonuç 

Bu çalışma kapsamında, **California Housing Prices** veri seti üzerinde **Spark ML** kütüphanesi kullanılarak konut fiyatlarının tahmin edilmesi amaçlanmıştır. **PySpark** kullanılarak veriler işlenmiş, öznitelikler belirlenmiş ve regresyon modelleri oluşturulmuştur.

Model performansını değerlendirmek için **R² skoru** ve **RMSE (Root Mean Squared Error)** gibi metrikler kullanılmıştır. Elde edilen sonuçlara göre:

- **Modelin doğruluk seviyesi** incelenmiş ve **hata oranı** belirlenmiştir.
- **Öznitelik seçimi**, modelin başarısını doğrudan etkileyen faktörlerden biri olmuştur.
- **Regresyon modellerinin kıyaslaması** yapılmış ve en iyi performans veren model belirlenmiştir.

Sonuç olarak, **Spark ML** ile büyük veri üzerinde regresyon analizi gerçekleştirilmiş ve konut fiyatlarını tahmin etmek için uygun bir model geliştirilmiştir. Modelin performansını artırmak için **hiperparametre optimizasyonu** ve **daha fazla veri temizleme işlemi** gibi ek adımlar önerilebilir.
