# Блок 1. Standalone Spark

### 1.1. Развернуть standalone cluster Spark: master + 2 workers. Приложить скрипт и/или алгоритм + скрин webui

Скрипт в папке spark

![image](images/spark.png)

### 1.2. Подключиться к кластеру с помощью Jupyter и/или Zeppelin. Приложить скрипт и/или алгоритм + скрин рабочей сессии из инструмента

In [1]:
from pyspark.sql import SparkSession

# Имя хоста Spark Master в docker
SPARK_MASTER_HOST = "spark-master"
# Порт Spark Master
SPARK_MASTER_PORT = "7077"
# Память выделенная для Spark Worker в настройках docker-compose.yml
SPARK_WORKER_MEMORY = "512m"
# Название сессии (любое)
SPARK_SESSION = "pyspark-jupyter"

# Создаем сессию
spark = (
    SparkSession.builder.appName(SPARK_SESSION)
    .master(f"spark://{SPARK_MASTER_HOST}:{SPARK_MASTER_PORT}")
    .config("spark.executor.memory", SPARK_WORKER_MEMORY)
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/02 19:55:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

### 1.3. Развертывание и подключение к HDFS

Скрипт развертывания в папке hadoop

#### Помещаем файлы которые нужно загрузить в hdfs в папку /hdfs_upload/upload
#### выполняем скрипт hdfs_upload.sh

![image](images/hadoop.png)

In [3]:
import requests

# Имя Hadoop Namenode в docker
HADOOP_HOST = 'hadoop-namenode'
# Hadoop Namenode port
HADOOP_PORT = "9000"
# Hadoop Namenode port
HADOOP_WEB_PORT = "9870"
# Hadoop default directory
HADOOP_DIR = "upload"


def hdfs_files(
    host: str = HADOOP_HOST,
    port: str = HADOOP_WEB_PORT,
    directory: str = HADOOP_DIR,
    f_filter: str = "",
    f_extension: str = "",
) -> list | str:
    """Returns files list of target directory.

    :param host: hadoop namenode host name
    :param port: hadoop namenode web port
    :param directory: target directory to list.
    :param f_filter: string to filter filename with
    :param f_extension: file extension ('csv', 'parquet', etc..)
    """
    try:
        response = requests.get(
            f"http://{host}:{port}/webhdfs/v1/{directory}?op=LISTSTATUS"
        ).json()
    except Exception as error:
        return error

    if response.get("FileStatuses"):
        files_data = response["FileStatuses"].get("FileStatus")
        file_list = [
            file.get("pathSuffix")
            for file in files_data
            if f_filter in file.get("pathSuffix")
            and file.get("pathSuffix").endswith(f_extension)
            and file.get("type") == "FILE"
        ]
        return file_list
    elif response.get("RemoteException"):
        return response["RemoteException"].get("message")


# Блок 2. Работа с данными на Spark

### 2.1. Преобразовать данные исходного датасета в parquet объединяя все таблицы. Оценить разницу в скорости чтения / занимаемом объеме. Сделать выводы.

#### Получаем список файлов датасета book

In [4]:
book_files = hdfs_files(f_filter='book', f_extension='csv')
book_files

['book1-100k.csv',
 'book1000k-1100k.csv',
 'book100k-200k.csv',
 'book1100k-1200k.csv',
 'book1200k-1300k.csv',
 'book1300k-1400k.csv',
 'book1400k-1500k.csv',
 'book1500k-1600k.csv',
 'book1600k-1700k.csv',
 'book1700k-1800k.csv',
 'book1800k-1900k.csv',
 'book1900k-2000k.csv',
 'book2000k-3000k.csv',
 'book200k-300k.csv',
 'book3000k-4000k.csv',
 'book300k-400k.csv',
 'book4000k-5000k.csv',
 'book400k-500k.csv',
 'book500k-600k.csv',
 'book600k-700k.csv',
 'book700k-800k.csv',
 'book800k-900k.csv',
 'book900k-1000k.csv']

#### Так как схема в разных файлах может различаться берем схему из первого файла и создаем пустой датафрейм

In [66]:
from pyspark.sql.types import StructType, StructField, StringType

# Ссылка доступа к HDFS
HADOOP_LINK = f"hdfs://{HADOOP_HOST}:{HADOOP_PORT}"

schema = spark.read.csv(
    f"{HADOOP_LINK}/{HADOOP_DIR}/{book_files[0]}", header=True, inferSchema=True
).limit(1).schema

emp_RDD = spark.sparkContext.emptyRDD()

spark_df = spark.createDataFrame(data = emp_RDD, schema=schema)

In [67]:
spark_df

DataFrame[Id: int, Name: string, RatingDist1: string, pagesNumber: string, RatingDist4: string, RatingDistTotal: string, PublishMonth: string, PublishDay: string, Publisher: string, CountsOfReview: string, PublishYear: string, Language: string, Authors: string, Rating: string, RatingDist2: string, RatingDist5: string, ISBN: string, RatingDist3: string]

#### Читаем каждый файл и объединяем в один датасет

In [6]:
for file in book_files:
    
    spark_data = (
        spark.read
        .option("multiline", "true")
        .option("quote", '"')
        .option("header", "true")
        .option("escape", "\\")
        .option("escape", '"')
        .csv(f"{HADOOP_LINK}/{HADOOP_DIR}/{file}")
    )
    # spark_data = spark.read.csv(f"{HADOOP_LINK}/{HADOOP_DIR}/{file}", header=True, multiLine=True)
    spark_df = spark_df.unionByName(spark_data, allowMissingColumns=True)

                                                                                

#### Удаляем дубликаты

In [7]:
spark_df = spark_df.dropDuplicates()

+-------+-----------+
|summary|pagesNumber|
+-------+-----------+
|  count|          0|
|   mean|       null|
| stddev|       null|
|    min|       null|
|    max|       null|
+-------+-----------+



#### Записываем dataframe parquet в HDFS

In [8]:
FILENAME = 'book.parquet'

write_hadoop = (
    spark_df.write.option("header", True)
    .mode("overwrite")
    .parquet(f"hdfs://{HADOOP_HOST}:{HADOOP_PORT}/{FILENAME}")
)

                                                                                

#### Читаем dataframe из HDFS

In [9]:
df_load = spark.read.parquet(f"hdfs://{HADOOP_HOST}:{HADOOP_PORT}/{FILENAME}")

#### Сравниваем время выполнения

In [10]:
import time

def check_time(command):
    start_time = time.time()
    command
    print("--- %s seconds ---" % (time.time() - start_time))


In [11]:
# CSV

check_time(
    spark_df.filter(spark_df['Id'] == '400000').show(vertical=True)
)



-RECORD 0-------------------------------------
 Id                    | 400000               
 Name                  | The Gigli Concert    
 RatingDist1           | 1:2                  
 pagesNumber           | 96                   
 RatingDist4           | 4:10                 
 RatingDistTotal       | total:27             
 PublishMonth          | 16                   
 PublishDay            | 9                    
 Publisher             | Bloomsbury Methue... 
 CountsOfReview        | 2                    
 PublishYear           | 1991                 
 Language              | null                 
 Authors               | Tom    Murphy        
 Rating                | 3.3                  
 RatingDist2           | 2:4                  
 RatingDist5           | 5:3                  
 ISBN                  | 0413659305           
 RatingDist3           | 3:8                  
 Description           | null                 
 Count of text reviews | null                 

--- 9.298324

                                                                                

In [12]:
# Parquet

check_time(
    df_load.filter(df_load['Id'] == '400000').show(vertical=True)
)

                                                                                

-RECORD 0-------------------------------------
 Id                    | 400000               
 Name                  | The Gigli Concert    
 RatingDist1           | 1:2                  
 pagesNumber           | 96                   
 RatingDist4           | 4:10                 
 RatingDistTotal       | total:27             
 PublishMonth          | 16                   
 PublishDay            | 9                    
 Publisher             | Bloomsbury Methue... 
 CountsOfReview        | 2                    
 PublishYear           | 1991                 
 Language              | null                 
 Authors               | Tom    Murphy        
 Rating                | 3.3                  
 RatingDist2           | 2:4                  
 RatingDist5           | 5:3                  
 ISBN                  | 0413659305           
 RatingDist3           | 3:8                  
 Description           | null                 
 Count of text reviews | null                 

--- 3.337860

                                                                                

#### Сравниваем объем

In [13]:
def hdfs_dir_size(
    host: str = HADOOP_HOST,
    port: str = HADOOP_WEB_PORT,
    directory: str = HADOOP_DIR,
    f_filter: str = "",
    f_extension: str = "",
) -> list | str:
    """
    Return size of directory in Mb.

    :param host: hadoop namenode host name
    :param port: hadoop namenode web port
    :param directory: target directory to list.
    :param f_filter: string to filter filename with
    :param f_extension: file extension ('csv', 'parquet', etc..)
    """
    try:
        response = requests.get(
            f"http://{host}:{port}/webhdfs/v1/{directory}?op=LISTSTATUS"
        ).json()
    except Exception as error:
        return error

    if response.get("FileStatuses"):
        files_data = response["FileStatuses"].get("FileStatus")
        files_size = [
            int(file.get("length"))
            for file in files_data
            if f_filter in file.get("pathSuffix")
            and file.get("pathSuffix").endswith(f_extension)
            and file.get("type") == "FILE"
        ]
        return sum(files_size) / 1048576
    elif response.get("RemoteException"):
        return response["RemoteException"].get("message")

In [14]:
# CSV files size

hdfs_dir_size(directory="upload", f_filter="book", f_extension="csv")

1110.6515398025513

In [15]:
# Parquet files size

hdfs_dir_size(directory="book.parquet")

692.379002571106

### 2.2. Используя весь набор данных с помощью Spark вывести

#### Топ-10 книг с наибольшим числом ревью

In [68]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum, avg, max, min, mean, count

# Изменяем тип столбца CountsOfReview на int
df_load = df_load.withColumn("CountsOfReview", df_load.CountsOfReview.cast(IntegerType()))

In [105]:
top_10_book = (
    df_load
    .orderBy('CountsOfReview', ascending=False)
    .select("Id", "Name", "CountsOfReview")
    .limit(10)
)
top_10_book.show(truncate=False)



+-------+---------------------------------------------------------+--------------+
|Id     |Name                                                     |CountsOfReview|
+-------+---------------------------------------------------------+--------------+
|2767052|The Hunger Games (The Hunger Games, #1)                  |154447        |
|41865  |Twilight (Twilight, #1)                                  |94850         |
|19063  |The Book Thief                                           |87685         |
|4667024|The Help                                                 |76040         |
|3      |Harry Potter and the Sorcerer's Stone (Harry Potter, #1) |75911         |
|3636   |The Giver (The Giver, #1)                                |57034         |
|43641  |Water for Elephants                                      |52918         |
|2429135|The Girl with the Dragon Tattoo (Millennium, #1)         |52225         |
|136251 |Harry Potter and the Deathly Hallows (Harry Potter, #7)  |52088         |
|281

                                                                                

#### Топ-10 издателей с наибольшим средним числом страниц в книгах

In [84]:
# Изменяем тип столбца pagesNumber на int
df_load = df_load.withColumn("pagesNumber", df_load.pagesNumber.cast(IntegerType()))

In [106]:
from pyspark.sql.functions import sum, avg, max, min, mean, count

top_10_pub = (
    df_load.groupBy("Publisher")
    .agg(avg("pagesNumber")
    .alias("average_pages"))
    .sort('average_pages', ascending=False)
    .limit(10)
)
top_10_pub.show(truncate=False)



+-----------------------------------------------------------+------------------+
|Publisher                                                  |average_pages     |
+-----------------------------------------------------------+------------------+
|Crafty Secrets Publications                                |1807321.6         |
|Sacred-texts.com                                           |500000.0          |
|Department of Russian Language and Literature University of|322128.5714285714 |
|Logos Research Systems                                     |100000.0          |
|Encyclopedia Britannica, Incorporated                      |32642.0           |
|Progressive Management                                     |19106.3625        |
|Still Waters Revival Books                                 |10080.142857142857|
|P. Shalom Publications, Incorporated                       |8539.0            |
|Hendrickson Publishers, Inc. (Peabody, MA)                 |6448.0            |
|IEEE/EMB                   

                                                                                

In [104]:
df_load.describe(['pagesNumber']).show()

+-------+------------------+
|summary|       pagesNumber|
+-------+------------------+
|  count|           1850198|
|   mean|276.55174202977196|
| stddev| 5006.170699333496|
|    min|                 0|
|    max|           4517845|
+-------+------------------+



In [18]:
# spark_df.count()

In [19]:
# spark.stop()