In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Regression").getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/05 13:20:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
You are working with 1 core(s)


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer

from pyspark.ml.stat import Correlation

from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

: 

In [None]:
path = "Datasets/"
df = spark.read.csv(path+'housing.csv', inferSchema = True, header = True)

: 

In [None]:
df.limit(6).toPandas()

: 

In [None]:
df.printSchema()

: 

In [None]:
print(df.count())
print(len(df.columns))

: 

In [None]:
df = df.na.drop()
df.count()

: 

In [None]:
input_columns = ['total_bedrooms', 'population', 'households','median_income']
dependent_var = 'median_house_value'

: 

In [None]:
renamed = df.withColumnRenamed(dependent_var, 'label')

if str(renamed.schema['label'].dataType) != 'IntegerType':
    renamed = renamed.withColumn("label", renamed ["label"].cast(FloatType()))

: 

In [None]:
numeric_inputs = []
string_inputs = []

for column in input_columns:
    if str(renamed.schema[column].dataType) =='StringType':
        new_col_name = column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
        indexed = renamed

if len(string_inputs) !=0:
    for column in input_columns:
        if str(renamed.schema[column].dataType) == 'StringType':
            indexer = StringIndexer(inputCol = column, outputCol = column + "_num")
            indexer = indexer.fit(renamed).transform(renamed)
else:
    indexed = renamed

: 

In [None]:
indexed.limit(4).toPandas()

: 

In [None]:
numeric_inputs

: 

In [None]:
string_inputs

: 

In [None]:
d={}

for col in numeric_inputs:
    d[col] = indexed.approxQuantile(col,[0.01, 0.99], 0.25)

for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect()
    skew = skew[0] [0]
    if skew >1:
        indexed = indexed.withColumn(col, \
        log(when(df[col]< d[col][0], d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col]) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew=", skew, ")")
    elif skew <-1:
        indexed = indexed.withCOlumn(col, \
        exp(when(df[col]< d[col][0], d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col])).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew=",skew,")")


: 

In [None]:
indexed.limit(4).toPandas()

: 

In [None]:
features_list = numeric_inputs + string_inputs
assembler = VectorAssembler(inputCols=features_list, outputCol = 'features')
final_data = assembler.transform(indexed).select('features','label')
final_data.show(5)

: 

In [None]:
pearsonCorr = Correlation.corr(final_data, 'features','pearson').collect()[0][0]
array = pearsonCorr.toArray()

: 

In [None]:
for item in array:
    print(item[0])
    print(" ")
    print(item[1])
    print(" ")
    print(item[2])

: 

In [None]:
train, test = final_data.randomSplit([0.7,0.3])

: 

In [None]:
from pyspark.ml.regression import RandomForestRegressionModel
regressor = RandomForestRegressor()
fitModel = regressor.fit(train)

: 

In [None]:
from pyspark.ml.evaluation import *
evaluator = RegressionEvaluator(metricName="rmse")

: 

In [None]:
predictions = fitModel.transform(test)

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

: 

In [None]:
regressor = LinearRegression()

fitModel = regressor.fit(train)

trainingSummary = fitModel.summary

print('\033[1m' + "Linear Regression Model Summary without cross validation:" + '\033[0m')
print(" ")
print("Intercept: %s" % str(fitModel.intercept))
print("")
coeff_array = fitModel.coefficients.toArray()
coeff_scores = []
for x in coeff_array:
    coeff_scores.append(float(x))

result = spark.createDataFrame(zip(input_columns, coeff_scores), schema=['feature', 'coeff'])
print(result.orderBy(result["coeff"].desc()).show(truncate=False))

print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: (scaled loss + regularization) at each iteration \n %s" %str(trainingSummary))
print("")

print("Training RMSE: %f" % trainingSummary.totalIterations)
print("Training r2: %f" % trainingSummary.r2)
print("")

test_results = fitModel.evaluate(test)


: 

: 

: 

: 

: 

: 