# Подключение библиотек

In [205]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from datetime import date
from pyspark.sql import Window

# Создание SparkSession 

In [206]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

# Создание фрейма данных Сайт

In [207]:
Schema_site = T.StructType([ 
                      T.StructField("id", T.IntegerType(), True),
                      T.StructField("timestamp", T.LongType(), True),
                      T.StructField("type", T.StringType(), True),
                      T.StructField("page_id", T.IntegerType(), True),
                      T.StructField("tag", T.StringType(), True),
                      T.StructField("sign", T.BooleanType(), True)
                    ])

In [289]:
data_site = [(12345, 1667027426, 'click', 101, 'Sport', True), 
        (12345, 1667127426, 'click', 101, 'Politics', True),
        (45845, 1667127526, 'scroll', 101, 'Medicine', True),
        (12345, 1667327426, 'click', 102, 'Medicine', True),
        (35678, 1667427426, 'click', 102, 'Sport', True),
        (43218, 1667527426, 'click', 102, 'Politics', True),
        (12345, 1667627426, 'move', 103, 'Sport', True), 
        (76654, 1667727426, 'visit', 104, 'Politics', False),
        (26389, 1667827426, 'click', 105, 'Sport', False),
        (45845, 1667927426, 'click', 103, 'Medicine', True),
       ]

In [290]:
df_site = spark.createDataFrame(data = data_site, schema = Schema_site)

In [291]:
df_site.show()

+-----+----------+------+-------+--------+-----+
|   id| timestamp|  type|page_id|     tag| sign|
+-----+----------+------+-------+--------+-----+
|12345|1667027426| click|    101|   Sport| true|
|12345|1667127426| click|    101|Politics| true|
|45845|1667127526|scroll|    101|Medicine| true|
|12345|1667327426| click|    102|Medicine| true|
|35678|1667427426| click|    102|   Sport| true|
|43218|1667527426| click|    102|Politics| true|
|12345|1667627426|  move|    103|   Sport| true|
|76654|1667727426| visit|    104|Politics|false|
|26389|1667827426| click|    105|   Sport|false|
|45845|1667927426| click|    103|Medicine| true|
+-----+----------+------+-------+--------+-----+



# Преобразование даты в удобный формат

In [292]:
df_site = df_site.select(*[i for i in df_site.columns if i != "timestamp"], F.from_unixtime("timestamp").alias("event_time"))
df_site.show()

+-----+------+-------+--------+-----+-------------------+
|   id|  type|page_id|     tag| sign|         event_time|
+-----+------+-------+--------+-----+-------------------+
|12345| click|    101|   Sport| true|2022-10-29 07:10:26|
|12345| click|    101|Politics| true|2022-10-30 10:57:06|
|45845|scroll|    101|Medicine| true|2022-10-30 10:58:46|
|12345| click|    102|Medicine| true|2022-11-01 18:30:26|
|35678| click|    102|   Sport| true|2022-11-02 22:17:06|
|43218| click|    102|Politics| true|2022-11-04 02:03:46|
|12345|  move|    103|   Sport| true|2022-11-05 05:50:26|
|76654| visit|    104|Politics|false|2022-11-06 09:37:06|
|26389| click|    105|   Sport|false|2022-11-07 13:23:46|
|45845| click|    103|Medicine| true|2022-11-08 17:10:26|
+-----+------+-------+--------+-----+-------------------+



#  Вывести топ-5 самых активных посетителей сайта

In [293]:
df_site.groupby("id")\
       .agg(F.count("*").alias("activ"))\
       .orderBy("activ", ascending = False)\
       .show(5)



+-----+-----+
|   id|activ|
+-----+-----+
|12345|    4|
|45845|    2|
|43218|    1|
|76654|    1|
|26389|    1|
+-----+-----+
only showing top 5 rows



                                                                                

#  Посчитать процент посетителей, у которых есть ЛК

In [294]:
prc_lk = df_site.groupBy('sign')\
            .agg(F.count('*').alias('all'))\
            .withColumn('total', F.sum('all').over(Window.partitionBy()))\
            .withColumn('percent',F.col('all')/F.col('total')*100)
prc_lk.select('sign','percent').filter(df_site.sign == True).show()

22/12/22 16:27:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+-------+
|sign|percent|
+----+-------+
|true|   80.0|
+----+-------+



                                                                                

# Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [295]:
df_site.groupby("page_id", "type")\
       .agg(F.count("*").alias("num_click"))\
       .orderBy("num_click", ascending = False)\
       .filter(F.col('type') == 'click')\
       .show(5)



+-------+-----+---------+
|page_id| type|num_click|
+-------+-----+---------+
|    102|click|        3|
|    101|click|        2|
|    105|click|        1|
|    103|click|        1|
+-------+-----+---------+



                                                                                

# Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [296]:
df_time = df_site.withColumn("rang", F.floor(F.hour("event_time") / F.lit(4))+1)
df_time.show()

+-----+------+-------+--------+-----+-------------------+----+
|   id|  type|page_id|     tag| sign|         event_time|rang|
+-----+------+-------+--------+-----+-------------------+----+
|12345| click|    101|   Sport| true|2022-10-29 07:10:26|   2|
|12345| click|    101|Politics| true|2022-10-30 10:57:06|   3|
|45845|scroll|    101|Medicine| true|2022-10-30 10:58:46|   3|
|12345| click|    102|Medicine| true|2022-11-01 18:30:26|   5|
|35678| click|    102|   Sport| true|2022-11-02 22:17:06|   6|
|43218| click|    102|Politics| true|2022-11-04 02:03:46|   1|
|12345|  move|    103|   Sport| true|2022-11-05 05:50:26|   2|
|76654| visit|    104|Politics|false|2022-11-06 09:37:06|   3|
|26389| click|    105|   Sport|false|2022-11-07 13:23:46|   4|
|45845| click|    103|Medicine| true|2022-11-08 17:10:26|   5|
+-----+------+-------+--------+-----+-------------------+----+



# Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [298]:
df_time.groupBy("rang")\
       .agg(F.count("*").alias("num_time"))\
       .select(F.max("num_time").alias("rang_max"))\
       .show()



+--------+
|rang_max|
+--------+
|       3|
+--------+



                                                                                

# Создание фрейма данных Личный каблинет

In [105]:
Schema_lk = T.StructType([ 
                      T.StructField("id_lk", T.IntegerType(), True),
                      T.StructField("User_id", T.IntegerType(), True),
                      T.StructField("FIO", T.StringType(), True),
                      T.StructField("Date_birth", T.DateType(), True),
                      T.StructField("Date_creation", T.DateType(), True)
                    ])

In [106]:
data_lk = [(1, 12345, "Иванов Иван Иванович", date(1990, 4, 14),date(2020, 6, 16)), 
           (2, 45845, "Петрова Елена Ивановна", date(1991, 11, 20),date(2021, 8, 4)),
           (3, 35678, "Сидоров Петр Сергеевич", date(1995, 5, 15),date(2021, 12, 10)),
           (4, 43218, "Сазонова Ирина Петровна", date(1993, 1, 21),date(2022, 2, 12))
       ]

In [107]:
df_lk = spark.createDataFrame(data = data_lk, schema = Schema_lk)

In [108]:
df_lk.show()

                                                                                

+-----+-------+--------------------+----------+-------------+
|id_lk|User_id|                 FIO|Date_birth|Date_creation|
+-----+-------+--------------------+----------+-------------+
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|
|    2|  45845|Петрова Елена Ива...|1991-11-20|   2021-08-04|
|    3|  35678|Сидоров Петр Серг...|1995-05-15|   2021-12-10|
|    4|  43218|Сазонова Ирина Пе...|1993-01-21|   2022-02-12|
+-----+-------+--------------------+----------+-------------+



# Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт

In [299]:
df_all = df_lk.join(df_site, df_lk.User_id == df_site.id)
df_all.show()
df_all.withColumn('First_Name', F.split('FIO', ' ')[0])\
      .select(F.col('First_Name')).filter(df_all.tag == 'Sport')\
      .distinct()\
      .show()

                                                                                

+-----+-------+--------------------+----------+-------------+-----+------+-------+--------+----+-------------------+
|id_lk|User_id|                 FIO|Date_birth|Date_creation|   id|  type|page_id|     tag|sign|         event_time|
+-----+-------+--------------------+----------+-------------+-----+------+-------+--------+----+-------------------+
|    4|  43218|Сазонова Ирина Пе...|1993-01-21|   2022-02-12|43218| click|    102|Politics|true|2022-11-04 02:03:46|
|    2|  45845|Петрова Елена Ива...|1991-11-20|   2021-08-04|45845| click|    103|Medicine|true|2022-11-08 17:10:26|
|    2|  45845|Петрова Елена Ива...|1991-11-20|   2021-08-04|45845|scroll|    101|Medicine|true|2022-10-30 10:58:46|
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|12345|  move|    103|   Sport|true|2022-11-05 05:50:26|
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|12345| click|    101|   Sport|true|2022-10-29 07:10:26|
|    1|  12345|Иванов Иван Иванович|1990-04-14|   2020-06-16|123

                                                                                

+----------+
|First_Name|
+----------+
|   Сидоров|
|    Иванов|
+----------+

