# Load SIM to Redis

In [9]:
import redis
import pandas as pd
from redis.commands.search.field import TextField, TagField, NumericField, GeoField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
import os

In [10]:
redis_client = redis.from_url('redis://localhost:6379/?decode_responses=True')

In [60]:
def load_data_to_redis(sim_path):
    MODEL_INDEX_NAME = "idx:sim"
    MODEL_KEY_BASE = "ecommerce:sim"

    try:
        redis_client.ft(MODEL_INDEX_NAME).dropindex(delete_documents=False)
    except:
        pass

    redis_client.ft(MODEL_INDEX_NAME).create_index(
        [
            TagField("$.user_id_u", as_name="user_id_u"),
            TagField("$.user_id_v", as_name="user_id_v"),
            NumericField("$.sim", as_name="sim")
        ],
        definition=IndexDefinition(
            index_type=IndexType.JSON,
            prefix=[f"{MODEL_KEY_BASE}:"]
        )
    )
        
    for file in os.listdir(sim_path):
        user = str(file.replace('.csv', ''))
        # print(user)
        df = pd.read_csv(f'{sim_path}/{file}')
        df.columns = ['user_id_v', 'mfps']
        print(df)
        MFPSs_loaded = 0

        try:
            pipeline = redis_client.pipeline(transaction=False)

            for index, row in df.iterrows():
                user_id_u = user
                user_id_v = str(int(row['user_id_v']))
                mfps = row['mfps']
                
                data = {
                    'user_id_u': user_id_u,
                    'user_id_v': user_id_v,
                    'mfps': mfps
                }
                
                redis_key = f"{MODEL_KEY_BASE}:{user_id_u}:{user_id_v}"
                pipeline.json().set(redis_key, "$", data)
                MFPSs_loaded += 1
                print(f"{redis_key} - MFPS: {mfps}")

            pipeline.execute()
        except Exception as e:
            print("Failed to load MFPS data:")
            print(e)
            os._exit(1)

    print(f"Loaded {MFPSs_loaded} MFPSs into Redis.")

In [61]:
load_data_to_redis('./output')

     user_id_v      mfps
0        68275  0.986842
1      1895509  0.986667
2     13513176  0.985507
3      7590141  0.981818
4     13494521  0.980392
..         ...       ...
178   16263010  0.247312
179    6067141  0.247191
180   18765644  0.247191
181     153914  0.246914
182   24202670  0.246753

[183 rows x 2 columns]
ecommerce:sim:6087484:68275 - MFPS: 0.986842105263158
ecommerce:sim:6087484:1895509 - MFPS: 0.9866666666666666
ecommerce:sim:6087484:13513176 - MFPS: 0.9855072463768116
ecommerce:sim:6087484:7590141 - MFPS: 0.9818181818181816
ecommerce:sim:6087484:13494521 - MFPS: 0.9803921568627452
ecommerce:sim:6087484:9745026 - MFPS: 0.979591836734694
ecommerce:sim:6087484:6158890 - MFPS: 0.979591836734694
ecommerce:sim:6087484:5805473 - MFPS: 0.979591836734694
ecommerce:sim:6087484:17153193 - MFPS: 0.9791666666666669
ecommerce:sim:6087484:620317 - MFPS: 0.9791666666666669
ecommerce:sim:6087484:190330 - MFPS: 0.978723404255319
ecommerce:sim:6087484:7872456 - MFPS: 0.978260869565217

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
def insertDataToDB(spark, mysql_url, mysql_properties):
    # spark = SparkSession.builder.appName("clusteringForRS").getOrCreate()
    table_name = 'TrainingData'

    df = spark.read.jdbc(mysql_url, table_name, properties=mysql_properties)
    input_file_rdd = df.rdd.map(lambda row: (row.user_id, row.item_id, row.rating, row.review_date))
    cols = ["user_id","item_id", "rating", "timestamp"]
    input_file = input_file_rdd.toDF(cols)

    label_rdd = spark.sparkContext.textFile('hdfs:///HM_clustering/Label').map(lambda x: x.strip().split('\t')).map(lambda x: (str(x[0]),x[1]))
    labels = label_rdd.toDF(["user_", "label"])

    joined_df = labels.join(input_file, input_file.user_id == labels.user_).select(
                        input_file.user_id,
                        input_file.item_id,
                        input_file.rating,
                        input_file.timestamp,
                        labels.label)
    
    joined_df.write.jdbc(
        url=mysql_url,
        table="TrainingData_1",
        mode="overwrite",
        properties=mysql_properties
    )
    
    avg_rating_df = joined_df.groupBy("user_id", "label").agg(F.avg("rating").alias("avg_rating"))
    
    avg_rating_df.write.jdbc(
        url=mysql_url,
        table="AVGRating",
        mode="overwrite",
        properties=mysql_properties
    )

In [3]:
spark = SparkSession.builder \
        .appName("SplitFile") \
        .config("spark.jars", "mysql-connector-java-8.0.13.jar") \
        .getOrCreate()

mysql_url = "jdbc:mysql://localhost:3306/ecommerce?useSSL=false"
mysql_properties = {
    "user": "root",
    "password": "Password@123",
    "driver": "com.mysql.cj.jdbc.Driver"
}
insertDataToDB(spark, mysql_url, mysql_properties)

your 131072x1 screen size is bogus. expect trouble
24/05/21 21:29:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 21:30:22 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(Block

# Split file by Label

Tách input

In [2]:
from pyspark.sql import SparkSession
mysql_url = "jdbc:mysql://localhost:3306/ecommerce?useSSL=false"
mysql_properties = {
    "user": "root",
    "password": "Password@123",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [13]:
spark = SparkSession.builder.appName("SplitInput").getOrCreate()
table_name = "TrainingData_1"
df_input = spark.read.jdbc(mysql_url, table_name, properties=mysql_properties)
df_input = df_input.toPandas()

In [14]:
grouped = df_input.groupby('label')

for label, group in grouped:
    filename = f"./temp_preSIM/input_{label}.txt"
    
    with open(filename, 'w') as file:
        for index, row in group.iterrows():
            user_item = f"{row['user_id']};{row['item_id']}"
            rating_timestamp = f"{row['rating']};{row['timestamp']}"
            file.write(f"{user_item}\t{rating_timestamp}\n")
            
spark.stop()

Tách AVG Rating

In [3]:
spark = SparkSession.builder.appName("SplitAVG").getOrCreate()
table_name = "AVGRating"
df_avg = spark.read.jdbc(mysql_url, table_name, properties=mysql_properties)
df_avg = df_avg.toPandas()


your 131072x1 screen size is bogus. expect trouble
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 21:40:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [4]:
grouped = df_avg.groupby('label')
for label, group in grouped:
    filename = f"./temp_preSIM/avg_{label}.txt"
    
    with open(filename, 'w') as file:
        for index, row in group.iterrows():
            user = f"{row['user_id']}"
            avg_rating = f"{row['avg_rating']}"
            file.write(f"{user}\t{avg_rating}\n")
            
spark.stop() 