In [1]:
from pyspark.sql import SparkSession

# Создаем SparkSession
spark = (
    SparkSession.builder
    .appName("My PySpark Application")   # Название приложения
    .master("local[*]")   # Указывает использовать локальный режим со всеми ядрами
    .config("spark.executor.memory", "2g")   # Настройка памяти для исполнителей
    .config("spark.driver.memory", "2g")   # Настройка памяти для драйвера
    .config("spark.sql.debug.maxToStringFields", 1000)
    .getOrCreate()
)

# Проверка успешного подключения
print("Spark Version:", spark.version)

Spark Version: 3.5.5


In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

# Данные первой таблицы
data1 = [
    (1360, 2, 1, 1981, 0.599, 0, 5, 262382.85),
    (4272, 3, 2, 2016, 4.753, 1, 6, 985260.85),
    (3592, 1, 1, 2016, 3.634, 0, 9, 777977.39),
    (966, 1, 1, 1977, 2.730, 1, 8, 229698.91),
    (4926, 2, 2, 1993, 4.699, 0, 8, 1041740.85)
]

# Схема первой таблицы
schema1 = StructType([
    StructField("Square_Footage", IntegerType(), True),
    StructField("Num_Bedrooms", IntegerType(), True),
    StructField("Num_Bathrooms", IntegerType(), True),
    StructField("Year_Built", IntegerType(), True),
    StructField("Lot_Size", DoubleType(), True),
    StructField("Garage_Size", IntegerType(), True),
    StructField("Neighborhood_Quality", IntegerType(), True),
    StructField("House_Price", DoubleType(), True)
])

# Создание первой таблицы
df1 = spark.createDataFrame(data1, schema=schema1)

# Данные второй таблицы
data2 = [
    (1, 0, 1, 0, 5),
    (2, 1, 1, 1, 6),
    (3, 0, 0, 1, 9),
    (4, 1, 0, 1, 8),
    (5, 0, 1, 0, 8)
]

# Схема второй таблицы
schema2 = StructType([
    StructField("Property_ID", IntegerType(), True),
    StructField("Garage_Size", IntegerType(), True),
    StructField("Has_Pool", IntegerType(), True),
    StructField("Has_Basement", IntegerType(), True),
    StructField("Neighborhood_Quality", IntegerType(), True)
])

# Создание второй таблицы
df2 = spark.createDataFrame(data2, schema=schema2)

# Показать данные
df1.show(3)
df2.show(3)

+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|Lot_Size|Garage_Size|Neighborhood_Quality|House_Price|
+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+
|          1360|           2|            1|      1981|   0.599|          0|                   5|  262382.85|
|          4272|           3|            2|      2016|   4.753|          1|                   6|  985260.85|
|          3592|           1|            1|      2016|   3.634|          0|                   9|  777977.39|
+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+
only showing top 3 rows

+-----------+-----------+--------+------------+--------------------+
|Property_ID|Garage_Size|Has_Pool|Has_Basement|Neighborhood_Quality|
+-----------+-----------+--------+------------+--------------------+
|    

In [3]:
df = spark.read.csv("house_price_regression_dataset.csv", header=True, inferSchema=True)

In [4]:
df.show(5)

+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|          Lot_Size|Garage_Size|Neighborhood_Quality|       House_Price|
+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+
|          1360|           2|            1|      1981|0.5996366396268326|          0|                   5| 262382.8522740563|
|          4272|           3|            3|      2016|4.7530138494020395|          1|                   6|  985260.854490162|
|          3592|           1|            2|      2016| 3.634822720478255|          0|                   9| 777977.3901185812|
|           966|           1|            2|      1977|  2.73066687604351|          1|                   8| 229698.9186636115|
|          4926|           2|            1|      1993| 4.699072554837388|          0|                   8|1041740.8589

In [5]:
df.printSchema()

root
 |-- Square_Footage: integer (nullable = true)
 |-- Num_Bedrooms: integer (nullable = true)
 |-- Num_Bathrooms: integer (nullable = true)
 |-- Year_Built: integer (nullable = true)
 |-- Lot_Size: double (nullable = true)
 |-- Garage_Size: integer (nullable = true)
 |-- Neighborhood_Quality: integer (nullable = true)
 |-- House_Price: double (nullable = true)



In [6]:
row_count = df.count()
print(f"Количество строк: {row_count}")

Количество строк: 1000


In [7]:
df.describe().show(3)

+-------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-----------------+
|summary|    Square_Footage|      Num_Bedrooms|     Num_Bathrooms|        Year_Built|          Lot_Size|       Garage_Size|Neighborhood_Quality|      House_Price|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-----------------+
|  count|              1000|              1000|              1000|              1000|              1000|              1000|                1000|             1000|
|   mean|          2815.422|              2.99|             1.973|           1986.55|2.7780874273930207|             1.022|               5.615|618861.0186467685|
| stddev|1255.5149205133453|1.4275636370887075|0.8203316060861882|20.632915868030334|1.2979031460668562|0.8149725027141597|  2.8870590763110653|253568.0583754209|
+-------+-------------

In [8]:
from pyspark.sql.functions import lit

lit(1)

import pyspark.sql.functions as F
F.lit(1)
F.sum('1')

Column<'sum(1)'>

### Вывести уникальные значения для колонки Num_Bedrooms

In [9]:
numbedroomsdf = df.select('Num_Bedrooms')
numbedroomsdf.distinct().show()

+------------+
|Num_Bedrooms|
+------------+
|           1|
|           3|
|           5|
|           4|
|           2|
+------------+



### Отфильтровать строки, где Square_Footage больше 3000.

In [10]:
sqfoot = df.filter(df.Square_Footage > 3000)
sqfoot.show()

+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|          Lot_Size|Garage_Size|Neighborhood_Quality|       House_Price|
+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+
|          4272|           3|            3|      2016|4.7530138494020395|          1|                   6|  985260.854490162|
|          3592|           1|            2|      2016| 3.634822720478255|          0|                   9| 777977.3901185812|
|          4926|           2|            1|      1993| 4.699072554837388|          0|                   8|1041740.8589249004|
|          3944|           5|            3|      1990| 2.475930043628728|          2|                   8| 879796.9835223783|
|          3671|           1|            2|      2012| 4.911960066216673|          0|                   1| 814427.8614

### Объединение таблиц (df и df1) через union

In [11]:
unionDF = df.union(df1)
unionDF.show()
row_count = unionDF.count()
print(f"Количество строк: {row_count}")

+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|          Lot_Size|Garage_Size|Neighborhood_Quality|       House_Price|
+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+
|          1360|           2|            1|      1981|0.5996366396268326|          0|                   5| 262382.8522740563|
|          4272|           3|            3|      2016|4.7530138494020395|          1|                   6|  985260.854490162|
|          3592|           1|            2|      2016| 3.634822720478255|          0|                   9| 777977.3901185812|
|           966|           1|            2|      1977|  2.73066687604351|          1|                   8| 229698.9186636115|
|          4926|           2|            1|      1993| 4.699072554837388|          0|                   8|1041740.8589

### Джойн двух таблиц (df1 и df2) по Garage_Size и Neighborhood_Quality

In [12]:
df2.join(df1, (df2.Garage_Size == df1.Garage_Size) & (df2.Neighborhood_Quality == df1.Neighborhood_Quality), "inner").show()

+-----------+-----------+--------+------------+--------------------+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+
|Property_ID|Garage_Size|Has_Pool|Has_Basement|Neighborhood_Quality|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|Lot_Size|Garage_Size|Neighborhood_Quality|House_Price|
+-----------+-----------+--------+------------+--------------------+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+
|          1|          0|       1|           0|                   5|          1360|           2|            1|      1981|   0.599|          0|                   5|  262382.85|
|          5|          0|       1|           0|                   8|          4926|           2|            2|      1993|   4.699|          0|                   8| 1041740.85|
|          3|          0|       0|           1|                   9|          3592|           1|            1|      2016

### Создать новую колонку Age как разность между текущим годом (2024) и Year_Built

In [13]:
from pyspark.sql.functions import lit

current_date = lit(2024)
df = df.withColumn("Age", current_date - df.Year_Built)
df.show()

+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+---+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|          Lot_Size|Garage_Size|Neighborhood_Quality|       House_Price|Age|
+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+---+
|          1360|           2|            1|      1981|0.5996366396268326|          0|                   5| 262382.8522740563| 43|
|          4272|           3|            3|      2016|4.7530138494020395|          1|                   6|  985260.854490162|  8|
|          3592|           1|            2|      2016| 3.634822720478255|          0|                   9| 777977.3901185812|  8|
|           966|           1|            2|      1977|  2.73066687604351|          1|                   8| 229698.9186636115| 47|
|          4926|           2|            1|      1993| 4.699072554837388|          0|     

### Удалить колонку `Neighborhood_Quality`

In [14]:
df.drop('Neighborhood_Quality')

DataFrame[Square_Footage: int, Num_Bedrooms: int, Num_Bathrooms: int, Year_Built: int, Lot_Size: double, Garage_Size: int, House_Price: double, Age: int]

### Сортировать строки по `House_Price` в порядке убывания.

In [15]:
df.orderBy("House_Price", ascending=False).show()

+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+---+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|          Lot_Size|Garage_Size|Neighborhood_Quality|       House_Price|Age|
+--------------+------------+-------------+----------+------------------+-----------+--------------------+------------------+---+
|          4922|           4|            1|      2018|  4.23375289844731|          2|                   2|1108236.8362913695|  6|
|          4974|           5|            2|      2000|3.7095614063750157|          2|                  10| 1107045.062935083| 24|
|          4996|           1|            3|      2014| 4.252063779134723|          1|                   1|1102533.6490590929| 10|
|          4952|           4|            3|      1995| 4.725027320185737|          2|                   9|1099211.6405116057| 29|
|          4933|           3|            1|      2015|   4.7693497781737|          2|     

### Вывести строки с отсутствующими значениями в колонке `Garage_Size`.

In [16]:
dfna =  df.filter(df.Garage_Size.isNull())
dfna.show()

+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+---+
|Square_Footage|Num_Bedrooms|Num_Bathrooms|Year_Built|Lot_Size|Garage_Size|Neighborhood_Quality|House_Price|Age|
+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+---+
+--------------+------------+-------------+----------+--------+-----------+--------------------+-----------+---+



### Подсчитать среднюю цену дома (`House_Price`).

In [17]:
from pyspark.sql.functions import avg

avg_df = df.select(avg("House_Price"))
avg_df.show()

+-----------------+
| avg(House_Price)|
+-----------------+
|618861.0186467685|
+-----------------+

