### Reading Parquet file into spark dataframe

In [1]:
import os

# Get the list of files in the data directory
file_list = [f for f in os.listdir('data/train_parquet') if f.endswith('.parquet')]

# Prepend the directory path to each file name
file_list = [os.path.join('data/train_parquet', f) for f in file_list]


In [2]:
len(file_list)

146

In [3]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from tqdm import tqdm

# Create a SparkSession
#spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder \
    .config('spark.driver.memory', '24g') \
    .getOrCreate()

# Initialize an empty list to store the DataFrames
df_list = []

# Iterate through the list of Parquet files
for file in tqdm(file_list):
    # Read the Parquet file into a Spark DataFrame
    df = spark.read.parquet(file)

    # Append the DataFrame to the list
    df_list.append(df)

# Concatenate the DataFrames in the list
#df = df_list[0].union(df_list[1:])
df = df_list[0]

# Iterate through the rest of the DataFrames in the list
for i in tqdm(range(1, len(df_list))):
    # Union the DataFrame with the next DataFrame in the list
    df = df.union(df_list[i])

23/01/11 22:35:27 WARN Utils: Your hostname, darth-Vig800S resolves to a loopback address: 127.0.1.1; using 192.168.100.39 instead (on interface eno1)
23/01/11 22:35:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/11 22:35:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


100%|█████████████████████████████████████████| 146/146 [00:12<00:00, 11.80it/s]
100%|████████████████████████████████████████| 145/145 [00:00<00:00, 157.78it/s]


In [4]:
df.show()

+-------+-------+----+
|session|    aid|type|
+-------+-------+----+
| 100000|1498214|   1|
| 100000|1617298|   1|
| 100000|1617298|   3|
| 100000|1820189|   1|
| 100000|1619534|   1|
| 100000|  22770|   1|
| 100000|  22770|   1|
| 100000|  22770|   3|
| 100000| 339965|   1|
| 100000| 339965|   3|
| 100000| 339965|   5|
| 100000|  22770|   5|
| 100000| 339965|   1|
| 100000| 339965|   1|
| 100000| 710289|   1|
| 100001|1104009|   1|
| 100001|1196408|   1|
| 100001| 822736|   1|
| 100001| 791744|   1|
| 100001| 822736|   1|
+-------+-------+----+
only showing top 20 rows



# Fitting ALS on data

In [5]:
# Train the ALS model
als = ALS(maxIter=5, regParam=0.01, userCol="session", itemCol="aid", ratingCol="type")
model = als.fit(df)

23/01/11 22:35:52 WARN DAGScheduler: Broadcasting large task binary with size 1416.9 KiB
23/01/11 22:35:52 WARN DAGScheduler: Broadcasting large task binary with size 1416.9 KiB
23/01/11 22:35:53 WARN DAGScheduler: Broadcasting large task binary with size 1419.1 KiB




23/01/11 22:36:23 WARN DAGScheduler: Broadcasting large task binary with size 1420.7 KiB




23/01/11 22:36:49 WARN DAGScheduler: Broadcasting large task binary with size 1422.0 KiB


                                                                                

23/01/11 22:37:14 WARN DAGScheduler: Broadcasting large task binary with size 1420.9 KiB




23/01/11 22:37:46 WARN DAGScheduler: Broadcasting large task binary with size 1422.2 KiB


                                                                                

23/01/11 22:38:59 WARN DAGScheduler: Broadcasting large task binary with size 1423.0 KiB




23/01/11 22:39:26 WARN DAGScheduler: Broadcasting large task binary with size 1426.0 KiB




23/01/11 22:40:20 WARN DAGScheduler: Broadcasting large task binary with size 1427.4 KiB




23/01/11 22:41:55 WARN DAGScheduler: Broadcasting large task binary with size 1428.8 KiB




23/01/11 22:42:56 WARN DAGScheduler: Broadcasting large task binary with size 1430.2 KiB




23/01/11 22:44:24 WARN DAGScheduler: Broadcasting large task binary with size 1431.5 KiB




23/01/11 22:45:19 WARN DAGScheduler: Broadcasting large task binary with size 1432.9 KiB




23/01/11 22:46:48 WARN DAGScheduler: Broadcasting large task binary with size 1434.3 KiB




23/01/11 22:47:48 WARN DAGScheduler: Broadcasting large task binary with size 1435.7 KiB




23/01/11 22:49:16 WARN DAGScheduler: Broadcasting large task binary with size 1437.1 KiB




23/01/11 22:50:07 WARN DAGScheduler: Broadcasting large task binary with size 1439.1 KiB


                                                                                

23/01/11 22:51:13 WARN DAGScheduler: Broadcasting large task binary with size 1437.7 KiB


                                                                                

In [6]:
#userRecs = model.recommendForAllUsers(20)

In [7]:
#userRecs.show()

In [8]:
# Make recommendations for a user
recommendations = model.transform(df.filter(df.session == 12899779))

# Show the recommendations
recommendations.show()

23/01/11 22:52:09 WARN DAGScheduler: Broadcasting large task binary with size 1452.2 KiB
23/01/11 22:52:09 WARN DAGScheduler: Broadcasting large task binary with size 1450.8 KiB




23/01/11 22:52:22 WARN DAGScheduler: Broadcasting large task binary with size 1840.2 KiB




23/01/11 22:52:24 WARN DAGScheduler: Broadcasting large task binary with size 1854.1 KiB
23/01/11 22:52:24 WARN DAGScheduler: Broadcasting large task binary with size 1854.1 KiB


                                                                                

+--------+-----+----+----------+
| session|  aid|type|prediction|
+--------+-----+----+----------+
|12899779|59625|   1|0.99176383|
+--------+-----+----+----------+



In [9]:
%%time
from pyspark.sql.functions import desc

rec=recommendations.sort(desc("prediction")).show(10)
rec

23/01/11 22:52:28 WARN DAGScheduler: Broadcasting large task binary with size 1452.2 KiB
23/01/11 22:52:28 WARN DAGScheduler: Broadcasting large task binary with size 1450.8 KiB




23/01/11 22:52:39 WARN DAGScheduler: Broadcasting large task binary with size 1825.3 KiB




23/01/11 22:52:40 WARN DAGScheduler: Broadcasting large task binary with size 1839.5 KiB


[Stage 288:>                                                        (0 + 4) / 4]

+--------+-----+----+----------+
| session|  aid|type|prediction|
+--------+-----+----+----------+
|12899779|59625|   1|0.99176383|
+--------+-----+----+----------+

CPU times: user 28.4 ms, sys: 3.82 ms, total: 32.2 ms
Wall time: 18.6 s


                                                                                

In [10]:
#spark.stop()

In [11]:
import numpy as np
from pyspark.sql.functions import desc

# select only the rating column
ratings_df = recommendations.select("prediction")

# sort the ratings in descending order
ratings_df = ratings_df.sort(desc("prediction"))

# convert to pandas dataframe and get only top 10
ratings_df = ratings_df.limit(10).toPandas()

# Get the values of the pandas DataFrame as a numpy array
numpy_array = ratings_df.values
numpy_array

23/01/11 22:53:19 WARN DAGScheduler: Broadcasting large task binary with size 1452.2 KiB
23/01/11 22:53:19 WARN DAGScheduler: Broadcasting large task binary with size 1450.8 KiB




23/01/11 22:53:26 WARN DAGScheduler: Broadcasting large task binary with size 1811.7 KiB




23/01/11 22:53:26 WARN DAGScheduler: Broadcasting large task binary with size 1825.8 KiB


                                                                                

array([[0.99176383]], dtype=float32)

In [12]:
import json
from tqdm import tqdm

session_list = []

with open("data/test.jsonl", "r") as f:
    for line in tqdm(f):
        data = json.loads(line)
        session_list.append(data["session"])


1671803it [00:36, 45942.46it/s] 


In [13]:
session_list[0:10]

[12899779,
 12899780,
 12899781,
 12899782,
 12899783,
 12899784,
 12899785,
 12899786,
 12899787,
 12899788]

In [14]:
len(session_list)

1671803

In [15]:
temp=session_list[0:10]
temp

[12899779,
 12899780,
 12899781,
 12899782,
 12899783,
 12899784,
 12899785,
 12899786,
 12899787,
 12899788]

In [16]:

# with open('data/mid_file1.jsonl', 'w') as f:
#     for x in tqdm(temp):
#         recommendations = model.transform(df.filter(df.session == x))
#         ratings_df = recommendations.select("prediction")
#         # sort the ratings in descending order
#         ratings_df = ratings_df.sort(desc("prediction"))

#         # convert to pandas dataframe and get only top 10
#         ratings_df = ratings_df.limit(10).toPandas()

#         # Get the values of the pandas DataFrame as a numpy array
#         numpy_array = ratings_df.values
#         print(numpy_array)
        
#         string_values = [str(x1) for x1 in numpy_array]
#         aids_str = ' '.join(string_values)
        
#         json_record = json.dumps(aids_str, ensure_ascii=False)
#         f.write(str(x) + json_record + '\n')
    

In [18]:
model_path = "data/als_model1"
model.save(model_path)

23/01/11 22:57:52 WARN DAGScheduler: Broadcasting large task binary with size 1650.6 KiB


                                                                                

23/01/11 22:58:02 WARN DAGScheduler: Broadcasting large task binary with size 1649.2 KiB


                                                                                