# Airbnb Amsterdam Analysis

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as f

# Importing libraries
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import Bucketizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime

In [3]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').option('escape','"').load('/FileStore/tables/listingsfinal__1_-92842.csv')
df_viz=spark.sql("select * from listingsfinal_csv")


#Vizualizing Data for a better understanding

In [5]:
%sql

select count(*) as Number,neighbourhood_cleansed from listingsfinal_csv group by neighbourhood_cleansed order by Number desc limit 12


Number,neighbourhood_cleansed
3391,De Baarsjes - Oud-West
2477,De Pijp - Rivierenbuurt
2186,Centrum-West
1744,Centrum-Oost
1471,Westerpark
1407,Zuid
1323,Oud-Oost
1152,Bos en Lommer
972,Oostelijk Havengebied - Indische Buurt
609,Oud-Noord


In [6]:
%sql
select count(*) as Number,property_type from listingsfinal_csv group by property_type order by Number desc limit 5


Number,property_type
15227,Apartment
1673,House
677,Townhouse
418,Bed and breakfast
378,Loft


In [7]:
%sql
select count(*) as Number,room_type from listingsfinal_csv group by room_type order by Number desc limit 4


Number,room_type
15579,Entire home/apt
4059,Private room
328,Hotel room
58,Shared room


In [8]:
%sql
select count(*) as Number,accommodates from listingsfinal_csv group by accommodates order by Number desc limit 4


Number,accommodates
10911,2
6071,4
1577,3
499,1


In [9]:
%sql
select count(*) as Number,bathrooms from listingsfinal_csv group by bathrooms order by Number desc limit 5


Number,bathrooms
14633,1.0
3898,1.5
933,2.0
259,2.5
102,3.0


In [10]:
%sql
select count(*) as Number,bedrooms from listingsfinal_csv group by bedrooms order by Number desc limit 3


Number,bedrooms
11877,1
4824,2
1651,3


In [11]:
%sql
select count(*) as Number,beds from listingsfinal_csv group by beds order by Number desc limit 5


Number,beds
10907,1
5047,2
1866,3
1258,4
327,5


In [12]:

df1=sqlContext.read.load("/FileStore/tables/listings.csv",format='com.databricks.spark.csv', header='true', inferSchema='true')

df1 = df1.select("host_since", from_unixtime(unix_timestamp("host_since", 'MM/dd/yyy')).alias('DateFormat'))

from pyspark.sql.functions import month
df1 = df1.withColumn('StartMonth',month(df1.DateFormat))
pdf=df1.toPandas()

ax = pdf.plot.hist(bins=12, alpha=0.5)

#### From the above plot we can see that majority of the new hosts are added in the months of june and july. (SUMMER TIME IN AMSTERDAM)

#Deriving new columns

In [15]:
df1=sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').option('escape','"').load('/FileStore/tables/listings.csv')
#df1.printSchema()
df1 = df1.withColumn('amenities', regexp_replace('amenities', '"', ''))
df1 = df1.withColumn('amenities', regexp_replace('amenities', 'Cable TV', 'TV'))
df1 = df1.withColumn('amenities', regexp_replace('amenities', 'TV,TV', 'TV,'))
df1 = df1.withColumn('amenities', regexp_replace('amenities', 'TV,,', 'TV,'))

 
df1_amenities=df1.select('amenities')

amenity=df1_amenities.withColumn('word', f.explode(f.split(f.col('amenities'), ',')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)
type(amenity)
amenity=amenity.limit(5)

 


from matplotlib import pyplot 
%matplotlib inline
#%matplotlib plot
amenity_list=amenity.collect()
#amenity_list=[list(row) for row in amenity.collect()]
indexes = list(range(len(amenity_list)))

 

#split words and counts to different lists 
values = [r['count'] for r in amenity_list]
labels = [r['word'] for r in amenity_list]

 

#Plotting
bar_width = 0.1

 

pyplot.bar(indexes, values)

 

#add labels
labelidx = [i + bar_width for i in indexes] 
pyplot.xticks(labelidx, labels)
pyplot.show()


#### Firstly, challenge we faced was cleaning amenities column as in one listing we see Wifi and another listing we see "Wifi"(closed with "") but basically both are same. Took time for finding solution to it. Next is count of each amenity. Primarily we made use of UDF for this but we faced issues in saving dataframe and as well with assembler. We solved this by using for loop and this loop took lot of time and for safer side we saved this dataframe so that we could use this in future.

In [17]:
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])
  
def heat(col):
  check=col.split(',')
  if 'Heating' in check:
    return 1
  else: return 0
  
def ess(col):
  check=col.split(',')
  if 'Essentials' in check:
    return 1
  else: return 0
  
def kitchen(col):
  check=col.split(',')
  if 'Kitchen' in check:
    return 1
  else: return 0

 
def smoke(col):
  check=col.split(',')
  if 'Smoke detector' in check:
    return 1
  else: return 0
  
def wifi(col):
  check=col.split(',')
  if 'Wifi' in check:
    return 1
  else: return 0
    
 
 
for i in range(df.count()):
  list_heat.append((heat(getrows(df.select('amenities'),rownums=[i]).collect()[0][0])))
  list_wifi.append((wifi(getrows(df.select('amenities'),rownums=[i]).collect()[0][0])))
  list_ess.append((ess(getrows(df.select('amenities'),rownums=[i]).collect()[0][0])))
  list_kitchen.append((kitchen(getrows(df.select('amenities'),rownums=[i]).collect()[0][0])))
  list_smoke.append((smoke(getrows(df.select('amenities'),rownums=[i]).collect()[0][0])))
  
heat_df=spark.createDataFrame(list_heat, IntegerType())
wifi_df=spark.createDataFrame(list_wifi, IntegerType())
ess_df=spark.createDataFrame(list_ess, IntegerType())
kitchen_df=spark.createDataFrame(list_kitchen, IntegerType())
smoke_df=spark.createDataFrame(list_smoke, IntegerType())
 
df2=heat_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_heat=dfh.toDF(["Heat",'row_id'])

 

df2=wifi_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_wifi=dfh.toDF(["Wifi",'row_id'])

 

df2=ess_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_ess=dfh.toDF(["Essentials",'row_id'])

 

df2=kitchen_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_kitchen=dfh.toDF(["Kitchen",'row_id'])

 

df2=smoke_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_smoke=dfh.toDF(["Smoke Detector",'row_id'])

 


df=df.join(df_heat,on='rowid')
df=df.join(df_wifi,on='rowid')
df=df.join(df_ess,on='rowid')
df=df.join(df_kitchen,on='rowid')
df=df.join(df_smoke,on='rowid')
 


#### Created binary columns for all top 5 amenities. If a listing has Wifi as one of its amenity then Wifi column as 1 else 0 and similar concept for all other columns as well

In [19]:
import math

 

def distance(lat1,lon1,lat2,lon2):
    #lat1, lon1 = origin
    #lat2, lon2 = destination
    radius = 6371 # km

 

    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
        * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c

 

    return d
 
for i in range(df.count()):
  list_airport.append((distance(getrows(df.select('latitude'),rownums=[i]).collect()[0][0]),
  distance(getrows(df.select('longitude'),rownums=[i]).collect()[0][0]),
  52.3105,4.7683)) 
  
  list_citycenter.append((distance(getrows(df.select('latitude'),rownums=[i]).collect()[0][0]),
  distance(getrows(df.select('longitude'),rownums=[i]).collect()[0][0]),
  52.3792,4.8994)) 
  
airport_df=spark.createDataFrame(list_airport, FloatType())
df2=airport_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_airport=dfh.toDF(["Airport",'row_id'])

 

citycenter_df=spark.createDataFrame(list_citycenter, FloatType())
df2=airport_df.rdd.zipWithIndex()
dfh=df2.map(lambda x: (list(x[0]) + [x[1]]))
df_cc=dfh.toDF(["City Center",'row_id'])

 

dfnew=df.join(df_airport,on='rowid')
dfnew=df.join(df_cc,on='rowid')

#### Simialr problem of UDF was faced here as well. Hence, made use of for loop and join and saved dataframe so that we could use further

#Data Cleaning

In [22]:
df.count()

In [23]:
df.printSchema()

In [24]:
df=df.select("accommodates","bathrooms","bedrooms","beds","price","neighbourhood_cleansed","property_type","bed_type","security_deposit","cleaning_fee","guests_included","extra_people","number_of_reviews","room_type","host_is_superhost","host_listings_count","score","Wifi","Heating","Essentials","Kitchen","Smoke detector",'citycentre','airport')

In [25]:

df=df.withColumn('Wifi',col('Wifi').cast('int'))
df=df.withColumn('Heating',col('Heating').cast('int'))
df=df.withColumn('Essentials',col('Essentials').cast('int'))
df=df.withColumn('Kitchen',col('Kitchen').cast('int'))
df=df.withColumn('Smoke detector',col('Smoke detector').cast('int'))
df=df.withColumn('bathrooms',col('bathrooms').cast('int'))
df=df.withColumn('bedrooms',col('bedrooms').cast('int'))
df=df.withColumn('beds',col('beds').cast('int'))
df=df.withColumn('security_deposit',col('security_deposit').cast('double'))
df=df.withColumn('cleaning_fee',col('cleaning_fee').cast('double'))
df=df.withColumn('score',col('score').cast('int'))
df=df.withColumn('price',col('price').cast('int'))

In [26]:
# Findings number of nulls in each column
# reference: https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

#There are around 10000 nulls in security deposit and cleaning fee combined which is around half of our data set. So let us not consider there columns in our model.

In [28]:
df=df.drop('security_deposit','cleaning_fee')
df.count()

In [29]:
#Ref - https://blog.zhaytam.com/2019/07/15/outliers-detection-in-pyspark-2-interquartile-range/
def calculate_bounds(dataframe):
  bounds = {c: dict(zip(["q1", "q3"], dataframe.approxQuantile(c, [0.25, 0.75], 0)))
    for c,d in zip(dataframe.columns, dataframe.dtypes) if d[1] == "int"
    }
  for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)
  return bounds
    

In [30]:
df_price=df.select('price')
calculate_bounds(df_price)

In [31]:
splits = [0.0,314, 1000.0, 3000.0, 5000.0, 7000.0,float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="price", outputCol="bucketedFeatures")
bucketedData = bucketizer.transform(df_price)
bucketedData.groupBy("bucketedFeatures").count().show()

In [32]:
df=df.filter('price <=314')
df.count()

In [33]:
df=df.dropna(how='any')
df.count()

In [34]:
df_DS=df.select("accommodates","bathrooms","bedrooms","beds","price","guests_included","extra_people","number_of_reviews","host_listings_count")

df_DS.describe().toPandas().transpose()


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
accommodates,18614,2.76431718061674,1.1237572030961747,1,16
bathrooms,18614,1.051198022993446,0.2729015838910834,0,8
bedrooms,18614,1.3887933813258837,0.817766715168559,0,12
beds,18614,1.6883528526915226,1.1827376119246986,0,32
price,18614,139.54561083055765,58.84286278054279,9,314
guests_included,18614,1.4228537659825937,0.8140600156778348,1,16
extra_people,18614,11.221929730310519,23.20886665346957,0,280
number_of_reviews,18614,25.236327495433546,52.99942847272882,0.0,821.0
host_listings_count,18614,1.9534758783711186,8.688326054130359,0,9


In [35]:
import six
df_Corr=df.select("accommodates","bathrooms","bedrooms","beds","price","guests_included","extra_people","number_of_reviews","host_listings_count","airport","citycentre")

for i in df_Corr.columns:
    if not( isinstance(df_Corr.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to PRICE for ", i, df_Corr.stat.corr('price',i))

In [36]:
# reference: https://stackoverflow.com/questions/55546467/how-to-plot-correlation-heatmap-when-using-pysparkdatabricks

neighbourhood_indexer = StringIndexer(inputCol='neighbourhood_cleansed',outputCol='neighbourhood_index',handleInvalid='keep')
propertyType_Indexer = StringIndexer(inputCol='property_type',outputCol='property_type_index',handleInvalid='keep')
bedType_Indexer = StringIndexer(inputCol='bed_type',outputCol='bed_type_index',handleInvalid='keep')
roomType_indexer = StringIndexer(inputCol='room_type',outputCol='room_type_index',handleInvalid='keep')
hostissuperhost_indexer = StringIndexer(inputCol='host_is_superhost',outputCol='hostisusperhost_index',handleInvalid='keep')


# Vector assembler is used to create a vector of input features

assembler_allVariables=VectorAssembler(inputCols=['accommodates','bathrooms','bedrooms','beds','number_of_reviews','neighbourhood_index','property_type_index','bed_type_index',
                                       'room_type_index','hostisusperhost_index',"Wifi","Heating","Essentials","Kitchen","Smoke detector",'airport','citycentre','price'],
                            outputCol="features")

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages=[neighbourhood_indexer,propertyType_Indexer,bedType_Indexer,roomType_indexer,hostissuperhost_indexer,assembler_allVariables])

 

fit_pipe = pipe.fit(df)
df_correlated=fit_pipe.transform(df)

 

# Getting the correlation dense matrix
corr = Correlation.corr(df_correlated, "features").collect()[0][0]
# Coverting it to a list so that it can plotted
corrmatrix = corr.toArray().tolist() 
# defining the features data columns since if convert the corr matrix to list we will loose column headers
data_columns=[
 'accommodates','bathrooms','bedrooms','beds','number_of_reviews','neighbourhood_index','property_type_index','bed_type_index',
                                       'room_type_index','hostisusperhost_index',"Wifi","Heating","Essentials","Kitchen","Smoke detector",'airport','citycentre','price']
# We create a dataframe with correlated values list between features and features headers
corrdf=spark.createDataFrame(corrmatrix,data_columns)

# We will plot the correlation matrix with matplot matrix show.

fig, ax = plt.subplots()
# Creating a matrix plot with correlation matrix with max value 1 and min value -1
corr_ax=ax.matshow(corrmatrix,vmax=1,vmin=-1)
# Setting title xlabel , ylabel, xticks and yticks
ax.set_title("Correlation Matrix")
ax.set_xticks(np.arange(len(data_columns)))
ax.set_yticks(np.arange(len(data_columns)))
ax.set_xticklabels(data_columns)
ax.set_yticklabels(data_columns)
plt.gca().xaxis.tick_bottom()
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
plt.tick_params(axis="both", which="both" )
fig.colorbar(corr_ax)

#fig.tight_layout()
display(fig)




In [37]:
from pyspark.sql.functions import lit
from math import log
df=df.withColumn("log_of_price",log10(df.price))



In [38]:
from pyspark.sql.functions import lit
from math import log
df=df.withColumn("log_of_price",log10(df.price))


df1=df.select('price','log_of_price').toPandas()
%matplotlib inline
import matplotlib.pyplot as plt
df1.hist(column='price')
df1.hist(column='log_of_price')


In [39]:

train_data_LR,test_data_LR=df.randomSplit([0.7,0.3],seed=12345)
  
  

#Indexing the Categorical variable
neighbourhood_indexer = StringIndexer(inputCol='neighbourhood_cleansed',outputCol='neighbourhood_index',handleInvalid='keep')
propertyType_Indexer = StringIndexer(inputCol='property_type',outputCol='property_type_index',handleInvalid='keep')
bedType_Indexer = StringIndexer(inputCol='bed_type',outputCol='bed_type_index',handleInvalid='keep')
roomType_indexer = StringIndexer(inputCol='room_type',outputCol='room_type_index',handleInvalid='keep')
hostissuperhost_indexer = StringIndexer(inputCol='host_is_superhost',outputCol='hostisusperhost_index',handleInvalid='keep')


# Vector assembler is used to create a vector of input features

assembler_allVariables=VectorAssembler(inputCols=['accommodates','bathrooms','bedrooms','beds','number_of_reviews','neighbourhood_index','property_type_index','bed_type_index',
                                       'room_type_index','hostisusperhost_index',"Wifi","Heating","Essentials","Kitchen","Smoke detector",'airport','citycentre'],
                            outputCol="features")

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages=[neighbourhood_indexer,propertyType_Indexer,bedType_Indexer,roomType_indexer,hostissuperhost_indexer,assembler_allVariables])

fitted_pipe=pipe.fit(train_data_LR)

train_data_LR=fitted_pipe.transform(train_data_LR)

# Create an object for the Linear Regression model

lr_model = LinearRegression(labelCol='log_of_price')

# Fit the model on the train data

fit_model = lr_model.fit(train_data_LR.select(['features','log_of_price']))

# Transform the test data using the model to predict the duration

test_data_LR=fitted_pipe.transform(test_data_LR)

# Store the results in a dataframe

results = fit_model.transform(test_data_LR)

results.select(['log_of_price','prediction']).show()

In [40]:
test_results = fit_model.evaluate(test_data_LR)
test_results.residuals.show()

In [41]:
test_results.rootMeanSquaredError

### Here RMSE is for Log of Price, so for price the Root mean squared error will be exponential of the above value which is 15%. It means that typically our original values deviate by 15% from geometric mean.

In [43]:
test_results.r2

In [44]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(fit_model.coefficients))
print("Intercept: %s" % str(fit_model.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = fit_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

#### From the coefficients we can interpret that if a customer needs WIFI as an amenity then the price of the listing will be 2.5 % more than a listings that does not have WIFI as an amenity.

The same with other amenities as well
2% for Heating
4.7% for essentials

In [46]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
pd_df_lr = results.toPandas()
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.regplot(pd_df_lr['prediction'],pd_df_lr['log_of_price'])
display(plt.show())

In [47]:
train_data_RF,test_data_RF=df.randomSplit([0.7,0.3],seed=12345)



# Creating Random Forest Regressor model object
rf_model=RandomForestRegressor(featuresCol="features",labelCol='log_of_price',maxBins=500)


#Creating the pipeline object with Pipeline stages for transforming the data and applying model
pipe = Pipeline(stages=[neighbourhood_indexer,propertyType_Indexer,bedType_Indexer,roomType_indexer,hostissuperhost_indexer,assembler_allVariables])

fitted_pipe=pipe.fit(train_data_RF)

train_data_RF=fitted_pipe.transform(train_data_RF)

# Fit the model on the train data

fit_model = rf_model.fit(train_data_RF.select(['features','log_of_price']))

# Transform the test data using the model to predict the duration

test_data_RF=fitted_pipe.transform(test_data_RF)

# Store the results in a dataframe

results = fit_model.transform(test_data_RF)

results.select(['log_of_price','prediction']).show()



In [48]:
# Evaluating Decision Tree regression model based on the Regression Evaluator.
evaluate = RegressionEvaluator(
    labelCol="log_of_price", predictionCol="prediction", metricName="rmse")

# Calculating the root mean square value for the model.
rmse = evaluate.evaluate(results)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [49]:
# Creating a another Regression Evaluator object for calculating r2 score.
evaluate1 = RegressionEvaluator(
    labelCol="log_of_price", predictionCol="prediction", metricName="r2")

r2 = evaluate1.evaluate(results)
print("R Squared (R2) on test data = %g" % r2)

In [50]:



import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
pd_df_lr = results.toPandas()
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.regplot(pd_df_rf['prediction'],pd_df_rf['log_of_price'])
display(plt.show())