In [1]:
import pandas as pd
import numpy as np
import os 
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.colors import LinearSegmentedColormap
import plotly.graph_objects as go
import plotly.io as pio
import pyspark

In [2]:
from pyspark.shell import spark

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Python version 3.9.12 (main, Apr  4 2022 05:22:27)
Spark context Web UI available at http://10.67.197.115:4040
Spark context available as 'sc' (master = local[*], app id = local-1701399491493).
SparkSession available as 'spark'.


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import QuantileDiscretizer, StringIndexer, OneHotEncoder, StandardScaler, RobustScaler
from pyspark.ml.feature import PolynomialExpansion, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [4]:
PATH_LOCAL = 'datasets/'
PATH_REMOTE = '/datasets/'                    
CR = '\n'                                    
RANDOM_STATE = RANDOM_SEED = RS = 77          
TEST_FRAC = 0.2                              
N_TRIALS = 10                                 
N_CV = 5                                      
MAX_ITER = 1000                              
DEGREE_POLYNOMIAL = 5                        

In [5]:
def custom_read_spark(spark_session, file_name, file_format='csv', separator=',', header='true'):

    path_local = f'{PATH_LOCAL}{file_name}'
    path_remote = f'{PATH_REMOTE}{file_name}'
    
    if os.path.exists(path_local):
        return spark_session.read.load(
                                       path_local,
                                       inferSchema=True,
                                       format=file_format,
                                       sep=separator,
                                       header=header,
                                      )

    elif os.path.exists(path_remote):
        return spark_session.read.load(
                                       path_remote,
                                       inferSchema=True,
                                       format=file_format,
                                       sep=separator,
                                       header=header,
                                      )

    else:
        print(f'File "{file_name}" not found at the specified path.')

In [6]:
def var_name(var):
    return [name for name in globals() if globals()[name] is var][0]

In [7]:
class RandomGridBuilder: 
  
    def __init__(self, num_models, seed=None):
        self._param_grid = {}
        self.num_models = num_models
        self.seed = seed
    
    def addDistr(self, param, distr_generator):
        if 'pyspark.ml.param.Param' in str(type(param)):
            self._param_grid[param] = distr_generator
        else:
            raise TypeError('param must be an instance of Param')

        return self
  
    def build(self):    
        param_map = []
        for n in range(self.num_models):
            if self.seed:
                np.random.seed(self.seed + n)
                random.seed(self.seed + n)
            param_dict = {}
            for param, distr in self._param_grid.items():
                param_dict[param] = distr()
            param_map.append(param_dict)

        return param_map

In [8]:
class f:
    BOLD = "\033[1m"
    ITALIC = "\033[3m"
    END = "\033[0m"

In [9]:
PLOT_DPI = 150  
sns.set_style('whitegrid', {'axes.facecolor': '0.98', 'grid.color': '0.9', 'axes.edgecolor': '1.0'})
plt.rc(
       'axes',
       labelweight='bold',
       titlesize=16,
       titlepad=10,
      )

pio.templates['my_theme'] = go.layout.Template(
                                               layout_autosize=True,
                                               # width=900,
                                               layout_height=200,
                                               layout_legend_orientation="h",
                                               layout_margin=dict(t=40, b=40),         # (l=0, r=0, b=0, t=0, pad=0)
                                               layout_template='seaborn',
                                              )
pio.templates.default = 'my_theme'

CMAP_SYMMETRIC = LinearSegmentedColormap.from_list('', ['steelblue', 'aliceblue', 'steelblue'])

In [10]:
pd.options.display.max_colwidth = 100
pd.options.display.max_rows = 500
pd.options.display.max_columns = 100
pd.options.display.float_format = '{:.3f}'.format
pd.options.display.colheader_justify = 'left'

In [11]:
warnings.filterwarnings('ignore')

In [12]:
#Spark session open
spark = (
         SparkSession
         .builder
         .master('local[*]')
         .config('spark.task.cpus', '8')
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.executor.cores", '8')
         .config("spark.dynamicAllocation.minExecutors","1")
         .config("spark.dynamicAllocation.maxExecutors","16")
         .config("spark.driver.memory", "4g")
         .config("spark.executor.memory", "1g")
         .getOrCreate()
        )

spark

In [13]:
data = spark.read.format('csv')\
          .option('header','true')\
          .option('inferSchema', 'true')\
          .option('timestamp', 'true')\
          .load('housing.csv')

In [14]:
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [15]:
data.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [16]:
data.describe().toPandas().set_index('summary')

Unnamed: 0_level_0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
summary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [17]:
def count_null(df):
    print(f'missing values:{CR}')

    for column in data.columns:
        is_null = F.isnull(F.col(column)) | F.isnan(F.col(column))
        print(f'{column:25} {data.filter(is_null).count():5d}')

In [18]:
num_feature_list = []

for feature in data.dtypes:
    if feature[1] in ['double','float','int']:
        num_feature_list.append(feature[0])

print(f'Correlations to {f.BOLD}total_bedrooms{f.END}:')
for feature in num_feature_list:
    print(f"  {feature:<25} {data.corr('total_bedrooms', feature):>6.1%}")

Correlations to [1mtotal_bedrooms[0m:
  longitude                   6.8%
  latitude                   -6.5%
  housing_median_age        -31.7%
  total_rooms                92.0%
  total_bedrooms            100.0%
  population                 86.6%
  households                 96.7%
  median_income              -0.7%
  median_house_value          4.9%


In [19]:
df_stats_before = data.describe().toPandas().set_index('summary').dropna(axis=1).astype('float')      

In [20]:
data = (QuantileDiscretizer(
                            numBucketsArray=[10,10,10],
                            inputCols=['total_rooms','population','households'],
                            outputCols=['total_rooms_discrete','population_discrete','households_discrete'],
                            relativeError=0.01,
                            handleInvalid='error',
                           )
        .fit(data)
        .transform(data)
       )

In [21]:
data.select('total_rooms','total_rooms_discrete',
            'population','population_discrete',
            'households','households_discrete').sample(0.001).show(3)

+-----------+--------------------+----------+-------------------+----------+-------------------+
|total_rooms|total_rooms_discrete|population|population_discrete|households|households_discrete|
+-----------+--------------------+----------+-------------------+----------+-------------------+
|      862.0|                 0.0|     994.0|                3.0|     213.0|                1.0|
|     1999.0|                 4.0|     967.0|                3.0|     320.0|                3.0|
|     2377.0|                 5.0|    1068.0|                4.0|     435.0|                5.0|
+-----------+--------------------+----------+-------------------+----------+-------------------+
only showing top 3 rows



In [22]:
count_null(data)

missing values:

longitude                     0
latitude                      0
housing_median_age            0
total_rooms                   0
total_bedrooms              207
population                    0
households                    0
median_income                 0
median_house_value            0
ocean_proximity               0
total_rooms_discrete          0
population_discrete           0
households_discrete           0


In [23]:
window = Window().partitionBy('households_discrete')

data = data.withColumn('total_bedrooms', F.coalesce(F.col('total_bedrooms'),
                                                    F.percentile_approx(F.col('total_bedrooms'), 0.5).over(window)
                                                   )
                      )

In [24]:
count_null(data)

missing values:

longitude                     0
latitude                      0
housing_median_age            0
total_rooms                   0
total_bedrooms                0
population                    0
households                    0
median_income                 0
median_house_value            0
ocean_proximity               0
total_rooms_discrete          0
population_discrete           0
households_discrete           0


In [25]:
data = data.drop('total_rooms_discrete','population_discrete','households_discrete')

In [26]:
df_stats_after = data.describe().toPandas().set_index('summary').dropna(axis=1).astype('float')    

In [27]:
df_chng = (df_stats_after - df_stats_before) / df_stats_before

df_chng.style.format('{:.3%}', na_rep='-').applymap(lambda value: 'color:red;' if abs(value) > 0.0001 else None)

Unnamed: 0_level_0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
summary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
count,0.000%,0.000%,0.000%,0.000%,1.013%,0.000%,0.000%,0.000%,0.000%
mean,0.000%,0.000%,0.000%,0.000%,-0.027%,0.000%,0.000%,-0.000%,0.000%
stddev,0.000%,0.000%,0.000%,0.000%,-0.198%,0.000%,0.000%,0.000%,0.000%
min,-0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%
max,-0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%


In [28]:
data = data.withColumn('rooms_div_population', F.col('total_rooms') / F.col('population'))

In [29]:
target = 'median_house_value'

In [30]:
num_feature_list = []
cat_feature_list = []

for feature in data.dtypes:
    if feature[1] in ['double','float','int']:
        num_feature_list.append(feature[0])
    elif feature[1] in ['string']:
        cat_feature_list.append(feature[0])

num_feature_list = list(set(num_feature_list) - set([target]))     # exclusion of the target feature from the lists
cat_feature_list = list(set(cat_feature_list) - set([target]))     # universal option – almost ready function

In [31]:
num_feature_list

['total_rooms',
 'latitude',
 'housing_median_age',
 'population',
 'median_income',
 'total_bedrooms',
 'longitude',
 'rooms_div_population',
 'households']

In [32]:
cat_feature_list

['ocean_proximity']

In [33]:
data_train, data_test = data.randomSplit([1-TEST_FRAC, TEST_FRAC], seed=RS)

print(f'train samples: {data_train.count():6d}')
print(f'test samples:  {data_test.count():6d}')

train samples:  16484
test samples:    4156


In [34]:
def crteate_initial_num_vector():
    
    return VectorAssembler(
                           inputCols=num_feature_list,
                           outputCol='num_features'
                          )

In [35]:
def create_scaler():
    
    return RobustScaler(
                        inputCol='num_features',
                        outputCol='num_features_scaled'
                       )

In [36]:
def create_polynomial(degree=2):
    
    return PolynomialExpansion(
                               degree=degree,
                               inputCol='num_features_scaled',
                               outputCol='num_features_poly'
                              )

In [37]:
def create_string_indexer():
    
    return StringIndexer(
                         inputCols=cat_feature_list,
                         outputCols=[f'{feature}_indexer' for feature in cat_feature_list]
                        )

In [38]:
def create_ohe():
    
    return OneHotEncoder(
                         inputCols=[f'{feature}_indexer' for feature in cat_feature_list],
                         outputCols=[f'{feature}_ohe' for feature in cat_feature_list]
                        )

In [39]:
def create_KMeans_prepare(feature_list, outputCol_name):
    
    return VectorAssembler(
                           inputCols=feature_list,
                           outputCol=outputCol_name
                          )

In [40]:
def create_KMeans_clusters(featuresCol_name, predictionCol_name, k_clusters=5):
    
    return KMeans(
                  k=k_clusters,
                  featuresCol=featuresCol_name,
                  predictionCol=predictionCol_name,
                  seed=RS,
                 )

In [41]:
def create_cat_features_encoded_vector():
    
    return VectorAssembler(
                           inputCols=[f'{feature}_ohe' for feature in cat_feature_list] + ['cluster_group'],
                           outputCol='cat_features_encoded'
                          )

In [42]:
def create_final_feature_vector(switch='all'):
    
    if switch == 'all':
        return VectorAssembler(inputCols=['cat_features_encoded','num_features_poly'],
                               outputCol='features')
    
    elif switch == 'num_only':
        return VectorAssembler(inputCols=['num_features_poly'],
                               outputCol='features')

In [43]:
def create_estimator(estimator_name, features_col_name):
    
    if estimator_name == 'LinearRegression':
        
        return LinearRegression(
                                labelCol=target,
                                featuresCol=features_col_name,
                                standardization=False,
                                maxIter=MAX_ITER,
                               )

In [44]:
def create_params_grid(estimator):
    
    return (RandomGridBuilder(N_TRIALS)
            .addDistr(estimator.regParam, lambda: np.random.rand())
            .addDistr(estimator.elasticNetParam, lambda: np.random.rand())
            .build()
           )

In [45]:
def create_crossvalidator(estimator, params_grid):

    return CrossValidator(
                          estimator=estimator,
                          estimatorParamMaps=params_grid,
                          evaluator=RegressionEvaluator(labelCol=target, predictionCol='prediction',  metricName='r2'),
                          numFolds=N_CV,
                          parallelism=16,
                          seed=RS,
                         )

In [46]:
# modelling pipeline
def create_pipeline(estimator_name, features_col_name, switch='all'):  
    
    initial_num_vector = crteate_initial_num_vector()
    scaler = create_scaler()
    polynomial = create_polynomial(degree=DEGREE_POLYNOMIAL)
    
    string_indexer = create_string_indexer()
    ohe = create_ohe()
    kmeans_prepare = create_KMeans_prepare(['latitude','longitude', 'median_income'], 'KMeans_group')
    kmeans_clusters = create_KMeans_clusters('KMeans_group', 'cluster_group', k_clusters=6)
    cat_features_encoded_vector = create_cat_features_encoded_vector()
    
    final_feature_vector = create_final_feature_vector(switch=switch)

    estimator = create_estimator(estimator_name, features_col_name)
    params_grid = create_params_grid(estimator)
    crossvalidator = create_crossvalidator(estimator, params_grid)
    
    return Pipeline(stages=[
                            initial_num_vector,            
                            scaler,                        
                            polynomial,                    
                            string_indexer,                
                            ohe,                            
                            kmeans_prepare,                
                            kmeans_clusters,              
                            cat_features_encoded_vector,   
                            final_feature_vector,          
                            crossvalidator,                
                           ]
                   )

In [47]:
%%time

# for all features
pipeline_num_cat = create_pipeline('LinearRegression', 'features', switch='all').fit(data_train)

CPU times: total: 1.98 s
Wall time: 16min 49s


In [48]:
%%time

# only for numeric initial features
pipeline_num_only = create_pipeline('LinearRegression', 'features', switch='num_only').fit(data_train)

CPU times: total: 2.34 s
Wall time: 17min 15s


In [49]:
model_num_cat = pipeline_num_cat.stages[-1].bestModel
model_num_only = pipeline_num_only.stages[-1].bestModel

In [50]:
def prediction_results(pipeline):
    print(f'{CR}Predictions for LinearRegression with {f.BOLD}{(var_name(pipeline))}{f.END}{CR}')
    pipeline.transform(data_test).select('median_house_value', 'prediction').sample(0.01).show(5)    

In [51]:
prediction_results(pipeline_num_cat)
prediction_results(pipeline_num_only)


Predictions for LinearRegression with [1mpipeline_num_cat[0m

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          187800.0|218794.16224384448|
|          410500.0| 323311.5854588663|
|          325800.0| 325088.2302925545|
|          341700.0|303315.40402677236|
|          304000.0|  273694.135845541|
+------------------+------------------+
only showing top 5 rows


Predictions for LinearRegression with [1mpipeline_num_only[0m

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          264200.0| 323915.3100826172|
|          321600.0|  337954.140164135|
|          112200.0| 128404.3318612026|
|          151000.0|182657.01790653635|
|          208100.0| 225823.3863415392|
+------------------+------------------+
only showing top 5 rows



In [52]:
def model_metric(pipeline, metric_dict={'r2':3}):
    predicted = pipeline.transform(data_test)
    
    print(f'{CR}{f.BOLD}{var_name(pipeline)}{f.END} metrics:')
    
    for metric_name, metric_precission in metric_dict.items():
        metric = RegressionEvaluator(labelCol=target, predictionCol='prediction', metricName=metric_name).evaluate(predicted)
        print(f'{metric_name.upper():4} = {metric :{metric_precission}}')

model_metric(pipeline_num_cat, {'r2':'0.3f','rmse':'0.0f','mae':'0.0f'})
model_metric(pipeline_num_only, {'r2':'0.3f','rmse':'0.0f','mae':'0.0f'})


[1mpipeline_num_cat[0m metrics:
R2   = 0.739
RMSE = 59711
MAE  = 42842

[1mpipeline_num_only[0m metrics:
R2   = 0.735
RMSE = 60210
MAE  = 43338
