In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .config("spark.driver.bindAddress","localhost")\
        .config("spark.ui.port","4040")\
        .appName("Task3.3")\
        .getOrCreate()

In [75]:
from pyspark.sql.functions import col,lit,from_unixtime,hour,floor

In [79]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType, BooleanType

data_web = [
    (1, 1670641486, "visit", 101, "Sport", True),
    (1, 1670645086,"click", 101, "Sport", True),
    (1, 1670645486,"click", 105, "News", True),
    (1, 1670646086,"scroll", 105, "News", True),
    (1, 1670747886,"move", 122, "Sport", True),
    (2, 1670741486, "visit", 101, "Games", True),
    (2, 1670745086,"click", 101, "Games", True),
    (2, 1670745486,"click", 105, "Games", True),
    (2, 1670746086,"scroll", 105, "Games", True),
    (3, 1670747886,"move", 122, "Sport", True),
    (3, 1670741486, "visit", 101, "Games", True),
    (3, 1670745086,"click", 101, "Sport", True),
    (3, 1670745486,"click", 105, "News", True),
    (3, 1670746086,"scroll", 105, "News", True),
    (4, 1670747886,"move", 122, "Games", False),
    (4, 1670741486, "visit", 101, "Games", False),
    (5, 1670745086,"click", 101, "Sport", False),
    (5, 1670745486,"click", 105, "News", False),
    (5, 1670746086,"scroll", 105, "News", False),
    (5, 1670747886,"move", 122, "Sport", False),
    (6, 1670777886,"click", 122, "Sport", False),
  ]

schema_web = StructType([ \
    StructField("id", IntegerType(), True), \
    StructField("timestamp", LongType(), True), \
    StructField("type",StringType(),True), \
    StructField("page_id", IntegerType(), True), \
    StructField("tag", StringType(), True), \
    StructField("sign", BooleanType(), True) \
  ])
 
df_web = spark.createDataFrame(data=data_web,schema=schema_web)
df_web.printSchema()
df_web.show(truncate=False)

root
 |-- id: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- type: string (nullable = true)
 |-- page_id: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- sign: boolean (nullable = true)

+---+----------+------+-------+-----+-----+
|id |timestamp |type  |page_id|tag  |sign |
+---+----------+------+-------+-----+-----+
|1  |1670641486|visit |101    |Sport|true |
|1  |1670645086|click |101    |Sport|true |
|1  |1670645486|click |105    |News |true |
|1  |1670646086|scroll|105    |News |true |
|1  |1670747886|move  |122    |Sport|true |
|2  |1670741486|visit |101    |Games|true |
|2  |1670745086|click |101    |Games|true |
|2  |1670745486|click |105    |Games|true |
|2  |1670746086|scroll|105    |Games|true |
|3  |1670747886|move  |122    |Sport|true |
|3  |1670741486|visit |101    |Games|true |
|3  |1670745086|click |101    |Sport|true |
|3  |1670745486|click |105    |News |true |
|3  |1670746086|scroll|105    |News |true |
|4  |1670747886|move  |1

In [80]:
df_web.groupby("id").count().orderBy(col("count").desc()).show(5)

+---+-----+
| id|count|
+---+-----+
|  1|    5|
|  3|    5|
|  2|    4|
|  5|    4|
|  4|    2|
+---+-----+
only showing top 5 rows



                                                                                

In [81]:
df_web.groupby("sign").count().select(max("count").alias("max")).withColumn("proc", col("max")*100/df_web.count()).show()

+---+-----------------+
|max|             proc|
+---+-----------------+
| 14|66.66666666666667|
+---+-----------------+



                                                                                

In [82]:
df_web.select(col("page_id"), col("type")).filter(col("type") == "click").groupby(col("page_id")).count().orderBy(col("count").alias("click_count").desc()).show(5)

+-------+-----+
|page_id|count|
+-------+-----+
|    101|    4|
|    105|    4|
|    122|    1|
+-------+-----+



                                                                                

In [101]:
h = hour(from_unixtime("timestamp"))
df_web.withColumn("time_range", floor(h/lit(6))).groupby("time_range").count().orderBy(col("count").desc()).show(1)

+----------+-----+
|time_range|count|
+----------+-----+
|         1|   16|
+----------+-----+
only showing top 1 row



                                                                                

In [102]:
data_lk = [
    (10, 1, "James Smith", 976559268, 1586716068),
    (20, 2, "Michael Rose", 1031160468, 1636482468),
    (30, 3,"Robert Williams", 931800468, 1643135268)
  ]

schema_lk = StructType([ \
    StructField("id", IntegerType(), True), \
    StructField("user_id", IntegerType(), True), \
    StructField("full_name",StringType(),True), \
    StructField("dob", LongType(), True), \
    StructField("domlk", LongType(), True) \
  ])
 
df_lk = spark.createDataFrame(data=data_lk,schema=schema_lk)
df_lk.printSchema()
df_lk.show(truncate=False)

root
 |-- id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- dob: long (nullable = true)
 |-- domlk: long (nullable = true)

+---+-------+---------------+----------+----------+
|id |user_id|full_name      |dob       |domlk     |
+---+-------+---------------+----------+----------+
|10 |1      |James Smith    |976559268 |1586716068|
|20 |2      |Michael Rose   |1031160468|1636482468|
|30 |3      |Robert Williams|931800468 |1643135268|
+---+-------+---------------+----------+----------+



In [118]:
df_web.select("id", "tag").filter(col("tag") == "Sport").join(df_lk, df_web.id == df_lk.user_id).select("tag", "full_name").show()

+-----+---------------+
|  tag|      full_name|
+-----+---------------+
|Sport|    James Smith|
|Sport|    James Smith|
|Sport|    James Smith|
|Sport|Robert Williams|
|Sport|Robert Williams|
+-----+---------------+

