# Инициализация необходимых вещей

In [26]:
# Библиотеки
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from datetime import date
from pyspark.sql import Window

In [27]:
# Сессия SparkSession
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

## a. Создайте схему будущего фрейма данных. Схема должна включать следующие атрибуты:

- id -  уникальный идентификатор посетителя сайта. Тип – последовательность чисел фиксированной длины. Данное поле не является первичным ключом.
- timestamp – дата и время события в формате unix timestamp.
- type – тип события, значение из списка (факт посещения(visit), клик по визуальному элементу страницы(click), скролл(scroll), перед на другую страницу(move)).
- page_id – id текущей страницы. Тип - последовательность чисел фиксированной длины.
- tag – каждая страница с новостью размечается редакцией специальными тегами, которые отражают тематику конкретной новости со страницы. Возможный список тематик: политика, спорт, медицина и т.д.
- sign – наличие у пользователя личного кабинета. Значения – True/False.

In [33]:
# Схема данных
schema_site = T.StructType([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("timestamp", T.LongType(), True),
    T.StructField("type", T.StringType(), True),
    T.StructField("page_id", T.IntegerType(), True),
    T.StructField("tag", T.StringType(), True),
    T.StructField("sign", T.BooleanType(), True)
])

## c. Наполните датафрейм данными. Пример:

    (12345, 1667627426, "click", 101, "Sport”, False)

In [34]:
# Данные для фрейма
data_site = [
    (12345, 1667027426, 'click', 101, 'Sport', True),
    (12345, 1667127426, 'click', 101, 'Politics', True),
    (45678, 1667127526, 'scroll', 101, 'Medicine', True),
    (63435, 1667327426, 'click', 102, 'Medicine', True),
    (35678, 1667427426, 'click', 102, 'Sport', True),
    (20917, 1667527426, 'click', 102, 'Politics', True),
    (12345, 1667627426, 'move', 103, 'Sport', True), 
    (76654, 1667727426, 'visit', 104, 'Politics', False),
    (26389, 1667827426, 'click', 105, 'Sport', False),
    (45845, 1667927426, 'click', 103, 'Medicine', True),
]

## b. Создайте датафрейм с описанной выше схемой данных.

In [30]:
# Собственно создаём фрем
df_site = spark.createDataFrame(data = data_site, schema = schema_site)

In [31]:
# Смотрим что у него внутри
df_site.show()

+-----+----------+------+-------+--------+-----+
|   id| timestamp|  type|page_id|     tag| sign|
+-----+----------+------+-------+--------+-----+
|12345|1667027426| click|    101|   Sport| true|
|12345|1667127426| click|    101|Politics| true|
|45678|1667127526|scroll|    101|Medicine| true|
|63435|1667327426| click|    102|Medicine| true|
|35678|1667427426| click|    102|   Sport| true|
|20917|1667527426| click|    102|Politics| true|
|12345|1667627426|  move|    103|   Sport| true|
|76654|1667727426| visit|    104|Politics|false|
|26389|1667827426| click|    105|   Sport|false|
|45845|1667927426| click|    103|Medicine| true|
+-----+----------+------+-------+--------+-----+



## Преобразование даты в удобный формат

In [32]:
# Трансформируем метку даты в юник формат
df_site = df_site.select(*[i for i in df_site.columns if i != "timestamp"], F.from_unixtime("timestamp").alias("event_time"))
df_site.show()

+-----+------+-------+--------+-----+-------------------+
|   id|  type|page_id|     tag| sign|         event_time|
+-----+------+-------+--------+-----+-------------------+
|12345| click|    101|   Sport| true|2022-10-29 07:10:26|
|12345| click|    101|Politics| true|2022-10-30 10:57:06|
|45678|scroll|    101|Medicine| true|2022-10-30 10:58:46|
|63435| click|    102|Medicine| true|2022-11-01 18:30:26|
|35678| click|    102|   Sport| true|2022-11-02 22:17:06|
|20917| click|    102|Politics| true|2022-11-04 02:03:46|
|12345|  move|    103|   Sport| true|2022-11-05 05:50:26|
|76654| visit|    104|Politics|false|2022-11-06 09:37:06|
|26389| click|    105|   Sport|false|2022-11-07 13:23:46|
|45845| click|    103|Medicine| true|2022-11-08 17:10:26|
+-----+------+-------+--------+-----+-------------------+



# d. Решите следующие задачи:

## d.1 Вывести топ-5 самых активных посетителей сайта

In [38]:
df_site.groupby("id")\
    .agg(F.count("*").alias("activ"))\
    .orderBy("activ", ascending = False)\
    .show(5)



+-----+-----+
|   id|activ|
+-----+-----+
|12345|    3|
|45678|    1|
|20917|    1|
|63435|    1|
|76654|    1|
+-----+-----+
only showing top 5 rows



                                                                                

## d.2 Посчитать процент посетителей, у которых есть ЛК

In [39]:
prc_lk = df_site.groupBy('sign')\
    .agg(F.count('*').alias('all'))\
    .withColumn('total', F.sum('all').over(Window.partitionBy()))\
    .withColumn('percent',F.col('all')/F.col('total')*100)
prc_lk.select('sign','percent').filter(df_site.sign == True).show()

22/12/23 18:07:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+----+-------+
|sign|percent|
+----+-------+
|true|   80.0|
+----+-------+



                                                                                

## d.3 Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [295]:
df_site.groupby("page_id", "type")\
       .agg(F.count("*").alias("num_click"))\
       .orderBy("num_click", ascending = False)\
       .filter(F.col('type') == 'click')\
       .show(5)



+-------+-----+---------+
|page_id| type|num_click|
+-------+-----+---------+
|    102|click|        3|
|    101|click|        2|
|    105|click|        1|
|    103|click|        1|
+-------+-----+---------+



                                                                                

## d.4 Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [40]:
df_time = df_site.withColumn("rang", F.floor(F.hour("event_time") / F.lit(4))+1)
df_time.show()

+-----+------+-------+--------+-----+-------------------+----+
|   id|  type|page_id|     tag| sign|         event_time|rang|
+-----+------+-------+--------+-----+-------------------+----+
|12345| click|    101|   Sport| true|2022-10-29 07:10:26|   2|
|12345| click|    101|Politics| true|2022-10-30 10:57:06|   3|
|45678|scroll|    101|Medicine| true|2022-10-30 10:58:46|   3|
|63435| click|    102|Medicine| true|2022-11-01 18:30:26|   5|
|35678| click|    102|   Sport| true|2022-11-02 22:17:06|   6|
|20917| click|    102|Politics| true|2022-11-04 02:03:46|   1|
|12345|  move|    103|   Sport| true|2022-11-05 05:50:26|   2|
|76654| visit|    104|Politics|false|2022-11-06 09:37:06|   3|
|26389| click|    105|   Sport|false|2022-11-07 13:23:46|   4|
|45845| click|    103|Medicine| true|2022-11-08 17:10:26|   5|
+-----+------+-------+--------+-----+-------------------+----+



## d.5 Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте. 

In [41]:
df_time.groupBy("rang")\
    .agg(F.count("*").alias("num_time"))\
    .select(F.max("num_time").alias("rang_max"))\
    .show()



+--------+
|rang_max|
+--------+
|       3|
+--------+



                                                                                

## d.6 Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов

1. Id – уникальный идентификатор личного кабинета
2. User_id – уникальный идентификатор посетителя
3. ФИО посетителя
4. Дату рождения посетителя 
5. Дата создания ЛК

In [42]:
Schema_lk = T.StructType([
    T.StructField("id_lk", T.IntegerType(), True),
    T.StructField("User_id", T.IntegerType(), True),
    T.StructField("FIO", T.StringType(), True),
    T.StructField("Date_birth", T.DateType(), True),
    T.StructField("Date_creation", T.DateType(), True)
])

In [44]:
data_lk = [
    (1, 12345, "Иванов Иван Иванович", date(1990, 1, 1),date(2020, 1, 1)), 
    (2, 45845, "Петров Пётр Петрович", date(1991, 2, 2),date(2021, 2, 2)),
    (3, 35678, "Сидоров Сидор Сидорович", date(1995, 3, 3),date(2021, 3, 3)),
    (4, 43218, "Кузнецов Пётр Сизорович", date(1993, 4, 4),date(2022, 4, 4))
]

In [45]:
df_lk = spark.createDataFrame(data = data_lk, schema = Schema_lk)

In [46]:
df_lk.show()

+-----+-------+--------------------+----------+-------------+
|id_lk|User_id|                 FIO|Date_birth|Date_creation|
+-----+-------+--------------------+----------+-------------+
|    1|  12345|Иванов Иван Иванович|1990-01-01|   2020-01-01|
|    2|  45845|Петров Пётр Петрович|1991-02-02|   2021-02-02|
|    3|  35678|Сидоров Сидор Сид...|1995-03-03|   2021-03-03|
|    4|  43218|Кузнецов Пётр Сиз...|1993-04-04|   2022-04-04|
+-----+-------+--------------------+----------+-------------+



# Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт

In [299]:
df_all = df_lk.join(df_site, df_lk.User_id == df_site.id)
df_all.show()
df_all.withColumn('First_Name', F.split('FIO', ' ')[0])\
      .select(F.col('First_Name')).filter(df_all.tag == 'Sport')\
      .distinct()\
      .show()

                                                                                

+-----+-------+--------------------+----------+-------------+-----+------+-------+--------+----+-------------------+
|id_lk|User_id|                 FIO|Date_birth|Date_creation|   id|  type|page_id|     tag|sign|         event_time|
+-----+-------+--------------------+----------+-------------+-----+------+-------+--------+----+-------------------+
|    4|  43218|Сазонова Ирина Пе...|1993-01-21|   2022-02-12|43218| click|    102|Politics|true|2022-11-04 02:03:46|
|    2|  45845|Петрова Елена Ива...|1991-11-20|   2021-08-04|45845| click|    103|Medicine|true|2022-11-08 17:10:26|
|    2|  45845|Петрова Елена Ива...|1991-11-20|   2021-08-04|45845|scroll|    101|Medicine|true|2022-10-30 10:58:46|
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|12345|  move|    103|   Sport|true|2022-11-05 05:50:26|
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|12345| click|    101|   Sport|true|2022-10-29 07:10:26|
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|123

                                                                                

+----------+
|First_Name|
+----------+
|   Сидоров|
|    Иванов|
+----------+

