Подключаемся к созданному кластеру:

In [10]:
sc.stop()

In [11]:
sc = pyspark.SparkContext(master="spark://lightkeeper:7077",
                          appName='spark_hw').getOrCreate()

In [12]:
sc

In [13]:
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.getOrCreate()

In [15]:
spark

In [355]:
from pyspark.sql.types import IntegerType, FloatType, StringType, StructField, StructType, TimestampType
from pyspark.sql.functions import avg,max,count, desc, udf,\
col, concat_ws, unix_timestamp, from_unixtime, lit, window, monotonically_increasing_id, first
import math
import glob

## Блок 2

In [10]:
! ls -a archive

.		     book1400k-1500k.csv  book4000k-5000k.csv
..		     book1500k-1600k.csv  book400k-500k.csv
all_books_csv	     book1600k-1700k.csv  book500k-600k.csv
all_books.parquet    book1700k-1800k.csv  book600k-700k.csv
book1000k-1100k.csv  book1800k-1900k.csv  book700k-800k.csv
book100k-200k.csv    book1900k-2000k.csv  book800k-900k.csv
book1100k-1200k.csv  book2000k-3000k.csv  book900k-1000k.csv
book1-100k.csv	     book200k-300k.csv	  rating_out
book1200k-1300k.csv  book3000k-4000k.csv  tmp
book1300k-1400k.csv  book300k-400k.csv	  user_rating


### 2.1 Преобразовать данные исходного датасета в parquet объединяя все таблицы. 

Сформируем датафрейм, объединив вче файлы, при этом учитываем, что в разных файлах может быть различное число полей.

In [505]:
files_path = glob.glob('archive/book*')

In [506]:
df = spark.read.option('header', 'true').option('inferSchema', 'true')\
                                        .option("multiLine", 'true')\
                                        .option("escape", "\"")\
                                        .csv(files_path[0])

for path in files_path[1:]:
    df_2 = spark.read.option('header', 'true').option('inferSchema', 'true')\
                                              .option("multiLine", 'true')\
                                              .option("escape", "\"")\
                                              .csv(path)
    df = df.unionByName(df_2, allowMissingColumns=True)

                                                                                

In [507]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Authors: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- PublishYear: integer (nullable = true)
 |-- PublishMonth: integer (nullable = true)
 |-- PublishDay: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- RatingDist5: string (nullable = true)
 |-- RatingDist4: string (nullable = true)
 |-- RatingDist3: string (nullable = true)
 |-- RatingDist2: string (nullable = true)
 |-- RatingDist1: string (nullable = true)
 |-- RatingDistTotal: string (nullable = true)
 |-- CountsOfReview: integer (nullable = true)
 |-- Language: string (nullable = true)
 |-- pagesNumber: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Count of text reviews: integer (nullable = true)



Сохраним полученный датафрейм в виде parquet и csv:

In [None]:
df.write.mode("overwrite").parquet("archive/all_books.parquet")

In [None]:
df.write.option("header", 'true').csv("archive/all_books_csv")

Сравним время чтения:

In [510]:
%%time
parqDF = spark.read.parquet("archive/all_books.parquet")

CPU times: user 0 ns, sys: 3.12 ms, total: 3.12 ms
Wall time: 166 ms


In [511]:
%%time
csvDF = spark.read.option('header', 'true').option('inferSchema', 'true')\
                                        .option("multiLine", 'true')\
                                        .option("escape", "\"")\
                                        .csv("archive/all_books_csv")

CPU times: user 6.01 ms, sys: 0 ns, total: 6.01 ms
Wall time: 366 ms


Сравним объёмы занимаемой памяти:

In [512]:
!du -sh ./archive/all_books_csv

1,2G	./archive/all_books_csv


In [513]:
!du -sh ./archive/all_books.parquet

775M	./archive/all_books.parquet


Делвем вывод, что формат parquet выигрывает по скорости чтения перед csv, а также занимает в 1.5 разаменьше памяти.

### 2.2 Используя весь набор данных с помощью Spark вывести

#### а) Топ-10 книг с наибольшим числом ревью

In [13]:
df.orderBy(col("CountsOfReview").desc())\
  .select("Name", "CountsOfReview")\
  .limit(10).show()

                                                                                

+--------------------+--------------+
|                Name|CountsOfReview|
+--------------------+--------------+
|The Hunger Games ...|        154447|
|Twilight (Twiligh...|         94850|
|      The Book Thief|         87685|
|            The Help|         76040|
|Harry Potter and ...|         75911|
|The Giver (The Gi...|         57034|
| Water for Elephants|         52918|
|The Girl with the...|         52225|
|Harry Potter and ...|         52088|
|The Lightning Thi...|         48630|
+--------------------+--------------+



#### b) Топ-10 издателей с наибольшим средним числом страниц в книгах

In [14]:
df.groupBy("Publisher")\
  .agg(avg("pagesNumber").alias("avg_page"))\
  .sort(desc("avg_page"))\
  .limit(10).show()



+--------------------+------------------+
|           Publisher|          avg_page|
+--------------------+------------------+
|Crafty Secrets Pu...|         1807321.6|
|    Sacred-texts.com|          500000.0|
|Department of Rus...| 322128.5714285714|
|Logos Research Sy...|          100000.0|
|Encyclopedia Brit...|           32642.0|
|Progressive Manag...|        19106.3625|
|Still Waters Revi...|10080.142857142857|
|P. Shalom Publica...|            8539.0|
|Hendrickson Publi...|            6448.0|
|            IEEE/EMB|            6000.0|
+--------------------+------------------+



                                                                                

#### c) Десять наиболее активных по числу изданных книг лет

In [15]:
df.groupBy("PublishYear")\
  .agg(count("*").alias("n_books"))\
  .sort(desc("n_books"))\
  .limit(10).show()



+-----------+-------+
|PublishYear|n_books|
+-----------+-------+
|       2007| 129507|
|       2006| 122374|
|       2005| 117639|
|       2004| 105733|
|       2003| 104345|
|       2002|  95537|
|       2001|  88228|
|       2000|  87290|
|       2008|  80265|
|       1999|  80155|
+-----------+-------+



                                                                                

#### d) Топ-10 книг имеющих наибольший разброс в оценках среди книг имеющих больше 500 оценок

Сначала очистим значения полей рейтинг, отавив только значения после ":"

In [16]:
clean_ratings = udf(lambda s: s.split(':')[1], StringType())

In [17]:
df = df.withColumn('RatingDistTotal', clean_ratings(df.RatingDistTotal))\
         .withColumn('RatingDist1', clean_ratings(df.RatingDist1))\
         .withColumn('RatingDist2', clean_ratings(df.RatingDist2))\
         .withColumn('RatingDist3', clean_ratings(df.RatingDist3))\
         .withColumn('RatingDist4', clean_ratings(df.RatingDist4))\
         .withColumn('RatingDist5', clean_ratings(df.RatingDist5))

Напишем функцию, вычисляющую стандртное отклонение для оценок

In [19]:
def calculate_std(ratings):
    ratings = list(map(float, ratings.split()))
    total = ratings[0]
    rating = ratings[1:]
    mean =  sum([(i+1)*x for i, x in enumerate(rating)]) / total
    std = math.sqrt(sum([x*(i + 1 - mean)**2 for i, x in enumerate(rating)]) / total)
    return std

Образуем новую колонку для стандартного отклонения посчитав его для кажой строки

In [20]:
get_std = udf(calculate_std, FloatType())

df = df.withColumn('std', get_std(concat_ws(" ", col("RatingDistTotal"),
                                                col("RatingDist1"), 
                                                col("RatingDist2"), 
                                                col("RatingDist3"), 
                                                col("RatingDist4"), 
                                                col("RatingDist5"))))


In [21]:
df.filter(col("RatingDistTotal") > 500)\
  .orderBy(col("std").desc())\
  .select("Id", "Name", "std", "RatingDistTotal")\
  .limit(10).show()



+-------+--------------------+---------+---------------+
|     Id|                Name|      std|RatingDistTotal|
+-------+--------------------+---------+---------------+
|2247237|Scientology: The ...|1.6768911|            853|
| 564449|Scientology: The ...|1.6754147|            838|
|2355709|Para Entrenar a u...|1.6598984|           1656|
| 675331| To Train Up a Child|1.6595358|           1636|
| 214280|Para Entrenar a u...|1.6594893|           1634|
|2175673|The Bluebook: A U...|1.5883198|            542|
|2238150|The Bluebook: A U...|1.5883198|            542|
|4573714|Dianetics: The Mo...|1.5621103|           2897|
|3036308|Dianetics: The Mo...|1.5608956|           2869|
|2724205|Dianetica: La Cie...|1.5603579|           2860|
+-------+--------------------+---------+---------------+



                                                                                

#### e) Любой интересный инсайт из данных

Если посчитать наиболее активный по числу изданных книг месяц, то окажется что больше всего книг издаётся в конце и начале года. Единственное, в полученном датафрейме количество уникальных значений для значения месяца не 12, а 31. Это связано с тем, что в некоторых исходных таблицах перепутаны значения колонок месяц и день.

In [22]:
df.groupBy("PublishMonth")\
  .agg(count("*").alias("n_books"))\
  .sort(desc("n_books"))\
  .show()



+------------+-------+
|PublishMonth|n_books|
+------------+-------+
|           1| 589982|
|          12| 125327|
|           9|  95739|
|          10|  93673|
|           3|  90724|
|           6|  84709|
|           4|  83594|
|           5|  82201|
|           8|  79038|
|          11|  77834|
|           2|  75004|
|           7|  74494|
|          15|  36284|
|          31|  25949|
|          28|  22265|
|          30|  22095|
|          17|  16884|
|          25|  15498|
|          20|  13648|
|          27|  13414|
+------------+-------+
only showing top 20 rows



                                                                                

## Блок 3

Посмотрим сначала на структуру датасета, в нём всего три поля:

In [327]:
df = spark.read.csv("archive/user_rating/*", header=True, inferSchema=True, multiLine=True, escape="\"")

In [328]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Rating: string (nullable = true)



Чтобы вычислить средний рейтинг нам нужно преобразовать строковое описание рейтинга в число, для этого посмотрим какие уникальные значения может принимть поле рейтинг:

In [329]:
df.select("Rating").distinct().show()

+--------------------+
|              Rating|
+--------------------+
|     did not like it|
|     really liked it|
|            liked it|
|           it was ok|
|      it was amazing|
|This user doesn't...|
+--------------------+



Зададим для этих значений словарь соответсвия:

In [330]:
rating_int = {
    "did not like it": 1,
    "it was ok": 2,
    "liked it": 3,
    "really liked it": 4,
    "it was amazing": 5,
    "This user doesn't have any rating": -1
}

In [331]:
rating_to_int = udf(lambda s: rating_int[s], IntegerType())

Задаём схему данных:

In [484]:
dataSchema = StructType([
    StructField("Id", IntegerType(),True),
    StructField("Name", StringType(),True),
    StructField("Rating", StringType(),True)
])

Задаём file source:

In [495]:
streaming = (
    spark.readStream.schema(dataSchema)
    .csv("archive/user_rating/", header=True, inferSchema=True, multiLine=True, escape="\"")
)

Задаём преобразование данных. Сначала переводим рейтинг в числовое значение, затем вычисляем среднее значение группируя по названию книги:

In [496]:
dest_df = streaming.withColumn("Rating", rating_to_int(col("Rating")))\
                   .groupBy(col("Name")) \
                   .agg(avg("Rating").alias("avg_rating"))

Задаём sink:

In [497]:
query = (
    dest_df.writeStream.queryName("avg_rating_df")
    .format("memory")
    .outputMode("update")
    .start()
)

23/10/08 05:06:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e5bede83-0b02-44af-80f8-4c6d27e4b21d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/10/08 05:06:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [498]:
spark.streams.active[0].isActive

True

In [499]:
query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [494]:
query.stop()

Сравним результат полученный через straming и без него:

In [504]:
spark.sql('SELECT * from avg_rating_df').orderBy("Name").show()

+--------------------+------------------+
|                Name|        avg_rating|
+--------------------+------------------+
|     !آنچه سینما هست|               2.0|
|!از قر و قمبیل‌ها...|               5.0|
|   " Talking Heads "|               2.0|
|"A Problem from H...|               4.0|
|   "A" Is for Africa|               3.0|
|"A" is for Apple ...|               3.0|
|    "B" Is for Betsy|               5.0|
|"Beat" Takeshi Ki...|               5.0|
|"C" Is For Corpse...|               3.0|
|   "Cinema Paradiso"|               5.0|
|"Do you consider ...|               2.0|
|        "Giant" Size|               4.0|
|"Headhunter" Hiri...|               2.0|
|"I Am a Man": Chi...|               3.0|
|"Love, Loss and L...|               5.0|
|"Master Harold".....|               3.5|
|            "Mayday"|               5.0|
|"Membumikan" Al-Q...|2.6666666666666665|
|"Multiplication I...|               3.0|
|"My Teenage Son's...|               4.0|
+--------------------+------------

23/10/08 05:10:02 WARN TaskSetManager: Stage 683 contains a task of very large size (2068 KiB). The maximum recommended task size is 1000 KiB.


In [483]:
df.withColumn("Rating", rating_to_int(col("Rating")))\
                   .groupBy(col("Name")) \
                   .agg(avg("Rating").alias("avg_rating")).orderBy("Name").show()



+--------------------+------------------+
|                Name|        avg_rating|
+--------------------+------------------+
|     !آنچه سینما هست|               2.0|
|!از قر و قمبیل‌ها...|               5.0|
|   " Talking Heads "|               2.0|
|"A Problem from H...|               4.0|
|   "A" Is for Africa|               3.0|
|"A" is for Apple ...|               3.0|
|    "B" Is for Betsy|               5.0|
|"Beat" Takeshi Ki...|               5.0|
|"C" Is For Corpse...|               3.0|
|   "Cinema Paradiso"|               5.0|
|"Do you consider ...|               2.0|
|        "Giant" Size|               4.0|
|"Headhunter" Hiri...|               2.0|
|"I Am a Man": Chi...|               3.0|
|"Love, Loss and L...|               5.0|
|"Master Harold".....|               3.5|
|            "Mayday"|               5.0|
|"Membumikan" Al-Q...|2.6666666666666665|
|"Multiplication I...|               3.0|
|"My Teenage Son's...|               4.0|
+--------------------+------------

                                                                                