In [58]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col, sum, explode, split, to_timestamp, current_timestamp
from pyspark.sql.types import IntegerType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

spark = SparkSession\
        .builder\
        .appName("ALS")\
        .config("spark.driver.host","localhost")\
        .config("spark.jars", "./postgresql-42.6.0.jar") \
        .getOrCreate()
# read in the dataset into pyspark DataFrame
attendance = spark.read.csv('./attend.csv', header='true', inferSchema = 'true')
attendance = attendance.drop('дата занятия') \
                        .drop('время начала занятия') \
                        .drop('время окончания занятия') \
                        .drop('направление 2') \
                        .drop('направление 3') \
                        .drop('уникальный номер занятия') \
                        .withColumnRenamed('уникальный номер группы', 'groupId') \
                        .withColumnRenamed('уникальный номер участника', 'userId') \
                        .withColumn("rank", when((attendance['онлайн/офлайн'] == "Да"), 1) \
                                                .when((attendance['онлайн/офлайн'] == "Нет"), 1) \
                                                .otherwise(lit("0"))) \
                        .drop('онлайн/офлайн')
attendance = attendance.withColumn("rank", col('rank').cast(IntegerType()))

categoriesFromDB = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://10.50.50.34:5432/leaders2023") \
    .option("dbtable", 'public."Category"') \
    .option("user", "postgres") \
    .option("password", "m3zyCrSHpGXsgqn2XZGigP7hR4Dn2GaeEfJSTDmkfvADkEoPgVbp96nUL6Xty4PK") \
    .option("driver", "org.postgresql.Driver") \
    .load()

level0Categories = categoriesFromDB.select(col("id").alias("lvl0id"),col("name").alias("lvl0name"),col("parentId").alias("lvl0parentId")).filter(col("level") == 0)
level1Categories = categoriesFromDB.select(col("id").alias("lvl1id"),col("name").alias("lvl1name"),col("parentId").alias("lvl1parentId")).filter(col("level") == 1)
level2Categories = categoriesFromDB.select(col("id").alias("lvl2id"),col("name").alias("lvl2name"),col("parentId").alias("lvl2parentId")).filter(col("level") == 2)
level3Categories = categoriesFromDB.select(col("id").alias("lvl3id"),col("name").alias("lvl3name"),col("parentId").alias("lvl3parentId")).filter(col("level") == 3)

lvl01JoinedDF = level1Categories.join(level0Categories, level1Categories.lvl1parentId == level0Categories.lvl0id)
lvl012JoinedDF = level2Categories.join(lvl01JoinedDF, level2Categories.lvl2parentId == lvl01JoinedDF.lvl1id)
lvl0123JoinedDF = level3Categories.join(lvl012JoinedDF, level3Categories.lvl3parentId == lvl012JoinedDF.lvl2id)

categories = lvl0123JoinedDF.select("lvl0id", "lvl0name", "lvl3id","lvl3name")
                
groups = spark.read.csv('./groups.csv', header='true', inferSchema = 'true')
groups = groups.drop('направление 1') \
                        .drop('направление 2') \
                        .drop('адрес площадки') \
                        .drop('округ площадки') \
                        .drop('район площадки') \
                        .drop('расписание в активных периодах') \
                        .drop('расписание в закрытых периодах') \
                        .drop('расписание в плановом периоде') \
                        .withColumnRenamed('уникальный номер', 'groupId') \
                        .withColumnRenamed('направление 3', 'lvl3name')
groupsWithLevelIds = categories.join(groups, 'lvl3name')
attendanceWithLevelIds=attendance.join(groupsWithLevelIds, 'groupId')
attendanceWithLevelIds.head(5)



                                                                                

[Row(groupId=801346710, userId=101386726, rank=1, lvl3name='Дыхательная гимнастика', lvl0id=3584, lvl0name='Для тела', lvl3id=3757),
 Row(groupId=801346710, userId=101430794, rank=1, lvl3name='Дыхательная гимнастика', lvl0id=3584, lvl0name='Для тела', lvl3id=3757),
 Row(groupId=801346810, userId=101366986, rank=1, lvl3name='ОНЛАЙН Ментальная арифметика', lvl0id=2840, lvl0name='Для ума', lvl3id=2976),
 Row(groupId=801346810, userId=101374816, rank=1, lvl3name='ОНЛАЙН Ментальная арифметика', lvl0id=2840, lvl0name='Для ума', lvl3id=2976),
 Row(groupId=801346810, userId=101381146, rank=1, lvl3name='ОНЛАЙН Ментальная арифметика', lvl0id=2840, lvl0name='Для ума', lvl3id=2976)]

In [35]:
interestsLvl3 = attendanceWithLevelIds.groupBy("userId", "lvl3id").agg(sum("rank").alias("rank"))


(trainingInterestsLvl3, testInterestsLvl3) = interestsLvl3.randomSplit([0.8, 0.2])

alsInterestsLvl3 = ALS(maxIter=5,rank=100, regParam=0.01, userCol="userId", itemCol="lvl3id", ratingCol="rank",
          coldStartStrategy="drop")


modelInterestsLvl3 = alsInterestsLvl3.fit(trainingInterestsLvl3)


predictionsInterestsLvl3 = modelInterestsLvl3.transform(testInterestsLvl3)
evaluatorInterestsLvl3 = RegressionEvaluator(metricName="rmse", labelCol="rank",
                                predictionCol="prediction")
rmseInterestsLvl3 = evaluatorInterestsLvl3.evaluate(predictionsInterestsLvl3)
print("Root-mean-square error = " + str(rmseInterestsLvl3))

23/05/28 00:19:33 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/28 00:19:33 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/05/28 00:19:34 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
[Stage 285:>                                                        (0 + 8) / 8]

Root-mean-square error = 28.068110009106668


                                                                                

In [36]:
interestsLvl0 = attendanceWithLevelIds.groupBy("userId", "lvl0id").agg(sum("rank").alias("rank"))

(trainingInterestsLvl0, testInterestsLvl0) = interestsLvl0.randomSplit([0.8, 0.2])

alsInterestsLvl0 = ALS(maxIter=5,rank=100, regParam=0.01, userCol="userId", itemCol="lvl0id", ratingCol="rank",
          coldStartStrategy="drop")


modelInterestsLvl0 = alsInterestsLvl0.fit(trainingInterestsLvl0)


predictionsInterestsLvl0 = modelInterestsLvl0.transform(testInterestsLvl0)
evaluatorInterestsLvl0 = RegressionEvaluator(metricName="rmse", labelCol="rank",
                                predictionCol="prediction")
rmseInterestsLvl0 = evaluatorInterestsLvl0.evaluate(predictionsInterestsLvl0)
print("Root-mean-square error = " + str(rmseInterestsLvl0))



Root-mean-square error = 95.13942791265822


                                                                                

In [62]:
url = "jdbc:postgresql://10.50.50.34:5432/leaders2023"
properties = {"user": "postgres","password": "m3zyCrSHpGXsgqn2XZGigP7hR4Dn2GaeEfJSTDmkfvADkEoPgVbp96nUL6Xty4PK",
              "driver": "org.postgresql.Driver"}

recommendationsInterestsLvl0 = modelInterestsLvl0.recommendForAllUsers(3)
lvl0recs = recommendationsInterestsLvl0.withColumn('recommendations', explode('recommendations'))
resultLvl0 = lvl0recs.select("userId", lvl0recs["recommendations.lvl0id"].alias("categoryId"), lvl0recs["recommendations.rating"].alias("rank")) \
        .withColumn("createdAt",to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS"))


resultLvl0.write.jdbc(url=url, table='public."Recommendation"', mode="overwrite", properties=properties)


                                                                                

In [65]:
recommendationsInterestsLvl3 = modelInterestsLvl3.recommendForAllUsers(200)
lvl3recs = recommendationsInterestsLvl3.withColumn('recommendations', explode('recommendations'))
resultLvl3 = lvl3recs.select("userId", lvl3recs["recommendations.lvl3id"].alias("categoryId"), lvl3recs["recommendations.rating"].alias("rank")) \
        .withColumn("createdAt",to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS"))
resultLvl3.count()
resultLvl3.write.jdbc(url=url, table='public."Recommendation"', mode="append", properties=properties)



                                                                                

In [None]:
spark.stop()