In [1]:
import findspark

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
import findspark
from pyspark.sql import SparkSession, functions as F
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan, when, count, col, lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

In [4]:
! ls -l /home/train/datasets | grep Advertising

-rw-rw-r--. 1 train train     4556 Jul 21  2020 Advertising.csv
-rw-rw-r--. 1 train train     4556 Oct 11 13:34 Advertising.csv.1


In [5]:
spark = SparkSession.builder \
.appName("Model") \
.master("local[*]") \
.getOrCreate()

In [6]:
df = spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.option("sep",",") \
.load("file:///home/train/datasets/Advertising.csv.1")

In [7]:
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)

In [8]:
df.limit(6).toPandas()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9
5,6,8.7,48.9,75.0,7.2


In [9]:
#Checking Schema of df
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [10]:
df_count = df.count()
print(df_count)

200


In [11]:
# Null check
for col_name in df.dtypes:
    null_count = df.filter( (F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
    if(  null_count > 0 ):
        print("{} {} type has {} null values % {}".format(col_name, col_type[1], null_count, (null_count/df_count * 100)))

# No null

In [12]:
df.limit(6).toPandas()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9
5,6,8.7,48.9,75.0,7.2


In [13]:
# drop the columns that are not required
my_data = df.drop(*['Id'])
my_data.columns

['TV', 'Radio', 'Newspaper', 'Sales']

In [14]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))

(200, 4)

In [15]:
# get the summary of the numerical columns
my_data.select('TV', 'Radio', 'Newspaper','Sales').describe().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|               TV|             Radio|         Newspaper|             Sales|
+-------+-----------------+------------------+------------------+------------------+
|  count|              200|               200|               200|               200|
|   mean|         147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
| stddev|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
|    min|              0.7|               0.0|               0.3|               1.6|
|    max|            296.4|              49.6|             114.0|              27.0|
+-------+-----------------+------------------+------------------+------------------+



In [16]:
my_data.limit(4).toPandas()

Unnamed: 0,TV,Radio,Newspaper,Sales
0,230.1,37.8,69.2,22.1
1,44.5,39.3,45.1,10.4
2,17.2,45.9,69.3,9.3
3,151.5,41.3,58.5,18.5


In [17]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['TV',
                                       'Radio',
                                       'Newspaper'],
                           outputCol='features')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

# view the transformed vector
final_data.select('features').show()

+-----------------+
|         features|
+-----------------+
|[230.1,37.8,69.2]|
| [44.5,39.3,45.1]|
| [17.2,45.9,69.3]|
|[151.5,41.3,58.5]|
|[180.8,10.8,58.4]|
|  [8.7,48.9,75.0]|
| [57.5,32.8,23.5]|
|[120.2,19.6,11.6]|
|    [8.6,2.1,1.0]|
| [199.8,2.6,21.2]|
|  [66.1,5.8,24.2]|
| [214.7,24.0,4.0]|
| [23.8,35.1,65.9]|
|   [97.5,7.6,7.2]|
|[204.1,32.9,46.0]|
|[195.4,47.7,52.9]|
|[67.8,36.6,114.0]|
|[281.4,39.6,55.8]|
| [69.2,20.5,18.3]|
|[147.3,23.9,19.1]|
+-----------------+
only showing top 20 rows



In [18]:
final_data.show(3)

+-----+-----+---------+-----+-----------------+
|   TV|Radio|Newspaper|Sales|         features|
+-----+-----+---------+-----+-----------------+
|230.1| 37.8|     69.2| 22.1|[230.1,37.8,69.2]|
| 44.5| 39.3|     45.1| 10.4| [44.5,39.3,45.1]|
| 17.2| 45.9|     69.3|  9.3| [17.2,45.9,69.3]|
+-----+-----+---------+-----+-----------------+
only showing top 3 rows



# Building Pipeline

In [20]:
from pyspark.ml import Pipeline

In [35]:
regressor = RandomForestRegressor(featuresCol = 'features', labelCol='Sales')
pipeline = Pipeline(stages=[assembler, regressor])

In [36]:
pipeline.write().overwrite().save("file:///home/train/atscale4/hadoop-hdfs/spark-ml")

In [37]:
pipelineModel = Pipeline.load('file:///home/train/atscale4/hadoop-hdfs/spark-ml')
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees, [100, 500]).build()

# Cros Validation

In [53]:
crossval = CrossValidator(estimator=pipelineModel,
estimatorParamMaps = paramGrid,
evaluator= RegressionEvaluator(labelCol = 'Sales'),
                               numFolds=3)

In [54]:
train_data, test_data = my_data.randomSplit([0.8,0.2], seed=123)

In [55]:
cvModel= crossval.fit(train_data)

In [56]:
bestModel= cvModel.bestModel
for x in range(len(bestModel.stages)):
    print(bestModel.stages[x])

VectorAssembler_c496188117b3
RandomForestRegressionModel: uid=RandomForestRegressor_7421bf3ca989, numTrees=100, numFeatures=3


In [57]:
pred = cvModel.transform(test_data)
pred.select('Sales', 'prediction').show()

+-----+------------------+
|Sales|        prediction|
+-----+------------------+
|  5.3| 7.448946032412579|
|  4.8| 7.651781401225229|
|  5.9| 8.003419046048975|
|  9.3| 9.940877423552688|
|  7.6| 9.136102764069959|
|  8.8|10.012000098699954|
|  7.6| 9.767615138464292|
| 10.8|12.889870258744525|
| 10.4|11.742693721838409|
| 10.1|10.422298308044446|
| 11.6|13.273496424607647|
|  8.7| 9.525070529095137|
|  9.7| 9.588483236598222|
| 11.3|10.970151377921624|
|  9.4| 9.733690797040886|
| 12.9|13.174956793618462|
| 11.9|13.543082545723017|
| 13.2|13.266471916231332|
| 11.0|11.886007889797142|
| 10.3|11.420128241896634|
+-----+------------------+
only showing top 20 rows



In [58]:
eval = RegressionEvaluator(labelCol = 'Sales')
rmse = eval.evaluate(pred)
mse= eval.evaluate(pred, {eval.metricName: 'mse'})
mae= eval.evaluate(pred, {eval.metricName: 'mae'})
r2 = eval.evaluate(pred, {eval.metricName: 'r2'})

In [59]:
print('RMSE: %.3f' %rmse)
print('MSE: %.3f' %mse)
print('MAE: %.3f' %mae)
print('r2: %.3f' %r2)

RMSE: 1.699
MSE: 2.888
MAE: 1.394
r2: 0.910


In [60]:
test_data.limit(5).toPandas()

Unnamed: 0,TV,Radio,Newspaper,Sales
0,5.4,29.9,9.4,5.3
1,8.6,2.1,1.0,4.8
2,17.2,4.1,31.6,5.9
3,17.2,45.9,69.3,9.3
4,19.6,20.1,17.0,7.6


In [61]:
pred.limit(5).toPandas()

Unnamed: 0,TV,Radio,Newspaper,Sales,features,prediction
0,5.4,29.9,9.4,5.3,"[5.4, 29.9, 9.4]",7.448946
1,8.6,2.1,1.0,4.8,"[8.6, 2.1, 1.0]",7.651781
2,17.2,4.1,31.6,5.9,"[17.2, 4.1, 31.6]",8.003419
3,17.2,45.9,69.3,9.3,"[17.2, 45.9, 69.3]",9.940877
4,19.6,20.1,17.0,7.6,"[19.6, 20.1, 17.0]",9.136103


In [None]:
# Stopping spark 
spark.stop()