In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=d6917673d93a05a31f0815b67573490f1b7698c8b9c8f866991cde98633d85e5
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
spark = SparkSession.builder.appName("Sales Prediction").getOrCreate()

data = spark.read.csv("/content/advertising.csv", header=True, inferSchema=True)
data.show()

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3| 12.0|
|151.5| 41.3|     58.5| 16.5|
|180.8| 10.8|     58.4| 17.9|
|  8.7| 48.9|     75.0|  7.2|
| 57.5| 32.8|     23.5| 11.8|
|120.2| 19.6|     11.6| 13.2|
|  8.6|  2.1|      1.0|  4.8|
|199.8|  2.6|     21.2| 15.6|
| 66.1|  5.8|     24.2| 12.6|
|214.7| 24.0|      4.0| 17.4|
| 23.8| 35.1|     65.9|  9.2|
| 97.5|  7.6|      7.2| 13.7|
|204.1| 32.9|     46.0| 19.0|
|195.4| 47.7|     52.9| 22.4|
| 67.8| 36.6|    114.0| 12.5|
|281.4| 39.6|     55.8| 24.4|
| 69.2| 20.5|     18.3| 11.3|
|147.3| 23.9|     19.1| 14.6|
+-----+-----+---------+-----+
only showing top 20 rows



In [16]:
print(data.count())

200


In [17]:
data.printSchema()

root
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [18]:
data.na.drop().show()

+-----+-----+---------+-----+
|   TV|Radio|Newspaper|Sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3| 12.0|
|151.5| 41.3|     58.5| 16.5|
|180.8| 10.8|     58.4| 17.9|
|  8.7| 48.9|     75.0|  7.2|
| 57.5| 32.8|     23.5| 11.8|
|120.2| 19.6|     11.6| 13.2|
|  8.6|  2.1|      1.0|  4.8|
|199.8|  2.6|     21.2| 15.6|
| 66.1|  5.8|     24.2| 12.6|
|214.7| 24.0|      4.0| 17.4|
| 23.8| 35.1|     65.9|  9.2|
| 97.5|  7.6|      7.2| 13.7|
|204.1| 32.9|     46.0| 19.0|
|195.4| 47.7|     52.9| 22.4|
| 67.8| 36.6|    114.0| 12.5|
|281.4| 39.6|     55.8| 24.4|
| 69.2| 20.5|     18.3| 11.3|
|147.3| 23.9|     19.1| 14.6|
+-----+-----+---------+-----+
only showing top 20 rows



In [19]:
data = data.withColumn("label", when(data["Sales"] < 10, 0)
                       .when((data["Sales"] >= 10) & (data["Sales"] <= 20), 1)
                       .otherwise(2))

In [20]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["TV", "Radio", "Newspaper"], outputCol="features")
data = assembler.transform(data)

In [21]:

train_data, test_data = data.randomSplit([0.8, 0.2])

In [22]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

lr_accuracy = evaluator.setMetricName("accuracy").evaluate(lr_predictions)
lr_f1 = evaluator.setMetricName("f1").evaluate(lr_predictions)

print(f"Logistic Regression Accuracy: {lr_accuracy*100:.2f}%")
print(f"Logistic Regression F1 Score: {lr_f1:.2f}")


Logistic Regression Accuracy: 85.11%
Logistic Regression F1 Score: 0.85


In [23]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)

rf_accuracy = evaluator.setMetricName("accuracy").evaluate(rf_predictions)
rf_f1 = evaluator.setMetricName("f1").evaluate(rf_predictions)

print(f"Random Forest Accuracy: {rf_accuracy*100:.2f}%")
print(f"Random Forest F1 Score: {rf_f1:.2f}")


Random Forest Accuracy: 89.36%
Random Forest F1 Score: 0.89


In [24]:
from pyspark.ml.classification import DecisionTreeClassifier

df = DecisionTreeClassifier(featuresCol="features", labelCol="label")
df_model = df.fit(train_data)
df_predictions = df_model.transform(test_data)

df_accuracy = evaluator.setMetricName("accuracy").evaluate(df_predictions)
df_f1 = evaluator.setMetricName("f1").evaluate(df_predictions)

print(f"Random Forest Accuracy: {df_accuracy*100:.2f}%")
print(f"Random Forest F1 Score: {df_f1:.2f}")


Random Forest Accuracy: 89.36%
Random Forest F1 Score: 0.89


In [26]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nr = NaiveBayes(featuresCol="features", labelCol="label")
nr_model = nr.fit(train_data)

nr_predictions = nr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

nr_accuracy = evaluator.setMetricName("accuracy").evaluate(nr_predictions)
nr_f1 = evaluator.setMetricName("f1").evaluate(nr_predictions)

print(f"Naive Bayes Accuracy: {nr_accuracy*100:.2f}%")
print(f"Naive Bayes F1 Score: {nr_f1:.2f}")


Naive Bayes Accuracy: 51.06%
Naive Bayes F1 Score: 0.53


In [27]:
spark.stop()