In [0]:
from pyspark.sql.functions import regexp_replace, col, lit, monotonically_increasing_id, when
from pyspark.sql.types import DateType, StructType, StructField, StringType, IntegerType, LongType, FloatType, BooleanType, ArrayType
from datetime import datetime
import re
from sklearn.impute import KNNImputer
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
from pyspark.ml.feature import Imputer

In [0]:
class PCA:
    def __init__(self, k):
        self.k = k
        self.result = None

    def fit(self, X):
        mean = np.mean(X, axis=0)
        centered_X = X - mean
        cov_mat = np.cov(centered_X, rowvar=False, bias=True)
        w, v = np.linalg.eig(cov_mat)
        idx = np.argsort(w)[::-1][:self.k]
        w = w[idx]
        v = v[:, idx]
        v = -v
        self.result = v
        res = 'PCA(n_components=' + str(self.k) + ')'

        return res

    def transform(self, X):
        mean = np.mean(X, axis=0)
        centered_X = X - mean
        transformed_X = np.matmul(centered_X, self.result)

        return transformed_X

In [0]:
class MyLogisticRegression:
    def __init__(self, lr = 0.001, n_iters = 1000):
        self.lr = lr
        self.n_iters = n_iters
        self.weights, self.bias = None, None

    def _sigmoid(self, x):
        """
        takes in results of the linear prediction and returns the sigmoid
        """
        return 1 / (1 + np.exp(-x))
    
    def fit(self, X, y):
        """
        takes in the dataset values as X and the classification as y, and updates the coefficients
        """
        self.weights = np.zeros(X.shape[1])
        self.bias = 0

        for _ in range(self.n_iters):
            linear_pred = np.dot(X, self.weights) + self.bias
            probability = self._sigmoid(linear_pred)
            
            # calculationg error
            dw = (1/X.shape[0]) * np.dot(X.T, (probability - y))
            db = (1 / X.shape[0]) * (np.sum(probability - y))

            # updating weights and bias
            self.weights -= self.lr * dw
            self.bias -= self.lr * db

    
    def predict_proba(self, X):
        """
        takes in the dataset values and returns probablity of predictions
        """
        linear_pred = np.dot(X, self.weights) + self.bias
        prediction = self._sigmoid(linear_pred)
        return prediction
    
    def predict(self, X, threshold=0.5):
        """
        takes in the dataset values and threshold, and returns 1 or 0 based on probablity prediction and the threshold
        """
        probabilities = self.predict_proba(X)
        return [1 if i > threshold else 0 for i in probabilities]

In [0]:
def add_column(df, col_vals, fields, cols_to_drop = None):
    """
    takes in a list of column's values and a schema as a list and adds the column to the dataframe
    """
    final_fields = [StructField("id", IntegerType(), True)]
    # prepares the new column(s) and drops the old one
    for field in fields:
        col_name = field[0]
        col_type = field[1]
        if cols_to_drop is not None:
            if type(cols_to_drop) is list:
                for col_to_drop in cols_to_drop:
                    df = df.drop(col_to_drop)
            else:
                df = df.drop(cols_to_drop)
        else:
            df = df.drop(col_name)
        final_fields.append(StructField(col_name, col_type, True))
    
    schema = StructType(fields=final_fields)
    temp_df = spark.createDataFrame(col_vals, schema)
    df = df.join(temp_df, on=["id"], how="inner")
    return df

In [0]:
def add_binary_cols(df, og_col, cols):
    """
    input:
        df - a spark dataframe
        og_col - a name of a column as string
        cols - a list of names of columns we want to add (values from og_col)
    output:
        a new dataframe without og_col, and with columns that are in cols with binary values. (1 - if the name of the column in og_col, else 0)
    """
    fields = [] # for the schema
    col_numbers_dict = {} # dictionary for storing columns indexes, the keys are the column name and the values are the indexes
    j = 1
    for col_name in cols: # each value gets an index
        col_numbers_dict[col_name] = j
        col_name_with_og = og_col + '_' + col_name
        fields.append([col_name_with_og, IntegerType()])
        j+=1

    temp_row = [0 for k in range(len(cols))] # a list of binary values
    vals_dict = df.select("id", og_col).rdd.collectAsMap() # dictionary of ids as keys and the value of the ogirinal column as values
    new_data = [[i, *temp_row] for i in sorted(vals_dict.keys())] # a list of the new rows

    for row_index, curr_row in enumerate(new_data):
        row_id = curr_row[0] # the row's id
        val = vals_dict[row_id] # the row's value
        if val not in col_numbers_dict.keys():
            continue
        col_index = col_numbers_dict[val] # the column's index of the value
        new_data[row_index][col_index] = 1

    return add_column(df, new_data, fields, og_col)


In [0]:
def kmeans(df, cols, k):
    """
    grouping houses by location and returns list of indexes of each group (location)
    """
    rdd_df = df.select("id", *cols).rdd
    centroids = rdd_df.takeSample(False, k, 2023)
    cent_dict = {}
    i = 0
    def dist(x, y):
        """
        takes in a row and dict of centroids and returns the number of centroid with the smallest distance between them
        """
        tmin = 1000000
        final = 0
        for num in y.keys():
            temp = y[num]
            result = 0
            for j in range(1,3):
                result += (x[j] - temp[j])**2
            result = np.sqrt(result)
            if result < tmin:
                tmin = result
                final = num
        return final
    
    def sum_list(x, y):
        result = [0, 0, 0]
        for i in range(1, len(x)):
            result[i] = x[i] + y[i]
        return result

        # creates a dictionary of centroids
    for cent in centroids:
        cent_dict[i] = list(cent)
        i+=1

    rows_by_cents = rdd_df.map(lambda x: [dist(x, cent_dict), list(x)])
    for itr in range(10):
        sum_rows = rows_by_cents.reduceByKey(lambda x,y: sum_list(x,y)) # sum of the columns of each row of each cluster
        count_rows = rows_by_cents.mapValues(lambda x: 1).reduceByKey(lambda x,y: x+y) # counts the number of rows of each cluster
        joined = sum_rows.join(count_rows).collect()
        joined_rdd=sc.parallelize(joined)
        avgs = joined_rdd.mapValues(lambda row: [x/row[1] for x in row[0]]) # contains the number of cluster and the average of the rows
        for cent in avgs.collect(): # updates the centroids dictionary
            cent_num = cent[0]
            cent_value = cent[1]
            cent_dict[cent_num] = cent_value
        rows_by_cents = rdd_df.map(lambda x: [dist(x, cent_dict), list(x)])
    result = []
    for row in rows_by_cents.collect(): # creates a list of the id and the new column
        result.append([row[1][0],row[0]])
                
    return cent_dict

In [0]:
def assign_location(df):
    """
    replaces the "latitude" and "longitude" columns with "location" column, where each location gets a number based on classifiction that was made using kmeans
    input:
        df - a spark dataframe with "latitude" and "longitude" columns
    output:
        final_df - a spark dataframe with "location" column instead of "latitude" and "longitude"
    """
    cent_dict = {0: [0.0, 35.64644872143282, 139.64445828059533], 1: [0.0, 35.7012928578391, 139.6874795486678], 2: [0.0, 35.72620743526037, 139.8375019727198], 3: [0.0, 35.71471221537529, 139.6216737771336], 4: [0.0, 35.69828154103825, 139.71176844587723], 5: [0.0, 35.574037140065975, 139.7253905209628], 6: [0.0, 35.65121075414842, 139.72129169587166], 7: [0.0, 35.74263885066194, 139.71986369007038], 8: [0.0, 35.70728394607018, 139.7877636192585], 9: [0.0, 35.68004331676238, 139.41011747307735]}
    vals_list = df.select("id", "latitude", "longitude").rdd.flatMap(lambda x: [[x.id, x["latitude"], x["longitude"]]]).collect()
    rdd_df = df.select("id", "latitude", "longitude").rdd
    def dist(x, y):
        """
        takes in a row and dict of centroids and returns the number of centroid with the smallest distance between them
        """
        tmin = 1000000
        final = 0
        for num in y.keys():
            temp = y[num]
            result = 0
            for j in range(1,3):
                result += (x[j] - temp[j])**2
            result = np.sqrt(result)
            if result < tmin:
                tmin = result
                final = num
        return final
    rows_by_cents = rdd_df.map(lambda x: [dist(x, cent_dict), x.id])
    vals = sorted(rows_by_cents.flatMap(lambda x: [[x[1], x[0]]]).collect())
    schema = [['location', StringType()]]
    cols_to_drop = ["latitude", "longitude"]
    final_df = add_column(df, vals, schema, cols_to_drop)

    return final_df


    

In [0]:
def percentage_to_number(df, cols):
    """
    input:
        df - spark dataframe
        cols - list of names(string) of columns to change
    output:
        spark dataframe with the values of the columns in cols changed to floats
    """
    cols_to_select = ["id"] + cols
    selected_df = df.select(*cols_to_select)
    vals_list = selected_df.rdd.map(lambda row: [row["id"]] + [row[col] for col in cols]).collect()
    fields = [[col_name, FloatType()] for col_name in cols]
    for i, row in enumerate(vals_list):
        for j, val in enumerate(row):
            if j == 0:
                continue
            elif val == "N/A" or val is None:
                vals_list[i][j] = None
            else:
                new_val = val.replace("%", "")
                vals_list[i][j] = float(new_val)
    return add_column(df, vals_list, fields)

In [0]:
def clean_property_type(df):
    """
    cleans values in the properties column
    input:
        df - a spark dataframe with a "properties" column
    output:
        a spark dataframe with a cleaned "properties" column
    """
    vals_list = df.select("id", "property_type").rdd.flatMap(lambda x: [[x.id, x["property_type"]]]).collect()
    for i, row in enumerate(vals_list):
        val = row[1].lower()
        words = val.split(" ")
        if "in" in words:
            cut_index = words.index("in")
            temp = ' '.join(words[cut_index+1:])
            vals_list[i][1] = temp
        elif "entire" in words:
            cut_index = words.index("entire")
            temp = ' '.join(words[cut_index+1:])
            vals_list[i][1] = temp
        elif "private" in words:
            cut_index = words.index("private")
            temp = ' '.join(words[cut_index+1:])
            vals_list[i][1] = temp
        else:
            vals_list[i][1] = val
    fields = [["property_type", StringType()]]
    return add_column(df, vals_list, fields)

In [0]:
def split_bathroom_col(df):
    """
    input:
        spark dataframe
    output:
        spark dataframe without bathrooms_text column and with 2 more columns (bathrooms_text_private and bathrooms_text_shared)
    """
    fields = [["bathrooms_text_private", FloatType()], ["bathrooms_text_shared", FloatType()]] # for the schema
    vals_dict = df.select("id", "bathrooms_text").rdd.collectAsMap() # dictionary with rows id as keys and bathrooms_text values as values
    temp_row = [0.0, 0.0]
    # col_numbers_dict = {"private": 1, "shared": 2} # dictionary that stores columns indexes, the keys are the column name and the values are the indexes
    new_data = [[i, *temp_row] for i in sorted(vals_dict.keys())] # a list of the new rows

    for row_index, curr_row in enumerate(new_data):
        row_id = curr_row[0] # the row's id
        val = vals_dict[row_id] # the row's value
        if val is None:
            continue
        words = val.lower().split(" ")

        if "shared" in words:
            col_index = 2
        else:
            col_index = 1

        if "half-bath" in words:
            new_data[row_index][col_index] = 0.5
        else:
            new_data[row_index][col_index] = float(words[0])

    return add_column(df, new_data, fields, "bathrooms_text")

In [0]:
class imputer:
    """
    a class for imputing data in a spark dataframe
    """
    def __init__(self, df, schemas, mode):
        """
        input:
            df - spark dataframe
            schemas - list for a schema
            mode - the way the imputer calculates the new values
        """
        self.df = df
        self.schemas = schemas
        self.col = None
        self.coltype = None
        self.mode = mode

    def _mean(self):
        """
        calculates the mean of a column
        """
        col_name = self.col
        col_mean = self.df.select(col_name).filter(col(col_name).isNotNull()).rdd.flatMap(lambda x: [x[col_name]]).mean()
        # result = int(col_mean + 0.5) # calculate the mean of the column and rounds it to the closets number
        # col_list = [row[self.col] for row in col_list]
        # filtered_list = list(filter(None, col_list)) # drop none values
        # result = int(sum(filtered_list)/len(filtered_list)+0.5) # calculate the mean of the column and rounds it to the closets number
        return col_mean
    
    def _median(self):
        """
        calculates the median of a column
        """
        # result = self.df.agg(median(self.col)).toPandas().values[0][0]
        col = self.col
        df = self.df
        col_list = df.select(col).rdd.flatMap(lambda x: [x[col]]).collect()
        filtered_list = sorted(list(filter(None, col_list))) # drops none values and sorts the list
        n = len(filtered_list)
        if n % 2 == 0:
            mid_index = int(n/2)-1
            median = (filtered_list[mid_index]+filtered_list[mid_index+1])/2
        else:
            mid_index = n//2
            median = filtered_list[mid_index]
        return median
    
    def _most_frequent(self):
        """
        returns the most frequent value of a column
        """
        col_list = self.df.select(self.col).rdd.flatMap(lambda x: [x[self.col]]).collect()
        frequency_dict = {}
        for val in col_list:
            if val is None:
                continue
            elif val not in frequency_dict.keys():
                frequency_dict[val] = 1
            else:
                frequency_dict[val] += 1
        most_frequent = max(frequency_dict, key=lambda x: frequency_dict[x])
        return most_frequent

    def transform(self):
        """
        replaces null values with values based on self.mode
        """
        for schema in self.schemas:
            self.col = schema[0]
            self.coltype = schema[1]
            col_name = self.col
            vals_list = self.df.select("id", col_name).rdd.flatMap(lambda x: [[x.id, x[col_name]]]).collect()
            if self.mode == "mean": # replaces null values with the average of values in the chosen column
                if self.coltype == FloatType():
                    mean = self._mean()
                else:
                    mean = int(self._mean()+0.5)
                for i, row in enumerate(vals_list):
                    val = row[1]
                    if val is None:
                        vals_list[i][1] = mean
            elif self.mode == "median": # replaces null values with the median of values in the chosen column
                median = self._median()
                for i, row in enumerate(vals_list):
                    val = row[1]
                    if val is None:
                        vals_list[i][1] = median
            elif self.mode == "most_frequent": # replaces null values with the most frequent value of values in the chosen column
                most_frequent = self._most_frequent()
                for i, row in enumerate(vals_list):
                    val = row[1]
                    if val is None:
                        vals_list[i][1] = most_frequent
            else:
                for i, row in enumerate(vals_list):
                    val = row[1]
                    if val is None:
                        vals_list[i][1] = self.mode
            self.df = add_column(self.df, vals_list, [schema])
        return self.df


In [0]:
def clean_amenities(amenities):
    """
    takes in amenities column's value and returns it clean
    """
    p1= r'\\u\w+'
    p2 = r'\w+\\'
    pattern = f'({p1}|{p2})'
    new_amenities = amenities[1:-1].replace('"', '').replace('  ', ' ').lower()
    new_amenities = re.sub(pattern, '', new_amenities)
    new_amenities = new_amenities.split(", ")
    return new_amenities

def amenities_cols(df):
    """
    input:
        df - spark dataframe
    output:
        spark dataframe without the column 'amenities' and with specific amenities binary columns
    """
    amenities = ["private entrance", "ethernet connection", "body soap", "bidet", "shower gel", "rice maker", "conditioner", "bathtub", "dining table", "freezer", "room-darkening shades", "elevator"]
    fields = [[f'amenities_{col_name.replace("-","_").replace(" ", "_")}', IntegerType()] for col_name in amenities]
    number_of_amenities = len(amenities)
    temp_row = [0 for _ in range(number_of_amenities)]
    amenities_list = df.select("id","amenities").rdd.flatMap(lambda x: [[x.id, clean_amenities(x.amenities)]]).collect()
    new_data = [[row[0], *temp_row] for row in amenities_list]
    col_index_dict = {} # each column gets an index
    j = 1
    for amenity in amenities:
        col_index_dict[amenity] = j
        j+=1
    
    for row_index, row in enumerate(amenities_list):
        row_id = row[0]
        row_amenities = row[1]
        for col in amenities:
            if col in row_amenities:
                col_index = col_index_dict[col]
                new_data[row_index][col_index] = 1

    return add_column(df, new_data, fields, 'amenities')

In [0]:
class preprocessing:
    def __init__(self, df):
        """
        takes in a spark dataframe
        """
        self.df = df


    def analysis_transform(self):
        """
        returns a processed spark dataframe
        """
        self.df = clean_property_type(self.df)
            

        # change the "latitude" and "longitude" columns to "location" based on kmeans distribution
        self.df = assign_location(self.df)

        self.df = split_bathroom_col(self.df)
        

        def days_passed(date):
            """
            takes in date and returns the number of days that passed since it
            """
            if date is None:
                return date
            else:
                date_val = datetime.strptime(date, "%d/%m/%Y")
                current_date = datetime.now()
                time_difference = current_date - date_val
                days_passed = time_difference.days
                return days_passed
                
            # changing the values of date columns to the number of days that passed since the date

        date_cols = ["host_since", "last_review", "first_review"]
        new_data = self.df.select("id", *date_cols).rdd.flatMap(lambda x: [[x.id, days_passed(x["host_since"]), days_passed(x["last_review"]), days_passed(x["first_review"])]]).collect()
        schema = [[col, IntegerType()] for col in date_cols]
        self.df = add_column(self.df, new_data, schema)
                
        # turning percentage columns into floats

        percentage_cols = ["host_response_rate", "host_acceptance_rate"]
        self.df = percentage_to_number(self.df, percentage_cols)

        return self.df
    
    def train_transform(self):
        """
        returns a processed spark dataframe
        """

        binary_cols = [['host_is_superhost',['f', 't']], ['host_identity_verified',['f', 't']]]
        for col in binary_cols:
            og_col = col[0]
            new_cols = col[1]
            self.df = add_binary_cols(self.df, og_col, new_cols)
        # turning amenities with most likely chance to effect the price into binary columns

        # split "location" column to binary cols in order to avoid logistic regression thinking there is hierarchy

        self.df = add_binary_cols(self.df, 'location', [str(num) for num in range(10)])

        self.df = amenities_cols(self.df)


        schemas = [['review_scores_rating', FloatType()], ['review_scores_accuracy', FloatType()], ['review_scores_cleanliness', FloatType()], ['review_scores_value', FloatType()]]
        mean_imputer = imputer(self.df, schemas, "mean")
        self.df = mean_imputer.transform()

        selected_columns = [['review_scores_checkin', FloatType()], ['review_scores_communication', FloatType()], ['review_scores_location', FloatType()]]
        median_imputer = imputer(self.df, schemas, "median")
        self.df = median_imputer.transform()

        columns_to_drop = ['property_type','room_type','host_has_profile_pic','bathrooms_text','host_response_rate', 'host_acceptance_rate', 'amenities', 'host_id','host_response_time','host_verifications','longitude', 'latitude', 'minimum_nights','maximum_nights', 'minimum_minimum_nights','maximum_minimum_nights', 'minimum_maximum_nights', 'minimum_nights_avg_ntm','maximum_nights_avg_ntm','has_availability','availability_30', 'availability_60', 'availability_90', 'availability_365', 'number_of_reviews','number_of_reviews_ltm','number_of_reviews_l30d', 'license', 'instant_bookable', 'reviews_per_month', 'bathrooms_text_shared', 'bathrooms_text_private', 'maximum_maximum_nights']

        self.df = self.df.drop(*columns_to_drop)
        self.df = self.df.drop('id')


        column_names = self.df.columns
        pandas_df = self.df.toPandas()
        df_values = pandas_df.values.tolist()
        knnimputer = KNNImputer(n_neighbors = 5)
        imputed_data = knnimputer.fit_transform(df_values)
        self.df = spark.createDataFrame(imputed_data.tolist(), column_names)
        return self.df
    
    def test_transform(self):
        """
        returns a processed spark dataframe
        """
        binary_cols = [['host_is_superhost',['f', 't']], ['host_identity_verified',['f', 't']]]
        for col in binary_cols:
            og_col = col[0]
            new_cols = col[1]
            self.df = add_binary_cols(self.df, og_col, new_cols)
        # turning amenities with most likely chance to effect the price into binary columns

        # split "location" column to binary cols in order to avoid logistic regression thinking there is hierarchy

        self.df = add_binary_cols(self.df, 'location', [str(num) for num in range(10)])

        self.df = amenities_cols(self.df)
        


        schemas = [['review_scores_rating', FloatType()], ['review_scores_accuracy', FloatType()], ['review_scores_cleanliness', FloatType()], ['review_scores_value', FloatType()]]
        mean_imputer = imputer(self.df, schemas, "mean")
        self.df = mean_imputer.transform()

        selected_columns = [['review_scores_checkin', FloatType()], ['review_scores_communication', FloatType()], ['review_scores_location', FloatType()]]
        median_imputer = imputer(self.df, schemas, "median")
        self.df = median_imputer.transform()

        columns_to_drop = ['property_type','room_type','host_has_profile_pic','bathrooms_text','host_response_rate', 'host_acceptance_rate', 'amenities','host_id','host_response_time',  'host_verifications','longitude', 'latitude', 'minimum_nights','maximum_nights', 'minimum_minimum_nights', 'maximum_minimum_nights', 'minimum_maximum_nights', 'minimum_nights_avg_ntm','maximum_nights_avg_ntm','has_availability','availability_30', 'availability_60', 'availability_90', 'availability_365', 'number_of_reviews','number_of_reviews_ltm','number_of_reviews_l30d', 'license', 'instant_bookable', 'reviews_per_month', 'bathrooms_text_shared', 'bathrooms_text_private', 'maximum_maximum_nights']

        self.df = self.df.drop(*columns_to_drop)
        self.df = self.df.drop('id')


        column_names = self.df.columns
        pandas_df = self.df.toPandas()
        df_values = pandas_df.values.tolist()
        knnimputer = KNNImputer(n_neighbors = 5)
        imputed_data = knnimputer.fit_transform(df_values)
        self.df = spark.createDataFrame(imputed_data.tolist(), column_names)

        return self.df