## Student name: Fan Li
### Analytic goal: Content based model. Using `artist_id`,`genre_id`,`genre_level`,`album_id` to predict `rating`
### Machine learning algorithm: RandomForestClassifer

In [0]:
from time import time
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [0]:
# start timing
start = time()

In [0]:
spark = SparkSession \
    .builder\
    .appName("mongo")\
    .config('spark.sql.parquet.binaryAsString', 'true')\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
    .config('org.apache.hadoop:hadoop-aws:3.3.1')\
    .config("spark.mongodb.input.uri", 'mongodb+srv://msds697_project:msds697@cluster1.ippkl.mongodb.net/msds697.music')\
    .config("spark.mongodb.output.uri", 'mongodb+srv://msds697_project:msds697@cluster1.ippkl.mongodb.net/msds697.music')\
    .config("spark.network.timeout", "3600s")\
    .getOrCreate()

In [0]:
df = spark.read.format("mongo").option('uri', 'mongodb+srv://msds697_project:msds697@cluster1.ippkl.mongodb.net/msds697.music').load()

In [0]:
df.show(5)

In [0]:
# subset for efficiency
_, df = df.randomSplit([.999,.001],200)
df = df.cache()

In [0]:
# number of unique genre
df.select(count_distinct('genre_id')).show()

In [0]:
# number of unique genre level
df.select(count_distinct('genre_level')).show()

In [0]:
# number of unique artist
df.select(count_distinct('artist_id')).show()

In [0]:
df.describe().show()

In [0]:
# cast rating to inte type and rename to label
df_new = df.select('artist_id','genre_id','album_id','genre_level'\
                   ,(df.rating).cast(IntegerType()).alias('label')
                  )

In [0]:
df_new.printSchema()

In [0]:
# check imblancing
df_new.groupby('label').count().show()

# try rank based model
### top ranked artist and/or genera

In [0]:
rating5_df = df.select('artist_id','genre_id','songid'\
                   ,(df.rating).cast(IntegerType()))\
                    .filter('rating==5')\
                    .distinct()\
                    .cache()
rating5_df.show(5)

In [0]:
rating5_df.count()

In [0]:
# each artist's each genera show top 3 song id
window = Window.partitionBy(rating5_df['artist_id'],rating5_df['genre_id']).orderBy(rating5_df['songid'])

rank_basedf = rating5_df.select('*', rank().over(window).alias('rank'))\
                        .filter(col('rank') <= 3)

In [0]:
rank_basedf.show()

## Create dataframe with a feature vector and rating

In [0]:
# def oneHotEncodeColumns(df, cols):
#     newdf = df
#     for c in cols: 
#         ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
#         ohe_model = ohe.fit(newdf)
        
#         newdf = ohe_model.transform(newdf).drop(c).withColumnRenamed(c+"-onehot", c)
        
#     return newdf
# # remove label column
# #categorical_col.remove("income")
# dfhot = oneHotEncodeColumns(df_new, ['genre_id'])

In [0]:
# df_new = dfhot.select('artist_id','genre_id','label')
# df_new.show(5)

In [0]:
featurevector = VectorAssembler(outputCol="features", inputCols=df_new.columns[:-1]).transform(df_new).select("features", "label")#except the last col.
featurevector.show(5)

## Split dataframe into training and test sets

In [0]:
train, val = featurevector.randomSplit([.8,.2],200)
train = train.cache()
val = val.cache()

## setup cross validation with a RandomForestClassifer, using f1 as metric

In [0]:
rfc = RandomForestClassifier()
metric_name = "accuracy"

evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction")\
                .setMetricName(metric_name) 


paramGrid = ParamGridBuilder().addGrid(rfc.numTrees,[5,10,20])\
                              .addGrid(rfc.maxDepth,[5,10,20])\
                              .build()
cv = CrossValidator(estimator=rfc, 
                    evaluator=evaluator, 
                    #numFolds=2, 
                    estimatorParamMaps=paramGrid)

##  Build a model using training Dataset

In [0]:
cvmodel = cv.fit(train) # take too long

In [0]:
print(f'Number of trees: {cvmodel.bestModel.getNumTrees}')
print(f'Max depth: {cvmodel.bestModel.getMaxDepth()}')

## Predict train and validation set

In [0]:
train_pred = cvmodel.bestModel.transform(train)
val_pred = cvmodel.bestModel.transform(val)


In [0]:
val_pred.show(5)

In [0]:
train_pred.show(5)

In [0]:
train_acc = evaluator.evaluate(train_pred)
val_acc = evaluator.evaluate(val_pred)

train_f1 = evaluator.evaluate(train_pred, {evaluator.metricName: "f1"})
#train_f1 = evaluator.evaluate(train_pred)
val_f1 = evaluator.evaluate(val_pred, {evaluator.metricName: "f1"})
#val_f1 = evaluator.evaluate(val_pred)

In [0]:
print(f"Train set accuracy: {train_acc}. F1 score: {train_f1}")
print(f"Validation set accuracy: {val_acc}. F1 score: {val_f1}")

In [0]:
# end timing
end = time()
hours, rem = divmod(end-start, 3600)
minutes, seconds = divmod(rem, 60)
print("{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))