<a href="https://colab.research.google.com/github/richardtekere09/AI-ML_2025_machine-learning_Druki_Alexey/blob/main/Apache_Spark_lab1_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Apache Spark Lab1


1.Data Analysis with Spark SQL

> install and import PySpark and related libraries in the Colab notebook. Then creating a Spark session which serves as the entry point to Spark. In this lab, the Spark application name is set to "PythonSQLAPP", but no master URL is specified so it runs locally by default.



In [16]:
!pip install pyspark  # Install Spark if not already available
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count, col,countDistinct, round, mean,max, when
from pyspark.sql.types import *
from functools import reduce

# Initialize Spark session
spark = SparkSession.builder.appName("PythonSQLAPP").getOrCreate()
print("Spark Version:", pyspark.__version__)


Spark Version: 3.5.5


In [7]:
# Read the Brooklyn sales dataset into a DataFrame
data = spark.read.csv('brooklyn_sales_map.csv', header=True, inferSchema=True)
data.printSchema()        # Print the inferred schema of the DataFrame
data.show(5)              # Display the first 5 rows of the dataset

root
 |-- _c0: integer (nullable = true)
 |-- borough1: integer (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- building_class_category: string (nullable = true)
 |-- tax_class: string (nullable = true)
 |-- block: integer (nullable = true)
 |-- lot: integer (nullable = true)
 |-- easement: string (nullable = true)
 |-- building_class: string (nullable = true)
 |-- address9: string (nullable = true)
 |-- apartment_number: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- residential_units: integer (nullable = true)
 |-- commercial_units: integer (nullable = true)
 |-- total_units: integer (nullable = true)
 |-- land_sqft: double (nullable = true)
 |-- gross_sqft: double (nullable = true)
 |-- year_built: integer (nullable = true)
 |-- tax_class_at_sale: integer (nullable = true)
 |-- building_class_at_sale: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- sale_date: date (nullable = true)
 |-- year_of_sale: integer (nullabl

In [7]:
# 1. Calculate the average sale price of homes
avg_price = data.select(avg(col('sale_price'))).collect()[0][0]
print('Average sale:', avg_price)


Average sale: 18041966.45317546


In [8]:
# 2. Find average gross square footage for each year
area_by_year = data.groupBy("year_built") \
                   .agg(avg("gross_sqft").alias("avg_gross_sqft_per_year"))
area_by_year.show(5)


+----------+-----------------------+
|year_built|avg_gross_sqft_per_year|
+----------+-----------------------+
|      1959|     63228.944444444445|
|      1829|                 4540.0|
|      1990|      7595.333333333333|
|      1903|                72064.0|
|      1975|     140465.55555555556|
+----------+-----------------------+
only showing top 5 rows



In [10]:
# Средняя стоимость жилья по районам
avg_sale_by_neighborhood = data.groupBy("neighborhood").agg(avg("sale_price").alias("avg_sale_price"))
avg_sale_by_neighborhood.show(5)

# Средняя площадь жилья по сочетаниям налоговой категории и года продажи
avg_gross_by_tax_and_year = data.groupBy("tax_class", "year_of_sale") \
                                .agg(avg("gross_sqft").alias("avg_gross_sqft"))
avg_gross_by_tax_and_year.show(5)


+-------------------+--------------------+
|       neighborhood|      avg_sale_price|
+-------------------+--------------------+
|     FLATBUSH-NORTH|1.3510520720930232E7|
|      CYPRESS HILLS|          1.878909E7|
|        BENSONHURST|           9501500.0|
|       BOROUGH PARK|1.0469132839285715E7|
|OCEAN PARKWAY-NORTH| 1.203814338095238E7|
+-------------------+--------------------+
only showing top 5 rows

+---------+------------+-----------------+
|tax_class|year_of_sale|   avg_gross_sqft|
+---------+------------+-----------------+
|        4|        2009|       46217.8125|
|        2|        2017|39115.19117647059|
|       2A|        2007|           6570.0|
|       1B|        2015|              0.0|
|       2C|        2017|              0.0|
+---------+------------+-----------------+
only showing top 5 rows



In [12]:
# 4. Подсчёт количества пустых значений по колонкам
from pyspark.sql.functions import col, sum as _sum

null_counts = data.select([_sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts.show()


+---+--------+------------+-----------------------+---------+-----+---+--------+--------------+--------+----------------+--------+-----------------+----------------+-----------+---------+----------+----------+-----------------+----------------------+----------+---------+------------+---------+---+------+------+----------+-------+-------+--------+----------+----------+----------+---------+----------+--------+---------+---------+---------+---------+---------+--------+--------+-------+-------+-------+---------+---------+---------+-------+---------+---------+---------+-------+--------+-------+-------+----------+----------+----------+---------+----------+---------+----------+--------+---------+--------+----------+--------+--------+---------+---------+---+--------+----------+-------+--------+----------+---------+----------+---------+---------+----------+----------+--------+--------+--------+--------+-------+--------+--------+---+-------+---------+------+------+-------+------+-------+------+-

In [13]:
# 5. Средняя дата продажи по индексам и налоговым категориям
avg_sale_year_by_zip_tax = data.groupBy("zip_code", "tax_class") \
                               .agg(avg("year_of_sale").alias("avg_year_of_sale"))
avg_sale_year_by_zip_tax.show(5)


+--------+---------+------------------+
|zip_code|tax_class|  avg_year_of_sale|
+--------+---------+------------------+
|   11205|       1B|2012.6666666666667|
|   11234|        2|2011.2222222222222|
|   11205|        2|            2014.0|
|   11203|        4|            2014.8|
|   11204|        2|            2013.0|
+--------+---------+------------------+
only showing top 5 rows



In [14]:
# 6. Средний год постройки
avg_year_built = data.select(avg("year_built")).collect()[0][0]

# Добавляем колонку с отклонением от среднего
data_with_year_diff = data.withColumn("year_built_deviation", col("year_built") - avg_year_built)
data_with_year_diff.select("year_built", "year_built_deviation").show(5)


+----------+--------------------+
|year_built|year_built_deviation|
+----------+--------------------+
|      2002|   258.9246501614639|
|         0|  -1743.075349838536|
|      1924|   180.9246501614639|
|      1970|   226.9246501614639|
|      1927|   183.9246501614639|
+----------+--------------------+
only showing top 5 rows



In [14]:
# 7. Количество различных домов по адресу (address9)
houses_per_street = data.groupBy("address9").agg(countDistinct("BBL").alias("house_count"))
houses_per_street.show(5)


+--------------------+-----------+
|            address9|house_count|
+--------------------+-----------+
|1430 WEST 4TH   S...|          1|
|    1261 SCHENECTADY|          1|
|     38 MONROE PLACE|          1|
|    35 CLIFTON PLACE|          1|
|407 VANDERBILT AV...|          1|
+--------------------+-----------+
only showing top 5 rows



In [21]:
from pyspark.sql.functions import max, count

# 8. Максимальная стоимость продажи и количество зданий по районам и категориям зданий
max_price_and_count = data.groupBy("neighborhood", "building_class_category") \
                          .agg(max("sale_price").alias("max_sale_price"),
                               count("*").alias("building_count"))
max_price_and_count.show(5)


+------------------+-----------------------+--------------+--------------+
|      neighborhood|building_class_category|max_sale_price|building_count|
+------------------+-----------------------+--------------+--------------+
|  BROOKLYN HEIGHTS|    22  STORE BUILDINGS|         1.2E7|             7|
|           MIDWOOD|   05  TAX CLASS 1 V...|     7881412.0|             4|
|BEDFORD STUYVESANT|   05 TAX CLASS 1 VA...|     2980000.0|             2|
|WILLIAMSBURG-SOUTH|   04  TAX CLASS 1 C...|     2520168.0|             1|
|      BOROUGH PARK|   04  TAX CLASS 1 C...|     1800000.0|             2|
+------------------+-----------------------+--------------+--------------+
only showing top 5 rows





>In the second part of the lab, we move on to the task of machine learning using Spark MLlib.
We will build classification models to predict whether a customer will open a bank deposit as a result of a marketing campaign.
The workflow includes data loading, preprocessing, feature collection, model training with hyperparameter selection, and model performance evaluation.


In [27]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("BankMarketingClassification").getOrCreate()

# Load dataset (semicolon separator)
df = spark.read.csv('/bank-additional-full.csv', header=True, inferSchema=True, sep=';')
df.printSchema()
df.show(5)


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: double (nullable = true)
 |-- cons.price.idx: double (nullable = true)
 |-- cons.conf.idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr.employed: double (nullable = true)
 |-- y: string (nullable = true)

+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+

In [28]:
from pyspark.sql.functions import when

df = df.withColumn("label", when(df.y == 'yes', 1).otherwise(0)).drop("y")


In [35]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Rename columns: replace '.' with '_'
for col_name in df.columns:
    if '.' in col_name:
        df = df.withColumnRenamed(col_name, col_name.replace('.', '_'))

categorical_cols = [field for (field, dtype) in df.dtypes if dtype == 'string' and field != 'label']
numeric_cols = [field for (field, dtype) in df.dtypes if dtype in ('int', 'double', 'float') and field != 'label']

# Apply StringIndexer and OneHotEncoder
indexers = [StringIndexer(inputCol=col, outputCol=col + "_idx") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col + "_idx", outputCol=col + "_vec") for col in categorical_cols]


In [36]:
# Assemble feature vector
feature_cols = [col + "_vec" for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

pipeline = Pipeline(stages=indexers + encoders + [assembler])
assembled_data = pipeline.fit(df).transform(df)
assembled_data.select("features").show(3, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------+
|(53,[8,11,18,21,24,25,28,38,41,43,44,45,46,48,49,50,51,52],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,56.0,261.0,1.0,999.0,1.1,93.994,-36.4,4.857,5191.0])|
|(53,[3,11,15,22,24,25,28,38,41,43,44,45,46,48,49,50,51,52],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,57.0,149.0,1.0,999.0,1.1,93.994,-36.4,4.857,5191.0])|
|(53,[3,11,15,21,23,25,28,38,41,43,44,45,46,48,49,50,51,52],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,37.0,226.0,1.0,999.0,1.1,93.994,-36.4,4.857,5191.0])|
+---------------------------------------------------------------------------------------------

In [38]:
# 9. Split the dataset
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=42)
print("Training examples:", train_data.count(), "Testing examples:", test_data.count())

# 10. Initialize models
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

lr = LogisticRegression(featuresCol="features", labelCol="label")
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
rf = RandomForestClassifier(featuresCol="features", labelCol="label")

# 11. Setup hyperparameter grids
from pyspark.ml.tuning import ParamGridBuilder

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.maxIter, [50, 100]) \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .build()

paramGrid_dt = ParamGridBuilder().addGrid(dt.maxDepth, [5, 10]).build()

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.numTrees, [10, 50]) \
    .build()

# 12. Setup cross-validation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")

cv_lr = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=5)
cv_dt = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid_dt, evaluator=evaluator, numFolds=5)
cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=5)

# 13. Train models
model_lr = cv_lr.fit(train_data)
model_dt = cv_dt.fit(train_data)
model_rf = cv_rf.fit(train_data)

# 14. Evaluate models

def evaluate(model, test_data):
    pred = model.transform(test_data)
    acc = evaluator.evaluate(pred, {evaluator.metricName: "accuracy"})
    prec = evaluator.evaluate(pred, {evaluator.metricName: "weightedPrecision"})
    rec = evaluator.evaluate(pred, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(pred, {evaluator.metricName: "f1"})
    return acc, prec, rec, f1

acc_lr, prec_lr, rec_lr, f1_lr = evaluate(model_lr, test_data)
acc_dt, prec_dt, rec_dt, f1_dt = evaluate(model_dt, test_data)
acc_rf, prec_rf, rec_rf, f1_rf = evaluate(model_rf, test_data)

print(f"Logistic Regression -> Accuracy: {acc_lr:.3f}, Precision: {prec_lr:.3f}, Recall: {rec_lr:.3f}, F1: {f1_lr:.3f}")
print(f"Decision Tree       -> Accuracy: {acc_dt:.3f}, Precision: {prec_dt:.3f}, Recall: {rec_dt:.3f}, F1: {f1_dt:.3f}")
print(f"Random Forest       -> Accuracy: {acc_rf:.3f}, Precision: {prec_rf:.3f}, Recall: {rec_rf:.3f}, F1: {f1_rf:.3f}")


Training examples: 32977 Testing examples: 8211
Logistic Regression -> Accuracy: 0.909, Precision: 0.895, Recall: 0.909, F1: 0.897
Decision Tree       -> Accuracy: 0.911, Precision: 0.906, Recall: 0.911, F1: 0.908
Random Forest       -> Accuracy: 0.911, Precision: 0.898, Recall: 0.911, F1: 0.896


In [39]:
# Построение матрицы ошибок для Random Forest
predictions_rf = model_rf.transform(test_data)
predictions_rf.groupBy("label", "prediction").count().show()


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|  613|
|    0|       0.0| 7192|
|    1|       1.0|  289|
|    0|       1.0|  117|
+-----+----------+-----+

