### MLflow is use for ML model tracking and Hyperparameter logging

In [None]:
# install mlflow
# !pip install mlflow


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import when
from pyspark.ml.feature import StringIndexer, IndexToString
import mlflow
import mlflow.spark
from mlflow.tracking import MlflowClient
from urllib.parse import urlparse

### Loading the dataset from previously configured Hive tables

In [None]:
# Creating dataframe from Hive table
df = spark.sql("SELECT * FROM plays")
df.show(5)

### Alternatively, the dataframe can be created from the raw tsv file

In [None]:
# Creating dataframe from tsv
df = spark.read.csv('dbfs:/FileStore/tables/lastfm-data/usersha1-artmbid-artname-plays.tsv', sep='\t')
df.show(5)

In [0]:
df = df.dropna()

#### Renaming the columns if datagrame created from raw files

In [None]:
newNames = ['userid', 'artistid', 'artistname', 'plays']
df = df.toDF(*newNames)
df.show(5)

### Use StringIndexer to encode the userid and artistid columns

The map of String to INdexes is stored and original strings can be retrieved using IndexToString 

In [0]:
# label encode users and artists
userIndexer = StringIndexer(inputCol="userid", outputCol="userIndex")
artistIndexer = StringIndexer(inputCol="artistid", outputCol="artistIndex")

userDecoder = IndexToString(inputCol="userIndex", outputCol="useridOrignial")
artistDecoder = IndexToString(inputCol="artistIndex", outputCol="artistidOriginal")

df = userIndexer.setHandleInvalid("keep").fit(df).transform(df) # sethandleInvalid("keep") because randomSplit was throwing error about string Indexer. Null values?
df = artistIndexer.setHandleInvalid("keep").fit(df).transform(df)

In [0]:
display(df.take(5))

userid,artistid,artistname,plays,userIndex,artistIndex
00000c289a1829a808ac09c00daf10bc3c4e223b,3bd73256-3905-4f3a-97e2-8b341527f805,betty blowtorch,2137,126839.0,17191.0
00000c289a1829a808ac09c00daf10bc3c4e223b,f2fb0ff0-5679-42ec-a55c-15109ce6e320,die Ärzte,1099,126839.0,277.0
00000c289a1829a808ac09c00daf10bc3c4e223b,b3ae82c2-e60b-4551-a76d-6620f1b456aa,melissa etheridge,897,126839.0,3120.0
00000c289a1829a808ac09c00daf10bc3c4e223b,3d6bbeb7-f90e-4d10-b440-e153c0d10b53,elvenking,717,126839.0,1981.0
00000c289a1829a808ac09c00daf10bc3c4e223b,bbd2ffd7-17f4-4506-8572-c1ea58c3f9a8,juliette & the licks,706,126839.0,2121.0


### Applying the confidence threshold for implicit rating

In [0]:
df2 = df.withColumn("ratings", when(df.plays>50, 1).otherwise(0))
# df2.describe().show()

### Creating a demo dataset for faster training

For complete training the first two rows can be uncommented

In [0]:
# data = df.select("userIndex", "artistIndex", "plays")
# data = df.selectExpr("INT(userIndex)","INT(artistIndex)","FLOAT(plays)","FLOAT(ratings)", "userid", "artistId")
data_demo = df2.selectExpr("INT(userIndex)","INT(artistIndex)","FLOAT(ratings)")

data_demo.dtypes

In [0]:
(demo, rest) = data_demo.randomSplit([0.1, 0.9])

### Setting up MLflow tracking server

In [0]:
# Set the MLFLOW_TRACKING_URI environment variable to have MLflow find a URI from there. The URI can either be a HTTP/HTTPS URI for a remote server, a database connection string, or a local path to log data to a directory. 
mlflow.set_tracking_uri('databricks') 

### Creating new MLflow experiment

In [0]:
# Set active Experiment. Creates new experiment if not available
mlflow.set_experiment(experiment_name ="/Users/user/mxm/lastfm-als-experiment1")

In [0]:
mlflow.tracking.get_tracking_uri()

### Training the ALS model. 

ALS works by factorizing the Rating matrix into User and Artist matrix iteratively

Important hyperparameters which will be tracked by the MLflow experiment are:

**alpha** = A multiplier for setting the confidence in the implicit ratings. A higher value (~1) implies high confidence in the ratings

**rank** = rank is the parameter to control the size of User and Artist matrices. A higher rank usually results in a better trained model but increases the computation overhead

**iterations** = Number of Iterations of the ALS method to calculate the matrix factors

**implicitPrefs** = Boolean to indicate whether the ratings are implicit or explicit

In [0]:
# source: docs 
# ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=int(p[3])))
# ratings = spark.createDataFrame(ratingsRDD)

alphas = [0.1, 0.5, 1]
ranks = [5, 10, 15]
for alpha in alphas:
  for rank in ranks:
    (training, test) = demo.randomSplit([0.8, 0.2])
    with mlflow.start_run(run_name='lastfm-als'):

      als = ALS(userCol="userIndex", itemCol="artistIndex", ratingCol="ratings",
                implicitPrefs=True)
      als.setMaxIter(5)
      als.setRegParam(0.1)
      als.setRank(rank)
      als.setAlpha(alpha)

      mlflow.log_param("model", "als")
      mlflow.log_param("maxIter", als.getMaxIter())
      mlflow.log_param("regParam", als.getRegParam())
      mlflow.log_param("rank", rank)
      mlflow.log_param("alpha", alpha)
      mlflow.log_param("implicitPrefs", als.getImplicitPrefs())

      model = als.fit(training)
      predictions = model.transform(test)
      predictions = predictions.filter(predictions.prediction != float('nan'))
      evaluator = RegressionEvaluator(metricName="rmse", labelCol="ratings",predictionCol="prediction")
      rmse = evaluator.evaluate(predictions)
      mlflow.log_metric("rmse", rmse)
      print("RANK= %s :Root-mean-square error = %s" %(str(rank), str(rmse)))

### Update model registry for tracking runs

In [None]:
run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "lastfm-als"').iloc[0].run_id

model_name = "lastfm-als-model"

model_version = mlflow.register_model(f"runs:/{run_id}/lastfm-als", model_name)

### Generate the  artist recommendation for users

In [0]:
# Generate top 10 artist recommendations for each user
userRecs = model.recommendForAllUsers(10)


### Generate the user recommendation for artists

In [None]:
# Generate top 10 user recommendations for each artist
movieRecs = model.recommendForAllItems(10)

### Evaluate predictions

In [0]:
rmse = evaluator.evaluate(predictions)

### Saved model

In [0]:
dbutils.fs.ls("./model/als-mlflow")