# AirBNB 

Adapted from Demo here
* https://github.com/combust/mleap-demo/blob/master/notebooks/PySpark%20-%20AirBnb.ipynb
* https://combust.github.io/mleap-docs/py-spark/

In [1]:
# download sample data file
!wget -nc -O airbnb.csv https://github.com/combust/mleap-demo/blob/master/data/airbnb.csv?raw=true

File ‘airbnb.csv’ already there; not retrieving.


## Import MLeap from PIP

In [2]:
!pip install mleap==0.18.0 --no-deps scikit-learn

#mleap==0.18.0 requires, scikit-learn<0.23.0,>=0.22.0 -- but has 0.24.0 installed

Collecting mleap==0.18.0
  Using cached mleap-0.18.0-py3-none-any.whl (59 kB)
Installing collected packages: mleap
Successfully installed mleap-0.18.0


In [3]:
### !!! IMPORTANT: Uses pyspark from mleap package !!! ###

from mleap import pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

In [4]:
### !!! IMPORTANT: Import MLeap Jars, or serialization will fail !!! ###

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('mleap') \
                    .config('spark.jars.packages', 'ml.combust.mleap:mleap-spark_2.12:0.18.0,ml.combust.mleap:mleap-runtime_2.12:0.18.0')\
                    .getOrCreate()

Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
ml.combust.mleap#mleap-spark_2.12 added as a dependency
ml.combust.mleap#mleap-runtime_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8bfe831d-54cf-4b1c-b8de-6261ec698abf;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found ml.combust.mleap#mleap-spark_2.12;0.18.0 in central
	found ml.combust.mleap#mleap-spark-base_2.12;0.18.0 in central
	found ml.combust.mleap#mleap-runtime_2.12;0.18.0 in central
	found ml.combust.mleap#mleap-core_2.12;0.18.0 in central
	found ml.combust.mleap#mleap-base_2.12;0.18.0 in central
	found ml.combust.mleap#mleap-tensor_2.12;0.18.0 in central
	found io.spray#spray-json_2.12;1.3.2 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found org.scalanlp#breeze_2.12;1.0 in central
	found org.scalanlp#breeze-macros_2.12;1.0 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sourceforge.f2j#arpack_combined_all;0.1 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.wendykierp#JTransforms;3.1 in central
	found pl.edu.icm#JLargeArrays;1.5 in central
	found org.apache.commons#commons-math3;3.2 in central
	found com.chuusai#shapeless_2.12;2.3.3 in central
	found org.typelevel#macro-compat_2.12;1.1.1 in central
	found org.slf

## Prepare Data in Spark

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType

schema = StructType([ \
    StructField("id",IntegerType(),True), \
    StructField("name",StringType(),True), \
    StructField("price",DoubleType(),True), \
    StructField("bedrooms", DoubleType(), True), \
    StructField("bathrooms", DoubleType(), True), \
    StructField("room_type", StringType(), True), \
    StructField("square_feet", DoubleType(), True), \
    StructField("host_is_superhost", DoubleType(), True), \
    StructField("state", StringType(), True), \
    StructField("cancellation_policy", StringType(), True), \
    StructField("security_deposit", DoubleType(), True), \
    StructField("cleaning_fee", DoubleType(), True), \
    StructField("extra_people", DoubleType(), True), \
    StructField("number_of_reviews", IntegerType(), True), \
    StructField("price_per_bedroom", DoubleType(), True), \
    StructField("review_scores_rating", DoubleType(), True), \
    StructField("instant_bookable", DoubleType(), True) \
])
 

df = spark.read.option("header", True).schema(schema).csv("airbnb.csv")

In [6]:
df.head()

Row(id=1949687, name='Delectable Victorian Flat for two', price=80.0, bedrooms=1.0, bathrooms=1.0, room_type='Entire home/apt', square_feet=None, host_is_superhost=0.0, state='London', cancellation_policy='moderate', security_deposit=100.0, cleaning_fee=20.0, extra_people=10.0, number_of_reviews=8, price_per_bedroom=80.0, review_scores_rating=94.0, instant_bookable=0.0)

In [7]:
datasetFiltered = df.filter("price >= 50 AND price <= 750 and bathrooms > 0.0")
print(df.count())
print(datasetFiltered.count())

                                                                                

389255
321588


In [8]:
datasetFiltered.registerTempTable("df")

In [9]:
datasetImputed = spark.sql("""
    select
        id,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        price,
        bathrooms,
        bedrooms,
        room_type,
        host_is_superhost,
        cancellation_policy,
        case when security_deposit is null
            then 0.0
            else security_deposit
        end as security_deposit,
        price_per_bedroom,
        case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as number_of_reviews,
        case when extra_people is null
            then 0.0
            else extra_people
        end as extra_people,
        instant_bookable,
        case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as cleaning_fee,
        case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as review_scores_rating,
        case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as square_feet,
        case when bathrooms >= 2
            then 1.0
            else 0.0
        end as n_bathrooms_more_than_two
    from df
    where bedrooms is not null
""")


datasetImputed.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|       square_feet|             price|          bedrooms|         bathrooms|     cleaning_fee|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|            321588|            321588|            321588|            321588|           321588|
|   mean| 546.7441757777032|131.54961006007687|1.3352426085550455| 1.199068373198005|37.64188340360959|
| stddev|363.39839582374066| 90.10912788720098|0.8466586601060778|0.4830590051262673|42.64237791484579|
|    min|             104.0|              50.0|               0.0|               0.5|              0.0|
|    max|           32292.0|             750.0|              10.0|               8.0|            700.0|
+-------+------------------+------------------+------------------+------------------+-----------------+



## Explore Data

In [10]:
spark.sql("""
    select 
        state,
        count(*) as n,
        cast(avg(price) as decimal(12,2)) as avg_price,
        max(price) as max_price
    from df
    group by state
    order by count(*) desc
""").show()



+-------------+-----+---------+---------+
|        state|    n|avg_price|max_price|
+-------------+-----+---------+---------+
|           NY|48362|   146.75|    750.0|
|           CA|44716|   158.76|    750.0|
|Île-de-France|40732|   107.74|    750.0|
|       London|17542|   117.72|    750.0|
|          NSW|14416|   167.96|    750.0|
|       Berlin|13098|    81.01|    650.0|
|Noord-Holland| 8890|   128.56|    750.0|
|          VIC| 8636|   144.49|    750.0|
|North Holland| 7636|   134.60|    700.0|
|           IL| 7544|   141.85|    750.0|
|           ON| 7186|   129.05|    750.0|
|           TX| 6702|   196.59|    750.0|
|           WA| 5858|   132.48|    750.0|
|    Catalonia| 5748|   106.39|    720.0|
|           BC| 5522|   133.14|    750.0|
|           DC| 5476|   136.56|    720.0|
|       Québec| 5116|   104.98|    700.0|
|    Catalunya| 4570|    99.36|    675.0|
|       Veneto| 4486|   131.71|    700.0|
|           OR| 4330|   114.02|    700.0|
+-------------+-----+---------+---

                                                                                

## Define Features

In [11]:
continuous_features = ["bathrooms", "bedrooms", "security_deposit", "cleaning_fee", "extra_people", "number_of_reviews", "square_feet", "review_scores_rating"]
categorical_features = ["room_type", "host_is_superhost", "cancellation_policy", "instant_bookable", "state"]

all_features = continuous_features + categorical_features

In [12]:
dataset_imputed = datasetImputed.persist()
[training_dataset, validation_dataset] = dataset_imputed.randomSplit([0.7, 0.3])

## Create Pipeline

In [13]:
continuous_feature_assembler= VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")
continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features",\
                                           withStd=True, withMean=False)


categorical_feature_indexers = [StringIndexer(inputCol=x, outputCol="{}_index".format(x)) for x in categorical_features]
#categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), outputCol="oh_encoder_{}".format(x.getOutputCol() )) for x in categorical_feature_indexers]


estimatorsLr = [continuous_feature_assembler, continuous_feature_scaler] + categorical_feature_indexers #+ categorical_feature_one_hot_encoders

#OHE does not work when serializing model
"""
Py4JJavaError: An error occurred while calling o1301.serializeToBundle.
: java.util.NoSuchElementException: Failed to find a default value for inputCols
	at org.apache.spark.ml.param.Params.$anonfun$getOrDefault$2(params.scala:756)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.ml.param.Params.getOrDefault(params.scala:756)
	at org.apache.spark.ml.param.Params.getOrDefault$(params.scala:753)
	at org.apache.spark.ml.PipelineStage.getOrDefault(Pipeline.scala:41)
	at org.apache.spark.ml.param.Params.$(params.scala:762)
	at org.apache.spark.ml.param.Params.$$(params.scala:762)
	at org.apache.spark.ml.PipelineStage.$(Pipeline.scala:41)
	at org.apache.spark.ml.param.shared.HasInputCols.getInputCols(sharedParams.scala:225)
	at org.apache.spark.ml.param.shared.HasInputCols.getInputCols$(sharedParams.scala:225)
	at org.apache.spark.ml.feature.OneHotEncoderModel.getInputCols(OneHotEncoder.scala:226)
	at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:47)
	at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:37)
	at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83)
	at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81)
	at ml.combust.bundle.serializer.GraphSerializer.$anonfun$writeNode$1(GraphSerializer.scala:34)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30)
	at ml.combust.bundle.serializer.GraphSerializer.$anonfun$write$2(GraphSerializer.scala:21)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
	at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:21)
	at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21)
	at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14)
	at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83)
	at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81)
	at ml.combust.bundle.serializer.GraphSerializer.$anonfun$writeNode$1(GraphSerializer.scala:34)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30)
	at ml.combust.bundle.serializer.GraphSerializer.$anonfun$write$2(GraphSerializer.scala:21)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
	at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:21)
	at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21)
	at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14)
	at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83)
	at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81)
	at ml.combust.bundle.serializer.BundleSerializer.$anonfun$write$1(BundleSerializer.scala:34)
	at scala.util.Try$.apply(Try.scala:213)
	at ml.combust.bundle.serializer.BundleSerializer.write(BundleSerializer.scala:29)
	at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:34)
	at ml.combust.mleap.spark.SimpleSparkSerializer.$anonfun$serializeToBundleWithFormat$4(SimpleSparkSerializer.scala:26)
	at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
	at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252)
	at scala.util.control.Exception$Catch.apply(Exception.scala:228)
	at scala.util.control.Exception$Catch.either(Exception.scala:252)
	at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
	at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26)
	at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26)
	at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
	at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33)
	at scala.util.Try$.apply(Try.scala:213)
	at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
	at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundleWithFormat(SimpleSparkSerializer.scala:25)
	at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundle(SimpleSparkSerializer.scala:17)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)

"""

featurePipeline = Pipeline(stages=estimatorsLr)

sparkFeaturePipelineModel = featurePipeline.fit(dataset_imputed)

                                                                                

## Train Model

In [14]:
linearRegression = LinearRegression(featuresCol="scaled_continuous_features", labelCol="price", predictionCol="price_prediction", maxIter=10, regParam=0.3, elasticNetParam=0.8)

pipeline_lr = [sparkFeaturePipelineModel] + [linearRegression]

sparkPipelineEstimatorLr = Pipeline(stages = pipeline_lr)

sparkPipelineLr = sparkPipelineEstimatorLr.fit(dataset_imputed)

21/09/19 18:12:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/09/19 18:12:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [15]:
logisticRegression = LogisticRegression(featuresCol="scaled_continuous_features", labelCol="n_bathrooms_more_than_two", predictionCol="n_bathrooms_more_than_two_prediction", maxIter=10)

pipeline_log_r = [sparkFeaturePipelineModel] + [logisticRegression]

sparkPipelineEstimatorLogr = Pipeline(stages = pipeline_log_r)

sparkPipelineLogr = sparkPipelineEstimatorLogr.fit(dataset_imputed)

                                                                                

## Serialize / Deserialize

In [16]:
import os
pwd = os.getcwd()

sparkPipelineLr.serializeToBundle(f"jar:file:{pwd}/models/airbnb.lr.zip", sparkPipelineLr.transform(dataset_imputed))
sparkPipelineLogr.serializeToBundle(f"jar:file:{pwd}/models/airbnb.logr.zip", dataset=sparkPipelineLogr.transform(dataset_imputed))

In [17]:
# Try loading the saved model

sparkPipelineLrLoad = PipelineModel.deserializeFromBundle(f"jar:file:{pwd}/models/airbnb.lr.zip")
sparkPipelineLogrLoad = PipelineModel.deserializeFromBundle(f"jar:file:{pwd}/models/airbnb.logr.zip")