### Знакомство со Spark
##### Установили и запустили Spark на локальном компьютере с Windows

![](./img/2022-12-06_15-30-40.png)

##### Установка дополнительных библиотек в виртуальное окружение
![](./img/2022-12-06_15-38-35.png)



Мы попробуем использовать возможности Spark для анализа данных clickstream пользователей новостного Интернет-портала. 

a. Создайте схему будущего фрейма данных. Схема должна включать следующие атрибуты:

 · id - уникальный идентификатор посетителя сайта. Тип – последовательность чисел фиксированной длины. Данное поле не является первичным ключом.

 · timestamp – дата и время события в формате unix timestamp.

 · type – тип события, значение из списка (факт посещения(visit), клик по визуальному элементу страницы(click), скролл(scroll), перед на другую страницу(move)).

 · page_id – id текущей страницы. Тип - последовательность чисел фиксированной длины.

 · tag – каждая страница с новостью размечается редакцией специальными тегами, которые отражают тематику конкретной новости со страницы. Возможный список тематик: политика, спорт, медицина и т.д.

 · sign – наличие у пользователя личного кабинета. Значения – True/False.

In [1]:
#!pip install pyspark



In [1]:
#!pip install pyspark
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.window import Window
import datetime
spark = SparkSession.builder.master("local").appName("spark").config("spark.driver.bindAddress", "localhost").config("spark.ui.port", "4040").getOrCreate()

In [11]:


schema = T.StructType([T.StructField("id", T.StringType(), True), T.StructField("timestamp", T.LongType(), True), T.StructField("type", T.StringType(), True), T.StructField("page_id", T.IntegerType(), True), T.StructField("tag", T.StringType(), True), T.StructField("sign", T.BooleanType(), True)])

Наполните датафрейм данными. Пример:

(12345, 1667627426, "click", 101, "Sport”, False)

In [12]:
data =[(1, 1667617426, "visit", 101, 'Sport', True),
       (1, 1667627486, "scroll", 101, 'Sport', True),
       (1, 1667637500, "click", 101, 'Sport', True),
        (1, 1667647505, "visit", 102, 'News', True),
         (1, 1667657565, "click", 102, 'News', True), 
         (1, 1667667586, "visit", 103, 'Sport', True),
          (2, 1667678001, "visit", 104, 'News', True),
           (2, 1667688101, "scroll", 104, 'News', True), 
           (2, 1667698151, "click", 104, 'News', True),
            (2, 1667618200, "visit", 105, 'Money', True),
             (2, 1667628226, "click", 105, 'Money', True),
              (2, 1667628317, "visit", 106, 'Money', True), 
              (2, 1667638359, "scroll", 106, 'Money', True),
               (3, 1667638422, "visit", 101, 'Sport', False),
                (3, 1667648486, "scroll", 101, 'Sport', False),
                 (4, 1667648505, "visit", 106, 'Money', False),
                  (5, 1667658511, "visit", 101, 'Sport', True),
                   (5, 1667658901, "click", 101, 'Sport', True),
                    (5, 1667658926, "visit", 102, 'News', True),
                     (5, 1667658976, "click", 102, 'News', True),
                      (6, 1667669359, "scroll", 106, 'Money', False),
                       (6, 1667679422, "visit", 101, 'Sport', False),
                        (6, 1667679486, "scroll", 101, 'Sport', False),
                         (6, 1667689505, "visit", 106, 'Money', False),
                          (6, 1667699511, "visit", 102, 'News', False),
                           (7, 1667669901, "click", 101, 'Sport', True),
                            (7, 1667659926, "visit", 102, 'News', True),
                             (7, 1667649976, "click", 102, 'News', True)]
df_web = spark.createDataFrame(data = data, schema = schema)

 Решите следующие задачи:

· Вывести топ-5 самых активных посетителей сайта

· Посчитать процент посетителей, у которых есть ЛК

· Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

· Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

· Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

· Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов

1. Id – уникальный идентификатор личного кабинета

2. User_id – уникальный идентификатор посетителя

3. ФИО посетителя

4.Дату рождения посетителя 

5. Дата создания ЛК

· Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

In [13]:
#1
df_web.groupby("id").agg(F.count("*").alias("activity")).orderBy("activity", ascending = False).show(5)

+---+--------+
| id|activity|
+---+--------+
|  2|       7|
|  1|       6|
|  6|       5|
|  5|       4|
|  7|       3|
+---+--------+
only showing top 5 rows



In [14]:
#2 
r_lk = df_web.filter("sign = true").select("id").distinct().count()
r_tot = df_web.select("id").distinct().count()
print("Доля пользователей с лк: {0}%".format(round(r_lk/r_tot*100,1)))

Доля пользователей с лк: 57.1%


In [15]:
#3
df_web.filter('type = "click"').groupby("page_id").count().orderBy("count",ascending = False).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    101|    3|
|    102|    3|
|    105|    1|
|    104|    1|
+-------+-----+



In [17]:
#4
df_web.withColumn("period", F.floor(F.hour(F.from_unixtime(F.col('timestamp'))) / F.lit(4))).show(50)

+---+----------+------+-------+-----+-----+------+
| id| timestamp|  type|page_id|  tag| sign|period|
+---+----------+------+-------+-----+-----+------+
|  1|1667617426| visit|    101|Sport| true|     1|
|  1|1667627486|scroll|    101|Sport| true|     2|
|  1|1667637500| click|    101|Sport| true|     2|
|  1|1667647505| visit|    102| News| true|     3|
|  1|1667657565| click|    102| News| true|     4|
|  1|1667667586| visit|    103|Sport| true|     4|
|  2|1667678001| visit|    104| News| true|     5|
|  2|1667688101|scroll|    104| News| true|     0|
|  2|1667698151| click|    104| News| true|     1|
|  2|1667618200| visit|    105|Money| true|     1|
|  2|1667628226| click|    105|Money| true|     2|
|  2|1667628317| visit|    106|Money| true|     2|
|  2|1667638359|scroll|    106|Money| true|     2|
|  3|1667638422| visit|    101|Sport|false|     2|
|  3|1667648486|scroll|    101|Sport|false|     3|
|  4|1667648505| visit|    106|Money|false|     3|
|  5|1667658511| visit|    101|

In [18]:
#5
df1 = df_web.withColumn("period", F.floor(F.hour(F.from_unixtime(F.col('timestamp'))) / F.lit(4)))\
.groupby("period").count().orderBy("count", ascending = False).select("period").show(1)


+------+
|period|
+------+
|     4|
+------+
only showing top 1 row



In [19]:
#6
schema_lk = T.StructType([T.StructField("id", T.StringType(), True),T.StructField("user_id", T.IntegerType(), True),T.StructField("fio", T.StringType(), True),
T.StructField("dob", T.DateType(), True),T.StructField("doc", T.DateType(), True)])

data_lk = [(101, 2, "Иванов Иван Иванович", datetime.datetime(1990, 7, 5), datetime.datetime(2016, 8, 1)),(102, 5, "Петров Петр Петрович", datetime.datetime(1995, 1, 22), datetime.datetime(2017, 10, 7)),(103, 1, "Сидоров Сидр Сидорович", datetime.datetime(1975, 8, 12), datetime.datetime(2018, 10, 7)),
(104, 6, "Олегов Олег Олегович", datetime.datetime(1980, 4, 15), datetime.datetime(2019, 7, 15))
]

df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

In [20]:
df_all = df_lk.alias("lk").join(df_web.alias("web"),on = [F.col("lk.user_id") == F.col("web.id")],how = "left")
df_all.show(10)

+---+-------+--------------------+----------+----------+---+----------+------+-------+-----+-----+
| id|user_id|                 fio|       dob|       doc| id| timestamp|  type|page_id|  tag| sign|
+---+-------+--------------------+----------+----------+---+----------+------+-------+-----+-----+
|103|      1|Сидоров Сидр Сидо...|1975-08-12|2018-10-07|  1|1667667586| visit|    103|Sport| true|
|103|      1|Сидоров Сидр Сидо...|1975-08-12|2018-10-07|  1|1667657565| click|    102| News| true|
|103|      1|Сидоров Сидр Сидо...|1975-08-12|2018-10-07|  1|1667647505| visit|    102| News| true|
|103|      1|Сидоров Сидр Сидо...|1975-08-12|2018-10-07|  1|1667637500| click|    101|Sport| true|
|103|      1|Сидоров Сидр Сидо...|1975-08-12|2018-10-07|  1|1667627486|scroll|    101|Sport| true|
|103|      1|Сидоров Сидр Сидо...|1975-08-12|2018-10-07|  1|1667617426| visit|    101|Sport| true|
|104|      6|Олегов Олег Олегович|1980-04-15|2019-07-15|  6|1667699511| visit|    102| News|false|
|104|     

In [21]:
#7
df_all.filter('tag = "Sport" AND ( type = "scroll" OR type = "visit")').select(F.split(df_all.fio,' ')[0].alias("Family")).distinct().show()


+-------+
| Family|
+-------+
| Петров|
|Сидоров|
| Олегов|
+-------+

