In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('LREX').getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print(f'I am working with {cores} cores')
spark

I am working with 1 cores


In [3]:
from pyspark.ml.regression import LinearRegression

In [4]:
training = spark.read.format('libsvm').load('./data/sample_linear_regression_data.txt')

In [5]:
training.show(5)

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 5 rows



In [27]:
print(f'shape : {(training.count(), len(training.columns))}')

shape : (501, 2)


In [6]:
lr = LinearRegression(featuresCol= 'features', 
                      labelCol= 'label', 
                      predictionCol= 'prediction')

In [7]:
model = lr.fit(training)

In [8]:
model.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [9]:
model.intercept

0.14228558260358093

In [11]:
tr_summary = model.summary

In [15]:
print(f'r2 : {tr_summary.r2}')  
print(f'rmse : {tr_summary.rootMeanSquaredError}')

r2 : 0.027839179518600154
rmse : 10.16309157133015


In [16]:
data = (spark.read
        .format('libsvm')
        .load('./data/sample_linear_regression_data.txt'))

In [20]:
train_data, test_data = data.randomSplit([0.7, 0.3])

In [26]:
train_data.show(5)

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
|-28.571478869743427|(10,[0,1,2,3,4,5,...|
|-26.736207182601724|(10,[0,1,2,3,4,5,...|
|-23.487440120936512|(10,[0,1,2,3,4,5,...|
|-22.949825936196074|(10,[0,1,2,3,4,5,...|
|-22.837460416919342|(10,[0,1,2,3,4,5,...|
+-------------------+--------------------+
only showing top 5 rows



In [25]:
train_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                345|
|   mean|0.22575295177918767|
| stddev|  10.26241690172677|
|    min|-28.571478869743427|
|    max| 26.903524792043335|
+-------+-------------------+



In [28]:
test_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                156|
|   mean| 0.3257469971530153|
| stddev| 10.472444420450312|
|    min|-28.046018037776633|
|    max|  27.78383192005107|
+-------+-------------------+



In [29]:
c_model = lr.fit(train_data)

In [30]:
test_results = c_model.evaluate(test_data)

In [32]:
test_results.residuals.show(5)

+-------------------+
|          residuals|
+-------------------+
| -27.28004679581195|
|-28.323003878807537|
|-22.699629770257186|
| -20.38430093968298|
|-18.895477403911602|
+-------------------+
only showing top 5 rows



In [34]:
test_results.rootMeanSquaredError

10.314706424266657

In [35]:
unl_data = test_data.select('features')

In [36]:
pred = c_model.transform(unl_data)

In [37]:
pred.show(5)

+--------------------+-------------------+
|            features|         prediction|
+--------------------+-------------------+
|(10,[0,1,2,3,4,5,...| -0.765971241964686|
|(10,[0,1,2,3,4,5,...| 1.5175204503244646|
|(10,[0,1,2,3,4,5,...|-0.8112543200657862|
|(10,[0,1,2,3,4,5,...| 0.6015381500684441|
|(10,[0,1,2,3,4,5,...| 0.6202638379069733|
+--------------------+-------------------+
only showing top 5 rows

