# Gerekli kütüphanelerin ve JDK kurulması
Gerekli Python kütüphanelerinin ve JDK'nın (Java Development Kit) kurulumunu gerçekleştiriyoruz. JDK, Apache Spark'ın çalışması için gereklidir.

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcon                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
                                                                                                    Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
                                                                                                    Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/d

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=bd87d11135cb492e2be23849e4920e14d03abbe4b1a4f70aacd476bb04a7d8b3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Ortam değişkenleriin ayarlanması
Java ve Spark'ın Python ortamında doğru bir şekilde bulunması için gereken ortam değişkenlerini ayarlanması.

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"


# Sparkın başlatılması
Bu kod parçaları, Spark oturumunu başlatır. findspark kütüphanesi, Spark'ı Python ortamında bulmayı ve etkinleştirmeyi sağlar.

In [4]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
spark

# Google Drive'ı bağlama
Google drive a bağlanmak için gerekli kod parçaları

In [6]:
from google.colab import drive,files
import os
drive.mount('/content/drive')

workspace_path = "/content/drive/MyDrive/SparkOdev/"

os.chdir(workspace_path)

Mounted at /content/drive


# Veri Okuma
Veriyi belirtile yoldan okuyup df isimli Spark DataFrame e atanır (Drive içerisine SparkOdev klasörü açınız ve içerisine veri setini yükleyiniz)

In [7]:
df = spark.read.csv(workspace_path+'housing.csv',inferSchema=True, header=True)

In [8]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [9]:
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           null|
| stddev|  2.0035317

# Null Değerlerin Bulunması
Bu kod parçası, her sütunda bulunan null değerlerin sayısını hesaplar.

In [10]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])

null_counts.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



# Null Değerlerin Silinmesi
DataFrame'deki herhangi bir satırda null değer bulunan satırları siler.

In [11]:
df = df.na.drop()

In [12]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])

null_counts.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



# Null Değerlerin Ortalama Değer ile Doldurulması
'total_bedrooms' sütunundaki null değerleri hesaplanan ortalama değer ile doldurur.

In [None]:
from pyspark.sql.functions import mean

mean_value = df.select(mean(df["total_bedrooms"])).collect()[0][0]

df_filled = df.na.fill(mean_value, ["total_bedrooms"])

df_filled.show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [None]:
df=df_filled

In [None]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])

null_counts.show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



# Kategorik değişkenin incelenmesi
 'ocean_proximity' sütunundaki benzersiz değerleri bulur.

In [13]:
from pyspark.sql.functions import col

unique_values = df.select("ocean_proximity").distinct()

unique_values.show()

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



# Kategorik Değerlerin Dönüştürülmesi
Kategorik değişkenlerin öncelikle OrdinalEncoding yapılması

In [14]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
indexed_df = indexer.fit(df).transform(df)

df = indexed_df.drop("ocean_proximity").withColumnRenamed("ocean_proximity_index", "ocean_proximity")

df.show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|            3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|            3.0|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|            3.0|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|            3.0|
|  -122.25|   37.85|              

In [15]:
df.describe().show()

+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|        households|     median_income|median_house_value|   ocean_proximity|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|              20433|             20433|             20433|             20433|             20433|             20433|             20433|             20433|             20433|             20433|
|   mean|-119.57068859198068| 35.63322125972706|28.633093525179856|2636.5042333480155| 537.8705525375618|1424.9469485635982|499.43346547251997|3.8711616013312273|206864.41315519012|0.9094112465129

In [16]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|            3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|            3.0|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|            3.0|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|            3.0|
|  -122.25|   37.85|              

# Lineer Regresyon Modelinin kurulması

##  VectorAssembler kullanarak özniteliklerin birleştirilmesi

In [17]:
from pyspark.ml.feature import VectorAssembler

feature_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income','ocean_proximity']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df).select("features", "median_house_value")

## Veri kümesinin train ve test kümelerine bölünmesi

In [18]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

## Lineer Regresyon modelinin oluşturulması ve eğitimin yapılması

In [19]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="median_house_value")
lr_model = lr.fit(train_data)

# R2 Skorunun Hesaplanması
Lineer Regresyon modeli kullanılarak yapılan tahminlerin R2 skorunu hesaplar.

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print("R2 Score:", r2)

R2 Score: 0.6416399219959245


# Özelliklerin scale edilmesi
Veri setindeki özellikleri ölçeklemek iin standart scaler kullanılması

In [21]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

feature_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income','ocean_proximity']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_vectorized = assembler.transform(df)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

scaler_model = scaler.fit(df_vectorized)

scaled_df = scaler_model.transform(df_vectorized)

scaled_df = scaled_df.select('median_house_value', 'scaled_features')

scaled_df.show()


+------------------+--------------------+
|median_house_value|     scaled_features|
+------------------+--------------------+
|          452600.0|[-61.005863841998...|
|          358500.0|[-61.000872770752...|
|          352100.0|[-61.010854913244...|
|          341300.0|[-61.015845984490...|
|          342200.0|[-61.015845984490...|
|          269700.0|[-61.015845984490...|
|          299200.0|[-61.015845984490...|
|          241400.0|[-61.015845984490...|
|          226700.0|[-61.020837055737...|
|          261100.0|[-61.015845984490...|
|          281500.0|[-61.020837055737...|
|          241800.0|[-61.020837055737...|
|          213500.0|[-61.020837055737...|
|          191300.0|[-61.020837055737...|
|          159200.0|[-61.020837055737...|
|          140000.0|[-61.020837055737...|
|          152500.0|[-61.025828126983...|
|          155500.0|[-61.025828126983...|
|          158700.0|[-61.020837055737...|
|          162900.0|[-61.025828126983...|
+------------------+--------------

# Scale edilmiş veri ile lineer regresyon modeli kurulması

In [22]:
train_df, test_df = scaled_df.randomSplit([0.8, 0.2], seed=42)

In [23]:
lr = LinearRegression(featuresCol='scaled_features', labelCol='median_house_value')
lr_model = lr.fit(train_df)

In [24]:
predictions = lr_model.transform(test_df)

evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction', metricName='r2')
r2 = evaluator.evaluate(predictions)
print("R2 Score:", r2)

R2 Score: 0.6203821929804811


# Min-Max Scaler işleminin yapılması ve Lineer Regresyon modelinin kurulması

In [28]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

feature_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income','ocean_proximity']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

scaler_model = scaler.fit(data)

scaled_data = scaler_model.transform(data)

scaled_data = scaled_data.select('median_house_value', 'scaled_features')

In [29]:
scaled_data.show()

+------------------+--------------------+
|median_house_value|     scaled_features|
+------------------+--------------------+
|          452600.0|[0.21115537848605...|
|          358500.0|[0.21215139442231...|
|          352100.0|[0.21015936254980...|
|          341300.0|[0.20916334661354...|
|          342200.0|[0.20916334661354...|
|          269700.0|[0.20916334661354...|
|          299200.0|[0.20916334661354...|
|          241400.0|[0.20916334661354...|
|          226700.0|[0.20816733067728...|
|          261100.0|[0.20916334661354...|
|          281500.0|[0.20816733067728...|
|          241800.0|[0.20816733067728...|
|          213500.0|[0.20816733067728...|
|          191300.0|[0.20816733067728...|
|          159200.0|[0.20816733067728...|
|          140000.0|[0.20816733067728...|
|          152500.0|[0.20717131474103...|
|          155500.0|[0.20717131474103...|
|          158700.0|[0.20816733067728...|
|          162900.0|[0.20717131474103...|
+------------------+--------------

In [30]:
from pyspark.ml.tuning import TrainValidationSplit
train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=42)

In [31]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(featuresCol='scaled_features', labelCol='median_house_value')

lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction', metricName='r2')
r2 = evaluator.evaluate(predictions)
print("R2 Score:", r2)

R2 Score: 0.62038219297657
