In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [6]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.window import Window

import datetime

In [14]:
spark = SparkSession.builder.master("local[*]").\
                    config("spark.driver.bindAddress", "localhost").\
                    config("spark.ui.port", "4040").\
                    getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [15]:
#### creds info
url = ""
db_name = "public"
creds = {
    "user": "",
    "password": "",
    "driver": ""
}

#### function for connect to Postgres table
def get_table_conn(table_name):
    full_table_name = db_name + "." + table_name
    return spark.read.jdbc(url, \
                           full_table_name, \
                           properties = creds)

In [39]:
schema_web = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("timestamp", T.LongType(), True),
                T.StructField("type", T.StringType(), True),
                T.StructField("page_id", T.IntegerType(), True),
                T.StructField("tag", T.StringType(), True),
                T.StructField("sign", T.BooleanType(), True)])

In [40]:
data_web =[(1, 1667627426, "visit", 101, 'Sport', False),
           (1, 1667627486, "move", 101, 'Sport', False),
           (1, 1667627500, "click", 101, 'Sport', False),
           (1, 1667627505, "visit", 102, 'Medicin', False),
           (1, 1667627565, "click", 102, 'Politics', False),
           (1, 1667627586, "move", 103, 'Sport', False),
           (2, 1667628001, "visit", 104, 'Politics', True),
           (2, 1667628101, "scroll", 104, 'Medicin', True),
           (2, 1667628151, "click", 104, 'Politics', True),
           (2, 1667628200, "visit", 105, 'Business', True),
           (2, 1667628226, "click", 105, 'Medicin', True),
           (2, 1667628317, "visit", 106, 'Medicin', True),
           (2, 1667628359, "move", 106, 'Business', True),
           (3, 1667628422, "visit", 101, 'Sport', False),
           (3, 1667628486, "scroll", 101, 'Sport', False),
           (4, 1667628505, "visit", 106, 'Business', False),
           (5, 1667628511, "move", 101, 'Sport', True),
           (5, 1667628901, "move", 101, 'Sport', True),
           (5, 1667628926, "visit", 102, 'Medicin', True),
           (5, 1667628976, "click", 102, 'Politics', True)]

df_web = spark.createDataFrame(data = data_web, schema = schema_web)

In [41]:
df_web.columns

['id', 'timestamp', 'type', 'page_id', 'tag', 'sign']

In [42]:
df_web.show(5,truncate=False)

+---+----------+-----+-------+--------+-----+
|id |timestamp |type |page_id|tag     |sign |
+---+----------+-----+-------+--------+-----+
|1  |1667627426|visit|101    |Sport   |false|
|1  |1667627486|move |101    |Sport   |false|
|1  |1667627500|click|101    |Sport   |false|
|1  |1667627505|visit|102    |Medicin |false|
|1  |1667627565|click|102    |Politics|false|
+---+----------+-----+-------+--------+-----+
only showing top 5 rows



**Вывести топ-5 самых активных посетителей сайта**

In [20]:
df_web.groupby("id")\
      .agg(F.count("*").alias("event_cnt"))\
      .orderBy("event_cnt", ascending = False)\
      .show()

+---+---------+
| id|event_cnt|
+---+---------+
|  2|        7|
|  1|        6|
|  5|        4|
|  3|        2|
|  4|        1|
+---+---------+



**Посчитать процент посетителей, у которых есть ЛК**

In [27]:
total_count = df_web.count()
df_web_sign = df_web.filter((F.col('sign')=='True')).count()
df_true_percent = df_web_sign*100/total_count


print("TOTAL RECORD COUNT: " + str(total_count))
print("RECORD TRUE: " + str(df_web_sign))
print("RECORD TRUE Percent: " + str(df_true_percent) + "%")

TOTAL RECORD COUNT: 20
RECORD TRUE: 11
RECORD TRUE Percent: 55.0%


**Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице**

In [55]:
windowSpec  = Window.partitionBy("page_id")

df_click.withColumn("sum_page",F.count("page_id").over(windowSpec))\
    .orderBy(F.col("sum_page").desc())\
    .show(truncate=False)

+---+----------+-----+-------+--------+-----+--------+
|id |timestamp |type |page_id|tag     |sign |sum_page|
+---+----------+-----+-------+--------+-----+--------+
|5  |1667628976|click|102    |Politics|true |2       |
|1  |1667627565|click|102    |Politics|false|2       |
|1  |1667627500|click|101    |Sport   |false|1       |
|2  |1667628151|click|104    |Politics|true |1       |
|2  |1667628226|click|105    |Medicin |true |1       |
+---+----------+-----+-------+--------+-----+--------+



**Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)**

In [57]:
df_web = df_web.select(*[i for i in df_web.columns if i != "timestamp"],
                    F.from_unixtime("timestamp").alias("event_time"))

In [58]:
df_web.show(5)

+---+-----+-------+--------+-----+-------------------+
| id| type|page_id|     tag| sign|         event_time|
+---+-----+-------+--------+-----+-------------------+
|  1|visit|    101|   Sport|false|2022-11-05 05:50:26|
|  1| move|    101|   Sport|false|2022-11-05 05:51:26|
|  1|click|    101|   Sport|false|2022-11-05 05:51:40|
|  1|visit|    102| Medicin|false|2022-11-05 05:51:45|
|  1|click|    102|Politics|false|2022-11-05 05:52:45|
+---+-----+-------+--------+-----+-------------------+
only showing top 5 rows



In [77]:
df_time_period = df_web.withColumn("new", F.hour("event_time") / F.lit(4))
df_time_period.show(10)

+---+------+-------+--------+-----+-------------------+----+
| id|  type|page_id|     tag| sign|         event_time| new|
+---+------+-------+--------+-----+-------------------+----+
|  1| visit|    101|   Sport|false|2022-11-05 05:50:26|1.25|
|  1|  move|    101|   Sport|false|2022-11-05 05:51:26|1.25|
|  1| click|    101|   Sport|false|2022-11-05 05:51:40|1.25|
|  1| visit|    102| Medicin|false|2022-11-05 05:51:45|1.25|
|  1| click|    102|Politics|false|2022-11-05 05:52:45|1.25|
|  1|  move|    103|   Sport|false|2022-11-05 05:53:06|1.25|
|  2| visit|    104|Politics| true|2022-11-05 06:00:01| 1.5|
|  2|scroll|    104| Medicin| true|2022-11-05 06:01:41| 1.5|
|  2| click|    104|Politics| true|2022-11-05 06:02:31| 1.5|
|  2| visit|    105|Business| true|2022-11-05 06:03:20| 1.5|
+---+------+-------+--------+-----+-------------------+----+
only showing top 10 rows



**Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.**

In [97]:
from pyspark.sql.functions import max

tt = df_time_period.groupby("new")\
      .agg(F.count("*").alias("new_count"))\
      .orderBy("new_count", ascending = False)\
      .limit(1)

tt.show()



+---+---------+
|new|new_count|
+---+---------+
|1.5|       14|
+---+---------+



**Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов**

In [98]:
schema_lk = T.StructType([
                T.StructField("id", T.StringType(), True),
                T.StructField("user_id", T.IntegerType(), True),
                T.StructField("fio", T.StringType(), True),
                T.StructField("dob", T.DateType(), True),
                T.StructField("doc", T.DateType(), True)])

In [99]:
data_lk = [
    (101, 2, "Иванов Иван Иванович", datetime.datetime(1990, 7, 5), datetime.datetime(2016, 8, 1)),
    (102, 5, "Александрова Александра александровна", datetime.datetime(1995, 1, 22), datetime.datetime(2017, 10, 7)),
    (103, 3, "Кириллов Петр Андреевич", datetime.datetime(1991, 4, 25), datetime.datetime(2019, 11, 8)),
    (104, 1, "Михайлов Олег Петрович", datetime.datetime(1990, 11, 22), datetime.datetime(2020, 12, 17)),
    (105, 4, "Путейкина Софья Валерьевна", datetime.datetime(1984, 3, 13), datetime.datetime(2021, 6, 2)),
    (106, 6, "Кнопкина Ольга Юрьевна", datetime.datetime(1987, 7, 7), datetime.datetime(2016, 10, 15)),
]

In [100]:
df_lk = spark.createDataFrame(data = data_lk, schema = schema_lk)

In [102]:
df_lk.show(3,truncate=False)

+---+-------+-------------------------------------+----------+----------+
|id |user_id|fio                                  |dob       |doc       |
+---+-------+-------------------------------------+----------+----------+
|101|2      |Иванов Иван Иванович                 |1990-07-05|2016-08-01|
|102|5      |Александрова Александра александровна|1995-01-22|2017-10-07|
|103|3      |Кириллов Петр Андреевич              |1991-04-25|2019-11-08|
+---+-------+-------------------------------------+----------+----------+
only showing top 3 rows



**Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт**

In [103]:
df_join_all = df_lk.alias("lk").join(df_web.alias("web"),
                                    on = [F.col("lk.user_id") == F.col("web.id")],
                                    how = "left")

In [105]:
df_join_all.show(2)

+---+-------+--------------------+----------+----------+---+-----+-------+-----+-----+-------------------+
| id|user_id|                 fio|       dob|       doc| id| type|page_id|  tag| sign|         event_time|
+---+-------+--------------------+----------+----------+---+-----+-------+-----+-----+-------------------+
|104|      1|Михайлов Олег Пет...|1990-11-22|2020-12-17|  1|visit|    101|Sport|false|2022-11-05 05:50:26|
|104|      1|Михайлов Олег Пет...|1990-11-22|2020-12-17|  1| move|    101|Sport|false|2022-11-05 05:51:26|
+---+-------+--------------------+----------+----------+---+-----+-------+-----+-----+-------------------+
only showing top 2 rows



In [112]:
df_sport = df_join_all.select("fio", "tag")\
                      .filter((F.col('tag')=='Sport'))\
                      .select("fio")\
                      .dropDuplicates(["fio"])
df_sport.show(truncate=False)

+-------------------------------------+
|fio                                  |
+-------------------------------------+
|Михайлов Олег Петрович               |
|Александрова Александра александровна|
|Кириллов Петр Андреевич              |
+-------------------------------------+



In [115]:
@udf(T.StringType())
def take_surname(fio):
    surname, name, middlename = fio.split(' ')
    return surname

In [118]:
df_sport.withColumn("surname", take_surname(F.col("fio")))\
        .select("surname")

surname
Михайлов
Александрова
Кириллов
