Document: [PySpark API](https://spark.apache.org/docs/latest/api/python/index.html)

In [None]:
%matplotlib inline

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import explode
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import OneVsRest
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


## Load Data from PIO

In [None]:
from pypio.utils import new_string_array
train_event_df = p_event_store.find('HousePrices', event_names=new_string_array(['train'], sc._gateway))

In [None]:
train_event_df.show(5)

In [None]:
def get_data_df(df):
    int_fields = ["MSSubClass","LotFrontage","LotArea","OverallQual","OverallCond","YearBuilt","YearRemodAdd","MasVnrArea","BsmtFinSF1","BsmtFinSF2","BsmtUnfSF","TotalBsmtSF","1stFlrSF","2ndFlrSF","LowQualFinSF","GrLivArea","BsmtFullBath","BsmtHalfBath","FullBath","HalfBath","BedroomAbvGr","KitchenAbvGr","TotRmsAbvGrd","Fireplaces","GarageYrBlt","GarageCars","GarageArea","WoodDeckSF","OpenPorchSF","EnclosedPorch","3SsnPorch","ScreenPorch","PoolArea","MiscVal","MoSold","YrSold","SalePrice"]

    def get_field_type(name):
        if name in int_fields:
            return 'integer'
        else:
            return 'string'

    field_names = (event_df
                .select(explode("fields"))
                .select("key")
                .distinct()
                .rdd.flatMap(lambda x: x)
                .collect())
    field_names.sort()
    exprs = [col("fields").getItem(k).cast(get_field_type(k)).alias(k) for k in field_names]
    return df.select(*exprs)

train_data_df = get_data_df(train_event_df)

In [None]:
train_data_df.show(1)

## Data Exploration
from https://www.kaggle.com/pmarcelino/comprehensive-data-exploration-with-python

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from scipy.stats import norm
from sklearn.preprocessing import StandardScaler
from scipy import stats

In [None]:
df_train = train_data_df.toPandas()

In [None]:
df_train.columns

In [None]:
#descriptive statistics summary
df_train['SalePrice'].describe()

In [None]:
#histogram
sns.distplot(df_train['SalePrice']);

In [None]:
#skewness and kurtosis
print("Skewness: %f" % df_train['SalePrice'].skew())
print("Kurtosis: %f" % df_train['SalePrice'].kurt())

In [None]:
#scatter plot grlivarea/saleprice
var = 'GrLivArea'
data = pd.concat([df_train['SalePrice'], df_train[var]], axis=1)
data.plot.scatter(x=var, y='SalePrice', ylim=(0,800000));

In [None]:
#scatter plot totalbsmtsf/saleprice
var = 'TotalBsmtSF'
data = pd.concat([df_train['SalePrice'], df_train[var]], axis=1)
data.plot.scatter(x=var, y='SalePrice', ylim=(0,800000));

In [None]:
#box plot overallqual/saleprice
var = 'OverallQual'
data = pd.concat([df_train['SalePrice'], df_train[var]], axis=1)
f, ax = plt.subplots(figsize=(8, 6))
fig = sns.boxplot(x=var, y="SalePrice", data=data)
fig.axis(ymin=0, ymax=800000);

In [None]:
var = 'YearBuilt'
data = pd.concat([df_train['SalePrice'], df_train[var]], axis=1)
f, ax = plt.subplots(figsize=(16, 8))
fig = sns.boxplot(x=var, y="SalePrice", data=data)
fig.axis(ymin=0, ymax=800000);
plt.xticks(rotation=90);

In [None]:
#correlation matrix
corrmat = df_train.corr()
f, ax = plt.subplots(figsize=(12, 9))
sns.heatmap(corrmat, vmax=.8, square=True);

In [None]:
#saleprice correlation matrix
k = 10 #number of variables for heatmap
cols = corrmat.nlargest(k, 'SalePrice')['SalePrice'].index
cm = np.corrcoef(df_train[cols].values.T)
sns.set(font_scale=1.25)
hm = sns.heatmap(cm, cbar=True, annot=True, square=True, fmt='.2f', annot_kws={'size': 10}, yticklabels=cols.values, xticklabels=cols.values)
plt.show()

In [None]:
#scatterplot
sns.set()
cols = ['SalePrice', 'OverallQual', 'GrLivArea', 'GarageCars', 'TotalBsmtSF', 'FullBath', 'YearBuilt']
sns.pairplot(df_train[cols], size = 2.5)
plt.show();

In [None]:
# TODO null values?
#missing data
total = df_train.isnull().sum().sort_values(ascending=False)
percent = (df_train.isnull().sum()/df_train.isnull().count()).sort_values(ascending=False)
missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
missing_data.head(20)

In [None]:
#histogram and normal probability plot
sns.distplot(df_train['SalePrice'], fit=norm);
fig = plt.figure()
res = stats.probplot(df_train['SalePrice'], plot=plt)

In [None]:
#applying log transformation
df_train['SalePrice'] = np.log(df_train['SalePrice'])

In [None]:
#transformed histogram and normal probability plot
sns.distplot(df_train['SalePrice'], fit=norm);
fig = plt.figure()
res = stats.probplot(df_train['SalePrice'], plot=plt)

In [None]:
#histogram and normal probability plot
sns.distplot(df_train['GrLivArea'], fit=norm);
fig = plt.figure()
res = stats.probplot(df_train['GrLivArea'], plot=plt)

In [None]:
#data transformation
df_train['GrLivArea'] = np.log(df_train['GrLivArea'])

In [None]:
#transformed histogram and normal probability plot
sns.distplot(df_train['GrLivArea'], fit=norm);
fig = plt.figure()
res = stats.probplot(df_train['GrLivArea'], plot=plt)

In [None]:
#dealing with missing data
df_train = df_train.drop((missing_data[missing_data['Total'] > 1]).index,1)
df_train = df_train.drop(df_train.loc[df_train['Electrical'].isnull()].index)
df_train.isnull().sum().max() #just checking that there's no missing data missing...

In [None]:
#standardizing data
saleprice_scaled = StandardScaler().fit_transform(df_train['SalePrice'][:,np.newaxis]);
low_range = saleprice_scaled[saleprice_scaled[:,0].argsort()][:10]
high_range= saleprice_scaled[saleprice_scaled[:,0].argsort()][-10:]
print('outer range (low) of the distribution:')
print(low_range)
print('\nouter range (high) of the distribution:')
print(high_range)

In [None]:
#bivariate analysis saleprice/grlivarea
var = 'GrLivArea'
data = pd.concat([df_train['SalePrice'], df_train[var]], axis=1)
data.plot.scatter(x=var, y='SalePrice', ylim=(0,800000));

In [None]:
# TODO wrong index
#deleting points
df_train.sort_values(by = 'GrLivArea', ascending = False)[:2]
df_train = df_train.drop(df_train[df_train['Id'] == 1299].index)
df_train = df_train.drop(df_train[df_train['Id'] == 524].index)

In [None]:
#bivariate analysis saleprice/grlivarea
var = 'TotalBsmtSF'
data = pd.concat([df_train['SalePrice'], df_train[var]], axis=1)
data.plot.scatter(x=var, y='SalePrice', ylim=(0,800000));

## Pandas

In [None]:
p_data_df = data_df.toPandas()

In [None]:
p_data_df

In [None]:
import matplotlib.pyplot as plt
from pandas.plotting import scatter_matrix
scatter_matrix(p_data_df, diagonal='kde', color='k', alpha=0.3)

plt.show()

## Train and Test

In [None]:
(train_df, test_df) = data_df.randomSplit([0.9, 0.1])


In [None]:
labelIndexer = StringIndexer(inputCol="target", outputCol="label").fit(train_df)

featureAssembler = VectorAssembler(inputCols=[x for x in field_names if x.startswith('attr')],
                                   outputCol="features")
clf = RandomForestClassifier(featuresCol="features", labelCol="label", predictionCol="prediction",
                             probabilityCol="probability", rawPredictionCol="rawPrediction",
                             maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
                             maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
                             impurity="gini", numTrees=20, featureSubsetStrategy="auto",
                             seed=None, subsamplingRate=1.0)
# clf = DecisionTreeClassifier(featuresCol="features", labelCol="label", predictionCol="prediction",
#                              probabilityCol="probability", rawPredictionCol="rawPrediction",
#                              maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
#                              maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10,
#                              impurity="gini", seed=None)
# TODO MultilayerPerceptronClassifier is NPE...
# clf = MultilayerPerceptronClassifier(featuresCol="features", labelCol="label",
#                                      predictionCol="prediction", maxIter=100, tol=1e-6, seed=None,
#                                      layers=None, blockSize=128, stepSize=0.03, solver="l-bfgs",
#                                      initialWeights=None)
# TODO NPE...
# lr = LogisticRegression(featuresCol="features", labelCol="label", predictionCol="prediction",
#                         maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
#                         threshold=0.5, probabilityCol="probability", # thresholds=None,
#                         rawPredictionCol="rawPrediction", standardization=True, weightCol=None,
#                         aggregationDepth=2, family="auto")
# lr = LogisticRegression()
# clf = OneVsRest(classifier=lr)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureAssembler, labelIndexer, clf, labelConverter])


In [None]:
model = pipeline.fit(train_df)


In [None]:
predict_df = model.transform(test_df)


In [None]:
predict_df.select("predictedLabel", "target", "features").show(5)


In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predict_df)
print("Test Error = %g" % (1.0 - accuracy))
