# Лабораторная работа № 1
## Выполнение разведочного анализа больших данных с использованием фреймворка Apache Spark

## Часть 1
В данной части работы рассмотрены:

* загрузка данных из HDFS;
* базовые преобразования данных;
* загрузка преобразованных данных в таблицу Apache Airflow.low.

Подключим необходимые библиотеки.

In [162]:
import os
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf
from pyspark.sql.functions import (
    regexp_replace,
    regexp_extract_all,
    col,
    lit,
    to_date,
    when
)

Сформируем объект конфигурации для Apache Spark, указав необходимые параметры.

In [163]:
def create_spark_configuration() -> SparkConf:
    """
    Создает и конфигурирует экземпляр SparkConf для приложения Spark.
    Локальный режим с доступом к HDFS и Iceberg.
    """
    user_name = "jovyan"
    
    conf = SparkConf()
    conf.setAppName("lab 1 Movies Analysis")
    conf.setMaster("local[*]")  # Локальный режим
    conf.set("spark.sql.adaptive.enabled", "true")
    conf.set("spark.executor.memory", "6g")
    conf.set("spark.executor.cores", "6")
    conf.set("spark.executor.instances", "1")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.driver.cores", "1")
   
    conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
    conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")  # Для старых дат
    conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY")   # Для чтения старых дат
    conf.set("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")     # Для timestamp
    conf.set("spark.sql.parquet.int96RebaseModeInRead", "LEGACY")      # Для чтения timestamp
    
    # Конфигурация для работы с HDFS
    conf.set("spark.hadoop.fs.defaultFS", "hdfs://hadoop-namenode:9820")
    conf.set("spark.hadoop.dfs.client.use.datanode.hostname", "true")
    
    return conf

Создаём сам объект конфигурации.

In [164]:
conf = create_spark_configuration()

Создаём и выводим на экран сессию Apache Spark.

In [165]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

Для исследования будем использовать датасет `"Movies_and_TV.json"`, расположенный на платформе Kaggle по адресу https://jmcauley.ucsd.edu/data/amazon_v2/index.html

Датасет включает в себя информацию о рецензиях на фильмы и сериалы от компании Amazon.

Данный датасет уже загружен в `HDFS` по адресу: `hdfs://hadoop-namenode:9820/data/Movies_and_TV.csv`

Указываем путь `в HD`FS для файла с данными.

In [166]:
path = "hdfs://hadoop-namenode:9820/data/Movies_and_TV.csv"

Заполняем датафрейм данными из файла

In [167]:
df = (spark.read.format("csv")
      .option("header", "true")
      .load(path)
)

Выводим фрагмент датафрейма на экран.

In [168]:
print("Первые 10 строк датасета:")
df.show(10)

Первые 10 строк датасета:
+----------+-----+-------+--------------------+-----------+--------------+-------------+------------+-------------+------------+-----------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID| reviewerName|style_Color:|style_Format:|style_Shape:|style_Size:|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-------------+------------+-------------+------------+-----------+--------------------+--------------+--------+----+
|0001527665| NULL|    5.0|really happy they...|03 11, 2013|A3478QRKQDOPQ2|        jacki|        NULL|     VHS Tape|        NULL|       NULL|               great|    1362960000|    True|NULL|
|0001527665| NULL|    5.0|Having lived in W...|02 18, 2013|A2VHSG6TZHU1OB|        Ken P|        NULL| Amazon Video|        NULL|       NULL|Realistic and Acc...|    1361145600|    True|   3|
|0001527665| NULL| 

Очевидно, что в целях сохранения ясности изложения и сокращения расчетного времени имеет смысл рассматривать не все солбцы датасета. Оставим следующие колонки, удалив остальные:

| Название столбца  | Расшифровка |
| ------------- | ------------- |
| asin | ID товара  |
|reviewerID  |ID рецензента  |
|reviewerName  |Имя рецензента  |
|reviewText  |Текст отзыва  |
|overall  |Общая оценка  |
|summary  |Краткое описание  |
|unixReviewTime  |Время отзыва в Unix-времени  
| reviewTime | Время отзыва ||

In [169]:
selected_columns = [
    "asin",           
    "reviewerID",       
    "reviewerName",        
    "reviewText",     
    "overall",        
    "summary",        
    "unixReviewTime", 
    "reviewTime"  
]

df = df.select(*selected_columns)

In [170]:
df.show(10)

+----------+--------------+-------------+--------------------+-------+--------------------+--------------+-----------+
|      asin|    reviewerID| reviewerName|          reviewText|overall|             summary|unixReviewTime| reviewTime|
+----------+--------------+-------------+--------------------+-------+--------------------+--------------+-----------+
|0001527665|A3478QRKQDOPQ2|        jacki|really happy they...|    5.0|               great|    1362960000|03 11, 2013|
|0001527665|A2VHSG6TZHU1OB|        Ken P|Having lived in W...|    5.0|Realistic and Acc...|    1361145600|02 18, 2013|
|0001527665|A23EJWOW1TLENE|Reina Berumen|Excellent look in...|    5.0|         Peace Child|    1358380800|01 17, 2013|
|0001527665|A1KM9FNEJ8Q171|      N Coyle|"More than anythi...|    5.0|Culturally releva...|    1357776000|01 10, 2013|
|0001527665|A38LY2SSHVHRYB| Jodie Vesely|This is a great m...|    4.0|Good Movie! Great...|    1356480000|12 26, 2012|
|0001527665| AHTYUW2H1276L|  lilsistah21|This mo

Выведем на экран метаданные датасета.

In [171]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: string (nullable = true)
 |-- reviewTime: string (nullable = true)



Видно, что все столбцы датасета содержат строковый тип данных, что не соответствует ожиданиям. Выполним преобразования типов данных некоторых столбцов.

In [172]:
def transform_dataframe(data: DataFrame) -> DataFrame:
    """
    Преобразует столбцы DataFrame в указанные типы данных.
    """
    # Числовые колонки
    data = data.withColumn("overall", col("overall").cast("Float"))
    
    # Временные метки
    data = data.withColumn("unixReviewTime", col("unixReviewTime").cast("Integer"))

    # Преобразуем reviewTime из "03 11, 2013" в Date с обработкой однозначных чисел
    data = data.withColumn(
        "reviewTime_clean",
        # Сначала нормализуем формат: добавляем ведущие нули к однозначным числам
        regexp_replace(
            regexp_replace(col("reviewTime"), r"\b(\d)\b", "0$1"),  # 3 -> 03
            r"(\d{2}) (\d{2}), (\d{4})", "$2/$1/$3"  # 03 11, 2013 -> 11/03/2013
        )
    )
    
    # Преобразуем в дату
    data = data.withColumn(
        "reviewTime_date",
        to_date(col("reviewTime_clean"), "MM/dd/yyyy")
    )
    
    
    # Удаляем временные колонки и переименовываем
    data = data.drop("reviewTime", "reviewTime_clean").withColumnRenamed("reviewTime_date", "reviewTime")
    
    return data

In [173]:
df = transform_dataframe(df)

In [174]:
print("После преобразований:")
df.show(10)

После преобразований:
+----------+--------------+-------------+--------------------+-------+--------------------+--------------+----------+
|      asin|    reviewerID| reviewerName|          reviewText|overall|             summary|unixReviewTime|reviewTime|
+----------+--------------+-------------+--------------------+-------+--------------------+--------------+----------+
|0001527665|A3478QRKQDOPQ2|        jacki|really happy they...|    5.0|               great|    1362960000|2013-11-03|
|0001527665|A2VHSG6TZHU1OB|        Ken P|Having lived in W...|    5.0|Realistic and Acc...|    1361145600|      NULL|
|0001527665|A23EJWOW1TLENE|Reina Berumen|Excellent look in...|    5.0|         Peace Child|    1358380800|      NULL|
|0001527665|A1KM9FNEJ8Q171|      N Coyle|"More than anythi...|    5.0|Culturally releva...|    1357776000|2013-10-01|
|0001527665|A38LY2SSHVHRYB| Jodie Vesely|This is a great m...|    4.0|Good Movie! Great...|    1356480000|      NULL|
|0001527665| AHTYUW2H1276L|  lilsi

In [175]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: float (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: integer (nullable = true)
 |-- reviewTime: date (nullable = true)



Видно, что теперь столбцы датафрейма содержат значения корректных типов.

Сохраненим DataFrame в Parquet форматх.

## Запись данных в Parquet

Проводим дополнительную очистку для колонки времени проверки

In [176]:
df_clean = df

df_clean = df_clean.withColumn(
    "reviewTime", 
    when(col("reviewTime").isNull(), lit(None)).otherwise(col("reviewTime"))
)

print("Проверка очищенных данных:")
df_clean.printSchema()

Проверка очищенных данных:
root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: float (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: integer (nullable = true)
 |-- reviewTime: date (nullable = true)



Настройка пути в HDFS

In [177]:
database_name = "mattiev_database"
table_name = "movies_lab1_parquet_table"

output_path = f"hdfs://hadoop-namenode:9820/data/{database_name}/{table_name}"

print(f"Сохраняем данные в: {output_path}")


Сохраняем данные в: hdfs://hadoop-namenode:9820/data/mattiev_database/movies_lab1_parquet_table


Сохраняем DataFrame в Parquet формате

In [178]:
df_clean.write \
  .mode("overwrite") \
  .option("compression", "snappy") \
  .parquet(output_path)

print("Данные успешно сохранены в Parquet формате в HDFS")

Данные успешно сохранены в Parquet формате в HDFS


## Чтение из Parquet

In [179]:
df_parquet = spark.read.parquet(output_path)

print("Данные успешно загружены из Parquet")
print(f"Количество строк: {df_parquet.count()}")
print(f"Количество колонок: {len(df_parquet.columns)}")

# Показать схему данных
print("Схема данных:")
df_parquet.printSchema()

# Показать первые несколько строк
print("Первые 10 строк:")
df_parquet.show(10)

Данные успешно загружены из Parquet
Количество строк: 13223183
Количество колонок: 8
Схема данных:
root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: float (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: integer (nullable = true)
 |-- reviewTime: date (nullable = true)

Первые 10 строк:
+--------------------+--------------+--------------------+--------------------+-------+--------------------+--------------+----------+
|                asin|    reviewerID|        reviewerName|          reviewText|overall|             summary|unixReviewTime|reviewTime|
+--------------------+--------------+--------------------+--------------------+-------+--------------------+--------------+----------+
| Not so good: the...|          NULL|                NULL| it's scope was c...|   NULL|                NULL|          NULL|      NULL|
|         

После успешной записи таблицы останавливаем сессию Apache Spark.

In [180]:
spark.stop()