In [35]:
import os
import pickle
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder
import joblib
import pandas as pd
import numpy as np

current_working_directory = os.getcwd()

class Model:
  def __init__(self, current_working_directory):
    # list required arguments and their types
    self.__list_args = ['postcode', 'year', 'month', 'property_landed']
    self.__list_argtypes = ['list of str', 'int', 'int', 'list of int']

    # get the current working directory and corresponding filenames
    self.__dict_filename = current_working_directory + '/data/year_postcode_big_dict.pickle'
    self.__colnames_filename = current_working_directory + '/data/colnames.pickle'
    self.__pred_model_filename = current_working_directory + '/data/pred_model.pickle'
    self.__spi_full_data_filename = current_working_directory + '/data/spi_full_data.csv'
    self.__defactorized_price_unit_log_scaler_filename = current_working_directory + '/data/defactorized_price_unit_log.gz'
    self.__lat_scaler_filename = current_working_directory + '/data/lat.gz'
    self.__long_scaler_filename = current_working_directory + '/data/long.gz'
    self.__mrt_500m_exist_scaler_filename = current_working_directory + '/data/mrt_500m_exist.gz'
    self.__mrt_1km_exist_scaler_filename = current_working_directory + '/data/mrt_1km_exist.gz'
    self.__mrt_500m_1y_scaler_filename = current_working_directory + '/data/mrt_500m_1y.gz'
    self.__mrt_1km_1y_scaler_filename = current_working_directory + '/data/mrt_1km_1y.gz'
    self.__mrt_500m_3y_scaler_filename = current_working_directory + '/data/mrt_500m_3y.gz'
    self.__mrt_1km_3y_scaler_filename = current_working_directory + '/data/mrt_1km_3y.gz'
    self.__mrt_500m_5y_scaler_filename = current_working_directory + '/data/mrt_500m_5y.gz'
    self.__mrt_1km_5y_scaler_filename = current_working_directory + '/data/mrt_1km_5y.gz'
    self.__schools_500m_scaler_filename = current_working_directory + '/data/schools_500m.gz'
    self.__schools_1km_scaler_filename = current_working_directory + '/data/schools_1km.gz'
    self.__malls_500m_scaler_filename = current_working_directory + '/data/malls_500m.gz'
    self.__malls_1km_scaler_filename = current_working_directory + '/data/malls_1km.gz'
    self.__onehotencoder_filename = current_working_directory + '/data/onehotencoder.gz'

    # get year-postcode big dictionary
    with open(self.__dict_filename, 'rb') as handle:
      self.__year_postcode_big_dict = pickle.load(handle)

    self.__lines = ("""
        01 ,01, 02, 03, 04, 05, 06
        02 ,07, 08
        03, 14, 15, 16
        04 ,09, 10
        05 ,11, 12, 13
        06 ,17
        07 ,18, 19
        08, 20, 21
        09, 22, 23
        10, 24, 25, 26, 27
        11 ,28, 29, 30
        12 ,31, 32, 33
        13 ,34, 35, 36, 37
        14 ,38, 39, 40, 41
        15 ,42, 43, 44, 45
        16 ,46, 47, 48
        17 ,49, 50, 81
        18 ,51, 52
        19 ,53, 54, 55, 82
        20 ,56, 57
        21 ,58, 59
        22 ,60, 61, 62, 63, 64
        23 ,65, 66, 67, 68
        24 ,69, 70, 71
        25 ,72, 73
        26 ,77, 78
        27 ,75, 76
        28 ,79, 80
        """).strip().split('\n')
    self.__mapping_dict = {}
    for line in self.__lines:
      parts = line.split(',')
      first_value = int(parts[0].strip())  # Convert the first value to integer
      for value in parts[1:]:
        self.__mapping_dict[int(value.strip())] = first_value  # Convert the keys to integers

    # get list of column names in the order they appear in the model's training df
    with open(self.__colnames_filename, 'rb') as file:
      self.__colnames = pickle.load(file)
    pred_var = "defactorized_price_unit_log_norm"
    self.__colnames.remove(pred_var)

    # get predictive model
    with open(self.__pred_model_filename, 'rb') as file:
      self.__pred_model = pickle.load(file)

    # get spi full data
    self.__spi_full_data = pd.read_csv(self.__spi_full_data_filename)

    # get scalers
    self.__defactorized_price_unit_log_scaler = joblib.load(self.__defactorized_price_unit_log_scaler_filename)
    self.__lat_scaler = joblib.load(self.__lat_scaler_filename)
    self.__long_scaler = joblib.load(self.__long_scaler_filename)
    self.__mrt_500m_exist_scaler = joblib.load(self.__mrt_500m_exist_scaler_filename)
    self.__mrt_1km_exist_scaler = joblib.load(self.__mrt_1km_exist_scaler_filename)
    self.__mrt_500m_1y_scaler = joblib.load(self.__mrt_500m_1y_scaler_filename)
    self.__mrt_1km_1y_scaler = joblib.load(self.__mrt_1km_1y_scaler_filename)
    self.__mrt_500m_3y_scaler = joblib.load(self.__mrt_500m_3y_scaler_filename)
    self.__mrt_1km_3y_scaler = joblib.load(self.__mrt_1km_3y_scaler_filename)
    self.__mrt_500m_5y_scaler = joblib.load(self.__mrt_500m_5y_scaler_filename)
    self.__mrt_1km_5y_scaler = joblib.load(self.__mrt_1km_5y_scaler_filename)
    self.__schools_500m_scaler = joblib.load(self.__schools_500m_scaler_filename)
    self.__schools_1km_scaler = joblib.load(self.__schools_1km_scaler_filename)
    self.__malls_500m_scaler = joblib.load(self.__malls_500m_scaler_filename)
    self.__malls_1km_scaler = joblib.load(self.__malls_1km_scaler_filename)

    # get one-hot encoder
    self.__onehotencoder = joblib.load(self.__onehotencoder_filename)

  def __get_info(self, postcodes, year, col_name, year_postcode_big_dict):
    ### gets information (latitude, longitude, postal district, market segment, facility no) for each postcode and a given year from
    ### pre-computed dictionary containing info by given year and postcode
    postcode_dict = self.__year_postcode_big_dict.get(year)
    val_dicts = list(map(postcode_dict.get, postcodes))
    vals = list(map(lambda x: x.get(col_name), val_dicts))
    return vals

  def __get_district_from_postal(self, postal_code):
    # gets a postal code as integer or string
    # outputs the postal district (int)
    first_two_digits = int(str(postal_code)[:-4])
    return self.__mapping_dict.get(first_two_digits, "Unknown")

  def __get_post_district(self, postcodes):
    # gets a list of postcodes
    # outputs a list of postal districts (int)
    post_district_list = list(map(lambda x: self.__get_district_from_postal(x), postcodes))
    return post_district_list

  def __get_segment_from_district(self, post_district):
    if post_district in [9, 10, 11]:
      return 'Core Central Region'
    elif post_district < 9 or post_district in [12, 13, 14, 15, 20]:
      return 'Rest of Central Region'
    else:
      return 'Outside Central Region'

  def __get_market_segment(self, post_district_list):
    # gets a list of integers of postal districts
    # outputs a list of market segments
    market_segment_list = list(map(lambda x: self.__get_segment_from_district(x), post_district_list))
    return market_segment_list

  def __scale_var(self, var, var_norm, df, scaler):
    column = df[var].values.reshape(-1, 1)
    normalized_column = scaler.transform(column)
    df = df.drop(var, axis=1)
    return df.assign(**{var_norm: normalized_column})

  def __unscale_var(self, var_norm, var, df, scaler):
    column = df[var_norm].values.reshape(-1, 1)
    unnormalized_column = scaler.inverse_transform(column)
    df = df.drop(var_norm, axis=1)
    return df.assign(**{var: unnormalized_column})

  def __dummify_vars(self, vars, df, encoder):
    df_num = df[df.columns.difference(vars)]
    df_cat = df[vars]
    df_encoded_array = encoder.transform(df_cat)
    categorical_columns = [f'{col}_{cat}' for i, col in enumerate(df_cat.columns) for cat in encoder.categories_[i]]
    df_encoded = pd.DataFrame(df_encoded_array, columns = categorical_columns)
    df = df_num.join(df_encoded)
    return df

  def __exp_var(self, var, var_exp, df):
    column = df[var].values.reshape(-1, 1)
    exp_column = np.exp(column)
    df = df.drop(var, axis=1)
    return df.assign(**{var_exp: exp_column})

  def __multiply_spi(self, price_var, refactorized_price_var, spi_var, df):
    price_column = df[price_var].values.reshape(-1, 1)
    spi_column = df[spi_var].values.reshape(-1, 1)
    refactorized_price_column = np.multiply(price_column, spi_column)
    df = df.drop(price_var, axis=1)
    df = df.drop(spi_var, axis=1)
    return df.assign(**{refactorized_price_var: refactorized_price_column})

  def __time_to_units(self, df, ref_year):
    df['time'] = (df['year'] - ref_year) + (df['month'] * 1/12)
    df = df.drop(columns=['year', 'month'])
    return df

  def __check_num_args(self, args, num_args):
    try:
      if len(args) != num_args:
        raise ValueError()
    except ValueError:
      print('Please input ' + str(num_args) + ' arguments (use required_args method to list the arguments needed).')

  def __check_all_types_correct(self, objs, obj_types, descriptors, obj_type_strings):
    try:
      objs_with_wrong_types = []
      objs_with_wrong_type_strings = []
      for obj, obj_type, descriptor, obj_type_string in zip(objs, obj_types, descriptors, obj_type_strings):
        if (type(obj_type) is list):
          if not (type(obj) in obj_type):
            objs_with_wrong_types.append(descriptor)
            objs_with_wrong_type_strings.append(obj_type_string)
        elif not (type(obj) is obj_type):
          objs_with_wrong_types.append(descriptor)
          objs_with_wrong_type_strings.append(obj_type_string)
      if len(objs_with_wrong_types) > 0:
        raise ValueError()
    except ValueError:
      print('The following objects are of wrong types: ' + ', '.join(objs_with_wrong_types) + '.')
      print('They should be of respective types: ' + ', '.join(objs_with_wrong_type_strings) + '.')

  def __check_all_lengths_equal(self, lsts):
    try:
      it = iter(lsts)
      the_len = len(next(it))
      if not all(len(l) == the_len for l in it):
        raise ValueError()
    except ValueError:
      print('Not all lists have same length.')

  def __check_all_lst_types_correct(self, lsts, lst_types, lst_names, lst_type_strings):
    try:
      lsts_with_wrong_types = []
      lsts_with_wrong_type_strings = []
      for lst, lst_type, lst_name, lst_type_string in zip(lsts, lst_types, lst_names, lst_type_strings):
        if (type(lst_type) is list):
          if not all(type(elt) in lst_type for elt in lst):
            lsts_with_wrong_types.append(lst_name)
            lsts_with_wrong_type_strings.append(lst_type_string)
        elif not all(type(elt) is lst_type for elt in lst):
          lsts_with_wrong_types.append(lst_name)
          lsts_with_wrong_type_strings.append(lst_type_string)
      if len(lsts_with_wrong_types) > 0:
        raise ValueError()
    except ValueError:
      print('The following lists have some elements with wrong types: ' + ', '.join(lsts_with_wrong_types) + '.')
      print('They should contain elements of respective types: ' + ', '.join(lsts_with_wrong_type_strings) + '.')

  def __check_year_correct(self, year, valid_years, valid_years_descriptor):
    try:
      if year not in valid_years:
        raise ValueError()
    except ValueError:
      print('Please input a year in the range ' + valid_years_descriptor + '.')

  def __check_postcodes_exist(self, postcodes, postcode_dict):
    try:
      postcodes_notin_markers = map(lambda x: x not in postcode_dict, postcodes)
      nonexistent_postcodes = []
      for postcode, notin_marker in zip(postcodes, postcodes_notin_markers):
        if notin_marker:
          nonexistent_postcodes.append(postcode)
      if nonexistent_postcodes:
        raise ValueError()
    except ValueError:
      print('The following postcodes are invalid: ' + ', '.join(nonexistent_postcodes) + '.')

  def __check_month_correct(self, month, valid_months, valid_months_descriptor):
    try:
      if month not in valid_months:
        raise ValueError()
    except ValueError:
      print('Please input a month in the range ' + valid_months_descriptor + '.')

  def __check_members(self, lst, valid_members, variable_descriptor):
    try:
      invalid_member_markers = map(lambda x: x not in valid_members, lst)
      invalid_members = []
      for member, invalid_member_marker in zip(lst, invalid_member_markers):
        if invalid_member_marker:
          invalid_members.append(member)
      if invalid_members:
        raise ValueError()
    except ValueError:
      print('The following ' + variable_descriptor + ' are invalid: ' + ', '.join(invalid_members) + '.')
      print('Please input ' + variable_descriptor + ' from the following: ' + ', '.join(valid_members) + '.')

  def __get_predictions(self, postcodes, year, month, property_landed):
    lat = self.__get_info(postcodes, year, "latitude", self.__year_postcode_big_dict)
    long = self.__get_info(postcodes, year, "longitude", self.__year_postcode_big_dict)
    malls_1km = self.__get_info(postcodes, year, "malls_1km", self.__year_postcode_big_dict)
    malls_500m = self.__get_info(postcodes, year, "malls_500m", self.__year_postcode_big_dict)
    mrt_1km_1y = self.__get_info(postcodes, year, "mrt_1km_1y", self.__year_postcode_big_dict)
    mrt_1km_3y = self.__get_info(postcodes, year, "mrt_1km_3y", self.__year_postcode_big_dict)
    mrt_1km_5y = self.__get_info(postcodes, year, "mrt_1km_5y", self.__year_postcode_big_dict)
    mrt_1km_exist = self.__get_info(postcodes, year, "mrt_1km_exist", self.__year_postcode_big_dict)
    mrt_500m_1y = self.__get_info(postcodes, year, "mrt_500m_1y", self.__year_postcode_big_dict)
    mrt_500m_3y = self.__get_info(postcodes, year, "mrt_500m_3y", self.__year_postcode_big_dict)
    mrt_500m_5y = self.__get_info(postcodes, year, "mrt_500m_5y", self.__year_postcode_big_dict)
    mrt_500m_exist = self.__get_info(postcodes, year, "mrt_500m_exist", self.__year_postcode_big_dict)
    schools_1km = self.__get_info(postcodes, year, "schools_1km", self.__year_postcode_big_dict)
    schools_500m = self.__get_info(postcodes, year, "schools_500m", self.__year_postcode_big_dict)
    postal_district = self.__get_post_district(postcodes)
    market_segment = self.__get_market_segment(postal_district)
    length = len(postcodes)
    year = [year] * length
    month = [month] * length
    data = {"lat": lat,
            "long": long,
            "malls_1km": malls_1km,
            "malls_500m": malls_500m,
            "mrt_1km_1y": mrt_1km_1y,
            "mrt_1km_3y": mrt_1km_3y,
            "mrt_1km_5y": mrt_1km_5y,
            "mrt_1km_exist": mrt_1km_exist,
            "mrt_500m_1y": mrt_500m_1y,
            "mrt_500m_3y": mrt_500m_3y,
            "mrt_500m_5y": mrt_500m_5y,
            "mrt_500m_exist": mrt_500m_exist,
            "schools_1km": schools_1km,
            "schools_500m": schools_500m,
            "postal_district": postal_district,
            "property_landed": property_landed,
            "market_segment": market_segment,
            "year": year,
            "month": month}
    df = pd.DataFrame(data)
    df = self.__time_to_units(df, 2019)
    df = self.__scale_var('lat', 'lat_norm', df, self.__lat_scaler)
    df = self.__scale_var('long', 'long_norm', df, self.__long_scaler)
    df = self.__scale_var('mrt_500m_exist', 'mrt_500m_exist_norm', df, self.__mrt_500m_exist_scaler)
    df = self.__scale_var('mrt_1km_exist', 'mrt_1km_exist_norm', df, self.__mrt_1km_exist_scaler)
    df = self.__scale_var('mrt_500m_1y', 'mrt_500m_1y_norm', df, self.__mrt_500m_1y_scaler)
    df = self.__scale_var('mrt_1km_1y', 'mrt_1km_1y_norm', df, self.__mrt_1km_1y_scaler)
    df = self.__scale_var('mrt_500m_3y', 'mrt_500m_3y_norm', df, self.__mrt_500m_3y_scaler)
    df = self.__scale_var('mrt_1km_3y', 'mrt_1km_3y_norm', df, self.__mrt_1km_3y_scaler)
    df = self.__scale_var('mrt_500m_5y', 'mrt_500m_5y_norm', df, self.__mrt_500m_5y_scaler)
    df = self.__scale_var('mrt_1km_5y', 'mrt_1km_5y_norm', df, self.__mrt_1km_5y_scaler)
    df = self.__scale_var('schools_500m', 'schools_500m_norm', df, self.__schools_500m_scaler)
    df = self.__scale_var('schools_1km', 'schools_1km_norm', df, self.__schools_1km_scaler)
    df = self.__scale_var('malls_500m', 'malls_500m_norm', df, self.__malls_500m_scaler)
    df = self.__scale_var('malls_1km', 'malls_1km_norm', df, self.__malls_1km_scaler)
    cat_vars = ['postal_district', 'market_segment']
    df = self.__dummify_vars(cat_vars, df, self.__onehotencoder)
    df = df[self.__colnames]
    defactorized_price_unit_log_norm = self.__pred_model.predict(df)
    df.insert(0, "postcode", postcodes)
    df = df.assign(defactorized_price_unit_log_norm = defactorized_price_unit_log_norm)
    # on defactorized_price_unit_log_norm, do unscale_var -> np.exp -> multiply by appropriate spi values to get prices
    df = self.__unscale_var("defactorized_price_unit_log_norm", "defactorized_price_unit_log", df, self.__defactorized_price_unit_log_scaler)
    df = self.__exp_var("defactorized_price_unit_log", "defactorized_price_unit", df)
    df = df.assign(market_segment = market_segment)
    df = df.assign(year = year)
    df = df.assign(month = month)
    df = pd.merge(df, self.__spi_full_data, how = "left", on = ['year', 'month', 'property_landed', 'market_segment'])
    df = self.__multiply_spi("defactorized_price_unit", "price_unit", "spi", df)
    return df

  def __get_predictions_lean(self, postcodes, year, month, property_landed):
    final_colnames = self.__list_args + ["price_unit"]
    df = self.__get_predictions(postcodes, year, month, property_landed)
    df = df[final_colnames]
    return df

  def predict(self, *args):
    # run checks for valid inputs
    req_objs = ["postcode", "year", "month", "property_landed"]
    num_args = len(req_objs)
    self.__check_num_args(args, num_args)

    postcodes, year, month, property_landed = args
    objs = [postcodes, year, month, property_landed]
    obj_types = [list, int, int, list]
    descriptors = ["postcode", "year", "month", "property_landed"]
    obj_type_strings = ['list', 'int', 'int', 'list']
    self.__check_all_types_correct(objs, obj_types, descriptors, obj_type_strings)

    lsts = [postcodes, property_landed]
    self.__check_all_lengths_equal(lsts)
    lst_types = [str, int]
    lst_names = ["postcode", "property_landed"]
    lst_type_strings = ['str', 'int']
    self.__check_all_lst_types_correct(lsts, lst_types, lst_names, lst_type_strings)

    valid_years = list(range(2019, 2051))
    self.__check_year_correct(year, valid_years, '2019 to 2050')

    postcode_dict = self.__year_postcode_big_dict.get(year)
    self.__check_postcodes_exist(postcodes, postcode_dict)

    valid_months = list(range(1, 13))
    valid_months_descriptor = '1 to 12'
    self.__check_month_correct(month, valid_months, valid_months_descriptor)

    # add check for property_landed

    return self.__get_predictions_lean(postcodes, year, month, property_landed)

  def required_args(self):
    print('The required arguments are: ' + ', '.join(list(map(lambda x: x[0] + ' (' + x[1] + ')', zip(self.__list_args, self.__list_argtypes)))) + '.')

In [36]:
print("To use the model, create an instance of Model with the path of the current working directory (a string) as argument, where the required model data files are in a subdirectory named data. Then use the predict method in this Model instance to produce predictions (in a DataFrame) for the given inputs. Use the required_args method to print the arguments required for the predict method.")


To use the model, create an instance of Model with the path of the current working directory (a string) as argument, where the required model data files are in a subdirectory named data. Then use the predict method in this Model instance to produce predictions (in a DataFrame) for the given inputs. Use the required_args method to print the arguments required for the predict method.
