### Импортируем нужные библиотеки

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime

### Запускаем SparkSession

In [2]:
spark = SparkSession.builder.master("local").\
                    appName("Data_Engineer 3.3").\
                    config("spark.driver.bindAddress","localhost").\
                    config("spark.ui.port","4040").\
                    getOrCreate()

### 3.3 а) - Определяем схему

In [3]:
schema = StructType([
         StructField("id", IntegerType(), True),
         StructField("timestamp", TimestampType(), True),
         StructField("type", StringType(), True), # click // scroll // visit // move
         StructField("page_id", IntegerType(), True),
         StructField("tag", StringType(), True), # policy // sport // medicine
         StructField("sign", BooleanType(), True)
])

### 3.3 c) - Заносим данные

In [4]:
data = [
  (12345, datetime.strptime('2022-06-10 02:10:12', '%Y-%m-%d %H:%M:%S'), 'click', 111, 'policy', False),
  (21445, datetime.strptime('2022-03-13 06:15:28', '%Y-%m-%d %H:%M:%S'), 'scroll', 123, 'sport', True),
  (51242, datetime.strptime('2022-05-10 22:22:59', '%Y-%m-%d %H:%M:%S'), 'visit', 456, 'medicine', True),
  (68210, datetime.strptime('2021-11-14 15:05:32', '%Y-%m-%d %H:%M:%S'), 'move', 123, 'sport', False),
  (79723, datetime.strptime('2022-12-05 11:28:11', '%Y-%m-%d %H:%M:%S'), 'move', 111, 'policy', True),
  (12658, datetime.strptime('2021-09-07 23:32:10', '%Y-%m-%d %H:%M:%S'), 'click', 456, 'sport', False),
  (95761, datetime.strptime('2022-07-12 04:18:06', '%Y-%m-%d %H:%M:%S'), 'scroll', 111, 'policy', True),
  (67882, datetime.strptime('2020-03-19 09:56:03', '%Y-%m-%d %H:%M:%S'), 'move', 123, 'policy', False),
  (48636, datetime.strptime('2022-11-22 22:36:28', '%Y-%m-%d %H:%M:%S'), 'click', 123, 'medicine', False),
  (69674, datetime.strptime('2020-10-26 01:44:43', '%Y-%m-%d %H:%M:%S'), 'visit', 456, 'policy', True),
  (69674, datetime.strptime('2020-10-26 01:58:22', '%Y-%m-%d %H:%M:%S'), 'move', 123, 'policy', True),
  (12658, datetime.strptime('2021-09-08 14:21:00', '%Y-%m-%d %H:%M:%S'), 'click', 456, 'sport', False),
  (12345, datetime.strptime('2022-12-05 12:11:01', '%Y-%m-%d %H:%M:%S'), 'scroll', 111, 'policy', False),
  (12345, datetime.strptime('2022-12-07 23:22:50', '%Y-%m-%d %H:%M:%S'), 'click', 456, 'policy', False),
  (95761, datetime.strptime('2022-07-14 06:37:54', '%Y-%m-%d %H:%M:%S'), 'move', 123, 'policy', True),
  (12345, datetime.strptime('2022-12-11 09:18:22', '%Y-%m-%d %H:%M:%S'), 'visit', 111, 'policy', False),
  (48636, datetime.strptime('2022-11-24 21:55:32', '%Y-%m-%d %H:%M:%S'), 'click', 456, 'medicine', False),
  (95761, datetime.strptime('2022-08-22 06:42:38', '%Y-%m-%d %H:%M:%S'), 'move', 123, 'policy', True)
]

# columns = ["id","timestamp","type","page_id","tag","sign"]
df = spark.createDataFrame(data = data, schema = schema)

### 3.3 b) - Выводим DataFrame df

In [5]:
df.show()

+-----+-------------------+------+-------+--------+-----+
|   id|          timestamp|  type|page_id|     tag| sign|
+-----+-------------------+------+-------+--------+-----+
|12345|2022-06-10 02:10:12| click|    111|  policy|false|
|21445|2022-03-13 06:15:28|scroll|    123|   sport| true|
|51242|2022-05-10 22:22:59| visit|    456|medicine| true|
|68210|2021-11-14 15:05:32|  move|    123|   sport|false|
|79723|2022-12-05 11:28:11|  move|    111|  policy| true|
|12658|2021-09-07 23:32:10| click|    456|   sport|false|
|95761|2022-07-12 04:18:06|scroll|    111|  policy| true|
|67882|2020-03-19 09:56:03|  move|    123|  policy|false|
|48636|2022-11-22 22:36:28| click|    123|medicine|false|
|69674|2020-10-26 01:44:43| visit|    456|  policy| true|
|69674|2020-10-26 01:58:22|  move|    123|  policy| true|
|12658|2021-09-08 14:21:00| click|    456|   sport|false|
|12345|2022-12-05 12:11:01|scroll|    111|  policy|false|
|12345|2022-12-07 23:22:50| click|    456|  policy|false|
|95761|2022-07

### Проверяем форматы, используемые в схеме

In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- type: string (nullable = true)
 |-- page_id: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- sign: boolean (nullable = true)



### 3.3 d)

#### Пункт 1 - Вывести топ-5 самых активных посетителей сайта

In [7]:
d_a_ht = df.groupby('id').count()
d_a_result = d_a_ht.orderBy(desc('count')).show(5)

+-----+-----+
|   id|count|
+-----+-----+
|12345|    4|
|95761|    3|
|12658|    2|
|69674|    2|
|48636|    2|
+-----+-----+
only showing top 5 rows



#### Пункт 2 - Посчитать процент посетителей, у которых есть ЛК

In [8]:
d_b_ht = df.dropDuplicates(['id']).groupby('id', 'sign').count().groupby('sign').count()
d_b_ht.show()

+-----+-----+
| sign|count|
+-----+-----+
| true|    5|
|false|    5|
+-----+-----+



In [9]:
sum_signs = d_b_ht.select(sum('count'))
print(sum_signs.head()[0])

10


In [10]:
sum_true = d_b_ht.where('sign == true').select('count')
print(sum_true.head()[0])

5


In [11]:
print(f"Процент, у которых есть ЛК: {sum_true.head()[0] / sum_signs.head()[0] * 100} %")

Процент, у которых есть ЛК: 50.0 %


#### Пункт 3 - Вывести топ-5 страниц сайта по показателю общего кол-ва кликов на данной странице

In [12]:
d_c_result = df.where(df['type'] == 'click').groupby('page_id').count().orderBy(desc('count')).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    456|    4|
|    111|    1|
|    123|    1|
+-------+-----+



#### Пункт 4 - Добавьте столбец к фрейму данных со значением временного диапазона в рамках суток с размером окна – 4 часа(0-4, 4-8, 8-12 и т.д.)

In [13]:
df_d_result = df.withColumn('interval', 
                   when((hour('timestamp') > 0) & (hour('timestamp') <= 4) , '0 - 4')
                  .when((hour('timestamp') > 4) & (hour('timestamp') <= 8) , '4 - 8')
                  .when((hour('timestamp') > 8) & (hour('timestamp') <= 12) , '8 - 12')
                  .when((hour('timestamp') > 12) & (hour('timestamp') <= 16) , '12 - 16')
                  .when((hour('timestamp') > 16) & (hour('timestamp') <= 20) , '16 - 20')
                  .when((hour('timestamp') > 20) & (hour('timestamp') <= 24) , '20 - 24')
                  .otherwise('wrong time type')
                           )
df_d_result.show()

+-----+-------------------+------+-------+--------+-----+--------+
|   id|          timestamp|  type|page_id|     tag| sign|interval|
+-----+-------------------+------+-------+--------+-----+--------+
|12345|2022-06-10 02:10:12| click|    111|  policy|false|   0 - 4|
|21445|2022-03-13 06:15:28|scroll|    123|   sport| true|   4 - 8|
|51242|2022-05-10 22:22:59| visit|    456|medicine| true| 20 - 24|
|68210|2021-11-14 15:05:32|  move|    123|   sport|false| 12 - 16|
|79723|2022-12-05 11:28:11|  move|    111|  policy| true|  8 - 12|
|12658|2021-09-07 23:32:10| click|    456|   sport|false| 20 - 24|
|95761|2022-07-12 04:18:06|scroll|    111|  policy| true|   0 - 4|
|67882|2020-03-19 09:56:03|  move|    123|  policy|false|  8 - 12|
|48636|2022-11-22 22:36:28| click|    123|medicine|false| 20 - 24|
|69674|2020-10-26 01:44:43| visit|    456|  policy| true|   0 - 4|
|69674|2020-10-26 01:58:22|  move|    123|  policy| true|   0 - 4|
|12658|2021-09-08 14:21:00| click|    456|   sport|false| 12 -

#### Пункт 5 - Выведите временной промежуток на основе предыдущего задания, в течение которого было больше всего активностей на сайте.

In [14]:
df_e_result = df_d_result.groupby('interval').count().orderBy(desc('count')).select('interval').show(1)

+--------+
|interval|
+--------+
| 20 - 24|
+--------+
only showing top 1 row



#### Пункт 6 - Создайте второй фрейм данных, который будет содержать информацию о ЛК посетителя сайта со следующим списком атрибутов:
1) Id – уникальный идентификатор личного кабинета
2) User_id – уникальный идентификатор посетителя
3) ФИО посетителя
4) Дату рождения посетителя 
5) Дата создания ЛК

#### Создаем схему (Связываю df и df_2 по id пользователей)

In [15]:
schema_2 = StructType([
           StructField("id_pa", IntegerType(), True), # id_pa - id personal account
           StructField("id_user", IntegerType(), True),
           StructField("Full_name", StringType(), True),
           StructField("dob", DateType(), True), # dob - date of birth
           StructField("doc_account", DateType(), True), # doc_account - date of creation account
])

#### Выводим уникальные id для сверки

In [16]:
df.dropDuplicates(['id']).select('id').show()

+-----+
|   id|
+-----+
|51242|
|79723|
|95761|
|68210|
|69674|
|67882|
|21445|
|12658|
|12345|
|48636|
+-----+



#### Заносим данные для DataFrame 2

In [17]:
data_2 = [
    (2344, 51242, "Aborin M.P", datetime.strptime('1996-10-12', "%Y-%m-%d"), datetime.strptime('2014-06-25', "%Y-%m-%d")),
    (1242, 79723, "Krutoi O.M.", datetime.strptime('1989-03-18', "%Y-%m-%d"), datetime.strptime('2015-08-12', "%Y-%m-%d")),
    (4583, 95761, "Prostoi A.V.", datetime.strptime('1985-04-11', "%Y-%m-%d"), datetime.strptime('2016-10-01', "%Y-%m-%d")),
    (6576, 68210, "Polevaya A.N.", datetime.strptime('1982-12-09', "%Y-%m-%d"), datetime.strptime('2016-04-29', "%Y-%m-%d")),
    (7519, 69674, "Krasnaya A.A.", datetime.strptime('1998-01-10', "%Y-%m-%d"), datetime.strptime('2018-02-11', "%Y-%m-%d")),
    (8124, 67882, "Reznaya T.M.", datetime.strptime('2000-03-12', "%Y-%m-%d"), datetime.strptime('2021-02-12', "%Y-%m-%d")),
    (1663, 21445, "Xoroshyi T.I.", datetime.strptime('2002-04-18', "%Y-%m-%d"), datetime.strptime('2021-04-25', "%Y-%m-%d")),
    (1056, 12658, "Serebristaya M.A.", datetime.strptime('2003-11-23', "%Y-%m-%d"), datetime.strptime('2022-11-27', "%Y-%m-%d")),
    (9860, 12345, "Molodoyi A.G.", datetime.strptime('2000-02-27', "%Y-%m-%d"), datetime.strptime('2022-06-15', "%Y-%m-%d")),
    (3558, 48636, "Molodaya O.V.", datetime.strptime('2001-03-02', "%Y-%m-%d"), datetime.strptime('2022-06-17', "%Y-%m-%d"))
]

df_2 = spark.createDataFrame(data = data_2, schema = schema_2)

#### Выводим DataFrame 2

In [18]:
df_2.show()

+-----+-------+-----------------+----------+-----------+
|id_pa|id_user|        Full_name|       dob|doc_account|
+-----+-------+-----------------+----------+-----------+
| 2344|  51242|       Aborin M.P|1996-10-12| 2014-06-25|
| 1242|  79723|      Krutoi O.M.|1989-03-18| 2015-08-12|
| 4583|  95761|     Prostoi A.V.|1985-04-11| 2016-10-01|
| 6576|  68210|    Polevaya A.N.|1982-12-09| 2016-04-29|
| 7519|  69674|    Krasnaya A.A.|1998-01-10| 2018-02-11|
| 8124|  67882|     Reznaya T.M.|2000-03-12| 2021-02-12|
| 1663|  21445|    Xoroshyi T.I.|2002-04-18| 2021-04-25|
| 1056|  12658|Serebristaya M.A.|2003-11-23| 2022-11-27|
| 9860|  12345|    Molodoyi A.G.|2000-02-27| 2022-06-15|
| 3558|  48636|    Molodaya O.V.|2001-03-02| 2022-06-17|
+-----+-------+-----------------+----------+-----------+



### Проверяем форматы, используемые в схеме_2

In [19]:
df_2.printSchema()

root
 |-- id_pa: integer (nullable = true)
 |-- id_user: integer (nullable = true)
 |-- Full_name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- doc_account: date (nullable = true)



#### Пункт 7 - Вывести фамилии посетителей, которые читали хотя бы одну новость про спорт.

#### Вспомогательная join табличка (df + df_2)

In [24]:
join_df_df_2 = df.join(df_2, df.id == df_2.id_user, 'inner')
join_df_df_2.show()

+-----+-------------------+------+-------+--------+-----+-----+-------+-----------------+----------+-----------+
|   id|          timestamp|  type|page_id|     tag| sign|id_pa|id_user|        Full_name|       dob|doc_account|
+-----+-------------------+------+-------+--------+-----+-----+-------+-----------------+----------+-----------+
|12345|2022-06-10 02:10:12| click|    111|  policy|false| 9860|  12345|    Molodoyi A.G.|2000-02-27| 2022-06-15|
|12345|2022-12-05 12:11:01|scroll|    111|  policy|false| 9860|  12345|    Molodoyi A.G.|2000-02-27| 2022-06-15|
|12345|2022-12-07 23:22:50| click|    456|  policy|false| 9860|  12345|    Molodoyi A.G.|2000-02-27| 2022-06-15|
|12345|2022-12-11 09:18:22| visit|    111|  policy|false| 9860|  12345|    Molodoyi A.G.|2000-02-27| 2022-06-15|
|12658|2021-09-07 23:32:10| click|    456|   sport|false| 1056|  12658|Serebristaya M.A.|2003-11-23| 2022-11-27|
|12658|2021-09-08 14:21:00| click|    456|   sport|false| 1056|  12658|Serebristaya M.A.|2003-11

In [40]:
df_g_result = join_df_df_2.select(['tag', 'Full_name']).filter(join_df_df_2.tag == 'sport').dropDuplicates(['Full_name']).show()

+-----+-----------------+
|  tag|        Full_name|
+-----+-----------------+
|sport|    Polevaya A.N.|
|sport|Serebristaya M.A.|
|sport|    Xoroshyi T.I.|
+-----+-----------------+

