In [1]:
## spark imports
import random
random.seed(42)

from pyspark import SparkContext
from pyspark.sql import SQLContext

from pyspark.sql import Row, SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.functions import col, split, mean, count, isnan, when, udf, abs, sqrt, max
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import CountVectorizer, VectorIndexer, VectorAssembler,StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline, feature
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Import AirBnB dataset
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM listings_csv")

#Drop rows with data quality issues - Hide if issues no longer exist in data import
#df = df.filter(col('property_type').isin(['Apartment', 'House', 'Condominium', 'Townhouse', 'Guest Suite', 'Bungalow', 'Loft', 'Serviced apartment', 'Bed and breakfast', 'Guesthouse', 'Villa', 'Boutique hotel', 'Hostel' ])) 

# Specify list of columns that need to be deleted as they won't impact the price to be predicted
columns_to_drop = ['id','listing_url', 'scrape_id', 'last_scraped','name', 'summary', 'space', 'description', 'experiences_offered',
                  'neighborhood_overview', 'notes', 'transit', 'access', 'interaction', 'thumbnail_url','medium_url', 'picture_url',
                  'xl_picture_url', 'host_id', 'host_url', 'host_name', 'host_since', 'host_location', 'square_feet', 'host_about', 'host_response_time',
                  'host_response_rate', 'host_acceptance_rate', 'host_is_superhost', 'host_thumbnail_url', 'host_picture_url', 'host_listings_count',
                  'host_total_listings_count','host_verifications', 'host_has_profile_pic', 'host_identity_verified','street', 'neighbourhood_group_cleansed',
                  'city','state','market', 'smart_location','country_code','country','latitude', 'longitude', 'is_location_exact','weekly_price',
                  'monthly_price','minimum_nights', 'maximum_nights', 'minimum_minimum_nights', 'maximum_minimum_nights', 'minimum_maximum_nights',
                  'maximum_maximum_nights', 'calendar_updated', 'has_availability','availability_30','availability_60','availability_90', 
                  'availability_365','calendar_last_scraped', 'number_of_reviews', 'number_of_reviews_ltm', 'first_review', 'last_review',
                  'review_scores_rating','review_scores_accuracy','review_scores_cleanliness', 'review_scores_checkin', 'review_scores_communication',
                  'review_Scores_location', 'review_Scores_value', 'license', 'jurisdiction_names',  'requires_guest_profile',
                  'requires_guest_phone_verification','calculated_host_listings_count', 'calculated_host_listings_count_entire_homes','calculated_host_listings_count_private_rooms',
                  'calculated_host_listings_count_shared_rooms','reviews_per_month', 'host_neighbourhood', 'neighbourhood', 'zipcode','requires_license',
                   'is_business_travel_ready', 'require_guest_profile_picture', 'require_guest_phone_verification', 'extra_people', 'minimum_nights_avg_ntm','maximum_nights_avg_ntm']

#Drop columns listed above
data = df.drop(*columns_to_drop)






#Import Neighbourhood Dataset ie Wellbeing dataset with education and econominc indicators of neighbourhood population
neighbourhood_df = sqlContext.sql("SELECT * FROM wellbeing_toronto_csv").drop("Combined_Indicators", "Neighbourhood_ID")

# join the listings with the wellbeing neighbourhood data by neighbourhood name
airbnb_df = data.join(neighbourhood_df, data.neighbourhood_cleansed == neighbourhood_df.Neighbourhood).drop("Neighbourhood", "Neighbourhood_ID")




#Import MCI data set and find average number of MCI events occurring in the neighbourhood and obtain ranks (1 - lowest occurances, max - highest occurances)
mci_df = sqlContext.sql("SELECT mci_2014_to_2018_csv.occurrenceyear, mci_2014_to_2018_csv.MCI, LEFT(mci_2014_to_2018_csv.Neighbourhood,POSITION(' (' IN mci_2014_to_2018_csv.Neighbourhood)-1) AS cleaned_Neighbourhood FROM mci_2014_to_2018_csv WHERE mci_2014_to_2018_csv.occurrenceyear >=2014")
mci_df = mci_df.groupBy(["cleaned_Neighbourhood","occurrenceyear"]).agg(count('occurrenceyear')).orderBy(["cleaned_Neighbourhood","occurrenceyear"], ascending = True)
mci_df = mci_df.groupby(["cleaned_Neighbourhood"]).agg(mean('count(occurrenceyear)')).orderBy("avg(count(occurrenceyear))", ascending = False)
mci_df = mci_df.withColumn("rank",dense_rank().over(Window.orderBy("avg(count(occurrenceyear))")))
mci_df = mci_df.withColumn("cleaned_Neighbourhood", regexp_replace('cleaned_Neighbourhood','_','-'))
max_rank = mci_df.agg(max('rank'))
# join the listings with the MCI data by neighbourhood name
airbnb_df = airbnb_df.join(mci_df, airbnb_df.neighbourhood_cleansed == mci_df.cleaned_Neighbourhood).drop('cleaned_Neighbourhood','avg(count(occurrenceyear))')




#Check if data in cell, if no data present return 0 else 1
def true_false_categorization(text):
  if text != None:
    return (1)
  else:
    return(0)
  
  
  
  
  # Recode cells with true or false values to be 1 or 0
def binary_encoding(text):
  if 'f' in text:
    return (0)
  else:
    return (1)

  
  
  
  
  # Replace a characther in a string 
def replace_char(df,column_name,orig_char, new_char):
  return(df.withColumn(column_name, regexp_replace(column_name,orig_char,new_char)))



# Convert data in column to an array so as to use with Count Vectorizer
def convert_col_to_array(df,new_col,orig_col, split_char):
  return(df.withColumn(new_col,split(col(orig_col),split_char)))




def initialize_Count_Vectorizer(input_Col, output_Col):
  return(CountVectorizer(inputCol = input_Col, outputCol = output_Col))





def convert_dtype(df, colname,new_dtype):
  return(df.withColumn(colname, df[colname].cast(new_dtype)))




def categorize_ranks(rank):
  max_rank = 137
  if rank > 0 * max_rank and rank <=0.25 * max_rank:
    return("No-Low")
  elif rank > 0.25*max_rank and rank <=0.5 *max_rank:
    return("Low-Med")
  elif rank >0.5 * max_rank and rank <= 0.75*max_rank:
    return ("Med-High")
  else:
    return ("High")
  
  
  
  
  # Convert True False values into boolean values
udf_binary_encoding = udf(binary_encoding)
airbnb_df = airbnb_df.withColumn('Instant_Bookable',udf_binary_encoding('instant_bookable'))




# String Hashing on Amenities
tokenizer = RegexTokenizer(inputCol="amenities", outputCol="amenitieslist", pattern="\\W+")
tokenized = tokenizer.transform(airbnb_df)

hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="amenitiesFeatures", numFeatures=40)
airbnb_df = hashingTF.transform(tokenized).drop("amenities", "amenitieslist")





udf_recode_true_false = udf(true_false_categorization)
airbnb_df = airbnb_df.withColumn('house_rules', udf_recode_true_false('house_rules')) #Are there any house rules?
airbnb_df = airbnb_df.withColumn('security_deposit', udf_recode_true_false('security_deposit')) # Is a security deposit required




#Drop NAs from Property Type and Bed Type ie these must exist in the listing to be considered
list = ['property_type', 'bed_type']
airbnb_df = airbnb_df.dropna(subset=list)




udf_rank_categorization = udf(categorize_ranks)
#airbnb_df = airbnb_df.withColumn('rank', udf_rank_categorization('rank')) # Is a security deposit required
airbnb_df = airbnb_df.withColumn('rank', udf_rank_categorization('rank'))




#One Hot Encoding

OHE_List= ['property_type', 'room_type', 'bed_type', 'neighbourhood_cleansed', 'cancellation_policy', 'rank']

# Generate OHE for elements in the list above
for element in OHE_List:
  #Remove Spaces from categorical fields to prevent issues with encoding
  airbnb_df = replace_char(airbnb_df, element," ", "_")
  
  #Convert String to Array so that OHE can run (requires an array as an input)
  airbnb_df = convert_col_to_array(airbnb_df, element + "_array", element," ")
  
  #Initialize Count vectorizer
  elementVectorizer = initialize_Count_Vectorizer(element + "_array",element + "_OHE")
  
  #Fit a vectorizer model
  elementVectorizer_model = elementVectorizer.fit(airbnb_df)
  
  #Transform Data
  airbnb_df = elementVectorizer_model.transform(airbnb_df)
  
  #Drop Extraneous Cols
  columns_to_drop = [element, element + "_array"]
  
  #Drop columns listed above
  airbnb_df = airbnb_df.drop(*columns_to_drop)
  
  
  
  
display(airbnb_df)

In [2]:
# Here we load the data and define some variables
import math
import numpy as np

# get rid of $ sign
df = airbnb_df
df = df.withColumn("priceN",expr("substring(price, 2, length(price))"))
df = df.drop("label")
df = df.withColumn("price", df["priceN"].cast(DoubleType()))
df = df.drop("priceN")

# dataframe = spark.table("sp500_csv") # the data table
dataframe = df
# cols = ['Close', 'Volume'] 

# The columns/features of interest
cols = ['price','security_deposit']#,'bedrooms']
data_array = np.array(dataframe.select(cols).toPandas())
# data_array[:,[0, 2]] = data_array[:,[2, 0]]
split = 0.85 # Split train and test data
i_split = int(data_array.shape[0] * split)
data_train = data_array[:i_split,:]
data_test  = data_array[i_split:,:]
len_train  = len(data_train)
len_test   = len(data_test)

seq_len = 10

# Here we normalize the data to train
import numpy as np
def normalise_windows(window_data): 
  
  normalised_data = []
  for window in window_data: 
    normalised_window = []
    for col_i in range(window.shape[1]): 
#       max_p = float(np.amax(window[:, col_i]))
#       if max_p == 0:
#         max_p = 1
      for p in window[:, col_i]:
#         if float(np.amax(window[:, col_i])) == 0:
#           normalised_col = float(p)
#         else:
#         normalised_col = (float(p) / max_p)
#         normalised_col = float(p)
#           oldmax = float(np.amax(window[:, col_i]))
        ii = 0
        while ii is not (-1):
          if not float(window[ii, col_i]) == 0:
            normalised_col = (float(p) / float(window[ii, col_i])-1)
            ii = -1
          else:
            ii +=1
        normalised_window.append(normalised_col)
    normalised_window = np.array(normalised_window).reshape(window.shape[0],2) # reshape and transpose array back into original multidimensional format
    normalised_data.append(normalised_window)
    
  return np.array(normalised_data)



# Here we define functions to get the training/test data

def get_test_data(seq_len):
  
  data_windows = []
  for i in range(len_test - seq_len):
    data_windows.append(data_test[i:i+seq_len,:])
  data_windows = np.array(data_windows).astype(float)
  data_windows = normalise_windows(data_windows)
  x = data_windows[:, :-1,:]
  y = data_windows[:, -1, [0]]
  
  return x,y


def get_train_data(seq_len):
  data_x = []
  data_y = []
  for i in range(len_train - seq_len): 
    windowo = data_train[i:i+seq_len,:]
    window = normalise_windows([windowo])
    xt = window[:,:-1]
    yt = window[:,-1, [0]]
    data_x.append(xt)
    data_y.append(yt)
  data_x = np.array(data_x)[:,0,:,:]
  data_y = np.array(data_y)[:,0,:]
  return data_x, data_y




# functions for training data and test data and generative training
def generate_train_batch(seq_len):#, batch_size)
  
#   batch_size = 128
  i = 0
  while i < (len_train - seq_len):
    x_batch = []
    y_batch = []
    for b in range(batch_size):
      if i >= (len_train - seq_len):
        # stop-condition for a smaller final batch if data doesn't divide evenly
        
        yield np.array(x_batch)[:,0,:,:], np.array(y_batch)[:,0,:]
        i = 0
      windowo = data_train[i:i+seq_len,:]
      window = normalise_windows([windowo])
      xt = window[:,:-1]
      yt = window[:,-1, [0]]    
      x_batch.append(xt)
      y_batch.append(yt)
      i += 1
      
    yield np.array(x_batch)[:,0,:,:], np.array(y_batch)[:,0,:]

In [3]:
# Here we define the model and some function to call the model

import os
import math
import numpy as np
import datetime as dt
from numpy import newaxis
from keras.layers import Dense, Activation, Dropout, LSTM
from keras.models import Sequential, load_model
from keras.callbacks import EarlyStopping, ModelCheckpoint, CSVLogger, LambdaCallback
from keras.optimizers import Adadelta

# Build Model

neurons = 100
dropout_rate = 0.2
# activation =  'linear'
activation = 'tanh'
# activation = 'relu'
input_timesteps = 9
input_dim = 2
save_dir = 'saved_models'
epochs = 1
batch_size = 128

def build_model():

  model = Sequential()
  model.add(LSTM(neurons,input_shape = (input_timesteps, input_dim), return_sequences=True))
  model.add(Dropout(dropout_rate))
  model.add(LSTM(neurons, return_sequences=True))
  model.add(LSTM(neurons, return_sequences=False))
  model.add(Dropout(dropout_rate))
  model.add(Dense(1,activation = activation))
  optimizer = Adadelta(lr=10)
#   optimizer ='adam'
#   optimizer ='sgd'

  model.compile(loss='mse', optimizer=optimizer)
  return model

model = build_model()
  
def train_generator(data_gen):
  steps_per_epoch = math.ceil((len_train - seq_len)/batch_size)
  lambdacall = LambdaCallback(on_batch_begin=lambda batch,logs: print(batch))
  callbacks = [CSVLogger(filename='save', separator=',', append=False),lambdacall]
  model.fit_generator(data_gen,steps_per_epoch=steps_per_epoch,epochs=epochs,callbacks=callbacks,workers=0)

  
def train(x,y):#,epochs,batch_size,save_dir):
  lambdacall = LambdaCallback(on_batch_begin=lambda batch,logs: print(batch))
  callbacks = [CSVLogger(filename='save', separator=',', append=False),lambdacall]
  model.fit(x,y,epochs=epochs,batch_size=batch_size,callbacks=callbacks)
  
def predict(data):
  prediction_seqs = []
  for i in range(int(len(data)/seq_len)):
    curr_frame = data[i*seq_len]
    predicted = []
    for j in range(seq_len):
      predicted.append(model.predict(curr_frame[newaxis,:,:])[0,0])
      curr_frame = curr_frame[1:]
      curr_frame = np.insert(curr_frame, [seq_len-2], predicted[-1], axis=0)
    prediction_seqs.append(predicted)
  prediction_seqs=np.array(prediction_seqs)
  return prediction_seqs


def eval_error(x_data,y_data):  
  evalut = model.evaluate(x=x_data, y=y_data, batch_size=128, verbose=1)
  return evalut



In [4]:
# Run code to train the model
import numpy as np
import matplotlib.pyplot as plt
seq_len = 10
data_gen = generate_train_batch(seq_len) # generate train data
train_generator(data_gen) # training

In [5]:
# Run code to test the model
x_test, y_test = get_test_data() # create the test data
prediction = predict(x_test) # predict the price 
eval_error = eval_error(x_test,y_test) # evaluate the error
print('Evaluation MSE error on Test Data = ', eval_error)
