In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DT').getOrCreate()

In [3]:
df = spark.read.format("csv").option("sep",",").option("inferSchema","true").option("header","true").load("test2_score_feature_final.txt")
df.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- trackid: integer (nullable = true)
 |-- albumscore: integer (nullable = true)
 |-- trackMax: integer (nullable = true)
 |-- trackMin: integer (nullable = true)
 |-- trackMean: double (nullable = true)
 |-- artistscore: integer (nullable = true)
 |-- albumMax: integer (nullable = true)
 |-- albumMin: integer (nullable = true)
 |-- albumMean: double (nullable = true)
 |-- genreamax: integer (nullable = true)
 |-- genreamin: integer (nullable = true)
 |-- genreamean: double (nullable = true)



In [4]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()
numeric_features = ['Userid','trackid','label','albumscore','trackMax','trackMin','trackMean','artistscore','albumMax','albumMin','albumMean','genreamax','genreamin','genreamean']

df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Userid,6000,206286.491,3515.749299951101,200031,212234
trackid,6000,149260.0025,86146.45891885243,65,296098
label,6000,0.5,0.5000416718757232,0,1
albumscore,6000,0.0,0.0,0,0
trackMax,6000,15.239833333333333,33.29398436461707,0,100
trackMin,6000,12.644166666666667,29.643880918634974,0,100
trackMean,6000,14.152771353128198,31.327078426246768,0.0,100.0
artistscore,6000,23.488666666666667,38.63689523198974,0,100
albumMax,6000,26.79083333333333,40.21001397052079,0,100


In [5]:
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.types as types
df=df.withColumn('label',df['label'].cast(types.IntegerType()))
df=df.withColumn('Userid',df['Userid'].cast(types.IntegerType()))
df=df.withColumn('trackid',df['trackid'].cast(types.IntegerType()))
df=df.withColumn('albumscore',df['albumscore'].cast(types.IntegerType()))
df=df.withColumn('trackMax',df['trackMax'].cast(types.IntegerType()))
df=df.withColumn('trackMin',df['trackMin'].cast(types.IntegerType()))
df=df.withColumn('trackMean',df['trackMean'].cast(types.IntegerType()))
df=df.withColumn('artistscore',df['artistscore'].cast(types.IntegerType()))
df=df.withColumn('albumMax',df['albumMax'].cast(types.IntegerType()))
df=df.withColumn('albumMin',df['albumMin'].cast(types.IntegerType()))
df=df.withColumn('albumMean',df['albumMean'].cast(types.IntegerType()))
df=df.withColumn('genreamax',df['genreamax'].cast(types.IntegerType()))
df=df.withColumn('genreamin',df['genreamin'].cast(types.IntegerType()))
df=df.withColumn('genreamean',df['genreamean'].cast(types.IntegerType()))

assembler=VectorAssembler(inputCols=['Userid','trackid','label','albumscore','trackMax','trackMin','trackMean','artistscore','albumMax','albumMin','albumMean','genreamax','genreamin','genreamean'],outputCol='features')



In [6]:
from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[assembler])
model=pipeline.fit(df)
df=model.transform(df)

In [7]:
train,test=df.randomSplit([0.95,0.05])

In [8]:
handleInvalid = "keep"
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 5703
Test Dataset Count: 297


In [10]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
rfModel = rf.fit(df)

In [12]:
predictions = rfModel.transform(test)
predictions.show(10)

+------+-----+-------+----------+--------+--------+---------+-----------+--------+--------+---------+---------+---------+----------+--------------------+--------------------+--------------------+----------+
|Userid|label|trackid|albumscore|trackMax|trackMin|trackMean|artistscore|albumMax|albumMin|albumMean|genreamax|genreamin|genreamean|            features|       rawPrediction|         probability|prediction|
+------+-----+-------+----------+--------+--------+---------+-----------+--------+--------+---------+---------+---------+----------+--------------------+--------------------+--------------------+----------+
|200031|    0| 130183|         0|       0|       0|        0|          0|       0|       0|        0|        0|        0|         0|(14,[0,1],[200031...|[9.40370181881176...|[0.94037018188117...|       0.0|
|200031|    0| 198762|         0|       0|       0|        0|          0|       0|       0|        0|       90|       90|        90|(14,[0,1,11,12,13...|[9.16580016624963..

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Area Under ROC 1.0


In [14]:
df1 = spark.read.format("csv").option("sep",",").option("inferSchema","true").option("header","true").load("test_score_feature_final.txt")
df1.printSchema()

root
 |-- Userid: integer (nullable = true)
 |-- trackid: integer (nullable = true)
 |-- albumscore: integer (nullable = true)
 |-- trackMax: integer (nullable = true)
 |-- trackMin: integer (nullable = true)
 |-- trackMean: double (nullable = true)
 |-- artistscore: integer (nullable = true)
 |-- albumMax: integer (nullable = true)
 |-- albumMin: integer (nullable = true)
 |-- albumMean: double (nullable = true)
 |-- genreamax: integer (nullable = true)
 |-- genreamin: integer (nullable = true)
 |-- genreamean: double (nullable = true)



In [15]:
import pandas as pd
pd.DataFrame(df1.take(5), columns=df1.columns).transpose()
numeric_features = ['Userid','trackid','albumscore','trackMax','trackMin','trackMean','artistscore','albumMax','albumMin','albumMean','genreamax','genreamin','genreamean']

df1.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Userid,120000,224372.7714,14155.612277030954,199810,249010
trackid,120000,147871.92235833334,85421.19935293401,1,296099
albumscore,120000,23.685075,38.7678462791902,0,100
trackMax,120000,14.660025,32.924395344452066,0,100
trackMin,120000,12.207208333333334,29.352143918452313,0,100
trackMean,120000,13.612407135284117,30.990106545307405,0.0,100.0
artistscore,120000,35.016175,42.7330026359253,0,100
albumMax,120000,26.612633333333335,40.195132924332924,0,100
albumMin,120000,24.08203333333333,37.95270333713142,0,100


In [16]:
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.types as types
#df=df.withColumn('label',df['label'].cast(types.IntegerType()))
df=df.withColumn('Userid',df['Userid'].cast(types.IntegerType()))
df=df.withColumn('trackid',df['trackid'].cast(types.IntegerType()))
df=df.withColumn('albumscore',df['albumscore'].cast(types.IntegerType()))
df=df.withColumn('trackMax',df['trackMax'].cast(types.IntegerType()))
df=df.withColumn('trackMin',df['trackMin'].cast(types.IntegerType()))
df=df.withColumn('trackMean',df['trackMean'].cast(types.IntegerType()))
df=df.withColumn('artistscore',df['artistscore'].cast(types.IntegerType()))
df=df.withColumn('albumMax',df['albumMax'].cast(types.IntegerType()))
df=df.withColumn('albumMin',df['albumMin'].cast(types.IntegerType()))
df=df.withColumn('albumMean',df['albumMean'].cast(types.IntegerType()))
df=df.withColumn('genreamax',df['genreamax'].cast(types.IntegerType()))
df=df.withColumn('genreamin',df['genreamin'].cast(types.IntegerType()))
df=df.withColumn('genreamean',df['genreamean'].cast(types.IntegerType()))

assembler1=VectorAssembler(inputCols=['Userid','trackid','albumscore','trackMax','trackMin','trackMean','artistscore','albumMax','albumMin','albumMean','genreamax','genreamin','genreamean'],outputCol='features')

In [17]:
from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[assembler1])
model1=pipeline.fit(df1)
df1=model1.transform(df1)

In [18]:
predictions1 = rfModel.transform(df1)
predictions1.show(10)

+------+-------+----------+--------+--------+---------+-----------+--------+--------+---------+---------+---------+----------+--------------------+--------------------+--------------------+----------+
|Userid|trackid|albumscore|trackMax|trackMin|trackMean|artistscore|albumMax|albumMin|albumMean|genreamax|genreamin|genreamean|            features|       rawPrediction|         probability|prediction|
+------+-------+----------+--------+--------+---------+-----------+--------+--------+---------+---------+---------+----------+--------------------+--------------------+--------------------+----------+
|199810| 208019|         0|       0|       0|      0.0|          0|       0|       0|      0.0|        0|        0|       0.0|(13,[0,1],[199810...|[9.62060771992818...|[0.96206077199281...|       0.0|
|199810|  74139|         0|       0|       0|      0.0|          0|       0|       0|      0.0|       80|       80|      80.0|(13,[0,1,10,11,12...|          [10.0,0.0]|           [1.0,0.0]|       

In [23]:
predictions1.count()


120000

In [24]:
pre=predictions1.rdd.map(lambda x: x.prediction).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 59, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:156)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:148)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:156)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:148)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.ArrayIndexOutOfBoundsException
