# Advertising Sales Prediction using pyspark and mlflow

📌 In this section, we will predict advertising sales using pyspark and mlflow.

# Business Problem

📌 Here we need to build a model which predicts sales based on the money spent on different platforms for marketing. Click on this <a href="https://www.kaggle.com/datasets/ashydv/advertising-dataset">link</a> to review the data set and variables.

# Create Session in Spark

In [1]:
!pip install findspark
import findspark
findspark.init("/opt/manual/spark/")
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
spark = (SparkSession.builder
         .appName("Advertising_Sales_Prediction")
         .master("yarn")
           .config("spark.jars.packages","org.mlflow:mlflow-spark:1.12.1") \
         .enableHiveSupport()
         .getOrCreate())

[31mtwisted 18.7.0 requires PyHamcrest>=1.9.0, which is not installed.[0m
[33mYou are using pip version 10.0.1, however version 23.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [5]:
sc = spark.sparkContext
sc

# Import Necesaary Libraries

In [6]:
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.width", 500)
pd.set_option("display.float_format", lambda x: '%.4f' % x)
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Import Dataset

In [8]:
df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("file:///home/train/datasets/Advertising.csv")
     )
df.persist()

DataFrame[ID: int, TV: double, Radio: double, Newspaper: double, Sales: double]

In [9]:
df.show(5)

+---+-----+-----+---------+-----+
| ID|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



In [10]:
df.limit(5).toPandas()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5
4,5,180.8,10.8,58.4,12.9


In [7]:
df.printSchema()

root
 |-- TV: double (nullable = true)
 |-- Radio: double (nullable = true)
 |-- Newspaper: double (nullable = true)
 |-- Sales: double (nullable = true)



In [11]:
print((df.count(), len(df.columns)))

(200, 5)


# Missing Value Analysis

In [12]:
df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df.columns]).toPandas()

Unnamed: 0,ID,TV,Radio,Newspaper,Sales
0,0,0,0,0,0


In [13]:
def null_count(dataframe, col_name):
    nc = dataframe.select(col_name).filter(
        (F.col(col_name) == "NA")|
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
    ).count()
    return nc

In [14]:
null_count(df, "Sales")

0

In [15]:
def show_all_null(dataframe):
    for col_name in dataframe.dtypes:
        nc = null_count(dataframe, col_name[0])
        if nc > 0:
            print("{} ===> {} , Ratio: {:.2f}".format(col_name[0], nc, (nc/dataframe.count())*100))
    if nc == 0:
        print("There is no null value") 

In [16]:
show_all_null(df)

There is no null value


# Analysis of Categorical and Numerical Variables

In [17]:
categorical_cols = []
numerical_cols = []
label_col = ["Sales"]
discarted_cols = []

In [18]:
def grab_cat_num_cols(dataframe):
    for col_name in dataframe.dtypes:
        if (col_name[0] not in label_col+discarted_cols):
            if col_name[1] == "string":
                categorical_cols.append(col_name[0])
            else:
                numerical_cols.append(col_name[0])
    return categorical_cols, numerical_cols

In [19]:
categorical_cols, numerical_cols = grab_cat_num_cols(df)

#Print Categorical and Numerical Variables
print(f"Observations: {df.count()}")
print(f"Variables: {len(df.columns)}")
print(f"Cat_cols: {len(categorical_cols)}")
print(f"Num_cols: {len(numerical_cols)}")

Observations: 200
Variables: 5
Cat_cols: 0
Num_cols: 4


In [20]:
# column check
if (len(df.columns) == (len(label_col) + len(discarted_cols) + len(categorical_cols) + len(numerical_cols))):
    print("column check is True")
else:
    print("There is a problem for column check")

column check is True


# Encoding Scaling

In [21]:
def find_binary_cols(dataframe, cat_cols):
    binary_cols = dataframe.select([col for col in cat_cols if dataframe.select(col).dtypes[0][1] == "string" and dataframe.select(col).distinct().count() == 2])
    return binary_cols

In [22]:
binary_cols = find_binary_cols(df, categorical_cols)
print(binary_cols.columns)

[]


In [23]:
my_dict = {}
string_indexer_objs = []
string_indexer_output_names = []
ohe_input_names = []
ohe_output_names = []

for col_name in categorical_cols:
    my_dict[col_name+"_index_obj"] = StringIndexer() \
    .setHandleInvalid("skip") \
    .setInputCol(col_name) \
    .setOutputCol(col_name+"_indexed")
    
    string_indexer_objs.append(my_dict.get(col_name+"_index_obj"))
    string_indexer_output_names.append(col_name+"_indexed")
    
    if col_name not in binary_cols.columns:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [24]:
not_to_hot_coded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(not_to_hot_coded)

[]


In [25]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)
assembler = VectorAssembler().setHandleInvalid("skip").setInputCols(numerical_cols + not_to_hot_coded + ohe_output_names).setOutputCol("unscaled_features")
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

# Create Model

In [26]:
# split dataset
train_df, test_df = df.randomSplit([0.8, 0.2], seed=123)
print((train_df.count() , test_df.count()))

(159, 41)


In [27]:
# create Estimator
estimator = RandomForestRegressor(labelCol=label_col[0])

In [28]:
# create pipeline object
pipeline_obj = Pipeline().setStages(string_indexer_objs + [encoder, assembler, scaler, estimator])

In [29]:
# train model
pipeline_model = pipeline_obj.fit(train_df)

In [30]:
# prediction
transform_df = pipeline_model.transform(test_df)
transform_df.select("Sales", "prediction").show(5)

+-----+------------------+
|Sales|        prediction|
+-----+------------------+
|  9.3| 9.063982142857142|
| 11.8|  9.95860881845492|
|  9.2| 9.338970959595958|
|  9.7|   10.906285283738|
| 11.3|11.312310719989057|
+-----+------------------+
only showing top 5 rows



In [31]:
# evaluate model
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="r2")
print("R2: ", evaluator.evaluate(transform_df))

R2:  0.9371333569310685


In [32]:
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="rmse")
print("RMSE: ", evaluator.evaluate(transform_df))

RMSE:  1.3253366343420505


# Model Tuning

In [33]:
paramGrid = (ParamGridBuilder()
             .addGrid(estimator.maxDepth, [1,2,3,4])
             .addGrid(estimator.maxBins, [20, 30, 40])
             .addGrid(estimator.numTrees, [10, 20, 30, 40])
             .build())
cv = CrossValidator(estimator=pipeline_obj,
                    estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(labelCol=label_col[0]),
                    numFolds=3)

In [34]:
cv_model = cv.fit(train_df)

In [35]:
# prediction
y_pred = cv_model.transform(test_df)
y_pred.select("Sales", "prediction").show(5)

+-----+------------------+
|Sales|        prediction|
+-----+------------------+
|  9.3| 9.228787923881672|
| 11.8|11.334805855438447|
|  9.2| 9.858650124691485|
|  9.7|10.557501713482626|
| 11.3|10.310562888775156|
+-----+------------------+
only showing top 5 rows



In [36]:
# evaluate model tuning
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="r2")
print("R2: ", evaluator.evaluate(y_pred))

R2:  0.9275788832749741


In [37]:
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="rmse")
print("RMSE: ", evaluator.evaluate(y_pred))

RMSE:  1.422488211634301


# MLFLOW

In [None]:
# import necessary libraries for mlflow
import pandas as pd
import os
import mlflow
from mlflow.models.signature import infer_signature
import mlflow.spark

In [None]:
mlflow.set_tracking_uri('http://localhost:5000/')
# print("mlflow_tracking_uri: " + mlflow.tracking.get_tracking_uri())

mlflow.set_experiment("Advertising_Prediction")

with mlflow.start_run(run_name="random-forest") as run:
    # log params
    mlflow.log_param("num_trees", estimator.getNumTrees())
    mlflow.log_param("max_depth", estimator.getMaxDepth())
    mlflow.log_param("max_bins", estimator.getMaxBins())
    
    pipelineModel = pipeline_obj.fit(train_df)
    
    # log metrics
    predDF = pipelineModel.transform(test_df)
    regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Sales")
    rmse = RegressionEvaluator.setMetricName("rmse").evaluate(predDF)
    r2 = RegressionEvaluator.setMetricName("r2").evaluate(predDF)
    mlflow.log_metrics({"rmse":rmse, "r2":r2})
    
    train = train_df.drop("Sales")
    predictions = predDF.drop("Sales","features")
    signature = infer_signature(train, predictions)
    
    # log model
    mlflow.spark.log_model(pipelineModel, "model", signature=signature)
    
    # log artifact
    rfModel = pipelineModel.stages[-1]
    pandasDF = (pd.DataFrame(list(zip(VectorAssembler.getInputCols(), rfModel.featureImportance)),
                            columns=["feature","importance"]).sort_values(by="importance", ascending=False))
    
    # save 
    pandasDF.to_csv("advetising-feature-importance.csv", index=False)
    mlflow.log_artifact("advetising-feature-importance.csv")