In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz

# Unzip the Spark file to the current folder
!tar xf spark-3.0.3-bin-hadoop3.2.tgz

# Install findspark
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()

# Import SparkSession
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def parser(s, delimeters=" ", to_int=None):
    s = s.split(delimeters)
    if to_int:
        return tuple([int(s[i]) if i in to_int else s[i] for i in range(len(s))])
    return tuple(s)
artistData = sc.textFile("/content/drive/MyDrive/DA_Project/artist_data_small.txt").map(lambda x: parser(x,'\t',[0]))
artistAlias = sc.textFile("/content/drive/MyDrive/DA_Project/artist_alias_small.txt").map(lambda x: parser(x,'\t', [0,1]))
artistAliasMap = artistAlias.collectAsMap()
userArtistData = sc.textFile("/content/drive/MyDrive/DA_Project/user_artist_data_small.txt").map(lambda x: parser(x,' ',[0,1,2]))
userArtistData = userArtistData.map(lambda x: (x[0], artistAliasMap.get(x[1], x[1]), x[2]))


In [None]:
userArtistData.take(5)

[(1059637, 1000010, 238),
 (1059637, 1000049, 1),
 (1059637, 1000056, 1),
 (1059637, 1000062, 11),
 (1059637, 1000094, 1)]

In [None]:
artistAlias.take(5)

[(1027859, 1252408),
 (1017615, 668),
 (6745885, 1268522),
 (1018110, 1018110),
 (1014609, 1014609)]

In [None]:
artistAliasMap

{1027859: 1252408,
 1017615: 668,
 6745885: 1268522,
 1018110: 1018110,
 1014609: 1014609,
 6713071: 2976,
 1014175: 1014175,
 1008798: 1008798,
 1013851: 1013851,
 6696814: 1030672,
 1036747: 1239516,
 1278781: 1021980,
 2035175: 1007565,
 1327067: 1308328,
 2006482: 1140837,
 1314530: 1237371,
 1160800: 1345290,
 1255401: 1055061,
 1307351: 1055061,
 1234249: 1005225,
 6622310: 1094137,
 1261919: 6977528,
 2103190: 1002909,
 9929875: 1009048,
 2118737: 1011363,
 9929864: 1000699,
 6666813: 1305683,
 1172822: 1127113,
 2026635: 1001597,
 6726078: 1018408,
 1039896: 1277013,
 1239168: 1266817,
 6819291: 1277876,
 2030690: 2060894,
 6786886: 166,
 1051692: 1307569,
 1239193: 1012079,
 1291581: 78,
 6642817: 1010969,
 1293171: 1007614,
 1070350: 1034635,
 6603691: 1279932,
 1027851: 1063053,
 2060513: 2029258,
 1277348: 668,
 1253023: 1033862,
 1002892: 1002451,
 2060435: 1256876,
 6612396: 1301739,
 1280154: 1021970,
 6617155: 1039381,
 1006102: 1034635,
 6697417: 2013670,
 1059007: 265

In [None]:
artistData.take(5)

[(1240105, 'André Visior'),
 (1240113, 'riow arai'),
 (1240132, 'Outkast & Rage Against the Machine'),
 (6776115, '小松正夫'),
 (1030848, "Raver's Nature")]

In [None]:
user_counts = userArtistData.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

artist_counts = userArtistData.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

max_user = user_counts.max(key=lambda x: x[1])
min_user = user_counts.min(key=lambda x: x[1])

max_artist = artist_counts.max(key=lambda x: x[1])
min_artist = artist_counts.min(key=lambda x: x[1])

print(f"User with maximum plays: {max_user[0]} with {max_user[1]} plays")
print(f"User with minimum plays: {min_user[0]} with {min_user[1]} plays")
print(f"Artist with maximum plays: {max_artist[0]} with {max_artist[1]} plays")
print(f"Artist with minimum plays: {min_artist[0]} with {min_artist[1]} plays")

User with maximum plays: 1024631 with 6188 plays
User with minimum plays: 2064012 with 58 plays
Artist with maximum plays: 1034635 with 43 plays
Artist with minimum plays: 1009140 with 1 plays


In [None]:
users = userArtistData.map(lambda x: x[0])
artists = userArtistData.map(lambda x: x[1])

min_user = users.reduce(lambda a, b: a if a < b else b)
max_user = users.reduce(lambda a, b: a if a > b else b)

min_artist = artists.reduce(lambda a, b: a if a < b else b)
max_artist = artists.reduce(lambda a, b: a if a > b else b)

print(f"Minimum user ID: {min_user}")
print(f"Maximum user ID: {max_user}")
print(f"Minimum artist ID: {min_artist}")
print(f"Maximum artist ID: {max_artist}")

Minimum user ID: 1000647
Maximum user ID: 2288164
Minimum artist ID: 1
Maximum artist ID: 10788218


In [None]:
def summary(user_id):
    play_list = userArtistData.filter(lambda x: x[0] == user_id).map(lambda x: (x[1], x[2])).collect()
    if play_list:
        total = sum(x[1] for x in play_list)
        mean = round(total / len(play_list))
        print("User %s has a total play count of %s and a mean play count of %s." % (user_id, total, mean))
    else:
        print("User %s has no play data." % user_id)

summary(1059637)
summary(2064012)
summary(2069337)

User 1059637 has a total play count of 674412 and a mean play count of 1879.
User 2064012 has a total play count of 548427 and a mean play count of 9456.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


In [None]:
trainingData, validationData, testData = userArtistData.randomSplit([0.4, 0.4, 0.2], seed=13)

trainingData.cache()
validationData.cache()
testData.cache()

print("Training Data Count:", trainingData.count())
print("Validation Data Count:", validationData.count())
print("Test Data Count:", testData.count())

Training Data Count: 19769
Validation Data Count: 19690
Test Data Count: 10022


In [None]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def cal_score(predict, actual):
    if len(actual) < len(predict):
#         print "here"
        predict = predict[0:len(actual)]
    return len(list(set(predict) & set(actual)))*1.0/len(actual)

def modelEval(model, dataset):
    # Find the list of all artists in the whole data set
    all_artists = userArtistData.map(lambda x: x[1]).distinct().collect()
    # Find the users in the input dataset
    test_user = dataset.map(lambda p: p[0]).distinct().collect()
    # Find the artists each user listened to in the training set and generate the test data
    global trainingData
    testdata = trainingData.filter(lambda x: x[0] in test_user).map(lambda x: (x[0], x[1])).groupByKey()
    testdata = testdata.map(lambda x: (x[0], list(x[1])))
    testdata = testdata.flatMap(lambda x: [(x[0],a) for a in all_artists if a not in x[1]])
    # Find the artists each user listened to in the input dataset
    testdata_actual = dataset.map(lambda x: (x[0], x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).collectAsMap()
    predictions = model.predictAll(testdata).map(lambda x: (x[0], (x[1], x[2])))
    predictions = predictions.groupByKey().map(lambda x: (x[0], sorted(list(x[1]), key=lambda y: y[1], reverse=True)))
    predictions = predictions.map(lambda x: (x[0], cal_score([y[0] for y in x[1]], testdata_actual[x[0]])))
    return predictions.map(lambda x:x[1]).reduce(lambda x, y: x+ y) * 1.0 / len(test_user)

In [None]:
training = trainingData.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
for r in [2, 10, 20]:
    model = ALS.trainImplicit(training, rank = r, seed=345)
    print("The model score for rank %s is %s" % (r, modelEval(model, validationData)))

The model score for rank 2 is 0.08641798321304396
The model score for rank 10 is 0.09606525904245533
The model score for rank 20 is 0.08511878890016106


In [None]:
bestModel = ALS.trainImplicit(training, rank=10, seed=345)
print (modelEval(bestModel, testData))

0.062383739358596715


In [None]:
recommended = map(lambda x: x.product, bestModel.recommendProducts(1059637, 10))
for i, artist in enumerate(recommended):
    print ("Artist %s: %s" % (i, artistData.lookup(artist)[0],))

Artist 0: Something Corporate
Artist 1: My Chemical Romance
Artist 2: Further Seems Forever
Artist 3: Taking Back Sunday
Artist 4: Brand New
Artist 5: U2
Artist 6: Modest Mouse
Artist 7: Alkaline Trio
Artist 8: Underoath
Artist 9: Green Day


# New DNN  

In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, regexp_replace
from pyspark.ml.feature import StringIndexer

In [None]:
# Load the dataset without specifying headers
user_artist_df = spark.read.option("delimiter", "\t").csv("/content/drive/MyDrive/DA_Project/user_artist_data_small.txt", header=False)

# Show the raw output
user_artist_df.show(truncate=False)



+---------------------+
|_c0                  |
+---------------------+
|1059637 1000010 238  |
|1059637 1000049 1    |
|1059637 1000056 1    |
|1059637 1000062 11   |
|1059637 1000094 1    |
|1059637 1000112 423  |
|1059637 1000113 5    |
|1059637 1000114 2    |
|1059637 1000123 2    |
|1059637 1000130 19129|
|1059637 1000139 4    |
|1059637 1000241 188  |
|1059637 1000263 180  |
|1059637 1000289 2    |
|1059637 1000305 1    |
|1059637 1000320 21   |
|1059637 1000340 1    |
|1059637 1000427 20   |
|1059637 1000428 12   |
|1059637 1000433 10   |
+---------------------+
only showing top 20 rows



In [None]:
from pyspark.sql import functions as F

# Load the dataset without headers
user_artist_df = spark.read.option("delimiter", "\t").csv("/content/drive/MyDrive/DA_Project/user_artist_data_small.txt", header=False)

# Show the raw DataFrame
user_artist_df.show(truncate=False)

# Split the single column into three columns using a regex to handle spaces
user_artist_df = user_artist_df.select(
    F.split(col("_c0"), " ").getItem(0).alias("userId"),
    F.split(col("_c0"), " ").getItem(1).alias("artistId"),
    F.split(col("_c0"), " ").getItem(2).alias("count")
)

# Show the cleaned DataFrame
user_artist_df.show(truncate=False)


+---------------------+
|_c0                  |
+---------------------+
|1059637 1000010 238  |
|1059637 1000049 1    |
|1059637 1000056 1    |
|1059637 1000062 11   |
|1059637 1000094 1    |
|1059637 1000112 423  |
|1059637 1000113 5    |
|1059637 1000114 2    |
|1059637 1000123 2    |
|1059637 1000130 19129|
|1059637 1000139 4    |
|1059637 1000241 188  |
|1059637 1000263 180  |
|1059637 1000289 2    |
|1059637 1000305 1    |
|1059637 1000320 21   |
|1059637 1000340 1    |
|1059637 1000427 20   |
|1059637 1000428 12   |
|1059637 1000433 10   |
+---------------------+
only showing top 20 rows

+-------+--------+-----+
|userId |artistId|count|
+-------+--------+-----+
|1059637|1000010 |238  |
|1059637|1000049 |1    |
|1059637|1000056 |1    |
|1059637|1000062 |11   |
|1059637|1000094 |1    |
|1059637|1000112 |423  |
|1059637|1000113 |5    |
|1059637|1000114 |2    |
|1059637|1000123 |2    |
|1059637|1000130 |19129|
|1059637|1000139 |4    |
|1059637|1000241 |188  |
|1059637|1000263 |180  

In [None]:
# user_artist_df = spark.read.option("delimiter", "\t").csv("/content/drive/MyDrive/DA_Project/user_artist_data_small.txt", inferSchema=True)
artist_data_df = spark.read.option("delimiter", "\t").csv("/content/drive/MyDrive/DA_Project/artist_data_small.txt", inferSchema=True)
artist_alias_df = spark.read.option("delimiter", "\t").csv("/content/drive/MyDrive/DA_Project/artist_alias_small.txt", inferSchema=True)

In [None]:
user_artist_df.show(5)

+-------+--------+-----+
| userId|artistId|count|
+-------+--------+-----+
|1059637| 1000010|  238|
|1059637| 1000049|    1|
|1059637| 1000056|    1|
|1059637| 1000062|   11|
|1059637| 1000094|    1|
+-------+--------+-----+
only showing top 5 rows



In [None]:
# Rename columns
user_artist_df = user_artist_df.withColumnRenamed("_c0", "userId").withColumnRenamed("_c1", "artistId").withColumnRenamed("_c2", "count")
artist_data_df = artist_data_df.withColumnRenamed("_c0", "artistId").withColumnRenamed("_c1", "artistName")
artist_alias_df = artist_alias_df.withColumnRenamed("_c0", "wrongArtistId").withColumnRenamed("_c1", "correctArtistId")

In [None]:
user_artist_df.show(5)

+-------+--------+-----+
| userId|artistId|count|
+-------+--------+-----+
|1059637| 1000010|  238|
|1059637| 1000049|    1|
|1059637| 1000056|    1|
|1059637| 1000062|   11|
|1059637| 1000094|    1|
+-------+--------+-----+
only showing top 5 rows



In [None]:
# Correct artist IDs using aliases

artist_alias_df = artist_alias_df.select("wrongArtistId", "correctArtistId").distinct()
user_artist_df = user_artist_df.join(artist_alias_df, user_artist_df.artistId == artist_alias_df.wrongArtistId, "left")
user_artist_df = user_artist_df.withColumn("artistId",
                                           when(col("correctArtistId").isNotNull(), col("correctArtistId"))
                                           .otherwise(col("artistId")))
user_artist_df = user_artist_df.drop("wrongArtistId", "correctArtistId")

In [None]:
# Indexing user and artist IDs
indexer_user = StringIndexer(inputCol="userId", outputCol="userIndex")
indexer_artist = StringIndexer(inputCol="artistId", outputCol="artistIndex")

user_artist_df = indexer_user.fit(user_artist_df).transform(user_artist_df)
user_artist_df = indexer_artist.fit(user_artist_df).transform(user_artist_df)

In [None]:
# Convert to Pandas for Keras
user_artist_pd = user_artist_df.select("userIndex", "artistIndex", "count").toPandas()

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Flatten, Dense, Concatenate

In [None]:
# Parameters
num_users = user_artist_pd.userIndex.nunique()
num_artists = user_artist_pd.artistIndex.nunique()
embedding_dim = 50

In [None]:
# Input layers
user_input = Input(shape=(1,))
artist_input = Input(shape=(1,))

In [None]:
# Embedding layers
user_embedding = Embedding(input_dim=num_users, output_dim=embedding_dim)(user_input)
artist_embedding = Embedding(input_dim=num_artists, output_dim=embedding_dim)(artist_input)

In [None]:
# Flatten the embeddings
user_vecs = Flatten()(user_embedding)
artist_vecs = Flatten()(artist_embedding)

In [None]:
# Concatenate and feed into dense layers
concat = Concatenate()([user_vecs, artist_vecs])
dense1 = Dense(128, activation='relu')(concat)
dense2 = Dense(64, activation='relu')(dense1)
output = Dense(1)(dense2)

In [None]:
# Build and compile model
model = Model(inputs=[user_input, artist_input], outputs=output)
model.compile(optimizer='adam', loss='mean_squared_error')

In [None]:
print(user_artist_pd.head())  # Show the first few rows
print(user_artist_pd.columns)  # Check the column names


   userIndex  artistIndex count
0       28.0        126.0   238
1       28.0       6645.0     1
2       28.0        188.0     1
3       28.0         15.0    11
4       28.0        158.0     1
Index(['userIndex', 'artistIndex', 'count'], dtype='object')


In [None]:
# Prepare data for training
X_train = [user_artist_pd['userIndex'].values, user_artist_pd['artistIndex'].values]
y_train = user_artist_pd['count'].values  # Use the correct syntax for accessing the column

In [None]:
import numpy as np

# Ensure the types are correct
X_train = [np.array(user_artist_pd['userIndex'].values), np.array(user_artist_pd['artistIndex'].values)]
y_train = np.array(user_artist_pd['count'].values).astype(np.float32)  # Ensure y_train is float


In [None]:
# Train model
model.fit(X_train, y_train, epochs=20, batch_size=32, validation_split=0.1)

Epoch 1/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 13ms/step - loss: 7851947.5000 - val_loss: 1286834.7500
Epoch 2/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 14ms/step - loss: 9125813.0000 - val_loss: 2159249.2500
Epoch 3/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 13ms/step - loss: 8656953.0000 - val_loss: 3409608.5000
Epoch 4/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 12ms/step - loss: 6083223.0000 - val_loss: 5287687.5000
Epoch 5/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 13ms/step - loss: 1642142.0000 - val_loss: 8229981.5000
Epoch 6/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 13ms/step - loss: 7587679.5000 - val_loss: 11664659.0000
Epoch 7/20
[1m1392/1392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 13ms/step - loss: 4769869.5000 - val_loss: 16314344.0000
Epoch 8/20
[1m1392/1392[0m [32m━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x7ce06aa62e00>

In [None]:
# Assuming artist_data_df has been indexed properly and contains artistId and artistIndex
# Create a mapping from artist indices to artist IDs
artist_index_to_id = artist_data_df.select("artistId").toPandas().values.flatten()

def recommend_artists(user_id, top_n=10):
    # Transform user_id to user_index
    user_index = indexer_user_model.transform(spark.createDataFrame([(user_id,)], ["userId"])).first().userIndex

    artist_indices = np.arange(num_artists)
    predictions = model.predict([np.array([user_index]*num_artists), artist_indices])

    top_artist_indices = np.argsort(predictions.flatten())[-top_n:]

    # Map indices to actual artist IDs
    top_artist_ids = artist_index_to_id[top_artist_indices]

    # Print the predicted artist IDs
    print("Top artist IDs:", top_artist_ids)

    artist_names = artist_data_df.select("artistId", "artistName").toPandas()

    recommended_artists = artist_names[artist_names.artistId.isin(top_artist_ids)]

    return recommended_artists

# Example usage
recommended_artists = recommend_artists(user_id=2069337)
print(recommended_artists)

[1m941/941[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step
Top artist IDs: [ 1028404  2156265  1301407  6658967  1030344 10025967  2001418  1073740
  2012490  1169453]
       artistId                artistName
77     10025967          Southworth, John
3464    1301407                    Darlin
5323    6658967           Sainte Chapelle
6026    1073740             Chuck Carrier
8689    2012490  Louir Vega ft Raul Midon
10375   2001418                    Me Low
17432   2156265            Harwood Agenda
23019   1030344                  Mnemonic
26650   1028404            Lou Ann Barton
28799   1169453                      SHOT
