In [1]:
from __future__ import print_function
import findspark, pyspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.0.0-bin-hadoop2.7\\'

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [3]:
if __name__ == '__main__':
    spark = SparkSession\
        .builder\
        .appName("OneHotEncoder")\
        .getOrCreate()

In [4]:
data = spark.createDataFrame(
    [
        (0, "Good"),
        (1, "Bad"),
        (2, "Average"),
        (3, "Good"),
        (4, "Bad"),
        (5, "Average"),
    ], ['Id', "Rating"]
)
data.show()

+---+-------+
| Id| Rating|
+---+-------+
|  0|   Good|
|  1|    Bad|
|  2|Average|
|  3|   Good|
|  4|    Bad|
|  5|Average|
+---+-------+



In [5]:
df = spark.read.csv("../Advertising.csv", header=True)
df.show()

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|       75|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|        1|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7|   24|        4| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|       46|   19|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|      114| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
+---+-----+-----+---------+-----+
only showing top 20 rows



In [6]:
df.columns

['_c0', 'TV', 'radio', 'newspaper', 'sales']

In [7]:
type(df)

pyspark.sql.dataframe.DataFrame

In [8]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- TV: string (nullable = true)
 |-- radio: string (nullable = true)
 |-- newspaper: string (nullable = true)
 |-- sales: string (nullable = true)



In [9]:
from pyspark.sql.functions import col
new_data = df.select(*(col(c).cast("float").alias(c) for c in df.columns))
new_data

DataFrame[_c0: float, TV: float, radio: float, newspaper: float, sales: float]

In [10]:
new_data.printSchema()

root
 |-- _c0: float (nullable = true)
 |-- TV: float (nullable = true)
 |-- radio: float (nullable = true)
 |-- newspaper: float (nullable = true)
 |-- sales: float (nullable = true)



In [11]:
list((col(c).cast("float").alias(c) for c in df.columns))

[Column<b'CAST(_c0 AS FLOAT) AS `_c0`'>,
 Column<b'CAST(TV AS FLOAT) AS `TV`'>,
 Column<b'CAST(radio AS FLOAT) AS `radio`'>,
 Column<b'CAST(newspaper AS FLOAT) AS `newspaper`'>,
 Column<b'CAST(sales AS FLOAT) AS `sales`'>]

In [12]:
new_data.show()

+----+-----+-----+---------+-----+
| _c0|   TV|radio|newspaper|sales|
+----+-----+-----+---------+-----+
| 1.0|230.1| 37.8|     69.2| 22.1|
| 2.0| 44.5| 39.3|     45.1| 10.4|
| 3.0| 17.2| 45.9|     69.3|  9.3|
| 4.0|151.5| 41.3|     58.5| 18.5|
| 5.0|180.8| 10.8|     58.4| 12.9|
| 6.0|  8.7| 48.9|     75.0|  7.2|
| 7.0| 57.5| 32.8|     23.5| 11.8|
| 8.0|120.2| 19.6|     11.6| 13.2|
| 9.0|  8.6|  2.1|      1.0|  4.8|
|10.0|199.8|  2.6|     21.2| 10.6|
|11.0| 66.1|  5.8|     24.2|  8.6|
|12.0|214.7| 24.0|      4.0| 17.4|
|13.0| 23.8| 35.1|     65.9|  9.2|
|14.0| 97.5|  7.6|      7.2|  9.7|
|15.0|204.1| 32.9|     46.0| 19.0|
|16.0|195.4| 47.7|     52.9| 22.4|
|17.0| 67.8| 36.6|    114.0| 12.5|
|18.0|281.4| 39.6|     55.8| 24.4|
|19.0| 69.2| 20.5|     18.3| 11.3|
|20.0|147.3| 23.9|     19.1| 14.6|
+----+-----+-----+---------+-----+
only showing top 20 rows



In [13]:
### Data Collection
from pyspark.sql.functions import col, isnan, when, count

In [14]:
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+---+---+-----+---------+-----+
|_c0| TV|radio|newspaper|sales|
+---+---+-----+---------+-----+
|  0|  0|    0|        0|    0|
+---+---+-----+---------+-----+



In [15]:
from pyspark.ml.feature import Imputer

In [16]:
imputer = Imputer(
    inputCols=['TV', 'radio', 'newspaper'],
    outputCols=['TV', 'radio', 'newspaper'],
)
imputer

Imputer_b0d01071404a

In [17]:
model = imputer.fit(new_data)
model

ImputerModel: uid=Imputer_b0d01071404a, strategy=mean, missingValue=NaN, numInputCols=3, numOutputCols=3

In [18]:
imputed_data = model.transform(new_data)
imputed_data

DataFrame[_c0: float, TV: float, radio: float, newspaper: float, sales: float]

In [19]:
### Checking for NaN type values in columns
imputed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in imputed_data.columns]).show()

+---+---+-----+---------+-----+
|_c0| TV|radio|newspaper|sales|
+---+---+-----+---------+-----+
|  0|  0|    0|        0|    0|
+---+---+-----+---------+-----+



In [20]:
features = imputed_data.drop('TV')
features

DataFrame[_c0: float, radio: float, newspaper: float, sales: float]

In [21]:
new_data.show()

+----+-----+-----+---------+-----+
| _c0|   TV|radio|newspaper|sales|
+----+-----+-----+---------+-----+
| 1.0|230.1| 37.8|     69.2| 22.1|
| 2.0| 44.5| 39.3|     45.1| 10.4|
| 3.0| 17.2| 45.9|     69.3|  9.3|
| 4.0|151.5| 41.3|     58.5| 18.5|
| 5.0|180.8| 10.8|     58.4| 12.9|
| 6.0|  8.7| 48.9|     75.0|  7.2|
| 7.0| 57.5| 32.8|     23.5| 11.8|
| 8.0|120.2| 19.6|     11.6| 13.2|
| 9.0|  8.6|  2.1|      1.0|  4.8|
|10.0|199.8|  2.6|     21.2| 10.6|
|11.0| 66.1|  5.8|     24.2|  8.6|
|12.0|214.7| 24.0|      4.0| 17.4|
|13.0| 23.8| 35.1|     65.9|  9.2|
|14.0| 97.5|  7.6|      7.2|  9.7|
|15.0|204.1| 32.9|     46.0| 19.0|
|16.0|195.4| 47.7|     52.9| 22.4|
|17.0| 67.8| 36.6|    114.0| 12.5|
|18.0|281.4| 39.6|     55.8| 24.4|
|19.0| 69.2| 20.5|     18.3| 11.3|
|20.0|147.3| 23.9|     19.1| 14.6|
+----+-----+-----+---------+-----+
only showing top 20 rows



In [22]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=features.columns,
    outputCol='Features'
)
assembler

VectorAssembler_6d419a380400

In [23]:
output = assembler.transform(imputed_data)
output = output.select("Features", "TV")
output

DataFrame[Features: vector, TV: float]

In [24]:
train_df, test_df = output.randomSplit([0.7,0.3])
train_df.show()
test_df.show()

+--------------------+-----+
|            Features|   TV|
+--------------------+-----+
|[2.0,39.299999237...| 44.5|
|[3.0,45.900001525...| 17.2|
|[4.0,41.299999237...|151.5|
|[5.0,10.800000190...|180.8|
|[6.0,48.900001525...|  8.7|
|[7.0,32.799999237...| 57.5|
|[8.0,19.600000381...|120.2|
|[9.0,2.0999999046...|  8.6|
|[11.0,5.800000190...| 66.1|
|[13.0,35.09999847...| 23.8|
|[15.0,32.90000152...|204.1|
|[17.0,36.59999847...| 67.8|
|[18.0,39.59999847...|281.4|
|[20.0,23.89999961...|147.3|
|[21.0,27.70000076...|218.4|
|[22.0,5.099999904...|237.4|
|[25.0,12.60000038...| 62.3|
|[26.0,3.5,19.5,12.0]|262.9|
|[27.0,29.29999923...|142.9|
|[28.0,16.70000076...|240.1|
+--------------------+-----+
only showing top 20 rows

+--------------------+-----+
|            Features|   TV|
+--------------------+-----+
|[1.0,37.799999237...|230.1|
|[10.0,2.599999904...|199.8|
|[12.0,24.0,4.0,17...|214.7|
|[14.0,7.599999904...| 97.5|
|[16.0,47.70000076...|195.4|
|[19.0,20.5,18.299...| 69.2|
|[23.0,15.8999996

In [25]:
from pyspark.ml.regression import LinearRegression
lin_reg = LinearRegression(featuresCol="Features", labelCol="TV")
linear_model= lin_reg.fit(train_df)

In [26]:
print("Coefficients:", str(linear_model.coefficients))
print("Intercepts:", str(linear_model.intercept))

Coefficients: [0.007915034610343038,-3.4152020025899787,0.11360930428435628,18.485926203263432]
Intercepts: -35.946419096341515


In [30]:
trainSummary = linear_model.summary
print("RMSE: ", trainSummary.rootMeanSquaredError)
print("R2: ", trainSummary.r2)

RMSE:  34.942352626383936
R2:  0.8386002929969353


In [33]:
### Prediction
predictions = linear_model.transform(test_df)
predictions.select("prediction", "TV", "Features").show()

+------------------+-----+--------------------+
|        prediction|   TV|            Features|
+------------------+-----+--------------------+
| 251.3676024996724|230.1|[1.0,37.799999237...|
|153.61254851264596|199.8|[10.0,2.599999904...|
|204.29325935892302|214.7|[12.0,24.0,4.0,17...|
|118.34032410914128| 97.5|[14.0,7.599999904...|
| 221.3697555995658|195.4|[16.0,47.70000076...|
|105.16234531267452| 69.2|[19.0,20.5,18.299...|
|19.090122455781824| 13.2|[23.0,15.89999961...|
|196.03504920284257|228.3|[24.0,16.89999961...|
|136.00633218636074| 95.7|[35.0,1.399999976...|
|63.871919290525824| 43.1|[39.0,26.70000076...|
| 252.6560125680585|293.6|[43.0,27.70000076...|
| 177.1818734675995|206.9|[44.0,8.399999618...|
| 130.6217660635421| 89.7|[47.0,9.899999618...|
|253.67789360608543|239.9|[48.0,41.5,18.5,2...|
| 129.8876225093335|100.4|[52.0,9.600000381...|
|244.34001914629346|216.4|[53.0,41.70000076...|
|240.71886208075523|198.9|[56.0,49.40000152...|
|239.37465422061496|210.8|[59.0,49.59999

In [34]:
test_df.select("Features")

DataFrame[Features: vector]

In [35]:
predictions = linear_model.transform(test_df)
predictions.show()

+--------------------+-----+------------------+
|            Features|   TV|        prediction|
+--------------------+-----+------------------+
|[1.0,37.799999237...|230.1| 251.3676024996724|
|[10.0,2.599999904...|199.8|153.61254851264596|
|[12.0,24.0,4.0,17...|214.7|204.29325935892302|
|[14.0,7.599999904...| 97.5|118.34032410914128|
|[16.0,47.70000076...|195.4| 221.3697555995658|
|[19.0,20.5,18.299...| 69.2|105.16234531267452|
|[23.0,15.89999961...| 13.2|19.090122455781824|
|[24.0,16.89999961...|228.3|196.03504920284257|
|[35.0,1.399999976...| 95.7|136.00633218636074|
|[39.0,26.70000076...| 43.1|63.871919290525824|
|[43.0,27.70000076...|293.6| 252.6560125680585|
|[44.0,8.399999618...|206.9| 177.1818734675995|
|[47.0,9.899999618...| 89.7| 130.6217660635421|
|[48.0,41.5,18.5,2...|239.9|253.67789360608543|
|[52.0,9.600000381...|100.4| 129.8876225093335|
|[53.0,41.70000076...|216.4|244.34001914629346|
|[56.0,49.40000152...|198.9|240.71886208075523|
|[59.0,49.59999847...|210.8|239.37465422