In [80]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import datetime

In [66]:
spark = SparkSession.builder.master("local").\
                    appName("Word Count").\
                    config("spark.driver.bindAddress","localhost").\
                    config("spark.ui.port","4040").\
                    getOrCreate()

In [67]:
schema = T.StructType([
                T.StructField("id", T.IntegerType(), True),
                T.StructField("timestamp", T.IntegerType(), True),
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),
                T.StructField("tag", T.StringType(), True),
                T.StructField("sign", T.BooleanType(), True)])

In [75]:
schema2 = T.StructType([
                T.StructField("id", T.IntegerType(), True),
                T.StructField("user_id", T.IntegerType(), True),
                T.StructField("fio", T.StringType(), True),
                T.StructField("dob", T.IntegerType(), True),
                T.StructField("doc", T.IntegerType(), True)])    

In [139]:
data =[(12345, 1667627426, "click", 101, "Sport", True),
        (12345, 1438732800, "scroll", 101, "sport", True),
        (12345, 1380412800, "move", 102, "medicine", True),
        (12348, 1211760000, "visit", 103, "hitech", True),
        (12348, 1522713600, "scroll", 104, "medicine", True),
        (12346, 1423612800, "click", 105, "medicine", False),
        (12348, 1337644800, "scroll", 103, "hitech", True),
        (12346, 1026000000, "click", 102, "medicine", False),
        (12346, 1332460800, "move", 101, "sport", False),
        (12345, 1276473600, "visit", 103, "hitech", True),
        (12347, 1289347200, "click", 101, "medicine", True),
        (12345, 985910400, "move", 104, "sport", True),
        (12349, 1542153600, "scroll", 112, "medicine", False),
        (12349, 1557100800, "click", 103, "hitech", False),
        (12345, 1547424000, "click", 107, "sport", True),
        (12348, 1489968000, "click", 103, "politics", True),
        (12348, 1366502400, "move", 104, "sport", True),
        (12345, 1297036800, "scroll", 107, "hitech", True),
        (12348, 1137715200, "move", 108, "medicine", True),
        (12348, 1495324800, "click", 109, "hitech", True),
        (12349, 1359936000, "click", 101, "politics", False),
        (12347, 1125532800, "move", 101, "medicine", True),
        (12345, 1015372800, "scroll", 102, "sport", True),
        (12346, 1371772800, "click", 103, "politics", False),
        (12350, 1105660800, "scroll", 104, "politics", True),
        (12346, 1456617600, "scroll", 105, "hitech", False),
        (12346, 1268784000, "click", 106, "medicine", False),
        (12347, 1371600000, "visit", 107, "medicine", True),
        (12349, 1033171200, "move", 108, "hitech", False),
        (12349, 1171670400, "scroll", 109, "hitech", False)]

In [83]:
data2 = [(1, 12345,"Алексеев Борис Владимирович", 578707200, 1438732800),
        (1, 12346,"Борисов Владимир Геннадьевич", 346723200, 1631732800),
        (1, 12347,"Воробьев Геннадий Дмитриевич", 462240000, 1334732800),
        (1, 12348,"Евстифеева Галина Андреевна", 496281600, 1238324800),
        (1, 12349,"Хабибулин Марат Адамович", 568944000, 1428732800),
        (1, 12350,"Зайцева Анна Михайловна", 486691200, 1528632800)]

In [140]:
df = spark.createDataFrame(data=data, schema = schema)
dfus = spark.createDataFrame(data=data2, schema = schema2)

In [133]:
## Вывести топ-5 самых активных посетителей сайта
df.groupBy("id").count()\
    .orderBy("count", ascending=False)\
    .select("id")\
    .show(5)

+-----+
|   id|
+-----+
|12345|
|12348|
|12346|
|12349|
|12347|
+-----+
only showing top 5 rows



In [134]:
##Посчитать процент посетителей, у которых есть ЛК
sign_count = df.select("id")\
    .where(F.col("sign")==True)\
    .select(F.countDistinct("id"))\
    .collect()[0][0]

user_count = df.select("id")\
    .select(F.countDistinct("id"))\
    .collect()[0][0]

print("Процент посетителей, у которых есть ЛК: {0:.2f} %".format(100 * sign_count / user_count))

Процент посетителей, у которых есть ЛК: 66.67 %


In [147]:
#Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице
df.filter(F.col("type") == "click")\
    .groupby("page_id")\
    .agg(F.count("*").alias("event_cnt"))\
    .orderBy("event_cnt", ascending = False)\
    .select("page_id")\
    .show(5)

+-------+
|page_id|
+-------+
|    103|
|    101|
|    107|
|    102|
|    105|
+-------+
only showing top 5 rows



In [100]:
#Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [161]:
df = df.select(*[i for i in df.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

In [166]:
df = df.withColumn("timeparts", F.floor(F.hour("event_time") / F.lit(4)))
df.show(5)

+-----+------+-------+--------+----+-------------------+---------+
|   id|  type|page_id|     tag|sign|         event_time|timeparts|
+-----+------+-------+--------+----+-------------------+---------+
|12345| click|    101|   Sport|true|2022-11-05 08:50:26|        2|
|12345|scroll|    101|   sport|true|2015-08-05 03:00:00|        0|
|12345|  move|    102|medicine|true|2013-09-29 04:00:00|        1|
|12348| visit|    103|  hitech|true|2008-05-26 04:00:00|        1|
|12348|scroll|    104|medicine|true|2018-04-03 03:00:00|        0|
+-----+------+-------+--------+----+-------------------+---------+
only showing top 5 rows



In [101]:
#Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [173]:
df.groupBy("timeparts")\
    .agg(F.count("*").alias("event_cnt"))\
    .orderBy("event_cnt", ascending = False)\
    .select("timeparts")\
    .show(1)

+---------+
|timeparts|
+---------+
|        0|
+---------+
only showing top 1 row



In [None]:
df.alias("lk").join(df_web.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "left")

In [183]:
#Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.
df.alias("web")\
    .join(dfus.alias("us"),
                        on = [F.col("us.user_id") == F.col("web.id")],
                        how = "left")\
    .filter(F.col("web.tag") == "sport")\
    .select("fio").distinct()\
    .show()


+--------------------+
|                 fio|
+--------------------+
|Евстифеева Галина...|
|Алексеев Борис Вл...|
|Борисов Владимир ...|
+--------------------+

