In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, SQLContext


In [3]:
spark = SparkSession.builder.appName("Pankaj").master("local[*]").config("spark.hive.query.result.fileformat", "SequenceFile").enableHiveSupport().getOrCreate()
#spark = SparkSession.builder.appName("Pankaj").master("yarn").config("spark.deploy-mode","cluster").config("spark.hive.query.result.fileformat", "SequenceFile").enableHiveSupport().getOrCreate()

In [4]:
#spark.sql("CREATE DATABASE IF NOT EXISTS first_db COMMENT 'This is first database' LOCATION 'F:/Study/PySpark_Python_Project/data/database/first_db.db'")
spark.sql("show databases").toPandas()


Unnamed: 0,namespace
0,default
1,first_db


In [53]:
#spark.sql("use first_db").show()

spark.sql('select current_date(), current_timestamp(), version()').show(truncate=False)

+--------------+-----------------------+----------------------------------------------+
|current_date()|current_timestamp()    |sparkversion()                                |
+--------------+-----------------------+----------------------------------------------+
|2020-04-27    |2020-04-27 15:41:24.015|3.0.0 bcadd5c3096109878fe26fb0d57a9b7d6fdaa257|
+--------------+-----------------------+----------------------------------------------+



In [6]:
housing = spark.read\
.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./data/SanFrancisco_Housing_Data")

In [7]:
housing.limit(2).toPandas()

Unnamed: 0,_c0,address,info,z_address,bathrooms,bedrooms,finishedsqft,lastsolddate,lastsoldprice,latitude,longitude,neighborhood,totalrooms,usecode,yearbuilt,zestimate,zindexvalue,zipcode,zpid
0,2,Address: 1160 Mission Street #2007,San FranciscoSales price: 1300000Sales date: ...,1160 Mission St UNIT 2007,2.0,2.0,1043.0,02/17/2016,1300000.0,37.778705,-122.412635,South of Market,4.0,Condominium,2007.0,1167508.0,975700,94103.0,83152781.0
1,5,Address: 260 King Street #475,San FranciscoSales price: 750000Sales date: 0...,260 King St UNIT 475,1.0,1.0,903.0,02/17/2016,750000.0,37.777641,-122.393417,South of Market,3.0,Condominium,2004.0,823719.0,975700,94107.0,69819817.0


In [8]:
housing.count()

11330

In [9]:
housing = housing.where('not (bedrooms < 3 and totalrooms > 14)').where('not (bedrooms < 3 and finishedsqft > 10000)')

In [10]:
housing.count()

11314

In [33]:
from pyspark.sql.functions import unix_timestamp
housing_dateint = housing.withColumn("lastsolddateint", unix_timestamp("lastsolddate","MM/dd/yyyy"))

In [34]:
housing_dateint[["lastsolddateint"]].show(5)

+---------------+
|lastsolddateint|
+---------------+
|     1455647400|
|     1455647400|
|     1455647400|
|     1455647400|
|     1455647400|
+---------------+
only showing top 5 rows



In [12]:
def drop_geog(data, keep=[]):
    remove_list = ["info","address","z_address","longitude","latitude","neighborhood", "lastsolddate","zipcode","zpid","usecode", "zestimate","zindexvalue"]
    for element in keep:
        remove_list.remove(element)

    return data.drop(*remove_list)

In [36]:
housing_dropgeo = drop_geog(housing_dateint)

In [35]:
from pyspark.ml.feature import VectorAssembler
def train_test_split(data):
    
    assembler = VectorAssembler()\
       .setInputCols(data.drop("lastsoldprice").columns)\
       .setOutputCol("features")\
       .setHandleInvalid("keep")
    
    train, test = data.randomSplit([0.8, 0.2], 30)
    return assembler.transform(train), assembler.transform(test)


In [37]:
train, test = train_test_split(housing_dropgeo)

In [38]:
test.show(5, truncate=False)

+---+---------+--------+------------+-------------+----------+---------+---------------+--------------------------------------------+
|_c0|bathrooms|bedrooms|finishedsqft|lastsoldprice|totalrooms|yearbuilt|lastsolddateint|features                                    |
+---+---------+--------+------------+-------------+----------+---------+---------------+--------------------------------------------+
|9  |3.0      |3.0     |2231.0      |2700000.0    |10.0      |1927.0   |1455647400     |[9.0,3.0,3.0,2231.0,10.0,1927.0,1.4556474E9]|
|11 |3.0      |3.0     |1300.0      |1530000.0    |4.0       |1900.0   |1455647400     |[11.0,3.0,3.0,1300.0,4.0,1900.0,1.4556474E9]|
|14 |1.0      |2.0     |1200.0      |1050000.0    |5.0       |1924.0   |1455647400     |[14.0,1.0,2.0,1200.0,5.0,1924.0,1.4556474E9]|
|26 |2.0      |2.0     |1688.0      |1000000.0    |6.0       |1927.0   |1455561000     |[26.0,2.0,2.0,1688.0,6.0,1927.0,1.455561E9] |
|43 |1.0      |3.0     |1771.0      |951000.0     |8.0       |

In [39]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression().setLabelCol("lastsoldprice").setFeaturesCol("features")

lrModel = lr.fit(train)

predictions = lrModel.transform(test)

rmse = RegressionEvaluator()\
  .setLabelCol("lastsoldprice")\
  .setPredictionCol("prediction")\
  .setMetricName("rmse")


r2 = RegressionEvaluator()\
  .setLabelCol("lastsoldprice")\
  .setPredictionCol("prediction")\
  .setMetricName("r2")
    
print("Root Mean Squared Error (RMSE) on test data = ", rmse.evaluate(predictions))
print("R^2 on test data = ", r2.evaluate(predictions))


Root Mean Squared Error (RMSE) on test data =  703427.7926049334
R^2 on test data =  0.5573530955962898


In [58]:
from pyspark.ml.linalg import Vector
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

def train_eval(
    predictor, paramMap, train, test):
    cv = CrossValidator()\
      .setEstimator( predictor\
                    .setLabelCol("lastsoldprice")\
                    .setFeaturesCol("features"))\
      .setEvaluator(RegressionEvaluator()\
          .setLabelCol("lastsoldprice")\
          .setPredictionCol("prediction")\
          .setMetricName("rmse"))\
      .setEstimatorParamMaps(paramMap)\
      .setNumFolds(5)\
      .setParallelism(2)
    cvModel = cv.fit(train)
    predictions = cvModel.transform(test)
    
    print("Root Mean Squared Error (RMSE) on test data = ", rmse.evaluate(predictions))
    print("R^2 on test data = ", r2.evaluate(predictions))
    bestModel = cvModel.bestModel
    
    print(bestModel.extractParamMap)
    
    return bestModel

In [60]:
lr = LinearRegression()
lrParamMap = ParamGridBuilder()\
    .addGrid(lr.regParam, [10.0, 1.0, 0.1, 0.01, 0.001])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(lr.maxIter, [10000, 250000])\
    .build()
train_eval(lr, lrParamMap, train, test)

Root Mean Squared Error (RMSE) on test data =  703431.4644844568
R^2 on test data =  0.5573484743676047
<bound method Params.extractParamMap of LinearRegressionModel: uid=LinearRegression_b4939bfd0c67, numFeatures=7>


LinearRegressionModel: uid=LinearRegression_b4939bfd0c67, numFeatures=7

In [62]:
from pyspark.ml.regression import DecisionTreeRegressor
decisionTree = DecisionTreeRegressor()
dtParamMap = ParamGridBuilder().build()
train_eval(decisionTree, dtParamMap, train, test)

Py4JJavaError: An error occurred while calling o34794.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 805.0 failed 1 times, most recent failure: Lost task 0.0 in stage 805.0 (TID 805, Shriprasad, executor driver): java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\25\shuffle_4_805_0.data.0641ee9d-b23b-48fe-8a90-52b790c10d1b to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\25\shuffle_4_805_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:588)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:226)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.$anonfun$train$1(DecisionTreeRegressor.scala:128)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:117)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:47)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:152)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:116)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\25\shuffle_4_805_0.data.0641ee9d-b23b-48fe-8a90-52b790c10d1b to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\25\shuffle_4_805_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [63]:
from pyspark.ml.regression import RandomForestRegressor
randomForest = RandomForestRegressor()
rfParamMap = ParamGridBuilder()\
    .addGrid(randomForest.maxBins, [4, 16, 32, 64])\
    .addGrid(randomForest.numTrees, [1, 10, 100])\
    .addGrid(randomForest.maxDepth, [2, 5, 10])\
    .build()

train_eval(randomForest, rfParamMap, train, test)

Py4JJavaError: An error occurred while calling o35107.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 813.0 failed 1 times, most recent failure: Lost task 0.0 in stage 813.0 (TID 811, Shriprasad, executor driver): java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\22\shuffle_6_811_0.data.07c080a0-66e3-4bab-a448-9fe023a1ebb3 to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\22\shuffle_6_811_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedData(BypassMergeSortShuffleWriter.java:222)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:167)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplitsBySorting(RandomForest.scala:960)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplits(RandomForest.scala:941)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:142)
	at org.apache.spark.ml.regression.RandomForestRegressor.$anonfun$train$1(RandomForestRegressor.scala:131)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:117)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:44)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:152)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:116)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\22\shuffle_6_811_0.data.07c080a0-66e3-4bab-a448-9fe023a1ebb3 to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\22\shuffle_6_811_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedData(BypassMergeSortShuffleWriter.java:222)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:167)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [65]:
from pyspark.ml.regression import GBTRegressor
gradientBoost = GBTRegressor()
gbParamMap = ParamGridBuilder()\
    .addGrid(randomForest.maxBins, [16, 32])\
    .addGrid(randomForest.numTrees, [5, 10, 100])\
    .addGrid(randomForest.maxDepth, [5, 10])\
    .addGrid(randomForest.minInfoGain, [0.0, 0.1, 0.5])\
    .build()
train_eval(gradientBoost, gbParamMap, train, test)

Py4JJavaError: An error occurred while calling o41827.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1125.0 (TID 1094, Shriprasad, executor driver): java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\08\shuffle_121_1094_0.data.9b0d1f85-9ddc-49da-aee6-bcb79abcf83a to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\08\shuffle_121_1094_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:588)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:226)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.$anonfun$train$2(DecisionTreeRegressor.scala:143)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:137)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:346)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:51)
	at org.apache.spark.ml.regression.GBTRegressor.$anonfun$train$1(GBTRegressor.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.GBTRegressor.train(GBTRegressor.scala:168)
	at org.apache.spark.ml.regression.GBTRegressor.train(GBTRegressor.scala:58)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:152)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:116)
	at sun.reflect.GeneratedMethodAccessor298.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\08\shuffle_121_1094_0.data.9b0d1f85-9ddc-49da-aee6-bcb79abcf83a to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\08\shuffle_121_1094_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [66]:
housing_neighborhood = drop_geog(housing_dateint, ["neighborhood"])

In [71]:
housing_neighborhood.limit(5).toPandas()

Unnamed: 0,_c0,bathrooms,bedrooms,finishedsqft,lastsoldprice,neighborhood,totalrooms,yearbuilt,lastsolddateint
0,2,2.0,2.0,1043.0,1300000.0,South of Market,4.0,2007.0,1455647400
1,5,1.0,1.0,903.0,750000.0,South of Market,3.0,2004.0,1455647400
2,7,4.0,3.0,1425.0,1495000.0,Potrero Hill,6.0,2003.0,1455647400
3,9,3.0,3.0,2231.0,2700000.0,Potrero Hill,10.0,1927.0,1455647400
4,11,3.0,3.0,1300.0,1530000.0,Bernal Heights,4.0,1900.0,1455647400


In [76]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer().setInputCol("neighborhood").setOutputCol("neighborhoodIndex")
encoder = OneHotEncoder()\
  .setInputCols(["neighborhoodIndex"])\
  .setOutputCols(["neighborhoodVector"])
pipeline = Pipeline().setStages([indexer, encoder])
housingEncoded = pipeline.fit(housing_neighborhood).transform(housing_neighborhood)\
.drop("neighborhoodIndex")\
.drop("neighborhood")

In [77]:
train_neighborhood, test_neighborhood = train_test_split(housingEncoded)

In [78]:
train_eval(lr, lrParamMap, train_neighborhood, test_neighborhood)

Root Mean Squared Error (RMSE) on test data =  629764.7059212242
R^2 on test data =  0.6452070185645484
<bound method Params.extractParamMap of LinearRegressionModel: uid=LinearRegression_b4939bfd0c67, numFeatures=77>


LinearRegressionModel: uid=LinearRegression_b4939bfd0c67, numFeatures=77

In [79]:
train_eval(decisionTree, dtParamMap, train_neighborhood, test_neighborhood)

Py4JJavaError: An error occurred while calling o81311.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2192.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2192.0 (TID 2125, Shriprasad, executor driver): java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_236_2125_0.data.2eca8a15-4581-41aa-887f-1eaf5dd92f11 to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_236_2125_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:588)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:226)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.$anonfun$train$1(DecisionTreeRegressor.scala:128)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:117)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:47)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:152)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:116)
	at sun.reflect.GeneratedMethodAccessor298.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_236_2125_0.data.2eca8a15-4581-41aa-887f-1eaf5dd92f11 to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_236_2125_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [80]:
train_eval(randomForest, rfParamMap, train_neighborhood, test_neighborhood)

Py4JJavaError: An error occurred while calling o81521.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2212.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2212.0 (TID 2144, Shriprasad, executor driver): java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_244_2144_0.data.57aec1d7-2ea5-4d2a-80a8-863803211ddf to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_244_2144_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:588)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:226)
	at org.apache.spark.ml.regression.RandomForestRegressor.$anonfun$train$1(RandomForestRegressor.scala:131)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:117)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:44)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:152)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:116)
	at sun.reflect.GeneratedMethodAccessor298.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: fail to rename file C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_244_2144_0.data.57aec1d7-2ea5-4d2a-80a8-863803211ddf to C:\Users\aaa\AppData\Local\Temp\blockmgr-4ad1c16d-c1d0-4080-a068-4b6a3fbc0d82\15\shuffle_244_2144_0.data
	at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:207)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions(LocalDiskShuffleMapOutputWriter.java:115)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [84]:
import sys
try:
    train_eval(gradientBoost, gbParamMap, train_neighborhood, test_neighborhood)
except:
    print(sys.exc_info())


(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while calling o100540.fit.\n', JavaObject id=o100911), <traceback object at 0x0B16E4E0>)
