In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator
from pyspark.sql.types import DecimalType, IntegerType, DateType
from pyspark.ml import Pipeline

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [3]:
df = spark.read.csv('./AirBnb_NYC_2019.csv',header='true',inferSchema='true')

In [4]:
df.head(1)

[Row(id='2539', name='Clean & quiet apt home by the park', host_id='2787', host_name='John', neighbourhood_group='Brooklyn', neighbourhood='Kensington', latitude='40.64749', longitude='-73.97237', room_type='Private room', price='149', minimum_nights='1', number_of_reviews='9', last_review='2018-10-19', reviews_per_month='0.21', calculated_host_listings_count='6', availability_365=365)]

In [5]:
df.select('availability_365').collect()

[Row(availability_365=365),
 Row(availability_365=355),
 Row(availability_365=365),
 Row(availability_365=194),
 Row(availability_365=0),
 Row(availability_365=129),
 Row(availability_365=0),
 Row(availability_365=220),
 Row(availability_365=0),
 Row(availability_365=188),
 Row(availability_365=6),
 Row(availability_365=39),
 Row(availability_365=314),
 Row(availability_365=333),
 Row(availability_365=0),
 Row(availability_365=46),
 Row(availability_365=321),
 Row(availability_365=12),
 Row(availability_365=21),
 Row(availability_365=249),
 Row(availability_365=0),
 Row(availability_365=347),
 Row(availability_365=364),
 Row(availability_365=304),
 Row(availability_365=233),
 Row(availability_365=85),
 Row(availability_365=0),
 Row(availability_365=75),
 Row(availability_365=311),
 Row(availability_365=67),
 Row(availability_365=355),
 Row(availability_365=255),
 Row(availability_365=284),
 Row(availability_365=359),
 Row(availability_365=269),
 Row(availability_365=340),
 Row(availabi

In [6]:
df = df.withColumn("latitude",df["latitude"].cast(DecimalType()))
df = df.withColumn("longitude",df["longitude"].cast(DecimalType()))
df = df.withColumn("price",df["price"].cast(IntegerType()))
df = df.withColumn("minimum_nights",df["minimum_nights"].cast(IntegerType()))
df = df.withColumn("number_of_reviews",df["number_of_reviews"].cast(IntegerType()))
df = df.withColumn("calculated_host_listings_count",df["calculated_host_listings_count"].cast(IntegerType()))
df = df.withColumn("last_review",df["last_review"].cast(DateType()))
df = df.withColumn("reviews_per_month",df["reviews_per_month"].cast(DecimalType()))
df = df.withColumn("availability_365",df["availability_365"].cast(IntegerType()))

In [7]:
df.columns

['id',
 'name',
 'host_id',
 'host_name',
 'neighbourhood_group',
 'neighbourhood',
 'latitude',
 'longitude',
 'room_type',
 'price',
 'minimum_nights',
 'number_of_reviews',
 'last_review',
 'reviews_per_month',
 'calculated_host_listings_count',
 'availability_365']

In [8]:
#creates indexes for each string
indexers = [StringIndexer(inputCol=col, 
                          outputCol=col+'_index').fit(df) for col in ['neighbourhood_group',
                                                                      'neighbourhood',
                                                                      'room_type']]
pipe = Pipeline(stages=indexers)
df = pipe.fit(df).transform(df)

#creates columns with 0 or 1 for each index
encoder = OneHotEncoderEstimator(inputCols=['neighbourhood_group_index','neighbourhood_index',
                                             'room_type_index'],
                                 outputCols=['neighbourhood_group_dummy','neighbourhood_dummy',
                                             'room_type_dummy'],
                                dropLast=True)
df = encoder.fit(df).transform(df)

df.dtypes

[('id', 'string'),
 ('name', 'string'),
 ('host_id', 'string'),
 ('host_name', 'string'),
 ('neighbourhood_group', 'string'),
 ('neighbourhood', 'string'),
 ('latitude', 'decimal(10,0)'),
 ('longitude', 'decimal(10,0)'),
 ('room_type', 'string'),
 ('price', 'int'),
 ('minimum_nights', 'int'),
 ('number_of_reviews', 'int'),
 ('last_review', 'date'),
 ('reviews_per_month', 'decimal(10,0)'),
 ('calculated_host_listings_count', 'int'),
 ('availability_365', 'int'),
 ('neighbourhood_group_index', 'double'),
 ('neighbourhood_index', 'double'),
 ('room_type_index', 'double'),
 ('neighbourhood_group_dummy', 'vector'),
 ('neighbourhood_dummy', 'vector'),
 ('room_type_dummy', 'vector')]

In [9]:
df.columns

['id',
 'name',
 'host_id',
 'host_name',
 'neighbourhood_group',
 'neighbourhood',
 'latitude',
 'longitude',
 'room_type',
 'price',
 'minimum_nights',
 'number_of_reviews',
 'last_review',
 'reviews_per_month',
 'calculated_host_listings_count',
 'availability_365',
 'neighbourhood_group_index',
 'neighbourhood_index',
 'room_type_index',
 'neighbourhood_group_dummy',
 'neighbourhood_dummy',
 'room_type_dummy']

In [12]:
target = 'price'
features = df.drop('id',
                     'name',
                     'host_id',
                     'host_name',
                     'neighbourhood_group_index',
                     'neighbourhood_index',
                     'room_type_index',
                     'neighbourhood_group',
                     'neighbourhood',
                     'room_type',
                  'price',
                  'last_review').columns

vector = VectorAssembler(inputCols= features,
                         outputCol='features')

vectorized_df = vector.transform(df)
vectorized_df.head(2)


[Row(id='2539', name='Clean & quiet apt home by the park', host_id='2787', host_name='John', neighbourhood_group='Brooklyn', neighbourhood='Kensington', latitude=Decimal('41'), longitude=Decimal('-74'), room_type='Private room', price=149, minimum_nights=1, number_of_reviews=9, last_review=datetime.date(2018, 10, 19), reviews_per_month=Decimal('0'), calculated_host_listings_count=6, availability_365=365, neighbourhood_group_index=1.0, neighbourhood_index=52.0, room_type_index=1.0, neighbourhood_group_dummy=SparseVector(76, {1: 1.0}), neighbourhood_dummy=SparseVector(381, {52: 1.0}), room_type_dummy=SparseVector(85, {1: 1.0}), features=SparseVector(549, {0: 41.0, 1: -74.0, 2: 1.0, 3: 9.0, 5: 6.0, 6: 365.0, 8: 1.0, 135: 1.0, 465: 1.0})),
 Row(id='2595', name='Skylit Midtown Castle', host_id='2845', host_name='Jennifer', neighbourhood_group='Manhattan', neighbourhood='Midtown', latitude=Decimal('41'), longitude=Decimal('-74'), room_type='Entire home/apt', price=225, minimum_nights=1, numb

In [13]:
train_data, test_data = vectorized_df.randomSplit([.75,.25])

lin_reg = LinearRegression(featuresCol='features', labelCol='price')

lin_reg.fit(train_data)

Py4JJavaError: An error occurred while calling o301.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 19, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<latitude_double_VectorAssembler_97a6581f9ce0:double,longitude_double_VectorAssembler_97a6581f9ce0:double,minimum_nights_double_VectorAssembler_97a6581f9ce0:double,number_of_reviews_double_VectorAssembler_97a6581f9ce0:double,reviews_per_month_double_VectorAssembler_97a6581f9ce0:double,calculated_host_listings_count_double_VectorAssembler_97a6581f9ce0:double,availability_365_double_VectorAssembler_97a6581f9ce0:double,neighbourhood_group_dummy:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,neighbourhood_dummy:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,room_type_dummy:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2552)
	at org.apache.spark.sql.Dataset.first(Dataset.scala:2559)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1.apply(LinearRegression.scala:321)
	at org.apache.spark.ml.regression.LinearRegression$$anonfun$train$1.apply(LinearRegression.scala:319)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:319)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:176)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<latitude_double_VectorAssembler_97a6581f9ce0:double,longitude_double_VectorAssembler_97a6581f9ce0:double,minimum_nights_double_VectorAssembler_97a6581f9ce0:double,number_of_reviews_double_VectorAssembler_97a6581f9ce0:double,reviews_per_month_double_VectorAssembler_97a6581f9ce0:double,calculated_host_listings_count_double_VectorAssembler_97a6581f9ce0:double,availability_365_double_VectorAssembler_97a6581f9ce0:double,neighbourhood_group_dummy:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,neighbourhood_dummy:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,room_type_dummy:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 22 more
