In [1]:
from pyspark.sql import SparkSession

In [2]:
spark: SparkSession = SparkSession.builder.appName("ExampleApp").getOrCreate()

In [3]:
df = spark.read.parquet("data/2017_StPaul_MN_Real_Estate.parquet")

In [4]:
from pyspark.sql.functions import to_date
df = df.withColumn('offmarketdate', to_date('offmarketdate', 'M/d/yyyy H:mm'))

In [5]:
from datetime import timedelta

def train_test_split_date(df, split_col, test_days=45):
  """Calculate the date to split test and training sets"""
  # Find how many days our data spans
  max_date = df.agg({split_col: 'max'}).collect()[0][0]
  min_date = df.agg({split_col: 'min'}).collect()[0][0]
  # Subtract an integer number of days from the last date in dataset
  split_date = max_date - timedelta(days=test_days)
  return split_date

# Find the date to use in spitting test and train
split_date = train_test_split_date(df, 'offmarketdate')

# Create Sequential Test and Training Sets
train_df = df.where(df['offmarketdate'] < split_date)
test_df = df.where(df['offmarketdate'] >= split_date).where(df['LISTDATE'] <= split_date) 

In [12]:
df = df.withColumn('LISTDATE', to_date('LISTDATE', 'M/d/yyyy H:mm'))

In [13]:
from pyspark.sql.functions import datediff, to_date, lit

split_date = to_date(lit('2017-12-10'))
# Create Sequential Test set
test_df = df.where(df['offmarketdate'] >= split_date).where(df['LISTDATE'] <= split_date)

# Create a copy of DAYSONMARKET to review later
test_df = test_df.withColumn('DAYSONMARKET_Original', test_df['DAYSONMARKET'])

# Recalculate DAYSONMARKET from what we know on our split date
test_df = test_df.withColumn('DAYSONMARKET', datediff(split_date, 'LISTDATE'))

# Review the difference
test_df[['LISTDATE', 'offmarketdate', 'DAYSONMARKET_Original', 'DAYSONMARKET']].show()

+----------+-------------+---------------------+------------+
|  LISTDATE|offmarketdate|DAYSONMARKET_Original|DAYSONMARKET|
+----------+-------------+---------------------+------------+
|2017-10-06|   2018-01-24|                  110|          65|
|2017-09-18|   2017-12-12|                   82|          83|
|2017-11-07|   2017-12-12|                   35|          33|
|2017-10-30|   2017-12-11|                   42|          41|
|2017-07-14|   2017-12-19|                  158|         149|
|2017-10-25|   2017-12-20|                   45|          46|
|2017-12-07|   2017-12-23|                   16|           3|
|2017-11-22|   2017-12-16|                   24|          18|
|2017-10-27|   2017-12-13|                   47|          44|
|2017-09-29|   2017-12-12|                   12|          72|
|2017-11-28|   2017-12-11|                   13|          12|
|2017-09-09|   2018-01-17|                  119|          92|
|2017-11-18|   2017-12-15|                   26|          22|
|2017-12

In [15]:
# # drop column with low observation

# obs_threshold = 30
# cols_to_remove = list()

# # Inspect first 10 binary columns in list
# for col in binary_cols[0:10]:
#   # Count the number of 1 values in the binary column
#   obs_count = df.agg({col:'sum'}).collect()[0][0]
#   # If less than our observation threshold, remove
#   if obs_count <= obs_threshold:
#     cols_to_remove.append(col)
    
# # Drop columns and print starting and ending dataframe shapes
# new_df = df.drop(*cols_to_remove)

# print('Rows: ' + str(df.count()) + ' Columns: ' + str(len(df.columns)))
# print('Rows: ' + str(new_df.count()) + ' Columns: ' + str(len(new_df.columns)))

In [20]:
# Naively Handling Missing and Categorical Values

# # Replace missing values
# df = df.fillna(-1, subset=['WALKSCORE', 'BIKESCORE'])

# # Create list of StringIndexers using list comprehension
# indexers = [StringIndexer(inputCol=col, outputCol=col+"_IDX")\
#             .setHandleInvalid("keep") for col in categorical_cols]
# # Create pipeline of indexers
# indexer_pipeline = Pipeline(stages=indexers)
# # Fit and Transform the pipeline to the original data
# df_indexed = indexer_pipeline.fit(df).transform(df)

# # Clean up redundant columns
# df_indexed = df_indexed.drop(*categorical_cols)
# # Inspect data transformations
# print(df_indexed.dtypes)

In [None]:
# from pyspark.ml.regression import GBTRegressor

# # Train a Gradient Boosted Trees (GBT) model.
# gbt = GBTRegressor(featuresCol="features",
#                            labelCol="SALESCLOSEPRICE",
#                            predictionCol="Prediction_Price",
#                            seed=42
#                            )

# # Train model.
# model = gbt.fit(train_df)

In [None]:
# from pyspark.ml.evaluation import RegressionEvaluator

# # Select columns to compute test error
# evaluator = RegressionEvaluator(labelCol="SALESCLOSEPRICE", 
#                                 predictionCol="Prediction_Price")
# # Dictionary of model predictions to loop over
# models = {'Gradient Boosted Trees': gbt_predictions, 'Random Forest Regression': rfr_predictions}
# for key, preds in models.items():
#   # Create evaluation metrics
#   rmse = evaluator.evaluate(preds, {evaluator.metricName: "rmse"})
#   r2 = evaluator.evaluate(preds, {evaluator.metricName: "r2"})

#   # Print Model Metrics
#   print(key + ' RMSE: ' + str(rmse))
#   print(key + ' R^2: ' + str(r2))

In [None]:
# Interpreting Results
# NOTE: The array of feature importances, importances has already been created for you from model.featureImportances.toArray()

# # Convert feature importances to a pandas column
# fi_df = pd.DataFrame(importances, columns=['importance'])

# # Convert list of feature names to pandas column
# fi_df['feature'] = pd.Series(feature_cols)

# # Sort the data based on feature importance
# fi_df.sort_values(by=['importance'], ascending=False, inplace=True)

# # Inspect Results
# fi_df.head(10)

In [None]:
# # Saving & Loading Models


# from pyspark.ml.regression import RandomForestRegressionModel

# # Save model
# model.save('rfr_no_listprice')

# # Load model
# loaded_model = RandomForestRegressionModel.load('rfr_no_listprice')