In [1]:
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext()
spark = SparkSession(sc)

In [2]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [3]:
training = spark.read.csv("trainIdx2_matrix.txt", header = False, sep = ('|'), inferSchema=True)
training = training.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "trackID").withColumnRenamed("_c2", "Scores")
training.show(5)
testing = spark.read.csv("D:\\test2_new.txt", header = False, sep = ('|'), inferSchema=True)
testing = testing.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "trackID").withColumnRenamed("_c2", "Recommanded")
testing.show(5)

+------+-------+------+
|userID|trackID|Scores|
+------+-------+------+
|199808| 248969|    90|
|199808|   2663|    90|
|199808|  28341|    90|
|199808|  42563|    90|
|199808|  59092|    90|
+------+-------+------+
only showing top 5 rows

+------+-------+-----------+
|userID|trackID|Recommanded|
+------+-------+-----------+
|200031|  30877|          1|
|200031|   8244|          1|
|200031| 130183|          0|
|200031| 198762|          0|
|200031|  34503|          1|
+------+-------+-----------+
only showing top 5 rows



In [4]:
from pyspark.sql.types import IntegerType
training = training.withColumn("userID", training["userID"].cast(IntegerType()))
training = training.withColumn("trackID", training["trackID"].cast(IntegerType()))
training = training.withColumn("Scores", training["Scores"].cast('float'))
training.show(3)
testing = testing.withColumn("userID", testing["userID"].cast(IntegerType()))
testing = testing.withColumn("trackID", testing["trackID"].cast(IntegerType()))
testing = testing.withColumn("Recommanded", testing["Recommanded"].cast(IntegerType()))####changed####
testing.show(3)

+------+-------+------+
|userID|trackID|Scores|
+------+-------+------+
|199808| 248969|  90.0|
|199808|   2663|  90.0|
|199808|  28341|  90.0|
+------+-------+------+
only showing top 3 rows

+------+-------+-----------+
|userID|trackID|Recommanded|
+------+-------+-----------+
|200031|  30877|          1|
|200031|   8244|          1|
|200031| 130183|          0|
+------+-------+-----------+
only showing top 3 rows



In [5]:
# Create ALS model
als = ALS(
    maxIter=5, 
    rank = 5,
    regParam=0.01,
    userCol="userID", 
    itemCol="trackID",
    ratingCol="Scores", 
    nonnegative = True, 
    implicitPrefs = False,
    coldStartStrategy='drop'
    
)

ALS(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, alpha=1.0, userCol='user', itemCol='item', seed=None, ratingCol='rating', nonnegative=False, checkpointInterval=10, intermediateStorageLevel='MEMORY_AND_DISK', finalStorageLevel='MEMORY_AND_DISK', coldStartStrategy='nan'/'drop, blockSize=4096)

In [None]:
model = als.fit(training)

In [None]:
als_predictions = model.transform(testing)
als_predictions.show(3)

In [None]:
als_predictions = als_predictions.withColumnRenamed("prediction", "Scores")
als_predictions.show(3)

In [None]:
als_predictions = als_predictions.withColumn("Recommanded", als_predictions["Recommanded"].cast(IntegerType()))
als_predictions.show(3)

In [None]:
from pyspark.ml.feature import OneHotEncoder as OneHotEncoderEstimator
from pyspark.ml.feature import StringIndexer, VectorAssembler

categoricalColumns = []
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'Recommanded', outputCol = 'label')
stages += [label_stringIdx]
'''userID|trackID|Recommanded|Scores'''


numericCols = ['userID', 'trackID', 'Scores']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
assembler.setHandleInvalid("keep")
stages += [assembler]

In [None]:
cols = als_predictions.columns

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(als_predictions)
df_predictions = pipelineModel.transform(als_predictions)

selectedCols = ['label', 'features'] + cols
df_predictions = df_predictions.select(selectedCols)

df_predictions.printSchema()

In [None]:
import pandas as pd
pd.DataFrame(df_predictions.take(5), columns=df_predictions.columns).transpose()

In [None]:
train, test = df_predictions.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Testing Dataset Count: " + str(test.count()))
print(train.show(3))
print(test.show(3))

-------------------using the LogistRegression tree-------------------

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol='label', maxIter=10)
lrModel = lr.fit(train)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
predictions = lrModel.transform(test)
predictions.show(10)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

-------------------using the decision tree-------------------

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.show(10)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

-------------------using the random forest-------------------

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.show(10)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

-------------------using the random GBTC-------------------

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
predictions.show(10)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
#########################apply project#################################

In [None]:
data = spark.read.csv("D:\\testItem.data", header=False)
data = data.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "trackID").withColumnRenamed("_c2", "Recommanded")
data = data.withColumn("userID", data["userID"].cast(IntegerType()))
data = data.withColumn("trackID", data["trackID"].cast(IntegerType()))
data = data.withColumn("Recommanded", data["Recommanded"].cast(IntegerType()))
print(data.count())
print(data.show(5))
print(data.printSchema())

In [None]:
# Create ALS model
als_lack = ALS(
    maxIter=5, 
    rank = 5,
    regParam=0.01,
    userCol="userID", 
    itemCol="trackID",
    ratingCol="Scores", 
    nonnegative = False, 
    implicitPrefs = False,
    coldStartStrategy='nan'
    
)

In [None]:
model_lack = als_lack.fit(training)

In [None]:
ALS_mod = model_lack.transform(data)
print(ALS_mod.count())
print(ALS_mod.show(3))

In [None]:
ALS_mod = ALS_mod.withColumnRenamed("prediction", "Scores")
ALS_mod.show(3)

In [None]:
print(ALS_mod.printSchema())

In [None]:
ALS_mod.show(3)

In [None]:
ALS_mod.count()

In [None]:
#ALS_mod_collect = ALS_mod.collect()

In [None]:
#ALS_mod_pd = pd.DataFrame(ALS_mod_collect)
#type(pd.DataFrame(ALS_mod_pd))

In [None]:
#ALS_mod_pd.fillna(0)


In [None]:
#ALS_mod = spark.createDataFrame(ALS_mod_pd)
#print(type(ALS_mod))
#print(ALS_mod.schema)
#ALS_mod.head

In [None]:
#ALS_mod = ALS_mod.withColumnRenamed("0", "userID").withColumnRenamed("1", "trackID").withColumnRenamed("2", "Recommanded").withColumnRenamed("3", "Scores")

In [None]:
#ALS_mod.show(3)

In [None]:
ALS_mod = ALS_mod.na.replace('nan', '0')
ALS_mod = ALS_mod.na.fill(0)

In [None]:
ALS_mod.schema

In [None]:
#ALS_mod.toPandas()

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(ALS_mod)
ALS_mod = pipelineModel.transform(ALS_mod)

selectedCols = ['label', 'features'] + cols
ALS_mod = ALS_mod.select(selectedCols)

ALS_mod.printSchema()

In [None]:
type(ALS_mod)

In [None]:

predictions = rfModel.transform(ALS_mod)
predictions.show(10)

In [None]:
predictions.select('prediction').count()

In [None]:
print(type(predictions))
print(predictions.schema)

In [None]:
result = predictions.select('userID', 'trackID', 'prediction')
print(result.schema)
result.count()

In [None]:
result = result.na.fill(0)

In [None]:
result.toPandas()

In [None]:
#result.coalesce(2).write.csv("627_HW9")
#result.toPandas().to_csv('627HW9v2.csv', index=False)

-------------------using cross validation-------------------

In [None]:
'''from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)'''