## Лабораторная работа № 1 
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

### Часть 2

В данной части работы рассмотрены:
* разведочный анализ данных;
* работа с Dataframe API фреймворка `Apache Spark`.

Подключаем необходимые библиотеки.

In [327]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    col, lit, sum, mean, when,
    explode, count, desc, floor,
    corr, array_contains, lit, first
)
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

Сформируем объект конфигурации для `Apache Spark`, указав необходимые параметры.

In [328]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.

    Returns:
        SparkConf: Настроенный экземпляр SparkConf.
    """
    # Получаем имя пользователя
    user_name = os.getenv("USER")
    
    conf = SparkConf()
    conf.setAppName("lab 1 Test")
    conf.setMaster("yarn")
    conf.set("spark.submit.deployMode", "client")
    conf.set("spark.executor.memory", "12g")
    conf.set("spark.executor.cores", "8")
    conf.set("spark.executor.instances", "2")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "2")
    conf.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.spark_catalog.type", "hadoop")
    conf.set("spark.sql.catalog.spark_catalog.warehouse", f"hdfs:///user/{user_name}/warehouse")
    conf.set("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")

    return conf

Создаём сам объект конфигурации.

In [329]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию `Apache Spark`. В процессе создания сессии происходит подключение к кластеру `Apache Hadoop`, что может занять некоторое время.

In [None]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

Укажем базу данных, которая была создана в первой части лабораторной работы.

In [330]:
database_name = "ivanov_database"

Установим созданную базу данных как текущую.

In [331]:
spark.catalog.setCurrentDatabase(database_name)

Прочитаем сохранённую в предыдущей части работы таблицу и загрузим её в `Spark Dataframe`.

In [332]:
df = spark.table("sobd_lab1_table")

Выведем прочитанную таблицу на экран.

In [None]:
df.show()

Посмотрим на схему данных.

In [None]:
df.printSchema()

Вычислим количество строк в датафрейме.

In [None]:
df.count()

#### Анализ столбца `vin`

Отсортируем датафрейм по столбцу `vin`, который может рассматриваться в качестве первичного ключа таблицы.

In [None]:
df.orderBy("vin", ascending=False).show()

Видно, что некоторые значения ключа довольно странные (представляют собой больше описание, чем ключ). Удалим из датафрейма строки с такими ключами, оставив только подходящие по шаблону.

In [None]:
# Регулярное выражение для VIN длиной 17 символов, состоящих из цифр и букв
vin_pattern = r"^[A-Z0-9]{17}$"

# Фильтрация DataFrame
df = df.filter(col("vin").rlike(vin_pattern))
df.show()

In [None]:
df.count()

Проверим наличие дубликатов в датафрейме.

In [None]:
(
    df
    .groupBy("vin")
    .count()
    .where("count > 1")
    .orderBy("count", ascending=False)
    .show()
)

Дубликаты есть. Посмотрим, что они собой представляют (на примере одной записи).

In [None]:
df.filter(col("vin") == "1G1ZE5SX2LF145812").show()

Похоже на полную идентичность строк. Удалим дубликаты.

In [None]:
df = df.dropDuplicates(["vin"])
df.count()

In [None]:
df.show()

#### Анализ столбца `body_type`

Посмотрим внимательно на значения в столбце. Видно, что в данном столбце расположен **категориальный признак**.

Введем функцию, определяющую количество NULL-значений в столбце.

In [344]:
def count_nulls(data: DataFrame,
                column_name: str) -> None:
    """
    Подсчет количества null и not null значений в указанном столбце.

    Args:
        data (DataFrame): DataFrame, содержащий данные.
        column_name (str): Имя столбца для подсчета null и not null значений.
    
    Returns:
        None
    """
    # Подсчет количества null значений в указанном столбце
    null_counts = data.select(
        sum(col(column_name).isNull().cast("int"))
    ).collect()[0][0]

    # Подсчет количества not null значений в указанном столбце
    not_null_counts = data.select(
        sum(col(column_name).isNotNull().cast("int"))
    ).collect()[0][0]

    # Вывод результатов
    print(f"Число колонок с NULL: {null_counts} "
          f"({100 * null_counts / (null_counts + not_null_counts):.2f}%)")

In [None]:
count_nulls(data=df, column_name="body_type")

Видно, что столбец `body_type` содержит небольшое количество пропущенных значений. Поскольку признак содержит категорию (тип кузова автомобиля), то логично заменить пропущенные значения на категорию `Unknown`.

In [None]:
df = df.fillna({"body_type": "Unknown"})
count_nulls(data=df, column_name="body_type")

Создадим функцию расчета и визуализации распределения категориальных признаков.

In [347]:
def plot_cat_distribution(data: DataFrame,
                          column_name: str,
                          top_n: int = 20) -> None:
    """
    Построение распределения категориального признака.

    Args:
        data (DataFrame): DataFrame, содержащий данные.
        column_name (str): Имя столбца для группировки.
        top_n (int): Количество топ-значений для отображения.
    
    Returns:
        None
    """
    # Группировка данных по столбцу и подсчет количества
    categories = (
        data
        .groupBy(column_name)
        .count()
        .orderBy("count", ascending=False)
    )
    
    print(f"Количество категорий признака {column_name}: {categories.count()}")

    categories = (
        categories
        .limit(top_n)
        .toPandas()
    )
    
    # Визуализация с использованием Seaborn
    plt.figure(figsize=(10, 6))
    sns.barplot(x=column_name, y="count", data=categories)
    plt.title(f"Barplot of \"{column_name}\" counts")
    plt.xlabel(column_name)
    plt.ylabel("Count")
    plt.xticks(rotation=45)
    plt.show()

In [None]:
plot_cat_distribution(data=df, column_name="body_type")

Видно, что автомобили в представленном датасете имеют 9 определенных типов кузова, а для части автомобилей тип кузова неизвестен.

#### Анализ столбца `daysonmarket`

В соответствии с описанием и содержанием датасета логично считать данный признак **количественным**. Проверим его на наличие пропущенных значений.

In [None]:
count_nulls(data=df, column_name="daysonmarket")

Видно, что пропуски в данном столбце отсутствуют.

Создадим функцию, позволяющую рассчитывать статистические показатели данных в столбцах и строить диаграмму "ящик с усами" для оценки наличия выбросов.

In [350]:
def plot_boxplots(data: DataFrame,
                  columns: list[str],
                  sample_fraction: float = 0.1) -> None:
    """
    Построение boxplot для нескольких столбцов в PySpark DataFrame.

    Args:
        data (DataFrame): DataFrame, содержащий данные.
        columns (list of str): Список имен столбцов для построения boxplot.
        sample_fraction (float): Доля данных для семплирования выбросов.
    
    Returns:
        None
    """
    box_data = []

    for column in columns:
        # Вычисление квантилей
        quantiles = data.approxQuantile(column, [0.25, 0.5, 0.75], 0.01)
        q1, median, q3 = quantiles

        # Вычисление IQR и границ усов
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        # Фильтрация выбросов
        filtered_df = data.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))
        outliers_df = data.filter((col(column) < lower_bound) | (col(column) > upper_bound))

        # Вычисление минимального и максимального значений
        min_value = data.agg({column: "min"}).collect()[0][0]
        mean_value = data.agg({column: "mean"}).collect()[0][0]
        std_value = data.agg({column: "std"}).collect()[0][0]
        max_value = data.agg({column: "max"}).collect()[0][0]

        # Ограничение усов минимальным и максимальным значениями
        lower_bound = max(lower_bound, min_value)
        upper_bound = min(upper_bound, max_value)

        # Семплирование выбросов
        outliers = []
        if not outliers_df.isEmpty():
            sampled_outliers_df = outliers_df.sample(sample_fraction)
            outliers = (
                sampled_outliers_df
                .select(column)
                .limit(1000)
                .collect()
            )
            outliers = [row[column] for row in outliers]
            
            # Добавление минимального и максимального значений, если они 
            # относятся к выбросам и не присутствуют в семпле
            if min_value < lower_bound and min_value not in outliers:
                outliers.append(min_value)
            if max_value > upper_bound and max_value not in outliers:
                outliers.append(max_value)

        # Подготовка данных для axes.bxp
        box_data.append({
            'whislo': lower_bound,  # Нижняя граница усов
            'q1': q1,               # Первый квартиль
            'med': median,          # Медиана
            'q3': q3,               # Третий квартиль
            'whishi': upper_bound,  # Верхняя граница усов
            'fliers': outliers      # Выбросы
        })
        
    # Вывод статистических характеристик
    print(f"Минимальное значение:          {min_value:.2f}")
    print(f"Среднее значение:              {mean_value:.2f}")
    print(f"Среднеквадратичное отклонение: {std_value:.2f}")
    print(f"Первый квартиль:               {q1:.2f}")
    print(f"Медиана:                       {median:.2f}")
    print(f"Третий квартиль:               {q3:.2f}")
    print(f"Максимальное значение:         {max_value:.2f}")

    # Построение boxplot
    fig, ax = plt.subplots(figsize=(20, 6))
    ax.bxp(box_data, 
           vert=False, 
           positions=range(1, len(columns) + 1), widths=0.5)
    ax.set_yticks(range(1, len(columns) + 1))
    ax.set_yticklabels(columns)
    ax.set_xlabel('Value')
    ax.set_title('Boxplots')
    ax.grid(True)
    plt.show()

In [None]:
plot_boxplots(data=df, columns=["daysonmarket"])

Наблюдаем значительное количество выбросов в данных. Для более тщательного исследования создадим функцию для визуализации распределения категориального признака.

In [352]:
def plot_quant_distribution(data: DataFrame,
                            column: str,
                            num_bins: int = 200) -> None:
    """
    Построение гистограммы для количественной переменной с 
    использованием PySpark и Seaborn.

    Args:
        data (DataFrame): DataFrame с данными.
        column_name (str): Название колонки с количественной переменной.
        num_bins (int): Количество бинов для гистограммы.

    Returns:
        None
    """
    # Находим минимальное и максимальное значения колонки
    min_value = data.agg({column: "min"}).collect()[0][0]
    max_value = data.agg({column: "max"}).collect()[0][0]

    # Размер бина
    bin_size = (max_value - min_value) / num_bins

    # Добавляем колонку с номером бина
    data = data.withColumn(
        "bin", 
        floor((col(column) - min_value) / bin_size)
    )

    # Группируем по номеру бина и считаем количество строк в каждом бине
    bin_counts = data.groupBy("bin").count()

    # Преобразуем результат в Pandas DataFrame для построения гистограммы
    bin_counts_pd = bin_counts.limit(1000).toPandas()
    
    # Создаем массив границ бинов
    bin_edges = [min_value + i * bin_size for i in range(num_bins + 2)]
    
    # Преобразуем номера бинов в центры бинов
    bin_centers = [
        (bin_edges[i] + bin_edges[i + 1]) / 2 for i in range(num_bins + 1)
    ]
    
    # Добавляем центры бинов в Pandas DataFrame
    bin_counts_pd['bin_center'] = bin_counts_pd['bin'].apply(
        lambda x: bin_centers[int(x)]
    )
    
    # Построение гистограммы с использованием Seaborn
    plt.figure(figsize=(20, 6))
    sns.histplot(data=bin_counts_pd, x="bin_center", 
                 weights="count", kde=True, bins=num_bins + 1)
    plt.xlabel("Value")
    plt.ylabel("Count")
    plt.title(f"Распределение количественного признака \"{column}\"")
    plt.grid(True)
    plt.show()

In [None]:
plot_quant_distribution(data=df, column="daysonmarket")

Видно, что почти все данные не превышают значения 500, но при этом наблюдается малое количество довольно сильных выбросов. Обрежем эти выбросы, установив для них максимальную границу.

In [354]:
df = df.withColumn(
    "daysonmarket",
    when(col("daysonmarket") > 500.0, 500.0)
        .otherwise(col("daysonmarket"))
)

In [None]:
plot_quant_distribution(data=df, column="daysonmarket")

Теперь диаграмма более эффективно представляет данные в столбце.

In [None]:
df.show()

#### Анализ столбца `fleet`

In [None]:
count_nulls(data=df, column_name="fleet")

Видно, что более половины данных в столбце пропущены. Можно, конечно, попытаться обработать то, что есть, но в целях упрощения анализа просто удалим столбец.

In [None]:
df = df.drop("fleet")
df.show()

#### Анализ столбца `has_accidents`

In [None]:
count_nulls(data=df, column_name="has_accidents")

Как видим, ситуация аналогичная. Удаляем столбец.

In [360]:
df = df.drop("has_accidents")

In [None]:
df.show()

#### Анализ столбца `horsepower`

Данный столбец, согласно описанию и значениям, которые он принимает, можно отнести к **количественным**.

Выполним аналогичные шаги.

In [None]:
count_nulls(data=df, column_name="horsepower")

Пропущено много значений, но меньше половины. Стоит попытаться сохранить признак и обработать его.

In [None]:
plot_boxplots(data=df, columns=["horsepower"])

Можно наблюдать сильные выбросы. Удалим строки, их содержащие, и убедимся, что потеряна небольшая часть данных.

In [None]:
df.filter(col("horsepower") > 400).count()

Заменим пропуски средним значением признака.

In [None]:
df = df.filter(col("horsepower") < 400)
mean_horsepower = df.select(mean(col("horsepower"))).collect()[0][0]
mean_horsepower

In [366]:
df = df.fillna({"horsepower": mean_horsepower})

In [None]:
plot_quant_distribution(data=df, column="horsepower")

In [None]:
df.show()

#### Анализ столбцов `is_certified, is_cpo, is_oemcpo`

Данные признаки являются **бинарными** и имеют схожую интерпретацию. Заменим их одним признаком, который принимает значение `True`, если хотя бы один из вышеупомянитых признаков содержит истинное значение.

In [369]:
df = df.withColumn(
    "is_any_cert", 
    when(
        col("is_certified") | col("is_cpo") | col("is_oemcpo"), 
        True
    ).otherwise(False)
).drop(col("is_certified")).drop(col("is_cpo")).drop(col("is_oemcpo"))

In [None]:
df.show()

In [None]:
count_nulls(data=df, column_name="is_any_cert")

Пропуски отсутствуют.

In [None]:
plot_cat_distribution(data=df, column_name="is_any_cert")

#### Анализ признака `major_options`

In [None]:
count_nulls(data=df, column_name="major_options")

Пропуски есть, но их немного. Заменим их пустыми списками.

In [None]:
df = df.withColumn(
    "major_options", 
    when(
        col("major_options").isNull(), 
        lit([])
    ).otherwise(col("major_options"))
)

count_nulls(data=df, column_name="major_options")

Данный признак представляет собой **массив признаков**. Обработаем его следующим образом. Определим пятёрку самых часто встречающихся элементов списка для всех объектов из датасета и введем пять **бинарных** признаков, показывающих присутствие элемента в списке конкретного объекта.

In [375]:
def get_popular_options(data: DataFrame, 
                        column: str, 
                        top_n: int = 5) -> DataFrame:
    """
    Получение DataFrame всех элементов массива опций, подсчет их количества, 
    сортировка по убыванию и вывод нескольких самых популярных элементов.

    Args:
        data (DataFrame): DataFrame, содержащий данные.
        column (str): Имя столбца с массивом.
        top_n (int): Количество самых популярных элементов для вывода.

    Returns:
        DataFrame: DataFrame с самыми популярными элементами.
    """
    # Развертывание массива в отдельные строки
    exploded_df = data.withColumn("element", explode(col(column)))

    # Подсчет количества каждого элемента
    element_counts = (
        exploded_df
        .groupBy("element")
        .agg(count("element").alias("count"))
    )

    # Сортировка элементов по убыванию
    sorted_elements = element_counts.orderBy(desc("count"))

    # Вывод нескольких самых популярных элементов
    top_elements = sorted_elements.limit(top_n)

    return top_elements

Получим датафрейм из пяти самых популярных опций.

In [None]:
popular_options = get_popular_options(data=df, column="major_options")
popular_options.show()

In [377]:
def add_options_columns(data: DataFrame,
                        column: str,
                        popular_options: DataFrame) -> DataFrame:
    """
    Добавление новых булевых колонок в DataFrame, указывающих, 
    содержится ли определённый элемент в каждой строке.

    Args:
        data (DataFrame): DataFrame, содержащий данные.
        column (str): Имя столбца с массивом.
        popular_elements (DataFrame): DataFrame с популярными элементами.

    Returns:
        DataFrame: DataFrame с новыми булевыми колонками.
    """
    # Добавление булевых колонок для каждой популярной категории
    categories_df = popular_options.select(col("element").alias("category"))
    result = data.crossJoin(categories_df.hint("broadcast"))
    result = result.withColumn("contains", 
                               array_contains(col(column), col("category")))
    result = result.groupBy(data.columns).pivot("category").agg(first("contains"))
    
    for col_name in result.columns:
        if col_name not in data.columns:
            result = result.withColumnRenamed(col_name, f"contains_{col_name}")
    
    return result

Выполним вышеописанное преобразование над датасетом, а признак-массив удалим.

In [None]:
df = add_options_columns(data=df, column="major_options", 
                         popular_options=popular_options)
df = df.drop("major_options").cache()
df.show()

#### Анализ столбца `maximum_seating`

Данный признак является **количественным**.

In [None]:
count_nulls(data=df, column_name="maximum_seating")

In [None]:
(
    df
    .filter(col("maximum_seating").isNotNull())
    .groupBy("maximum_seating")
    .count()
    .show()
)

Оставим только те автомобили, число мест в которых менее 20. Их преобладающее количество.

In [None]:
df = df.filter(col("maximum_seating") < 20)
mean_maximum_seating = int(
    df.select(mean(col("maximum_seating"))).collect()[0][0]
)
mean_maximum_seating

Заменим пропуски средним значением.

In [382]:
df = df.fillna({"maximum_seating": mean_maximum_seating})

In [None]:
plot_cat_distribution(data=df, column_name="maximum_seating")

In [None]:
df.show()

#### Анализ признака `price`

Признак **количественный**. Все преобразования аналогичны вышерассмотренным.

In [None]:
count_nulls(data=df, column_name="price")

In [None]:
plot_boxplots(data=df, columns=["price"])

In [None]:
df.filter(col("price") > 60000).count()

In [None]:
df = df.filter(col("price") <= 60000)
plot_quant_distribution(data=df, column="price")

In [None]:
df.show()

#### Анализ столбца `wheel_system`

Признак **категориальный**.

In [None]:
count_nulls(data=df, column_name="wheel_system")

In [None]:
(
    df
    .filter(col("wheel_system").isNotNull())
    .groupBy("wheel_system")
    .count()
    .show()
)

Заменим пропуски модой.

In [None]:
wheel_system_mode = (
    df
    .filter(col("wheel_system").isNotNull())
    .groupBy("wheel_system")
    .count()
    .orderBy("count", ascending=False)
    .first()[0]
)
wheel_system_mode

In [393]:
df = df.fillna({"wheel_system": wheel_system_mode})

In [None]:
plot_cat_distribution(data=df, column_name="wheel_system")

In [None]:
df.show()

#### Анализ столбца `mileage`

Признак **количественный**.

In [None]:
count_nulls(data=df, column_name="mileage")

In [None]:
df = df.filter(col("mileage").isNotNull())
count_nulls(data=df, column_name="mileage")

In [None]:
plot_boxplots(data=df, columns=["mileage"])

In [None]:
plot_quant_distribution(data=df, column="mileage")

In [None]:
df = df.filter(col("mileage") <= 200000.0)
plot_quant_distribution(data=df, column="mileage")

In [None]:
df.show()

#### Анализ признака `year`

Признак **количественный**.

Преобразуем год выпуска в количество лет с года выпуска до 2024 года.

In [None]:
df = df.withColumn("age", lit(2024) - col("year")).drop("year")
df.show()

#### Расчет корреляции между количественными признаками

In [403]:
def compute_and_visualize_correlation_matrix(data: DataFrame, 
                                             columns: list[str]) -> None:
    """
    Вычисляет и визуализирует корреляционную матрицу для указанных 
    колонок в DataFrame PySpark.

    Args:
        df (DataFrame): DataFrame PySpark.
        columns (list[str]): Список колонок для вычисления корреляции.

    Returns:
        None
    """
    # Вычисление корреляционной матрицы
    corr_matrix = {}
    for col1 in columns:
        corr_matrix[col1] = {}
        for col2 in columns:
            corr_value = data.select(corr(col1, col2)).collect()[0][0]
            corr_matrix[col1][col2] = corr_value

    # Преобразование корреляционной матрицы в DataFrame Pandas для визуализации
    corr_matrix_pd = pd.DataFrame(corr_matrix)

    # Построение и визуализация корреляционной матрицы
    plt.figure(figsize=(10, 8))
    sns.heatmap(corr_matrix_pd, annot=True, cmap='coolwarm', linewidths=0.5)
    plt.title('Correlation Matrix')
    plt.show()

In [None]:
compute_and_visualize_correlation_matrix(
    data=df, columns=[
        "daysonmarket", "horsepower", "maximum_seating",
        "mileage", "age", "price"
    ]
)

Корреляционная матрица демонстрирует наличие корреляции между некоторыми количественными признаками.

In [None]:
df.show()

Посмотрим, сколько объектов осталось после преобразований датасета.

In [None]:
df.count()

In [None]:
df.printSchema()

Сохраняет очищенную и обработанную таблицу на диск.

In [None]:
# Сохранение DataFrame в виде таблицы
df.writeTo("sobd_lab1_processed_table").using("iceberg").create()

In [None]:
for table in spark.catalog.listTables():
    print(table.name)

Обратите внимание, что при необходимости созданные базу данных и таблицу можно удалить следующими командами.

In [409]:
# spark.sql("DROP TABLE spark_catalog.ivanov_database.sobd_lab1_processed_table")
# spark.sql("DROP DATABASE spark_catalog.ivanov_database")

Останавливаем `Spark`-сессию.

In [10]:
spark.stop()