# Airbnb price analysis and prediction in Seattle

Group 6
- Jian Jian
- Tiehao Chen
- Chenghao Wu
- Yueyuan He

In [3]:
!pip install langdetect
!pip install nltk
!pip install wordcloud
!pip install singleton_decorator

In [4]:
# Do not delete or change this cell

# grading import statements
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.master("local[*]").config("spark.sql.crossJoin.enabled","true").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
# Correct Usage Example (pass ONLY the full file name):
#   file_name_to_load = get_training_filename("sms_spam.csv") # correct - pass ONLY the full file name  
#   
# Incorrect Usage Example
#   file_name_to_load = get_training_filename("/sms_spam.csv") # incorrect - pass ONLY the full file name
#   file_name_to_load = get_training_filename("sms_spam.csv/") # incorrect - pass ONLY the full file name
#   file_name_to_load = get_training_filename("c:/users/will/data/sms_spam.csv") incorrect -pass ONLY the full file name
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

Load Module

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, isnull, when, count, col
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, Transformer, Estimator
from pyspark.sql.functions import regexp_replace

Create connect to spark

In [8]:
import csv
import pandas as pd

from collections.abc import Iterable
import pyspark.sql.functions as fn

seed = 77

def load_data(file_name, sampling=False):
  df = spark.read\
        .option("header", "true")\
        .option("multiLine", "true")\
        .option('inferSchema', 'true')\
        .option('escape', '"')\
        .csv(get_training_filename(file_name))
  if sampling:
    df = df.sample(True, .2, seed=seed)
  return df
        

def split_data(raw_data, training_rate=0.7):
  train_data, test_data = raw_data.randomSplit([training_rate, 1-training_rate])
  return (train_data, test_data)

def shape(df):
  return (df.count(), len(df.columns))

Create data object, fill the raw/train/test data.

# EDA and Data Imputation

We will explore the whole dataset during the EDA.

In [12]:
from pyspark.sql.dataframe import DataFrame

df = load_data('listings.csv')


## Data Exploration

In [14]:
df_reviews = load_data('reviews.csv')

### Who likes Airbnb most?

In [16]:
most_person = df_reviews.groupBy('reviewer_name').count().sort("count", ascending=False)
most_person.show()

In [17]:
fig = plt.figure(figsize=(12, 3), dpi=100)
sns.barplot("reviewer_name","count",palette="RdBu_r",data=most_person.toPandas().head(20))
plt.xticks(rotation=45)
plt.show()
display()

### Which house is the most popular？

In [19]:
most_living = df_reviews.groupBy('listing_id').count().sort("count", ascending=False)
most_living.show()

In [20]:
most_living_df  = most_living.toPandas()
fig = plt.figure(figsize=(12, 3), dpi=100)
sns.barplot("listing_id","count",palette="husl",data=most_living_df.head(20),order=None)
plt.xticks(rotation=45)
plt.show()
display()

### Whose house is the most popular？

In [22]:
df2 = df.select(['id','listing_url','host_name','neighbourhood_group_cleansed'])
df2 = df2.withColumn('listing_id', fn.regexp_replace(fn.col('listing_url'), "https://www.airbnb.com/rooms/" , '' ).cast('int'))
most_living = most_living.join(df2,"listing_id", "left_outer")

In [23]:
fig = plt.figure(figsize=(12, 6), dpi=100)
sns.barplot(y = "host_name",x = "count",palette="husl",data=most_living.toPandas().head(20))
plt.xticks(rotation=45)
plt.show()
display()

### Where is the most popular house？

In [25]:
location_groupby = most_living.groupby('neighbourhood_group_cleansed').count()
location_groupby_sorted = location_groupby.orderBy('count', ascending=False)
fig = plt.figure(figsize=(12, 3), dpi=100)
sns.barplot(x = 'neighbourhood_group_cleansed', y = 'count',data=location_groupby_sorted.toPandas(),saturation=1)
plt.xticks(rotation=45)
plt.show()
display()

## Numerical Data Cleaning

### Convert values in columns from string to number

- Display the type of each column

In [29]:
df.printSchema()

In [30]:
shape(df)

- It is a 9023 by 106 data dataset. The dataset are comprised of categorical(ordinal, nominal) variables, numerical(integer, float) variables, timestamp variable, and boolean variables. 
- Some of the features are loaded with incorrect format. 
- The follow up process will impute the problematic data into a purified dataset by using reg expression, winsorizing, and logarthim transformation.
- The review comments resides in some of the variables. This analysis will foucs on quantitative reasoning. So we will disregard those features. 
- There are 106 columns in the dataset. According to the 'curse of the dimensionality', feature selection is necessary. After data cleaning, Lasso regression and Random Forest will be applied to the dataset to select the important feature as the columns of the training set. 
- Since the dataset is huge in terms of the number of feature, we will sampling a subset to explore the data space

In [32]:
df = load_data('listings.csv', True)
shape(df)

- Select boolean columns

In [34]:
bool_columns = ['host_is_superhost', 
                    'host_has_profile_pic', 
                    'host_identity_verified', 
                    'is_location_exact', 
                    'has_availability', 
                    'requires_license', 
                    'instant_bookable', 
                    'is_business_travel_ready', 
                    'require_guest_profile_picture', 
                    'require_guest_phone_verification']
df_bool = df.select(bool_columns)
df_bool.limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
host_is_superhost,f,f,t,t,t
host_has_profile_pic,t,t,t,t,t
host_identity_verified,t,t,f,f,f
is_location_exact,t,t,f,f,t
has_availability,t,t,t,t,t
requires_license,t,t,t,t,t
instant_bookable,f,f,f,f,t
is_business_travel_ready,f,f,f,f,f
require_guest_profile_picture,f,f,f,f,f
require_guest_phone_verification,f,f,f,f,f


- Replace t to 1, and f to 0 respectively

In [36]:
from singleton_decorator import singleton
from pyspark.sql import functions as fn
from pyspark.sql import types as t


@singleton
class BooleanConverter(Transformer):
  def __init__(self):
    self._bool_columns = ['host_is_superhost', 
                    'host_has_profile_pic', 
                    'host_identity_verified', 
                    'is_location_exact', 
                    'has_availability', 
                    'requires_license', 
                    'instant_bookable', 
                    'is_business_travel_ready', 
                    'require_guest_profile_picture', 
                    'require_guest_phone_verification']
    self._bool_dict = {'t': 1, 'f': 0}
    def bool_map(x):
      if x in self._bool_dict.keys():
        return self._bool_dict[x]
      return x
    self._bool_encode_udf = fn.udf(bool_map, t.IntegerType())

  @property
  def bool_columns(self):
    return self._bool_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df
    for col_name in self._bool_columns:
      if col_name in result.columns:
        result = result.withColumn(col_name, self._bool_encode_udf(fn.col(col_name)))
    return result

bool_converter = BooleanConverter()
wrangling_pipeline = Pipeline(stages=[bool_converter])
result = wrangling_pipeline.fit(df).transform(df)

result.select([fn.col(col_name) for col_name in bool_converter.bool_columns]).limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
host_is_superhost,0,0,1,1,1
host_has_profile_pic,1,1,1,1,1
host_identity_verified,1,1,0,0,0
is_location_exact,1,1,0,0,1
has_availability,1,1,1,1,1
requires_license,1,1,1,1,1
instant_bookable,0,0,0,0,1
is_business_travel_ready,0,0,0,0,0
require_guest_profile_picture,0,0,0,0,0
require_guest_phone_verification,0,0,0,0,0


### Convert values in columns from formatted string to number

- Select price formatted columns

In [39]:
price_columns = ['extra_people', 
                 'price']
df_price = df.select(price_columns)
df_price.limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
extra_people,$15.00,$15.00,$5.00,$5.00,$0.00
price,$120.00,$60.00,$32.00,$32.00,$105.00


- Reformat the currency formattet to number

In [41]:
@singleton
class CurrencyConverter(Transformer):
  def __init__(self):
    self._price_columns = [
                           'extra_people', 
                           'price', 
                           'weekly_price', 
                           'monthly_price', 
                           'security_deposit', 
                           'cleaning_fee', 
                           ]
  @property
  def price_columns(self):
    return self._price_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df
    for col_name in self._price_columns:
      if col_name in result.columns:
        result = result.withColumn(col_name, fn.regexp_replace(fn.col(col_name), "\$|," , '' ).cast('double'))
    return result


currency_converter = CurrencyConverter()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter])
result = wrangling_pipeline.fit(df).transform(df)

result.select([fn.col(col_name) for col_name in currency_converter.price_columns]).limit(5).toPandas().T


Unnamed: 0,0,1,2,3,4
extra_people,15.0,15.0,5.0,5.0,0.0
price,120.0,60.0,32.0,32.0,105.0
weekly_price,725.0,375.0,385.0,385.0,630.0
monthly_price,,1000.0,950.0,950.0,2500.0
security_deposit,200.0,0.0,500.0,500.0,300.0
cleaning_fee,85.0,10.0,20.0,20.0,50.0


## Revise incorrect string-type columns into numerical one

In [43]:
@singleton
class RateConverter(Transformer):
  def __init__(self):
    self._rate_columns = [
                           'host_response_rate', 
                           'host_acceptance_rate', 
                           ]
  @property
  def rate_columns(self):
    return self._rate_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df
    for col_name in self._rate_columns:
      if col_name in result.columns:
        result = result.withColumn(col_name, fn.regexp_replace(fn.col(col_name), "\%" , '' ).cast('double'))
    return result


rate_converter = RateConverter()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter])
result = wrangling_pipeline.fit(df).transform(df)

result.select([fn.col(col_name) for col_name in rate_converter.rate_columns]).limit(5).toPandas().T


Unnamed: 0,0,1,2,3,4
host_response_rate,100.0,100.0,100.0,100.0,100.0
host_acceptance_rate,,,,,


## Zero Variance Variables Removal

In [45]:
import re


@singleton
class ZeroVarianceCleaner(Transformer):
  '''
    Not threadsafe
  '''
  def __init__(self):
    self._reg_exp = re.compile("variance\((.*)\)", re.IGNORECASE)
    self._zero_variance_columns = list()

  @property
  def zero_variance_columns(self):
    return self._zero_variance_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    numerical_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
    if len(numerical_columns) > 0:
      numerical_columns = [col_name for col_name in numerical_columns if col_name not in ['id', 'longtitude', 'latitude']]
      numeric_variances = result.agg({col_name : 'variance' for col_name in numerical_columns})
      numeric_variances = numeric_variances.select(*[fn.col(col).alias(self._reg_exp.search(col).group(1)) for col in numeric_variances.columns])
      numeric_variances = numeric_variances.toPandas().T.iloc[:,0]
      numeric_variances = numeric_variances[(numeric_variances == 0) | (numeric_variances.isna())]
      self._zero_variance_columns = [*self._zero_variance_columns, *numeric_variances.index]

    string_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.StringType)]
    if len(string_columns) > 0:
      string_columns_unique_count = result.agg(*(fn.countDistinct(fn.col(col_name)).cast('int').alias(col_name) for col_name in string_columns))
      string_columns_unique_count = string_columns_unique_count.toPandas().T.iloc[:,0]
      string_columns_unique_count = string_columns_unique_count[(string_columns_unique_count < 2) | (string_columns_unique_count.isna())]
      self._zero_variance_columns = [*self._zero_variance_columns, *string_columns_unique_count.index]

    timestamp_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.TimestampType)]
    if len(timestamp_columns) > 0:
      timestamp_columns_unique_count = result.agg(*(fn.countDistinct(fn.col(col_name)).cast('int').alias(col_name) for col_name in timestamp_columns))
      timestamp_columns_unique_count = timestamp_columns_unique_count.toPandas().T.iloc[:,0]
      timestamp_columns_unique_count = timestamp_columns_unique_count[(timestamp_columns_unique_count < 2) | (timestamp_columns_unique_count.isna())]
      self._zero_variance_columns = [*self._zero_variance_columns, *timestamp_columns_unique_count.index]

    if len(self._zero_variance_columns) > 0:
      result = result.drop(*self._zero_variance_columns)

    return result


zero_variance_cleaner = ZeroVarianceCleaner()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner])
result = wrangling_pipeline.fit(df).transform(df)

pd.DataFrame({'Removed': [col_name not in result.columns for col_name in zero_variance_cleaner.zero_variance_columns]}, index = zero_variance_cleaner._zero_variance_columns)


Unnamed: 0,Removed
host_acceptance_rate,True
has_availability,True
scrape_id,True
is_business_travel_ready,True
experiences_offered,True
thumbnail_url,True
medium_url,True
xl_picture_url,True
country_code,True
country,True


## Drop descriptive columns
- The sentiment analysis will not be a part of linear or tree based machine learning algorithm, so just drop descriptive comment columns directly. 
- Latter these features will go in to a separate dataset to train sentiment analysis models.

In [47]:
class ColumnRemover(Transformer):
  '''
    Not threadsafe
  '''
  def __init__(self, drop_columns):
    self._drop_columns = drop_columns

  @property
  def drop_columns(self):
    return self._drop_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    result = result.drop(*[col_name for col_name in self._drop_columns if col_name in result.columns])

    return result

literature_column_remover = ColumnRemover([
                                'name', 'summary', 'space', 'description', 
                                'neighborhood_overview', 'notes', 'transit', 
                                'access', 'interaction', 'house_rules', 
                                'host_name', 'host_about', 'jurisdiction_names'])
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover])
result = wrangling_pipeline.fit(df).transform(df)


pd.DataFrame({'Removed': [col_name not in result.columns for col_name in literature_column_remover.drop_columns]}, index = literature_column_remover.drop_columns)


Unnamed: 0,Removed
name,True
summary,True
space,True
description,True
neighborhood_overview,True
notes,True
transit,True
access,True
interaction,True
house_rules,True


## Drop URL columns
- The url scraping analysis is out of the report scope, drop these url related column accordingly.

In [49]:
url_column_remover = ColumnRemover(['listing_url', 'picture_url', 'host_url', 'host_thumbnail_url', 'host_picture_url'])
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover])
result = wrangling_pipeline.fit(df).transform(df)


pd.DataFrame({'Removed': [col_name not in result.columns for col_name in url_column_remover.drop_columns]}, index = url_column_remover.drop_columns)


Unnamed: 0,Removed
listing_url,True
picture_url,True
host_url,True
host_thumbnail_url,True
host_picture_url,True


## Missing value exploratory

- Identify the number of missing value at each feature.

In [52]:
def calc_missing_ratio(data_frame: DataFrame):
  record_cnt = data_frame.count()
  df_columns = data_frame.columns
  
  df_result = data_frame.select([fn.col(c).cast(t.StringType()) for c in data_frame.columns]) \
    .select([fn.sum(fn.when(fn.isnull(c), 1).otherwise(0)).alias(c) for c in data_frame.columns]) \
    .select([(col(c)/record_cnt).alias(c) for c in data_frame.columns]) \
    .toPandas().T

  df_result = df_result.loc[(df_result != 0).all(axis=1), :]
  df_result.columns = ['Missing Value Ratio']
  df_result.sort_values(by=['Missing Value Ratio'], ascending=False, inplace=True)

  return df_result

missing_ratio = calc_missing_ratio(result)

print(missing_ratio)

- Visualize the ratio of missing values using bar plot.<br>
We will focus on the columns that has more than 2% of missing values.

In [54]:
import math
from matplotlib import gridspec
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import matplotlib.ticker as mtick


def plot_missing_freq(df, xlab, ylab, title):
  plt.clf()
  plt.figure(figsize=(20, 5))
  df = pd.Series(df.iloc[:, 0].values, index=list(df.index))

  ax = sns.barplot(x=df.values * 100, y=df.index, orient='h')
  
  ax.set_xticklabels(df.values * 100, fontsize=15)
  ax.set_yticklabels(df.index, fontsize=12)
  ax.xaxis.set_major_formatter(mtick.PercentFormatter())

  ax.set_xlabel(xlab, fontsize=15)
  ax.set_ylabel(ylab, fontsize=35)

  plt.title(title, fontsize=20)
  # for bar in ax.patches:
  #   bar.set_height(30)

  display()

missing_ratio_2 = calc_missing_ratio(result)
missing_ratio_2 = missing_ratio_2.loc[(missing_ratio_2 > 0.02).all(axis=1), :]
plot_missing_freq(missing_ratio_2, 'Feature', 'Missing Value Ratio', 'Airbnb Data Set Missing Value')

## Drop columns that have over half missing value

In [56]:
incomplete_column_remover = ColumnRemover(['square_feet', 'monthly_price', 'weekly_price', 'license'])
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover])
result = wrangling_pipeline.fit(df).transform(df)

ColumnRemover
pd.DataFrame({'Removed': [col_name not in result.columns for col_name in incomplete_column_remover.drop_columns]}, index = incomplete_column_remover.drop_columns)


Unnamed: 0,Removed
square_feet,True
monthly_price,True
weekly_price,True
license,True


## Drop redundant columns

There are some columns that use different values expressing the same meaning.

In [59]:
string_columns = result.select(*[fn.col(f.name) for f in result.schema.fields if isinstance(f.dataType, t.StringType)]).columns
categorical_cols = ['neighbourhood_group_cleansed','host_response_time',
       'property_type', 'room_type', 'bed_type','cancellation_policy']
tfidf_cols = ['host_verifications','amenities']
string_columns_filtered = [c for c in string_columns if ( c not in categorical_cols + tfidf_cols )]
result.agg(*(fn.countDistinct(fn.col(col_name)).cast('int').alias(col_name) for col_name in string_columns_filtered)).toPandas().T.iloc[:, 0].sort_values()

Word cloud helps to identify the diversity of the words.

In [61]:
from wordcloud import WordCloud

# Encoding space to underscore, filter out zipcode 
words = result.select(*string_columns)
for col_name in string_columns:
      words = words.withColumn(col_name, fn.regexp_replace(fn.col(col_name), ' ' , '_' )).withColumn(col_name, fn.lower(fn.col(col_name)))
words = words.drop('zipcode')

cloud_list = list()
for col_name in words.columns:
  cloud_list.append(words.agg(fn.concat_ws(' ', fn.collect_list(col_name))).toPandas().iloc[0,0])
cloud_list = pd.Series(cloud_list, index=words.columns)

In [62]:
fig = plt.figure()

f, ax = plt.subplots(4, 4, figsize=(120,120))
for i in range(4):
  for j in range(4):
    ws = cloud_list.iloc[4 * i + j]
    title = cloud_list.index[4 * i + j]
    ax[i, j].imshow(WordCloud(max_font_size=240, width = 240, height = 240, background_color='white').generate(ws))
    ax[i, j].set_title(title, fontsize=100)
    ax[i, j].axis("off") 

plt.show()
display()


In [63]:
sentement_analysis_column_remover = ColumnRemover(string_columns_filtered)
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover])
result = wrangling_pipeline.fit(df).transform(df)


pd.DataFrame({'Removed': [col_name not in result.columns for col_name in sentement_analysis_column_remover.drop_columns]}, index = sentement_analysis_column_remover.drop_columns)


Unnamed: 0,Removed
host_location,True
host_neighbourhood,True
street,True
neighbourhood,True
neighbourhood_cleansed,True
city,True
state,True
zipcode,True
market,True
smart_location,True


## Second round missing value imputation

### Inspect ratio of missing value for each column

In [66]:
missing_ratio_3 = calc_missing_ratio(result)
missing_ratio_3 = missing_ratio_3.loc[(missing_ratio_3 > 0.02).all(axis=1), :]
plot_missing_freq(missing_ratio_3, 'Feature', 'Missing Value Ratio', 'Airbnb Data Set Missing Value')

### Drop sentiment comment related columns

- It is possible that review related values missing are due to lacking of guest, to aviod bias, the records that has no review related data will be dropped.

In [69]:
@singleton
class MissingReviewRecordRemover(Transformer):
  def __init__(self):
    pass

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    review_columns = [*[col_name for col_name in result.columns if col_name.startswith('review')], 'id']
    drop_rate = (1 - result.select(review_columns).dropna().count() / result.count()) * 100
    review_dropped_result = result.select(review_columns).dropna().select(fn.col('id').alias('_id'))
    result = review_dropped_result.join(result, review_dropped_result._id == result.id).drop('_id')

    return result

missing_review_record_remover = MissingReviewRecordRemover()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover])
result = wrangling_pipeline.fit(df).transform(df)

shape(result)


In [70]:
missing_ratio_4 = calc_missing_ratio(result)
plot_missing_freq(missing_ratio_4, 'Feature', 'Missing Value Ratio', 'Airbnb Data Set Missing Value')

## Missing value imputation

- Impute the rest missing values with mean for numerical columns and mode for boolean columns

In [73]:
@singleton
class MissingValueImputer(Transformer):
  def __init__(self):
    self._reg_exp = re.compile("avg\((.*)\)", re.IGNORECASE)
    self._reg_exp_2 = re.compile("mode\((.*)\)", re.IGNORECASE)
    self._excuded_columns = ['id', 'host_id', 'host_is_superhost', 'longitude', 'latitude', 'is_location_exact',
                             'instant_bookable', 'require_guest_profile_picture', 'require_guest_phone_verification']
    self._boolean_columns = ['host_is_superhost', 'is_location_exact', 'instant_bookable', 
                       'require_guest_profile_picture', 'require_guest_phone_verification']

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    numerical_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
    numerical_columns = [col_name for col_name in numerical_columns if col_name not in self._excuded_columns]

    means = result.agg({col_name: 'mean' for col_name in numerical_columns})
    for col_name in means.columns:
      means = means.withColumnRenamed(col_name, f'_{self._reg_exp.search(col_name).group(1)}')

    dummy_link = fn.udf(lambda x:1, t.IntegerType())
    result = result.withColumn('dummy_link', dummy_link(result.columns[0]))
    means = means.withColumn('dummy_link', dummy_link(means.columns[0]))
    result = result.join(means, on='dummy_link', how='inner')

    for col_name in numerical_columns:
      result = result.withColumn(col_name, fn.coalesce(fn.col(col_name), fn.col(f'_{col_name}')))
    mode_map = dict()
    for col_name in self._boolean_columns:
      if col_name not in result.columns:
        continue
      cnts = result.groupBy(col_name).count()
      mode = cnts.join(
          cnts.agg(fn.max("count").alias("max_")), fn.col("count") == fn.col("max_")
      ).limit(1).select(col_name)
      mode = mode.first()[0]
      mode_map[col_name]=mode

    result = result.drop(*means.columns)
    result = result.fillna(mode_map)

    return result


missing_value_imputer = MissingValueImputer()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer])
result = wrangling_pipeline.fit(df).transform(df)

print(f'Number of remaining value: {result.count() - result.dropna().count()}')

- Remove the 4 missing value

In [75]:
@singleton
class MissingValueImputer(Transformer):
  def __init__(self):
    self._reg_exp = re.compile("avg\((.*)\)", re.IGNORECASE)

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    numerical_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]

    means = result.agg({col_name: 'mean' for col_name in numerical_columns})
    for col_name in means.columns:
      means = means.withColumnRenamed(col_name, f'_{self._reg_exp.search(col_name).group(1)}')

    dummy_link = fn.udf(lambda x:1, t.IntegerType())
    result = result.withColumn('dummy_link', dummy_link(result.columns[0]))
    means = means.withColumn('dummy_link', dummy_link(means.columns[0]))
    result = result.join(means, on='dummy_link', how='inner')

    for col_name in numerical_columns:
      result = result.withColumn(col_name, fn.coalesce(fn.col(col_name), fn.col(f'_{col_name}')))
    result = result.drop(*means.columns).dropna()

    return result

missing_value_imputer = MissingValueImputer()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer])
result = wrangling_pipeline.fit(df).transform(df)

In [76]:
display(result)

id,host_id,host_since,host_response_time,host_response_rate,host_is_superhost,host_listings_count,host_total_listings_count,host_verifications,host_has_profile_pic,host_identity_verified,neighbourhood_group_cleansed,latitude,longitude,is_location_exact,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,amenities,price,security_deposit,cleaning_fee,guests_included,extra_people,minimum_nights,maximum_nights,minimum_minimum_nights,maximum_minimum_nights,minimum_maximum_nights,maximum_maximum_nights,minimum_nights_avg_ntm,maximum_nights_avg_ntm,availability_30,availability_60,availability_90,availability_365,number_of_reviews,number_of_reviews_ltm,first_review,last_review,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,requires_license,instant_bookable,cancellation_policy,require_guest_profile_picture,require_guest_phone_verification,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month
9596.0,14942.0,2009-04-26T00:00:00.000+0000,within a few hours,100.0,0.0,5.0,5.0,"['email', 'phone', 'facebook', 'reviews', 'kba']",1.0,1.0,Other neighborhoods,47.65479,-122.33652,1.0,Apartment,Entire home/apt,4.0,1.0,1.0,4.0,Real Bed,"{TV,""Cable TV"",Internet,Wifi,Kitchen,""Free street parking"",Heating,""Family/kid friendly"",""Smoke detector"",""Carbon monoxide detector"",""Fire extinguisher"",Essentials,Shampoo,""24-hour check-in"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""Self check-in"",""Smart lock"",Microwave,""Coffee maker"",Refrigerator,Dishwasher,""Dishes and silverware"",""Cooking basics"",Oven,Stove,""BBQ grill"",""Patio or balcony"",""Garden or backyard""}",120.0,200.0,85.0,2.0,15.0,2.0,60.0,2.0,2.0,60.0,60.0,2.0,60.0,0.0,0.0,0.0,0.0,93.0,17.0,2011-06-15T00:00:00.000+0000,2019-09-22T00:00:00.000+0000,91.0,9.0,9.0,10.0,9.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,0.0,0.0,3.0,3.0,0.0,0.0,0.91
15108.0,39377.0,2009-09-18T00:00:00.000+0000,within a few hours,100.0,0.0,3.0,3.0,"['email', 'phone', 'facebook', 'reviews', 'jumio', 'offline_government_id', 'government_id', 'work_email']",1.0,1.0,Other neighborhoods,47.68827,-122.33586,1.0,House,Private room,6.0,1.0,3.0,3.0,Real Bed,"{Internet,Wifi,Kitchen,""Pets allowed"",""Pets live on this property"",Dog(s),Cat(s),""Free street parking"",""Hot tub"",""Indoor fireplace"",Heating,""Family/kid friendly"",""Suitable for events"",Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""First aid kit"",""Safety card"",""Fire extinguisher"",Essentials,Shampoo,Iron,""Self check-in"",Lockbox,""Hot water"",Microwave,Refrigerator,Oven,Stove,""Lake access""}",60.0,0.0,10.0,1.0,15.0,1.0,730.0,1.0,1.0,730.0,730.0,1.0,730.0,0.0,0.0,0.0,173.0,71.0,1.0,2011-07-06T00:00:00.000+0000,2019-03-31T00:00:00.000+0000,90.0,9.0,9.0,10.0,10.0,10.0,9.0,1.0,0.0,moderate,0.0,0.0,3.0,0.0,3.0,0.0,0.7
15749.0,61636.0,2009-12-09T00:00:00.000+0000,within a few hours,100.0,1.0,2.0,2.0,"['email', 'phone', 'reviews', 'jumio', 'offline_government_id', 'selfie', 'government_id', 'identity_manual']",1.0,0.0,Other neighborhoods,47.68407,-122.32649,0.0,House,Private room,1.0,1.0,1.0,1.0,Real Bed,"{TV,Wifi,Kitchen,""Pets live on this property"",Dog(s),Cat(s),""Free street parking"",Heating,Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""Fire extinguisher"",Essentials,Shampoo,""Lock on bedroom door"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""Hot water"",""Bed linens"",""Extra pillows and blankets"",""Host greets you""}",32.0,500.0,20.0,1.0,5.0,21.0,190.0,4.0,21.0,190.0,190.0,20.6,190.0,29.0,59.0,89.0,179.0,21.0,5.0,2011-11-13T00:00:00.000+0000,2019-11-07T00:00:00.000+0000,95.0,9.0,8.0,10.0,10.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,0.0,0.0,2.0,1.0,1.0,0.0,0.21
15749.0,61636.0,2009-12-09T00:00:00.000+0000,within a few hours,100.0,1.0,2.0,2.0,"['email', 'phone', 'reviews', 'jumio', 'offline_government_id', 'selfie', 'government_id', 'identity_manual']",1.0,0.0,Other neighborhoods,47.68407,-122.32649,0.0,House,Private room,1.0,1.0,1.0,1.0,Real Bed,"{TV,Wifi,Kitchen,""Pets live on this property"",Dog(s),Cat(s),""Free street parking"",Heating,Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""Fire extinguisher"",Essentials,Shampoo,""Lock on bedroom door"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""Hot water"",""Bed linens"",""Extra pillows and blankets"",""Host greets you""}",32.0,500.0,20.0,1.0,5.0,21.0,190.0,4.0,21.0,190.0,190.0,20.6,190.0,29.0,59.0,89.0,179.0,21.0,5.0,2011-11-13T00:00:00.000+0000,2019-11-07T00:00:00.000+0000,95.0,9.0,8.0,10.0,10.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,0.0,0.0,2.0,1.0,1.0,0.0,0.21
15749.0,61636.0,2009-12-09T00:00:00.000+0000,within a few hours,100.0,1.0,2.0,2.0,"['email', 'phone', 'reviews', 'jumio', 'offline_government_id', 'selfie', 'government_id', 'identity_manual']",1.0,0.0,Other neighborhoods,47.68407,-122.32649,0.0,House,Private room,1.0,1.0,1.0,1.0,Real Bed,"{TV,Wifi,Kitchen,""Pets live on this property"",Dog(s),Cat(s),""Free street parking"",Heating,Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""Fire extinguisher"",Essentials,Shampoo,""Lock on bedroom door"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""Hot water"",""Bed linens"",""Extra pillows and blankets"",""Host greets you""}",32.0,500.0,20.0,1.0,5.0,21.0,190.0,4.0,21.0,190.0,190.0,20.6,190.0,29.0,59.0,89.0,179.0,21.0,5.0,2011-11-13T00:00:00.000+0000,2019-11-07T00:00:00.000+0000,95.0,9.0,8.0,10.0,10.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,0.0,0.0,2.0,1.0,1.0,0.0,0.21
15749.0,61636.0,2009-12-09T00:00:00.000+0000,within a few hours,100.0,1.0,2.0,2.0,"['email', 'phone', 'reviews', 'jumio', 'offline_government_id', 'selfie', 'government_id', 'identity_manual']",1.0,0.0,Other neighborhoods,47.68407,-122.32649,0.0,House,Private room,1.0,1.0,1.0,1.0,Real Bed,"{TV,Wifi,Kitchen,""Pets live on this property"",Dog(s),Cat(s),""Free street parking"",Heating,Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""Fire extinguisher"",Essentials,Shampoo,""Lock on bedroom door"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""Hot water"",""Bed linens"",""Extra pillows and blankets"",""Host greets you""}",32.0,500.0,20.0,1.0,5.0,21.0,190.0,4.0,21.0,190.0,190.0,20.6,190.0,29.0,59.0,89.0,179.0,21.0,5.0,2011-11-13T00:00:00.000+0000,2019-11-07T00:00:00.000+0000,95.0,9.0,8.0,10.0,10.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,0.0,0.0,2.0,1.0,1.0,0.0,0.21
17951.0,33360.0,2009-08-23T00:00:00.000+0000,within an hour,100.0,1.0,12.0,12.0,"['email', 'phone', 'reviews', 'jumio', 'offline_government_id', 'selfie', 'government_id', 'identity_manual']",1.0,0.0,West Seattle,47.55538,-122.38622,1.0,Guest suite,Entire home/apt,2.0,1.0,0.0,1.0,Real Bed,"{TV,""Cable TV"",Internet,Wifi,Kitchen,""Free street parking"",Heating,""Family/kid friendly"",""Smoke detector"",""Carbon monoxide detector"",""Fire extinguisher"",Essentials,Shampoo,""24-hour check-in"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""translation missing: en.hosting_amenity_49"",""Self check-in"",Lockbox,""Private entrance"",""Pack ’n Play/travel crib"",""Hot water"",""Bed linens"",""Extra pillows and blankets"",Microwave,""Coffee maker"",Refrigerator,""Dishes and silverware"",""Cooking basics"",Oven,Stove,""BBQ grill"",""Garden or backyard"",""Long term stays allowed""}",105.0,300.0,50.0,2.0,0.0,2.0,120.0,2.0,2.0,120.0,120.0,2.0,120.0,24.0,54.0,84.0,171.0,121.0,32.0,2010-06-10T00:00:00.000+0000,2019-10-22T00:00:00.000+0000,98.0,10.0,10.0,10.0,10.0,10.0,10.0,1.0,1.0,moderate,0.0,0.0,8.0,8.0,0.0,0.0,1.05
19619.0,74305.0,2010-01-27T00:00:00.000+0000,within an hour,100.0,1.0,51.0,51.0,"['email', 'phone', 'reviews', 'kba', 'work_email']",1.0,1.0,Downtown,47.61362,-122.34706,1.0,Condominium,Entire home/apt,4.0,1.0,1.0,1.0,Real Bed,"{TV,""Cable TV"",Internet,Wifi,""Air conditioning"",""Wheelchair accessible"",Pool,Kitchen,""Free parking on premises"",""Paid parking off premises"",Gym,Elevator,""Hot tub"",Heating,""Family/kid friendly"",Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""Safety card"",""Fire extinguisher"",Essentials,Shampoo,""24-hour check-in"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",Bathtub,""High chair"",""Babysitter recommendations"",""Pack ’n Play/travel crib"",""Room-darkening shades"",""Hot water"",""Body soap"",""Bath towel"",""Toilet paper"",""Bed linens"",""Extra pillows and blankets"",""Ethernet connection"",Microwave,""Coffee maker"",Refrigerator,Dishwasher,""Dishes and silverware"",""Cooking basics"",Oven,Stove,""BBQ grill"",""Patio or balcony"",""Long term stays allowed"",""Wide hallways"",""No stairs or steps to enter"",""Wide entrance for guests"",""Flat path to guest entrance"",""Well-lit path to entrance"",""No stairs or steps to enter"",""Wide entryway"",""Host greets you"",""Handheld shower head"",""Hot water kettle"",""Smart TV"",""Mountain view"",""Rain shower"",Terrace,Balcony,""Exercise equipment"",""Breakfast table"",""Shared gym"",""Shared pool"",""Shared hot tub"",Netflix,""Outdoor seating"",""Full kitchen"",""Bedroom comforts"",""Bathroom essentials""}",170.0,500.0,107.0,1.0,0.0,2.0,365.0,2.0,2.0,365.0,365.0,2.0,365.0,16.0,37.0,67.0,144.0,105.0,30.0,2010-09-04T00:00:00.000+0000,2019-11-15T00:00:00.000+0000,97.0,10.0,10.0,10.0,10.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,1.0,1.0,58.0,58.0,0.0,0.0,0.94
26116.0,110248.0,2010-04-18T00:00:00.000+0000,within an hour,100.0,0.0,5.0,5.0,"['email', 'phone', 'facebook', 'reviews', 'jumio', 'offline_government_id', 'kba', 'selfie', 'government_id', 'identity_manual']",1.0,0.0,Capitol Hill,47.61743,-122.32676,1.0,Apartment,Entire home/apt,4.0,1.0,2.0,2.0,Real Bed,"{TV,""Cable TV"",Internet,Wifi,Kitchen,""Paid parking off premises"",Heating,Washer,Dryer,""Smoke detector"",""Carbon monoxide detector"",""First aid kit"",""Fire extinguisher"",Essentials,Shampoo,""24-hour check-in"",Hangers,""Hair dryer"",Iron,""Laptop friendly workspace"",""translation missing: en.hosting_amenity_49"",""translation missing: en.hosting_amenity_50"",""Self check-in"",Lockbox,""Hot water"",""Luggage dropoff allowed"",""Long term stays allowed""}",259.0,500.0,100.0,2.0,20.0,2.0,30.0,2.0,2.0,30.0,30.0,2.0,30.0,0.0,0.0,0.0,0.0,143.0,0.0,2010-09-21T00:00:00.000+0000,2016-09-24T00:00:00.000+0000,96.0,10.0,10.0,10.0,10.0,10.0,9.0,1.0,0.0,strict_14_with_grace_period,0.0,1.0,1.0,1.0,0.0,0.0,1.28
43373.0,189356.0,2010-08-02T00:00:00.000+0000,within an hour,90.0,0.0,1.0,1.0,"['email', 'phone', 'reviews']",1.0,0.0,Other neighborhoods,47.65055,-122.33059,1.0,House,Private room,2.0,1.0,1.0,1.0,Real Bed,"{Wifi,Kitchen,""Free street parking"",""Indoor fireplace"",Heating,""Family/kid friendly"",Washer,Dryer,Essentials,Shampoo,""Private entrance"",""Hot water"",Other,""Lake access""}",55.0,0.0,15.0,1.0,10.0,2.0,30.0,2.0,2.0,30.0,30.0,2.0,30.0,12.0,42.0,72.0,344.0,272.0,48.0,2010-08-13T00:00:00.000+0000,2019-11-15T00:00:00.000+0000,95.0,10.0,10.0,10.0,10.0,10.0,10.0,1.0,1.0,moderate,0.0,0.0,1.0,0.0,1.0,0.0,2.41


In [77]:
id_date = ['id','host_since','host_id','first_review','last_review','_c0']
id_date_column_remover = ColumnRemover(id_date)
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer
                            ,id_date_column_remover])
result = wrangling_pipeline.fit(df).transform(df)

In [78]:
#Save the dataframe to csv document for regression model
#df_pd = result.toPandas()
#df_pd.to_csv("/content/drive/Shared drives/718_project/dataset/listing_clean_new(used_for_random_forest_and_GBT).csv")

## Outlier Imputation

### Distribution inspection

#### boxplot

- In the big data environment, extracting data from the distributed file system is infeasible. To se the distribution of each feature, we will calculate the descriptive information of the dataset

In [83]:
import matplotlib.pyplot as plt

def plot_feature_distribution(result):
  non_continuous_columns = ['host_is_superhost', 'host_has_profile_pic', 'host_identity_verified',
                            'is_location_exact', 'guests_included', 'instant_bookable',
                            'latitude', 'longitude']

  numerical_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
  numerical_columns = [col_name for col_name in numerical_columns if col_name not in non_continuous_columns]

  stats = list()
  for col_name in numerical_columns:
    percentiles = result.select(col_name).agg(
            fn.expr(f'percentile({col_name}, array(0))').alias('0%'),
            fn.expr(f'percentile({col_name}, array(0.25))').alias('25%'), 
            fn.expr(f'percentile({col_name}, array(0.5))').alias('50%'), 
            fn.expr(f'percentile({col_name}, array(0.75))').alias('75%'),
            fn.expr(f'percentile({col_name}, array(1))').alias('100%')).toPandas()
            
    _0 = percentiles.loc[0, '0%']
    _25 = percentiles.loc[0, '25%']
    _50 = percentiles.loc[0, '50%']
    _75 = percentiles.loc[0, '75%']
    _100 = percentiles.loc[0, '100%']
    stats.append({'label': col_name, 'med': _50, 'q1': _25, 'q3': _75, 'whislo': _0, 'whishi': _100})

  fig, ax = plt.subplots(len(numerical_columns), 1, sharex=False, sharey='row', figsize=(15, 50))

  for idx, s in enumerate(stats):
    ax[idx].bxp([stats[idx]], vert=False, showfliers=False);

  plt.tight_layout()

  display()


plot_feature_distribution(result)

- Some features are not normal distributed in the data space. Convert them by taking logarithm.

In [85]:
import math


class LogarithmImputer(Transformer):
  def __init__(self):
    self._excluded_columns = ['id', 'host_id', 'host_is_superhost', 'host_has_profile_pic',
                              'host_identity_verified', 'is_location_exact',
                              'guests_included', 'instant_bookable', 
                              'latitude', 'longitude', 'availability_30', 
                              'availability_60', 'availability_90', 'availability_365', 
                              ]
    self._transofrmed_columns = list()
    self._log = fn.udf(lambda x: math.log(x+1), t.DoubleType())

  @property
  def transofrmed_columns(self):
    return self._transofrmed_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    numerical_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
    numerical_columns = [col_name for col_name in numerical_columns if col_name not in self._excluded_columns]
    self._transofrmed_columns = numerical_columns

    for col_name in self._transofrmed_columns:

      result = result.withColumn(col_name, self._log(col_name))

    return result

logarithm_imputer = LogarithmImputer()
logarithm_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer
                            , logarithm_imputer])

result = logarithm_pipeline.fit(df).transform(df)

plot_feature_distribution(result.select(logarithm_imputer.transofrmed_columns))

### Winsorizing the data set

- host id is a categorical feature, it should be revised

In [88]:
@singleton
class WinsorizingImputer(Transformer):
  def __init__(self):
    self._pending_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
    self._excuded_columns = ['id', 'host_id', 'host_is_superhost', 'longitude', 'latitude', 'is_location_exact',
                             'instant_bookable', 'require_guest_profile_picture', 'require_guest_phone_verification',
                             'price' ]
    def winsorize(tail, head):
      def _winsorize(value):
        if value < tail:
          return float(tail)
        if value > head:
          return float(head)
        return float(value)
      return _winsorize
    self._winsorize = lambda col, tail, head: fn.udf(winsorize(tail, head), t.DoubleType())(col)

  @property
  def pending_columns(self):
    return self._pending_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    self._pending_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
    self._pending_columns = [col_name for col_name in self._pending_columns if col_name not in self._excuded_columns]

    for col_name in self._pending_columns:
      percentiles = result.agg(
          fn.expr(f'percentile({col_name}, array(0.25))').alias('tail'), 
          fn.expr(f'percentile({col_name}, array(0.75))').alias('head')).toPandas()
      tail = percentiles.loc[0, 'tail'][0]
      head = percentiles.loc[0, 'head'][0]
      result = result.withColumn(col_name, self._winsorize(col_name, tail, head))
    result = result.drop(*percentiles.columns)

    return result

winsorizing_imputer = WinsorizingImputer()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer
                            , logarithm_imputer
                            , winsorizing_imputer])
result = wrangling_pipeline.fit(df).transform(df)

### Low Variance Features Removal

In [90]:
from pyspark.ml import feature


@singleton
class LowVarianceCleaner(Transformer):
  def __init__(self):
    self._reg_exp = re.compile("variance\((.*)\)", re.IGNORECASE)
    self._pending_columns = list()
    self._excuded_columns = ['id', 'host_id', 'host_is_superhost', 'longitude', 'latitude', 'is_location_exact',
                             'instant_bookable', 'require_guest_profile_picture', 'require_guest_phone_verification']
    self._extract_udf = lambda idx: fn.udf(lambda centered_features: float(centered_features[idx]), t.DoubleType())('centered_features')
    self._threshold = 0.01
    self._low_variance_columns = list()

  @property
  def pending_columns(self):
    return self._pending_columns

  @property
  def low_variance_columns(self):
    return self._low_variance_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df

    self._pending_columns = [f.name for f in result.schema.fields if isinstance(f.dataType, t.NumericType)]
    self._pending_columns = [col_name for col_name in self._pending_columns if col_name not in self._excuded_columns]

    standardized_result = Pipeline(stages=[feature.VectorAssembler(inputCols=self._pending_columns, outputCol='features'),
                                            feature.StandardScaler(withMean=True, withStd=True, inputCol='features', outputCol='centered_features')]) \
          .fit(result).transform(result)

    for idx, col_name in enumerate(self._pending_columns):
      standardized_result = standardized_result.withColumn(col_name, self._extract_udf(idx))
    standardized_result = standardized_result.drop('centered_features')

    numeric_variances = standardized_result.agg({col_name : 'variance' for col_name in self._pending_columns})
    numeric_variances = numeric_variances.select(*[fn.col(col).alias(self._reg_exp.search(col).group(1)) for col in numeric_variances.columns]) \
      .toPandas().T.iloc[:,0]
    numeric_variances = numeric_variances[(numeric_variances < self._threshold) | (numeric_variances.isna())]

    self._low_variance_columns = numeric_variances.index

    result = result.drop(*self._low_variance_columns)

    return result


low_variance_cleaner = LowVarianceCleaner()
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer
                            , logarithm_imputer
                            , winsorizing_imputer
                            , low_variance_cleaner])

result = wrangling_pipeline.fit(df).transform(df)

print(shape(result))

#return

## Feature Selection

### Tree-based algorithm

In [93]:
wrangling_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , missing_value_imputer
                            , low_variance_cleaner])

result = wrangling_pipeline.fit(df).transform(df)

#### Random Forest
- Explore feature importance By using Random Forest Regressor

In [95]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [96]:
result = spark.read\
        .option("header", "true")\
        .option("multiLine", "true")\
        .option('inferSchema', 'true')\
        .option('escape', '"')\
        .csv(get_training_filename('listing_clean.csv'))

In [97]:
from pyspark.ml.feature import HashingTF,CountVectorizer,IDF,StringIndexer,OneHotEncoder
categorical_cols_new = ['neighbourhood_group_cleansed','host_response_time',
       'property_type', 'room_type', 'bed_type','cancellation_policy']
col_idx_new=['neighbourhood_group_cleansed_IDX',
 'host_response_time_IDX',
 'property_type_IDX',   
 'room_type_IDX',
 'bed_type_IDX',
 'cancellation_policy_IDX']

indexers_new = [StringIndexer(inputCol=col, outputCol = col + "_IDX")\
            .setHandleInvalid("keep") for col in categorical_cols_new]

In [98]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
col_numeric = [c for c in result.columns if ( c not in categorical_cols_new
                                             and c not in ['host_verifications','amenities','price'])]
assemble1 = VectorAssembler(inputCols= col_numeric + col_idx_new ,outputCol='features')
reg = RandomForestRegressor(labelCol='price',featuresCol='features')

In [99]:
from pyspark.ml import Pipeline
transformer1 = Pipeline(stages=indexers_new  + [assemble1,reg])

In [100]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

df_train,df_test = result.randomSplit([0.7,0.3],seed=42)

paramGrid = ParamGridBuilder() \
    .addGrid(reg.numTrees, [int(x) for x in np.linspace(start = 3, stop = 15, num = 3)]) \
    .addGrid(reg.maxDepth, [int(x) for x in np.linspace(start = 3, stop = 15, num = 3)]) \
    .build()


evaluator = RegressionEvaluator(labelCol='price') 
crossval = CrossValidator(estimator=transformer1,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [101]:
cvModel = crossval.fit(df_train)

In [102]:
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[-1]
importances = bestModel.featureImportances

In [103]:
fi_df = pd.DataFrame(importances.toArray(), columns=['importances'])
fi_df['feature'] = pd.Series(col_numeric+col_idx_new)
fi_df.sort_values(by=['importances'], ascending=False, inplace=True)

In [104]:
plt.figure(figsize=(20,10))
fi_df.plot.barh(x='feature', 
               y ='importances',
               figsize=(20,8), 
               title='Feature Importances', 
               fontsize=10)
display(plt.show())

**Run random forest regression By filtering importance which is bigger than 0.03**

In [106]:
fi_df_new = fi_df[fi_df['importances']>0.03]

In [107]:
fi_df_new

Unnamed: 0,importances,feature
2,0.170005,host_listings_count
3,0.116572,host_total_listings_count
14,0.056705,cleaning_fee
13,0.05006,security_deposit
47,0.049901,neighbourhood_group_cleansed_IDX
9,0.04442,accommodates
1,0.041316,host_is_superhost
52,0.038608,cancellation_policy_IDX
7,0.03765,longitude
11,0.032044,bedrooms


In [108]:
num_feature_filtered = []
for i in fi_df_new['feature'].tolist():
    if i not in ['neighbourhood_group_cleansed_IDX','cancellation_policy_IDX']:
       num_feature_filtered.append(i)

In [109]:
num_feature_filtered

In [110]:
# change categorial col to Index
categorical_cols_filtered = ['neighbourhood_group_cleansed'
                            ,'cancellation_policy']
indexers_filtered = [StringIndexer(inputCol=col, outputCol = col + "_IDX")\
            .setHandleInvalid("keep") for col in categorical_cols_filtered]

encoded_filtered = [OneHotEncoder(inputCol = col + "_IDX", outputCol = col + '_Vec') for col in categorical_cols_filtered]

In [111]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.regression import RandomForestRegressor

cv1 = CountVectorizer()\
    .setInputCol('host_verifications')\
    .setOutputCol('tf1')

cv2 = CountVectorizer()\
    .setInputCol('amenities')\
    .setOutputCol('tf2')

idf1 = IDF().\
    setInputCol("tf1").\
    setOutputCol("tfidf1")

idf2 = IDF().\
    setInputCol("tf2").\
    setOutputCol("tfidf2")

assemble1 = VectorAssembler(inputCols= num_feature_filtered, outputCol='features')

assemble2 = VectorAssembler(inputCols= ['features','tfidf1','tfidf2','neighbourhood_group_cleansed_Vec'
                                        ,'cancellation_policy_Vec'], outputCol='final_features')

reg = RandomForestRegressor(labelCol='price',featuresCol='final_features')

transformer_final = Pipeline(stages=indexers_filtered + encoded_filtered + [cv1,idf1,cv2,idf2
                                                                            ,assemble1,assemble2,reg])

In [112]:
#just run one time
from pyspark.sql.functions import array
result =result.withColumn('host_verifications', array(result['host_verifications']))
result =result.withColumn('amenities', array(result['amenities']))

In [113]:
df_train,df_test = result.randomSplit([0.7,0.3],seed=42)
paramGrid_1 = ParamGridBuilder() \
    .addGrid(reg.numTrees, [int(x) for x in np.linspace(start = 5, stop = 15, num = 3)]) \
    .addGrid(reg.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 15, num = 3)]) \
    .build()

evaluator_1 = RegressionEvaluator(labelCol='price') 
crossval_1 = CrossValidator(estimator=transformer_final,
                          estimatorParamMaps=paramGrid_1,
                          evaluator=evaluator_1,
                          numFolds=3)
cvModel_1 = crossval_1.fit(df_train)
bestModel_1 = cvModel_1.bestModel
preds_1 = bestModel_1.transform(df_test)

In [114]:
rmse = evaluator.evaluate(preds_1, {evaluator.metricName: 'rmse'})
r2 = evaluator.evaluate(preds_1, {evaluator.metricName: 'r2'})
print(' RMSE: ' + str(rmse))
print(' R^2: ' + str(r2))

In [115]:
rfResult = preds_1.toPandas()

plt.plot(rfResult.price, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
display(plt.show())

In [116]:
print('numTrees - ', bestModel_1.stages[-1].getNumTrees)
print('maxDepth - ', bestModel_1.stages[-1].getOrDefault('maxDepth'))

#### Gradient Boosting Machine

In [118]:
from pyspark.ml.regression import GBTRegressor


gbt = GBTRegressor(labelCol='price',featuresCol='final_features')

transformer_final_gbt = Pipeline(stages=indexers_filtered + encoded_filtered + [cv1,idf1,cv2,idf2
                                                                            ,assemble1,assemble2,gbt])

In [119]:
# We trained 70% of total data and test 30% of total data, but it cost 40 minutes by using GBT regression.
# If it run too much time, databrick would stop, so I sample our data. Then everything can go smoothly in databricks
result_sample = result.sample(False,0.2,42)
df_train,df_test = result_sample.randomSplit([0.7,0.3],seed=42)
paramGrid_2 = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [int(x) for x in np.linspace(start = 5, stop = 15, num = 3)]) \
    .addGrid(gbt.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 15, num = 3)]) \
    .build()

evaluator_2 = RegressionEvaluator(labelCol='price') 
crossval_2 = CrossValidator(estimator=transformer_final_gbt,
                          estimatorParamMaps=paramGrid_2,
                          evaluator=evaluator_2,
                          numFolds=3)
cvModel_2 = crossval_2.fit(df_train)
bestModel_2 = cvModel_2.bestModel
preds_2 = bestModel_2.transform(df_test)

In [120]:
rmse = evaluator.evaluate(preds_2, {evaluator.metricName: 'rmse'})
r2 = evaluator.evaluate(preds_2, {evaluator.metricName: 'r2'})
print(' RMSE: ' + str(rmse))
print(' R^2: ' + str(r2))

In [121]:
rfResult = preds_2.toPandas()

plt.plot(rfResult.price, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("GBT Model Performance RMSE: %f" % rmse)
display(plt.show())

In [122]:
print('numIter - ', bestModel_2.stages[-1].getOrDefault('maxIter'))
print('maxDepth - ', bestModel_2.stages[-1].getOrDefault('maxDepth'))

### Lasso regression

- Features reduce quickly in the lasso model. For a dataset with high dimensional space, it is hard to find a close relationshi among observations. Under the L1 penalty term regularization, the lasso model will filter the most relevant features. 
- In the elastic net regression, the model becomes lasso regression when α equals to 1, which takes the L1 penalty term only. 
- The scree plot direct us the reasonable λ to apply on the penaly term.

In [125]:
from pyspark.ml import regression, tuning, evaluation


def build_index_transformer(columns):
  return [StringIndexer(inputCol=col, outputCol = col + "_IDX", handleInvalid="keep")  for col in columns]

categorical_type_features = ['property_type', 'room_type', 'bed_type']
indexers = build_index_transformer([col_name for col_name in categorical_type_features if col_name in df.columns])
categorical_type_features_remover = ColumnRemover(categorical_type_features)
non_numerical_column_remover = ColumnRemover(['id', 'host_id', 'host_since', 'first_review', 'last_review', 'host_response_time',
                                              'host_verifications', 'neighbourhood_group_cleansed',  'amenities', 'cancellation_policy', 
                                              'latitude', 'longitude'])

lasso_dataset_generate_pipeline = Pipeline(stages=[bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , non_numerical_column_remover
                            , missing_value_imputer
                            , logarithm_imputer
                            , winsorizing_imputer
                            , low_variance_cleaner
                            , *indexers
                            , categorical_type_features_remover])


result = lasso_dataset_generate_pipeline.fit(df).transform(df)

print(f'Lasso dataset shape: {shape(result)}')

train_data, test_data = split_data(result)


- Training Lasso Model and visualize the performance via scree plot

In [127]:
# Clean the data
_lambda = pd.Series(np.arange(0.1,0.34,0.02)).tolist()

_train_r2 = list()
_test_r2 = list()
_coef = list()
def search_lambda(train_data, lbds, train_r2, test_r2, coef):
  assembler = feature.VectorAssembler(inputCols=train_data.drop('price').columns, outputCol='features')
  standardizer = feature.StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="scaled_features")
  normalizer = feature.Normalizer(inputCol="scaled_features", outputCol="norm_features", p=2.0)

  lr = regression.LinearRegression() \
    .setLabelCol('price') \
    .setFeaturesCol('norm_features') \
    .setMaxIter(10) \
    .setElasticNetParam(1)

  lasso_cleaning_pipeline = Pipeline(stages=[assembler, standardizer, normalizer, lr])

  evaluator = evaluation.RegressionEvaluator() \
    .setLabelCol(lasso_cleaning_pipeline.getStages()[-1].getLabelCol()) \
    .setMetricName('r2')



  for lbd in lbds:
    paramGrid = tuning.ParamGridBuilder()\
        .addGrid(lr.regParam, [lbd]) \
        .build()

    tvs = tuning.TrainValidationSplit(estimator=lasso_cleaning_pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              trainRatio=0.7)


    model = tvs.fit(train_data)

    train_r2.append(evaluator.evaluate(model.transform(train_data)))
    test_r2.append(evaluator.evaluate(model.transform(test_data)))
    coef.append(model.bestModel.stages[-1].coefficients.toArray())

search_lambda(train_data, _lambda, _train_r2, _test_r2, _coef)

- Visualize performances of each lambda

In [129]:
def polt_model_performance(lbds, train_coef, test_coef, title_xys_1, title_xys_2):
  plt.clf()
  fig, ax = plt.subplots(2, 1, figsize=(20, 7))
  ax[0].plot(lbds, train_coef, 'o-', linewidth=2, label="Training Set")
  ax[0].plot(lbds, test_coef, 'o-', linewidth=2, label="Test Set")
  ax[0].set_title(title_xys_1[0], fontSize=25)
  ax[0].set_xlabel(title_xys_1[1])
  ax[0].set_ylabel(title_xys_1[2])
  ax[1].plot(lbds, [pair[0] - pair[1] for pair in zip(train_coef, test_coef)], 'o-', linewidth=2, label=f"{title_xys_2[2]}")
  ax[1].set_title(title_xys_2[0], fontSize=25)
  ax[1].set_xlabel(title_xys_2[1])
  ax[1].set_ylabel(title_xys_2[2])
  ax[0].legend()
  ax[1].legend()

  plt.tight_layout()
  display()

polt_model_performance(_lambda, _train_r2, _test_r2, 
                       ('Model Performance of Lasso Regression', 'Lambda', 'R^2'),
                       ('Difference of R^2 Between Training Set and Test Set', 'Lambda', 'Difference of R^2'))

- Trending of the weight for each features

In [131]:

__coef = np.array(_coef)

def plot_importance(column_names, lbds, coef_matrix, threshold, title, x_lab, y_lab):
  plt.clf()
  plt.figure(figsize=(20, 17))
  for idx, col_name in enumerate(column_names):
    plt.plot(lbds, coef_matrix[:,idx], 'o-', linewidth=2, label=col_name)
    c = coef_matrix[0,idx]
    if abs(c) > threshold:
      plt.annotate(col_name, (lbds[4], coef_matrix[4,idx]))

  plt.title(title, fontSize=25)
  plt.xlabel(x_lab)
  plt.ylabel(y_lab)

  plt.legend(loc='upper right')
  plt.tight_layout()
  display()

plot_importance(train_data.drop('price').columns, _lambda, __coef, 0.25, 'Weight change on each feature', 'Lambda', 'Weight')

- According to the scree plot, nearly all weights of features have been penalized to zero. To get a better insight of the trend of features, we will narrow down the lambda search range and run again.

In [133]:
_lambda = pd.Series(np.arange(-10.5,-3,1)).apply(math.exp).sort_values().tolist()
_train_r2 = list()
_test_r2 = list()
_coef = list()

search_lambda(train_data, _lambda, _train_r2, _test_r2, _coef)

In [134]:
__coef = np.array(_coef)

plot_importance(train_data.drop('price').columns, _lambda, __coef, 0.4, 'Weight change on each feature', 'Lambda', 'Weight')

- Set 0.25 as the threshold. When lambda is near 0.01, the features with absolute weight above this number when will be the important features of our model.

In [136]:
idx = len(_lambda) - len([lbd for lbd in _lambda if lbd > 0.01])

lass_weight = pd.DataFrame({'weight': __coef[idx,:], 'abs': np.abs(__coef[idx,:])}, 
                           index=train_data.drop('price').columns)
lass_weight = lass_weight.sort_values(by='abs', ascending=False)

feasible_feature = lass_weight[lass_weight['abs'] > 0.25].index.values.tolist()
feasible_feature = [col_name.replace('_IDX', '') for col_name in feasible_feature]
print(*feasible_feature, sep='\n')


The model fits the data space pretty well, so the regression coefficient is credible to filter out the useful features.

# Modeling

## Data Partition

## Elastic Net Regression

- Build the dataset with useful features only

In [142]:
@singleton
class ColumnSelector(Transformer):

  def __init__(self, selected_columns):
    self._selected_columns = selected_columns + ['id', 'price']

  @property
  def selected_columns(self):
    return self._selected_columns

  def _transform(self, df: DataFrame) -> DataFrame:
    result = df
    result = result.select(self._selected_columns)
    return result

df_lasso = load_data('listings.csv')

column_selector = ColumnSelector(feasible_feature)
indexers = build_index_transformer([col_name for col_name in categorical_type_features if col_name in feasible_feature])


elasticnet_pipeline = Pipeline(stages=[column_selector
                            , bool_converter
                            , currency_converter
                            , rate_converter
                            , zero_variance_cleaner
                            , literature_column_remover
                            , url_column_remover
                            , incomplete_column_remover
                            , sentement_analysis_column_remover
                            , missing_review_record_remover
                            , non_numerical_column_remover
                            , missing_value_imputer
                            , logarithm_imputer
                            , winsorizing_imputer
                            , low_variance_cleaner
                            , *indexers
                            , categorical_type_features_remover
                            ])


result = elasticnet_pipeline.fit(df_lasso).transform(df_lasso)

print(shape(result))

train_data, test_data = split_data(result)


In [143]:
result.columns

In [144]:
_elastic_net_train_r2 = list()
_elastic_net_test_r2 = list()
_elastic_net_coef = list()

_elastic_net_alpha = np.arange(0,1,.3)
_elastic_net_lambda = np.arange(.05,.95,.15)

def search_lambda_alpha(train_data, test_data, lbds, alphas, train_r2, test_r2, coef):
  assembler = feature.VectorAssembler(inputCols=train_data.drop('price').columns, outputCol='features')
  standardizer = feature.StandardScaler(withMean=True, withStd=True, inputCol="features", outputCol="scaled_features")
  normalizer = feature.Normalizer(inputCol="scaled_features", outputCol="norm_features", p=2.0)

  lr = regression.LinearRegression() \
    .setLabelCol('price') \
    .setFeaturesCol('norm_features') \
    .setMaxIter(10) \
    .setElasticNetParam(1)

  lasso_cleaning_pipeline = Pipeline(stages=[assembler, standardizer, normalizer, lr])

  evaluator = evaluation.RegressionEvaluator() \
    .setLabelCol(lasso_cleaning_pipeline.getStages()[-1].getLabelCol()) \
    .setMetricName('r2')



  # print(*train_data.columns, sep='\n')
  for a in alphas:
    for lbd in lbds:
      paramGrid = tuning.ParamGridBuilder()\
          .addGrid(lr.regParam, [lbd]) \
          .addGrid(lr.elasticNetParam, [a]) \
          .build()

      tvs = tuning.TrainValidationSplit(estimator=lasso_cleaning_pipeline,
                                estimatorParamMaps=paramGrid,
                                evaluator=evaluator,
                                trainRatio=0.7)

      model = tvs.fit(train_data)

      train_r2.append(evaluator.evaluate(model.transform(train_data)))
      test_r2.append(evaluator.evaluate(model.transform(test_data)))
      coef.append(model.bestModel.stages[-1].coefficients.toArray())

search_lambda_alpha(train_data, test_data,
                    _elastic_net_lambda, _elastic_net_alpha,
                    _elastic_net_train_r2, _elastic_net_test_r2, _elastic_net_coef)


In [145]:
plt.clf()
plt.figure(figsize=(20, 17))

plt.clf()

fig, ax = plt.subplots(2, 1, figsize=(20, 7))
for idx, a in enumerate(_elastic_net_alpha):
    coef_idx = len(_elastic_net_lambda) * idx
    ax[0].plot(_elastic_net_lambda, _elastic_net_train_r2[coef_idx:coef_idx+len(_elastic_net_lambda)],
               'o-', linewidth=2, label=f"Training Set-Alpha: {a}")
    ax[0].plot(_elastic_net_lambda, _elastic_net_test_r2[coef_idx:coef_idx+len(_elastic_net_lambda)],
               'o-', linewidth=2, label=f"Test Set-Alpha: {a}")
    ax[0].set_title('Elastic Net Regression Performance', fontSize=25)
    ax[0].set_xlabel('R^2')
    ax[0].set_ylabel('lambda')
    diff = [pair[0] - pair[1] for pair in zip(
        _elastic_net_train_r2[coef_idx:coef_idx+len(_elastic_net_lambda)], 
        _elastic_net_test_r2[coef_idx:coef_idx+len(_elastic_net_lambda)])]
    ax[1].plot(_elastic_net_lambda, diff, 'o-', linewidth=2, label=f"Difference of R^2-Alpha: {a}")
    ax[1].set_title('Difference of the Elastic Net Regression Preference', fontSize=25)
    ax[1].set_xlabel('lambda')
    ax[1].set_ylabel('Difference of R^2')

    ax[0].legend()
    ax[1].legend()

    plt.tight_layout()
    display()


#### Feature Importance Visualization

In [147]:
reg_coef = pd.DataFrame(_elastic_net_coef, columns=[col_name for col_name in column_selector.selected_columns if col_name not in ['price', 'id']])

plt.clf()

fig = plt.figure(figsize=(25, 10))

sns.boxplot(data=reg_coef.abs(), orient='h')

plt.title(f"Distribution of Feature Regression Coefficient", fontsize=30)
plt.ylabel('Feature Name', fontsize=18)
plt.yticks(fontsize=15)
plt.xlabel('Regression Coefficient', fontsize=18)

plt.tight_layout()
display()


#### Insights

1. Rooms clearance indicates the class of a house. Airbnb can dispatch vouchers to cover miscellaneous fee, like cleanning fee to increase retension rate.
2. A flexible arrangement is a big determinent. Guest may pay extra price to buy the offer such as booking travel protection.

## Natrual Language Processing

### Term Frequency in Host Description

In [151]:
nlp_df = load_data('listings.csv')
nlp_df1=nlp_df.select("name","review_scores_rating","description")

In [152]:
nlp_df1.printSchema()
nlp_df1.count(),len(nlp_df1.columns)

In [153]:
from pyspark.sql.types import IntegerType
nlp_df1 = nlp_df1.withColumn("review_scores_rating", nlp_df1["review_scores_rating"].cast(IntegerType()))

In [154]:
nlp_df2=nlp_df1.where(nlp_df1.review_scores_rating == 100)
nlp_df2.printSchema()

In [155]:
nlp_df2.count(),len(nlp_df2.columns)

In [156]:
from langdetect import detect
def language_detection(text):
    try:
        return detect(text)
    except:
        return None
language_udf = udf(language_detection)
nlp_df2 = nlp_df2.withColumn('language',language_udf(nlp_df2['description']))
nlp_df2= nlp_df2.filter(nlp_df2['language']=='en')

In [157]:
from pyspark.ml.feature import RegexTokenizer
tokenizer = RegexTokenizer(minTokenLength=3).setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("description")\
  .setOutputCol("words")
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()

In [158]:
from pyspark.ml.feature import StopWordsRemover
sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered_d")

In [159]:
nlp_pipeline = Pipeline(stages=[tokenizer, sw_filter]).fit(nlp_df2)
nlp_df3 = nlp_pipeline.transform(nlp_df2)

In [160]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer().setInputCol('filtered_d').setOutputCol("tf")
from pyspark.ml.feature import IDF
idf = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')

In [161]:
tfidf_pipeline=Pipeline(stages=[cv,idf]).fit(nlp_df3)
nlp_df4 = tfidf_pipeline.transform(nlp_df3)

words = tfidf_pipeline.stages[0].vocabulary
IDF_values = tfidf_pipeline.stages[1].idf

In [162]:
voca_idf = pd.DataFrame({'word': words, 'IDF': IDF_values})
highest_idf=voca_idf.sort_values('IDF',ascending= False)

In [163]:
highest_idf.head(20)

Unnamed: 0,word,IDF
7405,wowed,6.763885
5378,lover,6.763885
5352,intereaction,6.763885
5351,girlfriend,6.763885
5350,pallet,6.763885
5349,eliminate,6.763885
5348,slates,6.763885
5347,das,6.763885
5346,xiii,6.763885
5345,neighborho,6.763885


In [164]:
from wordcloud import WordCloud
wordcloudConvertDF =voca_idf.set_index('word').T.to_dict('records')
wordcloud = WordCloud(width=800, height=500, random_state=21, max_font_size=100, relative_scaling=0.5, colormap='Dark2').generate_from_frequencies(dict(*wordcloudConvertDF))
plt.figure(figsize=(14, 10))    
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
display(plt.show())

### Comments sentiment ananlysis

In [166]:
review_df=df_reviews
review_df.toPandas().head()
review_df = review_df.dropna()

In [167]:
#This cell we randomly extract 4500 comments as sample to analyze instead of whole dataset to decrease the time, we can get the whole data result without running this cell
review_df=review_df.sample(False, 0.1, seed=21).limit(5000)

In [168]:
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')
analyzer = SentimentIntensityAnalyzer()

def print_sentiment_scores(sentence):
    snt = analyzer.polarity_scores(sentence)
    print("{:-<40} {}".format(sentence, str(snt)))
def compound_score(text):
    compound_value = analyzer.polarity_scores(text)['compound']
   
    return compound_value

In [169]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
score_udf = udf(compound_score,FloatType())
review_df = review_df.withColumn('sentiment_compound',score_udf(review_df['comments']))

In [170]:
from langdetect import detect
def language_detection(text):
    try:
        return detect(text)
    except:
        return None
      
language_udf = udf(language_detection)
review_df = review_df.withColumn('language',language_udf(review_df['comments']))
review_df = review_df.filter(review_df['language']=='en')

In [171]:
from pyspark.ml.feature import RegexTokenizer
tokenizer2 = RegexTokenizer(minTokenLength=3).setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("comments")\
  .setOutputCol("words")
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
from pyspark.ml.feature import StopWordsRemover
sw_filter2 = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered_com")
from pyspark.ml.feature import CountVectorizer
cv2 = CountVectorizer().setInputCol('filtered_com').setOutputCol("tf")
from pyspark.ml.feature import IDF
idf2 = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')

In [172]:
nlp_pipeline2 = Pipeline(stages=[tokenizer2, sw_filter2, cv2, idf2]).fit(review_df)
review_df1 = nlp_pipeline2.transform(review_df).cache()

In [173]:
review_df2 = review_df1.select('listing_id','reviewer_id','filtered_com','comments','sentiment_compound','tf','tfidf')

In [174]:
review=review_df2.select(fn.when(fn.col('sentiment_compound') > 0,1).otherwise(0).alias("score"),
                                'listing_id','comments')

In [175]:
review.groupby('score').count().show()

In [176]:
reviewp = review.filter(review['score']=='1')
reviewn= review.filter(review['score']=='0')
reviewp=reviewp.sample(False, 0.1, seed=0).limit(100)

In [177]:
reviewc = reviewp.union(reviewn)

In [178]:
from pyspark.ml.classification import LogisticRegression
training_df, validation_df, testing_df = reviewc.randomSplit([0.5, 0.3, 0.2], seed=0)

In [179]:
lr = LogisticRegression().\
    setLabelCol('score').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)
lr_pipeline = Pipeline(stages=[nlp_pipeline2, lr]).fit(training_df)

In [180]:
lr_pipeline.transform(validation_df).\
    select(fn.expr('float(prediction = score)').alias('correct')).\
    select(fn.avg('correct')).show()

### Sentiment Prediction Model tuning

In [182]:
vocabulary = nlp_pipeline2.stages[2].vocabulary
weights = lr_pipeline.stages[-1].coefficients.toArray()

In [183]:
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})
coeffs_df.head()

Unnamed: 0,word,weight
0,great,-13.346881
1,stay,1.281599
2,place,2.479747
3,seattle,-0.362522
4,location,1.599328


In [184]:
coeffs_df.sort_values('weight').head(5)

Unnamed: 0,word,weight
0,great,-13.346881
868,exceptional,-2.64826
981,imagine,-2.628234
273,worked,-2.421334
1556,unbeatable,-2.352203


In [185]:
coeffs_df.sort_values('weight', ascending=False).head(5)

Unnamed: 0,word,weight
550,totally,4.56831
78,excellent,4.526391
368,kids,3.709811
50,amazing,3.562918
365,value,3.047549


It seems a lot of noise in this model and the model is overfiiting cuz these words don't make sense

In [187]:
lambda_par = 0.02
alpha_par = 0.3
en_lr = LogisticRegression().\
        setLabelCol('score').\
        setFeaturesCol('tfidf').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)

In [188]:
en_lr_estimator = Pipeline(
    stages=[tokenizer2, sw_filter2, cv2, idf2, en_lr])

In [189]:
en_lr_pipeline = en_lr_estimator.fit(training_df)
en_lr_pipeline.transform(validation_df).select(fn.avg(fn.expr('float(prediction = score)'))).show()

In [190]:
en_weights = en_lr_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': en_lr_pipeline.stages[2].vocabulary, 'weight': en_weights})

In [191]:
en_coeffs_df.sort_values('weight').head(15)

Unnamed: 0,word,weight
8,days,-0.348207
1010,exceptional,-0.206929
192,dirty,-0.202595
355,want,-0.187097
293,university,-0.15797
25,night,-0.157639
12,automated,-0.154275
201,convention,-0.151671
320,expectations,-0.148704
354,free,-0.143691


In [192]:
en_coeffs_df.sort_values('weight', ascending=False).head(15)

Unnamed: 0,word,weight
3,great,1.053419
202,love,0.810531
19,comfortable,0.573321
50,accommodating,0.540496
297,jane,0.508132
42,definitely,0.491557
20,space,0.488945
850,shopping,0.483562
87,loved,0.45415
35,amazing,0.450088


In [193]:
en_coeffs_df.query('weight == 0.0').shape

In [194]:
en_coeffs_df.query('weight == 0.0').shape[0]/en_coeffs_df.shape[0]

In [195]:
en_coeffs_df.query('weight == 0.0').head(15)

Unnamed: 0,word,weight
2,seattle,0.0
5,location,0.0
6,house,0.0
10,just,0.0
15,apartment,0.0
16,close,0.0
17,quiet,0.0
18,downtown,0.0
23,clean,0.0
27,walk,0.0


In [196]:
from pyspark.ml.tuning import ParamGridBuilder
grid = ParamGridBuilder().\
    addGrid(en_lr.regParam, [0., 0.01, 0.02]).\
    addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]).\
    build()

In [197]:
all_models = []
for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = en_lr_estimator.fit(training_df, grid[j])
    all_models.append(model)

In [198]:
accuracies = [m.\
    transform(validation_df).\
    select(fn.avg(fn.expr('float(score = prediction)')).alias('accuracy')).\
    first().\
    accuracy for m in all_models]

In [199]:
best_model_idx = np.argmax(accuracies)
print("best model index =", best_model_idx)

In [200]:
grid[best_model_idx]

In [201]:
best_model = all_models[best_model_idx]

In [202]:
best_model.\
    transform(testing_df).\
    select(fn.avg(fn.expr('float(score = prediction)')).alias('accuracy')).\
    show()

In [203]:
most_living.show()

## Recommend System

### Based on Users (The score of reviewers)

This system used collaborative filtering to help users get which houses they could live according to their past reviews. Spark supports Alternating Least Square(ALS) to factor decomposition which that's only one spark supports, because the purpose of spark is generally for big data, ALS doesn't need the algorithm to calculator all the results, so it would save time and computer calculation.

In [207]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
(training, test) = review_df1.randomSplit([0.7, 0.3])
als = ALS(maxIter=5, regParam=0.01, userCol="reviewer_id", itemCol="listing_id", ratingCol="sentiment_compound",
          coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="sentiment_compound",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 house recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each house
houseRecs = model.recommendForAllItems(10)

In [208]:
userRecs.show()

In [209]:
houseRecs.show()

### Based on House (Description)

Created the data for this system which is based on the description of houses.

In [212]:
df3 = load_data('listings.csv').select(['id','description'])

In [213]:
rec_most_living = most_living.drop('listing_id').join(df3,"id", "left_outer").limit(50)

Built pipeline for model.

In [215]:
from pyspark.ml import clustering
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import PCA


@singleton
class DropNaHandler(Transformer):
  def __init__(self):
    pass

  def _transform(self, df: DataFrame) -> DataFrame:
    return df.dropna()


center = StandardScaler(withMean=True, withStd=False, inputCol='tfidf', outputCol='centered_tfidf')
norm = Normalizer(inputCol="centered_tfidf", outputCol="norm_tfidf", p=2.0)
kmeans = clustering.KMeans(k=10, featuresCol='norm_tfidf', predictionCol='kmeans_feat')
pca = PCA(k=10, inputCol='centered_tfidf', outputCol='scores')
rec_pipeline = Pipeline(stages=[tokenizer, DropNaHandler(), sw_filter, cv, idf, center, norm, kmeans, pca])
pipeline_model = rec_pipeline.fit(rec_most_living)

Define a User Defined Function (UDF) that takes as input two column vectors and returns the distance between them.

In [217]:
from pyspark.sql import types

def l2_dist(c1, c2):    
    return float(np.sqrt((c1 - c2).T.dot((c1 - c2))))

l2_dist_udf = fn.udf(l2_dist, types.FloatType())

Built a function

In [219]:
def recommemd(id,num):
  print(f'There are {num} houses in Airbnb you could like except {id}')

  result = pipeline_model.transform(rec_most_living).\
        where(rec_most_living.id == id).\
        select(fn.col("scores").alias('rec_scores')).\
        join(pipeline_model.transform(rec_most_living)).\
        withColumn('distance', l2_dist_udf('scores', 'rec_scores')).\
        select("id","host_name", "listing_url","description", "distance").\
        orderBy(fn.asc("distance")).\
        limit(num+1).show()
  return result

Test this recommender system

In [221]:
recommemd("5259194",5)