## Блок 1. Standalone Spark

In [76]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import glob

In [77]:
spark = SparkSession\
    .builder.appName('spark-standalone-cluster')\
    .master('spark://DESKTOP-FBH5D4M.:7077')\
    .config('spark.driver.memory', '4g')\
    .config('spark.driver.cores', '4')\
    .config('spark.executor.memory', '4g')\
    .config('spark.executor.cores', '4')\
    .config('spark.cores.max', '8')\
    .getOrCreate()

## Блок 2. Работа с данными на Spark

In [182]:
file_path = './dataset_book/*.csv'
files = glob.glob(file_path)

df = spark.read.options(quote="\"", escape="\"", multiLine=True, header=True, inferSchema=True).csv(files[0])

for file in files[1:]:
    print(file)
    df_current = spark.read.options(quote="\"", escape="\"", multiLine=True,header=True, inferSchema=True).csv(file)
    df = df.unionByName(df_current, allowMissingColumns=True)
    print((df.count(), len(df.columns)))

./dataset_book/book1000k-1100k.csv
(97997, 20)
./dataset_book/book100k-200k.csv
(155043, 20)
./dataset_book/book1100k-1200k.csv
(196935, 20)
./dataset_book/book1200k-1300k.csv
(240557, 20)
./dataset_book/book1300k-1400k.csv
(278845, 20)
./dataset_book/book1400k-1500k.csv
(313604, 20)
./dataset_book/book1500k-1600k.csv
(347043, 20)
./dataset_book/book1600k-1700k.csv


                                                                                

(380029, 20)
./dataset_book/book1700k-1800k.csv


                                                                                

(412134, 20)
./dataset_book/book1800k-1900k.csv


                                                                                

(450997, 20)
./dataset_book/book1900k-2000k.csv


                                                                                

(494558, 20)
./dataset_book/book2000k-3000k.csv


                                                                                

(890515, 20)
./dataset_book/book200k-300k.csv


                                                                                

(946697, 20)
./dataset_book/book3000k-4000k.csv


                                                                                

(1203292, 20)
./dataset_book/book300k-400k.csv


                                                                                

(1259878, 20)
./dataset_book/book4000k-5000k.csv


                                                                                

(1540134, 20)
./dataset_book/book400k-500k.csv


                                                                                

(1595289, 20)
./dataset_book/book500k-600k.csv


                                                                                

(1650148, 20)
./dataset_book/book600k-700k.csv


                                                                                

(1705304, 20)
./dataset_book/book700k-800k.csv


                                                                                

(1759577, 20)
./dataset_book/book800k-900k.csv


                                                                                

(1809420, 20)
./dataset_book/book900k-1000k.csv




(1850310, 20)


                                                                                

In [183]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- RatingDist1: string (nullable = true)
 |-- pagesNumber: integer (nullable = true)
 |-- RatingDist4: string (nullable = true)
 |-- RatingDistTotal: string (nullable = true)
 |-- PublishMonth: integer (nullable = true)
 |-- PublishDay: integer (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- CountsOfReview: integer (nullable = true)
 |-- PublishYear: integer (nullable = true)
 |-- Language: string (nullable = true)
 |-- Authors: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- RatingDist2: string (nullable = true)
 |-- RatingDist5: string (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- RatingDist3: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Count of text reviews: integer (nullable = true)



In [228]:
df = df.withColumnRenamed("Count of text reviews","Count_of_text_reviews")

In [None]:
df.write.csv('./dataset_book/data_csv', header=True)
df.write.parquet('./dataset_book/data_parquet')

In [165]:
df_csv = spark.read.csv('./dataset_book/data_csv', header=True)

In [79]:
df_parquet = spark.read.parquet('./dataset_book/data_parquet', header=True)

In [229]:
df_parquet.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_parquet.columns]).show()



+---+----+-----------+-----------+-----------+---------------+------------+----------+---------+--------------+-----------+--------+-------+------+-----------+-----------+----+-----------+-----------+---------------------+
| Id|Name|RatingDist1|pagesNumber|RatingDist4|RatingDistTotal|PublishMonth|PublishDay|Publisher|CountsOfReview|PublishYear|Language|Authors|Rating|RatingDist2|RatingDist5|ISBN|RatingDist3|Description|Count_of_text_reviews|
+---+----+-----------+-----------+-----------+---------------+------------+----------+---------+--------------+-----------+--------+-------+------+-----------+-----------+----+-----------+-----------+---------------------+
|  0|   0|          0|          0|          0|              0|           0|         0|    17824|             0|          0| 1598488|      1|     0|          0|          0|5923|          0|     679070|              1440613|
+---+----+-----------+-----------+-----------+---------------+------------+----------+---------+------------

                                                                                

#### Топ-10 книг с наибольшим числом ревью

In [224]:
df_parquet\
    .select("Name", "CountsOfReview")\
    .orderBy(desc('CountsOfReview'))\
    .show(10)

[Stage 1651:>                                                       (0 + 8) / 9]

+--------------------+--------------+
|                Name|CountsOfReview|
+--------------------+--------------+
|The Hunger Games ...|        154447|
|Twilight (Twiligh...|         94850|
|      The Book Thief|         87685|
|            The Help|         76040|
|Harry Potter and ...|         75911|
|The Giver (The Gi...|         57034|
| Water for Elephants|         52918|
|The Girl with the...|         52225|
|Harry Potter and ...|         52088|
|The Lightning Thi...|         48630|
+--------------------+--------------+
only showing top 10 rows



                                                                                

#### Топ-10 издателей с наибольшим средним числом страниц в книгах

In [230]:
df_parquet\
    .select('Publisher', 'pagesNumber')\
    .groupBy('Publisher').avg('pagesNumber')\
    .orderBy(desc('avg(pagesNumber)'))\
    .show(10)



+--------------------+------------------+
|           Publisher|  avg(pagesNumber)|
+--------------------+------------------+
|Crafty Secrets Pu...|         1807321.6|
|    Sacred-texts.com|          500000.0|
|Department of Rus...| 322128.5714285714|
|Logos Research Sy...|          100000.0|
|Encyclopedia Brit...|           32642.0|
|Progressive Manag...|        19106.3625|
|Still Waters Revi...|10080.142857142857|
|P. Shalom Publica...|            8539.0|
|Hendrickson Publi...|            6448.0|
|            IEEE/EMB|            6000.0|
+--------------------+------------------+
only showing top 10 rows



                                                                                

#### Десять наиболее активных по числу изданных книг лет

In [231]:
df_parquet\
    .select('PublishYear')\
    .groupBy('PublishYear')\
    .count()\
    .orderBy(desc('count'))\
    .show(10)



+-----------+------+
|PublishYear| count|
+-----------+------+
|       2007|129507|
|       2006|122374|
|       2005|117639|
|       2004|105733|
|       2003|104345|
|       2002| 95537|
|       2001| 88228|
|       2000| 87290|
|       2008| 80265|
|       1999| 80155|
+-----------+------+
only showing top 10 rows



                                                                                

#### Топ-10 книг имеющих наибольший разброс в оценках среди книг имеющих больше 500 оценок

In [171]:
from statistics import stdev

def calculate_stdev(row):
    data = [float(x[2:].strip()) for x in row.split(",")]
    return stdev(data)

In [232]:
std_dev_udf = udf(calculate_stdev, FloatType())
df_parquet = df_parquet.withColumn('StdDev', std_dev_udf(concat_ws(",",\
                                                        df_parquet.RatingDist1,\
                                                        df_parquet.RatingDist2,\
                                                        df_parquet.RatingDist3,\
                                                        df_parquet.RatingDist4,\
                                                        df_parquet.RatingDist5)))

In [233]:
df_parquet = df_parquet\
    .withColumn('Total', \
                expr("substring(RatingDistTotal, 7, length(RatingDistTotal)-7)").\
                cast('Integer'))

In [234]:
df_parquet\
    .select('Name', 'StdDev', 'Total')\
    .where('Total > 500' )\
    .orderBy(desc('StdDev'))\
    .show(10)



+--------------------+---------+------+
|                Name|   StdDev| Total|
+--------------------+---------+------+
|Ο Χάρι Πότερ και ...|1884481.0|709468|
|Harry Potter og D...|1857551.5|699559|
|Harry Potter e a ...|1856879.5|699315|
|Harry Potter and ...|1850987.2|697097|
|Harry Potter och ...|1833126.8|690522|
|Harry Potter och ...|1824451.8|687169|
|Harry Potter e a ...|1822858.0|686591|
|Harry Potter and ...|1815587.4|683920|
|Harry Potter i Ka...|1814183.4|683396|
|Harri Potter maen...|1811594.9|682464|
+--------------------+---------+------+
only showing top 10 rows



                                                                                

#### Любой интересный инсайт из данных: cредний рейтинг по годам выпуска за последние 10 лет

In [80]:
df_parquet\
    .select('PublishYear', 'Rating')\
    .where('PublishYear <= 2023')\
    .groupBy('PublishYear')\
    .avg('Rating')\
    .orderBy(desc('PublishYear'))\
    .show(10)



+-----------+------------------+
|PublishYear|       avg(Rating)|
+-----------+------------------+
|       2022|              3.89|
|       2021|0.9221276595744682|
|       2020|  2.53327868852459|
|       2019|2.6745007923930273|
|       2018|2.7876787372330547|
|       2017| 2.431105463786531|
|       2016|2.5692142188961653|
|       2015| 2.692784277023865|
|       2014|2.5914216163583252|
|       2013|2.7313730355665844|
+-----------+------------------+
only showing top 10 rows



                                                                                

## Блок 3. Spark Streaming

In [82]:
schema = StructType([StructField("ID", IntegerType(), True),
                     StructField("Name", StringType(), True),
                     StructField("Rating", StringType(), True)])

In [89]:
streaming = spark\
            .readStream\
            .option("quote", "\"")\
            .option("escape", "\"")\
            .schema(schema)\
            .csv('./dataset_user_rating/')

In [90]:
rating = streaming.groupBy('Rating').count()
active_query = rating.writeStream.format('console').outputMode('complete').start()

23/04/16 22:51:00 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c53de093-2f0b-4706-b9fc-bd398c4c0585. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+------+
|              Rating| count|
+--------------------+------+
|     did not like it|  7811|
|              Rating|     7|
|     really liked it|132808|
|            liked it| 96047|
|           it was ok| 28811|
|      it was amazing| 92354|
|This user doesn't...|  4765|
+--------------------+------+



                                                                                

In [91]:
def convert(x):
    if x == 'it was amazing':
        return 5
    elif x == 'really liked it':
        return 4
    elif x == 'liked it':
        return 3
    elif x == 'it was ok':
        return 2
    elif x == 'did not like it':
        return 1
    else:
        return

In [92]:
import datetime
from datetime import timedelta

time = datetime.datetime.now()

def add_time():
    global time
    time += timedelta(seconds=60)
    return time

In [93]:
streaming.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Rating: string (nullable = true)



In [95]:
def foreach_batch_function(df, epoch_id):
    convUDF = udf(convert, IntegerType())
    df = df.withColumn('Int_rating', convUDF('Rating'))
    
    convUDF_1 = udf(add_time, TimestampType())
    df = df.withColumn('Time', convUDF_1())
    
    windowed = df\
            .withWatermark("time", "20000 milliseconds")\
            .groupBy(window("time", "10 seconds"), "Name")\
            .agg(avg("Int_rating").alias('avg_rating'))

    windowed.write.parquet('./dataset_user_rating/parquet')
    
streaming.writeStream.foreachBatch(foreach_batch_function).start()   

23/04/16 22:54:13 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-609fd031-847f-4e4c-b9bf-c6c88ef799e3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7fe565db2340>

23/04/16 22:54:17 WARN FileStreamSource: Listed 7 file(s) in 2594 ms            
23/04/16 22:54:19 WARN FileStreamSource: Listed 7 file(s) in 2101 ms            
23/04/16 22:54:21 WARN FileStreamSource: Listed 7 file(s) in 2116 ms            
23/04/16 22:54:24 WARN FileStreamSource: Listed 7 file(s) in 2990 ms            
23/04/16 22:54:30 WARN FileStreamSource: Listed 7 file(s) in 3456 ms            
23/04/16 22:54:43 WARN FileStreamSource: Listed 7 file(s) in 8511 ms            
                                                                                

In [96]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7fe565e97e80>,
 <pyspark.sql.streaming.StreamingQuery at 0x7fe565e976d0>]

In [None]:
spark.streams.active[0].stop()

In [97]:
df_parquet = spark.read.parquet('./dataset_user_rating/parquet', header=True)

In [98]:
df_parquet.show()

+--------------------+--------------------+----------+
|              window|                Name|avg_rating|
+--------------------+--------------------+----------+
|{2023-04-17 01:49...|Don't Make Me Thi...|       4.0|
|{2023-04-17 02:27...| The Grapes of Wrath|       3.0|
|{2023-04-17 06:23...|Ender's Game (End...|       5.0|
|{2023-04-17 09:07...|           Amsterdam|       5.0|
|{2023-04-17 09:12...|The Flame Trees o...|       5.0|
|{2023-04-17 12:49...|The Sword in the ...|       4.0|
|{2023-04-18 04:09...|Abstract Expressi...|       5.0|
|{2023-04-18 07:39...|Zero to One: Note...|       4.0|
|{2023-04-18 08:55...|  Eaters of the Dead|       4.0|
|{2023-04-18 10:46...|It's Not How Good...|       5.0|
|{2023-04-18 16:47...|      I Know My Name|       3.0|
|{2023-04-18 18:18...|Fist Stick Knife ...|       5.0|
|{2023-04-18 23:41...|Divine Secrets Of...|       3.0|
|{2023-04-18 23:57...|        The Namesake|       3.0|
|{2023-04-19 00:27...|   The Secret Garden|       5.0|
|{2023-04-

                                                                                