In [257]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null


In [258]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [259]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [260]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [261]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, \
                              StringType, IntegerType,BooleanType, \
                              LongType, DateType
import numpy as np
import pyspark.sql.functions as func
from pyspark.sql.functions import when
import datetime
from pyspark.sql.window import Window

In [262]:
spark = SparkSession.builder\
                    .master("local[*]")\
                    .getOrCreate()

In [263]:
schema_web = StructType([ 
                StructField("id", IntegerType(), True),
                StructField("timestamp", LongType(), True),
                StructField("type", StringType(), True),
                StructField("page_id", IntegerType(), True),
                StructField("tag", StringType(), True),
                StructField("sign", BooleanType(), True)
])

schema_lk = StructType([
                StructField("id", IntegerType(), True),
                StructField("user_id", IntegerType(), True),
                StructField("fio", StructType([
                      StructField('lastname', StringType(), True),
                      StructField('firstname', StringType(), True),
                      StructField('patronymic', StringType(), True)])),
                StructField("dob", DateType(), True),
                StructField("doc", DateType(), True)
])

In [264]:
id = [int(x) for x in range(1, 15,1)]
timestamp = [int(x) for x in range(1670198400 , 1670803200, 100)]
Type = [str(x) for x in np.random.choice(["visit", "click", "scroll", "move"], size=1000)]
page_id = [int(x) for x in range(1, 100,1)]
tag = []

for i in page_id:
  if  i%2 == 0:
     tag.append("HBO"),
  elif  i%3 == 0:
     tag.append("pron"),
  else:
     tag.append("sport")

sign = []

for i in id:
  if  i%2 == 0:
     sign.append(True),
  else:  
     sign.append(False) 



In [265]:
data_lk = [
    (2, 1, ("Вася", "Вася", "Вася"), datetime.datetime(1920, 9, 5), datetime.datetime(2007, 1, 9)),
    (12, 2, ("Петя", "Петя", "Петя"), datetime.datetime(2000, 2, 22), datetime.datetime(2019, 9, 11)), 
    (18, 8, ("Игорьков", "Игорёк", "Игоревич"), datetime.datetime(1111, 2, 11), datetime.datetime(2046, 12, 1)),
    (7, 4, ("Унру", "Сашка", "Иванович"), datetime.datetime(1248, 4, 12), datetime.datetime(3999, 1, 11)),
    (11, 7, ("Яцик", "Сирожа", "Геннадьевич"), datetime.datetime(1380, 5, 11), datetime.datetime(2022, 1, 11)),
]

In [266]:
df_web = spark.createDataFrame(list(zip(id, timestamp, Type, page_id, tag, sign)), schema_web)
df_lk = spark.createDataFrame(data_lk, schema_lk)

In [267]:
df_web.groupBy("id")\
      .agg(func.count("*").alias("event_cnt"))\
      .orderBy("event_cnt", ascending = False)\
      .show(15)

+---+---------+
| id|event_cnt|
+---+---------+
|  1|        1|
|  6|        1|
|  3|        1|
|  5|        1|
|  4|        1|
|  7|        1|
|  2|        1|
| 12|        1|
| 13|        1|
|  9|        1|
|  8|        1|
| 10|        1|
| 11|        1|
| 14|        1|
+---+---------+



In [268]:
df_web.groupBy("sign").agg(func.countDistinct("id").alias('count_users'))\
                      .withColumn('percent', func.col('count_users')*100/func.sum('count_users')\
                                              .over(Window.partitionBy()))\
                      .show()

+-----+-----------+-------+
| sign|count_users|percent|
+-----+-----------+-------+
| true|          7|   50.0|
|false|          7|   50.0|
+-----+-----------+-------+



In [269]:
df_web.groupBy("sign").agg(func.countDistinct("id").alias('count_users'))\
                      .withColumn('percent', func.col('count_users')*100/func.sum('count_users')\
                                              .over(Window.partitionBy()))\
                      .filter(df_web.sign == True)\
                      .show()

+----+-----------+-------+
|sign|count_users|percent|
+----+-----------+-------+
|true|          7|   50.0|
+----+-----------+-------+



In [270]:
df_web.groupBy("page_id", "type")\
      .agg(func.count("*").alias("click_cnt"))\
      .orderBy("click_cnt", ascending = False)\
      .filter(df_web.type == 'click')\
      .show(5)

+-------+-----+---------+
|page_id| type|click_cnt|
+-------+-----+---------+
|     14|click|        1|
|     11|click|        1|
+-------+-----+---------+



In [271]:
df_web = df_web.select(*[i for i in df_web.columns if i != "timestamp"],
                    func.from_unixtime("timestamp").alias("event_time"))

df_web = df_web.withColumn("new", func.floor(func.hour("event_time") / func.lit(4)))

df_web = df_web.withColumn("time_period", when(df_web.new == "0","0-4")\
                                         .when(df_web.new == "1","4-8")\
                                         .when(df_web.new == "2","8-12")\
                                         .when(df_web.new == "3","12-16")\
                                         .when(df_web.new == "4","16-20")\
                                         .when(df_web.new == "5","20-24")\
                                         )
df_web = df_web.drop("new")

In [272]:
df_web.groupBy("time_period")\
      .agg(func.count("*").alias("activity"))\
      .orderBy("activity")\
      .show()

+-----------+--------+
|time_period|activity|
+-----------+--------+
|        0-4|      14|
+-----------+--------+



In [273]:
df_all = df_lk.alias("lk").join(df_web.alias("web"),
                                    on = [func.col("lk.user_id") == func.col("web.id")],
                                    how = "left")

In [274]:
df_all.select('fio.lastname', 'tag').where(func.col('tag') == "sport").distinct().show()

+--------+-----+
|lastname|  tag|
+--------+-----+
|    Яцик|sport|
|    Вася|sport|
+--------+-----+

