In [None]:
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
spark = SparkSession\
    .builder\
    .appName("SparkETL")\
    .getOrCreate()

In [None]:
df = spark.read.csv("s3://BUCKET_NAME/input/2017_StPaul_MN_Real_Estate.csv", header=True, inferSchema=True)

In [None]:
df.printSchema()

In [None]:
# Select our dependent variable
Y_df = df.select(['SalesClosePrice'])

# Display summary statistics
Y_df.describe().show()

In [None]:
print(df.columns)

## FInd Correlations##

In [None]:
numericcols = [item[0] for item in df.dtypes if (item[1].startswith('int') or item[1].startswith('double'))]
print(numericcols)

In [None]:
numericcols.remove('No.')

In [None]:
numericcols.remove('SalesClosePrice')

In [None]:
print(numericcols)

In [None]:
# Loop to check all columns contained in list
for col in numericcols:
    #print(col)
    #print(df.corr('SalesClosePrice', col))
    print("Column: {} Corr: {} ".format(col, df.corr('SalesClosePrice', col)))

## Get Skewness ##

In [None]:
# Import skewness function
from pyspark.sql.functions import skewness

# Loop to check all columns contained in list
for col in numericcols:
    print("Column: {} Skewness: {} ".format(col, df.agg({col: 'skewness'}).collect()))



## Remove Outliers ##

In [None]:
df.count()

In [None]:
std_dev = df.agg({'SALESCLOSEPRICE': 'stddev'}).collect()[0][0]
mean_val = df.agg({'SALESCLOSEPRICE': 'mean'}).collect()[0][0]

h_bound = mean_val + (3 * std_dev)
l_bound = mean_val - (3 * std_dev)

df_withoutoutliers = df.where((df['SALESCLOSEPRICE'] < h_bound) & (df['SALESCLOSEPRICE'] > l_bound))

In [None]:
df_withoutoutliers.count()

## Scaling ##

In [None]:
def min_max_scaler(df, cols_to_scale):
  # Takes a dataframe and list of columns to minmax scale. Returns a dataframe.
  for col in cols_to_scale:
    # Define min and max values and collect them
    max_days = df.agg({col: 'max'}).collect()[0][0]
    min_days = df.agg({col: 'min'}).collect()[0][0]
    new_column_name = 'scaled_' + col
    # Create a new column based off the scaled data
    df = df.withColumn(new_column_name, 
                      (df[col] - min_days) / (max_days - min_days))
  return df

In [None]:
cols_to_scale = ['DAYSONMARKET']
df_scaled = min_max_scaler(df_withoutoutliers, cols_to_scale)


In [None]:
df_scaled.printSchema()

In [None]:
df_scaled.select(['DAYSONMARKET']).describe().show()

In [None]:
df_scaled.select(['scaled_DAYSONMARKET']).describe().show()

## IMPUTATION OF MISSING VALUES ##

In [None]:
col_mean = df_scaled.agg({'scaled_DAYSONMARKET': 'mean'}).collect()[0][0]
df_scaled = df_scaled.fillna(col_mean, subset=['scaled_DAYSONMARKET'])

In [None]:
df_scaled.select(['scaled_DAYSONMARKET']).describe().show()

## LEFT SKEW ##

In [None]:
df.agg({'SALESCLOSEPRICE': 'skewness'}).collect()

In [None]:
from pyspark.sql.functions import log
df = df.withColumn("log_SalesClosePrice", log(df['SALESCLOSEPRICE']))

In [None]:
df.agg({'log_SalesClosePrice': 'skewness'}).collect()

## RIGHT SKEW ##

In [None]:
# Compute the skewness
print(df.agg({'YEARBUILT': 'skewness'}).collect())

# Calculate the max year
max_year = df.agg({'YEARBUILT': 'max'}).collect()[0][0]

# Create a new column of reflected data
df = df.withColumn('Reflect_YearBuilt', (max_year + 1) - df['YEARBUILT'])

# Create a new column based reflected data
df = df.withColumn('adj_yearbuilt', 1 / log(df['Reflect_YearBuilt']))

In [None]:
print(df.agg({'adj_yearbuilt': 'skewness'}).collect())