In [1]:
# mysql connection
import mysql.connector

mydb = mysql.connector.connect(
  host="cowstudio.wayne-lee.cn",
  user="cowstudio",
  password="cowstudio_2119",
  database="cowstudio"
)

In [2]:
# get spark session, 2g mem per executor
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os

# set python env
os.environ['PYSPARK_PYTHON'] = "/opt/conda3/envs/lab2/bin/python"
spark = SparkSession.builder \
    .appName("user_item_action_matrix") \
    .master("spark://node01:10077") \
    .enableHiveSupport()\
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.cores.max", "3") \
    .config("spark.sql.shuffle.partitions", "12") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .getOrCreate()

sc = spark.sparkContext

23/04/24 04:03:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/24 04:03:39 WARN spark.SparkContext: Please ensure that the number of slots available on your executors is limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource then dynamic allocation will not work properly!


In [7]:
# define map functions 
from datetime import datetime, timedelta

today_string = datetime.today().strftime('%Y-%m-%d')
history_string = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
print(today_string)
print(history_string)

time_factor = 0.9

2023-04-24
2023-04-23


In [8]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
event_score_data = [
    ('view', 1.0),
    ('click', 4.0),
    ('long_view', 7.0),
    ('add_to_favorites', 20.0),
    ('purchase', 30.0),
    ('search_view', 5.0),
    ('search_click', 10.0)
]

schema = StructType([
    StructField("event_type", StringType(), True),
    StructField("score", DoubleType(), True)
])

event_score_df = spark.createDataFrame(event_score_data, schema)
event_score_df.show()
event_score_df.createOrReplaceTempView("event_score")

[Stage 2:>                                                          (0 + 1) / 1]

+----------------+-----+
|      event_type|score|
+----------------+-----+
|            view|  1.0|
|           click|  4.0|
|       long_view|  7.0|
|add_to_favorites| 20.0|
|        purchase| 30.0|
|     search_view|  5.0|
|    search_click| 10.0|
+----------------+-----+



                                                                                

In [9]:
user_item_action_matrix_schema = StructType([
    StructField("item_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("score", DoubleType(), True),
    StructField("date", StringType(), True)
])
spark.createDataFrame([],user_item_action_matrix_schema).createOrReplaceTempView("user_item_action_matrix")

In [23]:
user_item_action_matrix = spark.sql(f'''
with item_all as(
    select
        distinct id as item_id
    from
        item_ods
), user_all as(
    select
        distinct uid as user_id
    from
        user_ods
),history_score as(
    select
        a.item_id,
        b.user_id,
        if(c.item_id is null, 0, c.score) as score
    from
        item_all a
    left join
        user_all b
    left join
        user_item_action_matrix c on a.item_id = c.item_id and b.user_id = c.user_id and c.date = '{history_string}'   
), today_score as(
    select
        a.item_id,
        b.user_id,
        sum(if(d.score is null, 0,d.score)) as score
    from
        item_all a
    left join 
        user_all b
    left join
        event_ods c on a.item_id = c.item_id and b.user_id = c.user_id and c.timestamp = '{today_string}'
    left join
        event_score d on c.event_type = d.event_type
    group by
        a.item_id,
        b.user_id
    order by
        score desc
)
select
    a.item_id,
    b.user_id,
    c.score * {time_factor} + d.score as score,
    '{today_string}' as date
from
    item_all a
left join
    user_all b
left join
    history_score c on a.item_id = c.item_id and b.user_id = c.user_id
left join
    today_score d on a.item_id = d.item_id and b.user_id = d.user_id
order by
    score desc
''')
user_item_action_matrix.show()



+-------+-------+-----+----------+
|item_id|user_id|score|      date|
+-------+-------+-----+----------+
|    449|    267| 34.0|2023-04-24|
|    455|    267| 34.0|2023-04-24|
|    461|    267| 34.0|2023-04-24|
|    343|    307| 30.0|2023-04-24|
|     95|    275| 30.0|2023-04-24|
|     78|    212| 30.0|2023-04-24|
|     90|    221| 30.0|2023-04-24|
|     95|    212| 30.0|2023-04-24|
|    100|    275| 30.0|2023-04-24|
|     77|    221| 30.0|2023-04-24|
|    454|    238| 20.0|2023-04-24|
|    457|    238| 20.0|2023-04-24|
|     41|    181| 20.0|2023-04-24|
|    364|    195| 20.0|2023-04-24|
|    464|    238| 20.0|2023-04-24|
|     70|    181| 20.0|2023-04-24|
|    359|    307| 20.0|2023-04-24|
|    232|    307| 20.0|2023-04-24|
|    221|    183| 20.0|2023-04-24|
|    228|    278| 10.0|2023-04-24|
+-------+-------+-----+----------+
only showing top 20 rows



                                                                                

In [24]:
user_item_action_matrix.write.mode("overwrite").partitionBy("date").saveAsTable("user_item_action_matrix")

                                                                                

In [25]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|           event_ods|      false|
| default|     item_fresh_list|      false|
| default|       item_hot_list|      false|
| default|            item_ods|      false|
| default|          item_order|      false|
| default|          item_score|      false|
| default|        item_tag_ods|      false|
| default|     item_word_count|      false|
| default|       item_word_idf|      false|
| default|        item_word_tf|      false|
| default|     item_word_tfidf|      false|
| default|             tag_ods|      false|
| default|                test|      false|
| default|               test2|      false|
| default|user_item_action_...|      false|
| default|            user_ods|      false|
| default|        user_tag_ods|      false|
|        |         event_score|       true|
|        |          item_score|       true|
|        |user_item_action_...| 

In [26]:
spark.stop()
mydb.close()