>(#scrollTo=hDodfHbJsR-c)

>>[Setup](#scrollTo=BU47dap013Xn)

>>[Data Integration, Cleansing, and Exploration](#scrollTo=RL54JZqXYZjy)

>>[Feature Engineering](#scrollTo=p6NuyT0hZT7d)



In [None]:
import json
import numpy as np
import pandas as pd

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Setup

In [None]:
# install PySpark
! pip install pyspark >& /dev/null

In [None]:
! mkdir drive/MyDrive/IST718/final_project/ -p

In [None]:
# create Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Final-project-SEO').getOrCreate()

## Data Integration, Cleansing, and Exploration


Import retail data, explicitly defining the schema.

In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType

# from pyspark.sql import functions as f

schema = (
    StructType()
    .add("Keyword", StringType(), True)
    .add("Volume", IntegerType(), True)
    .add("Global volume", IntegerType(), True)
    .add("Traffic potential", IntegerType(), True)
    .add("SERP Features", StringType(), True)
)

seo_df = spark.read.csv(
    "drive/MyDrive/IST718/final_project/seo_data.csv", header=True, schema=schema
)

print("There are", seo_df.count(), "records in the seo data set.")
seo_df.show()

There are 150000 records in the seo data set.
+--------------------+------+-------------+-----------------+--------------------+
|             Keyword|Volume|Global volume|Traffic potential|       SERP Features|
+--------------------+------+-------------+-----------------+--------------------+
|     resume template|    90|           90|             2900|People also ask,I...|
|cover letter temp...|    50|           60|               60|Image pack,Sitelinks|
|    invoice template|   150|          300|               70|Image pack,People...|
|business plan tem...|    40|           60|            20000|Featured snippet,...|
|roblox shirt temp...|    20|           20|             NULL|                NULL|
|resignation lette...|    30|           80|             1400|People also ask,S...|
|            template|    20|           20|             NULL|                NULL|
|letter of recomme...|    20|           20|               40|Shopping results,...|
|       meme template|    30|          15

In [None]:
queries_col = seo_df.select('Keyword').distinct().collect()
all_queries = [row['Keyword'] for row in queries_col]

In [None]:
with open(
     "/content/drive/MyDrive/IST718/final_project/cluster_categories.json",
    "r",
    encoding="utf-8",
) as f:
    cluster_names = json.load(f)

## Faiss clustering

In [None]:
# encode data
!pip install -U sentence-transformers
from sentence_transformers import SentenceTransformer

Collecting sentence-transformers
  Downloading sentence_transformers-3.0.1-py3-none-any.whl (227 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/227.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m174.1/227.1 kB[0m [31m4.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m227.1/227.1 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (

  from tqdm.autonotebook import tqdm, trange


In [None]:
encoder = SentenceTransformer("all-mpnet-base-v2")
query_embeddings = encoder.encode(all_queries)
cluster_name_embeddings = encoder.encode(cluster_names)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [None]:
# save embeddings to avoid re-generating
with open("/content/drive/MyDrive/IST718/final_project/embeddings.npy", "wb") as f:
    np.save(f, query_embeddings)
    np.save(f, cluster_name_embeddings)

In [None]:
# read embeddings
with open("/content/drive/MyDrive/IST718/final_project/embeddings.npy", "rb") as f:
    query_embeddings = np.load(f)
    cluster_name_embeddings = np.load(f)

In [None]:
# cluster data
!apt install libomp-dev
!pip install faiss-cpu

import faiss

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libomp-14-dev libomp5-14
Suggested packages:
  libomp-14-doc
The following NEW packages will be installed:
  libomp-14-dev libomp-dev libomp5-14
0 upgraded, 3 newly installed, 0 to remove and 45 not upgraded.
Need to get 738 kB of archives.
After this operation, 8,991 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 libomp5-14 amd64 1:14.0.0-1ubuntu1.1 [389 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 libomp-14-dev amd64 1:14.0.0-1ubuntu1.1 [347 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy/universe amd64 libomp-dev amd64 1:14.0-55~exp2 [3,074 B]
Fetched 738 kB in 1s (544 kB/s)
Selecting previously unselected package libomp5-14:amd64.
(Reading database ... 121913 files and directories currently installed.)
Preparing to unpack .../libomp5-14_1%3a14

In [None]:
def assign_to_clusters_faiss(
    input_embeddings, assign_cluster_embeddings, threshold=None
):
    """
    Assigns input data points to the nearest cluster based on
    the distance between the input data points and the cluster centroids.
    """
    # Ensure data is in the right shape
    input_embeddings = np.array(input_embeddings).astype("float32")
    assign_cluster_embeddings = np.array(assign_cluster_embeddings).astype("float32")

    # Create a Faiss index
    dimension = assign_cluster_embeddings.shape[1]
    index = faiss.IndexFlatL2(dimension)

    # Add cluster vectors to the index
    index.add(assign_cluster_embeddings)

    # Number of nearest neighbors to search for
    k = 1
    # Search for the nearest cluster for each data vector
    distances, indices = index.search(input_embeddings, k)

    clustered_output = {
        cluster_names[i]: [] for i in range(len(assign_cluster_embeddings))
    }
    unassigned_output = []

    for i, data_vector in enumerate(input_embeddings):
        best_cluster_idx = indices[i][0]
        best_distance = distances[i][0]

        # Assign the data point to the nearest cluster if no threshold or if the distance is below the threshold
        if threshold is None or best_distance < threshold:
            clustered_output[cluster_names[best_cluster_idx]].append(
                all_queries[i]
                # {
                #     "query": all_queries[i],
                #     "distance": best_distance,
                #     "cluster": cluster_names[best_cluster_idx],
                # },
            )
        else:
            unassigned_output.append(
                {
                    "query": all_queries[i],
                    "distance": best_distance,
                    "cluster": cluster_names[best_cluster_idx],
                },
            )

    return clustered_output, unassigned_output

In [None]:
class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NpEncoder, self).default(obj)

In [None]:
[assigned_clusters, unassigned_clusters] = assign_to_clusters_faiss(
    query_embeddings, cluster_name_embeddings, threshold=1.412
)

In [None]:
with open(
    "/content/drive/MyDrive/IST718/final_project/assigned_cluster_to_queries.json",
    "w",
    encoding="utf-8",
) as f:
    json.dump(assigned_clusters, f, cls=NpEncoder)

with open(
    "/content/drive/MyDrive/IST718/final_project/unassigned_queries_data.json",
    "w",
    encoding="utf-8",
) as f:
    json.dump(unassigned_clusters, f, cls=NpEncoder)

In [None]:
# load data again to avoid re-running the notebook
with open( "/content/drive/MyDrive/IST718/final_project/seo_data.csv", "r") as f:
    seo_df_pandas = pd.read_csv(f)

with open(
    "/content/drive/MyDrive/IST718/final_project/assigned_cluster_to_queries.json",
    "r",
    encoding="utf-8",
) as f:
    assigned_clusters = json.load(f)

In [None]:
# assigned clusters to df
# Create a dictionary for quick lookup of volumes
volume_lookup = dict(zip(seo_df_pandas['Keyword'], seo_df_pandas['Volume']))

# Flatten the JSON structure
flattened_data = []
for cluster_name, queries in assigned_clusters.items():
    queries_list = []
    for query in queries:
        # print("query", cluster_name, sub_key, query)
        volume = volume_lookup.get(query, 0)  # Default volume to 0 if not found
        queries_list.append(query)
    queries_sorted_by_volume = sorted(queries_list, key=lambda x: volume_lookup.get(x, 0), reverse=True)
    if isinstance(queries_sorted_by_volume, list):
        queries_str = ", ".join(map(str, queries_sorted_by_volume))  # Convert each element to string if it's not already
    else:
        queries_str = str(queries_sorted_by_volume)  # Ensure the queries field is a string
    flattened_entry = {
        "cluster_name": cluster_name,
        "queries": queries_str,
        "count": len(queries),
        "total_msv": sum([volume_lookup.get(query, 0) for query in queries]),
    }
    flattened_data.append(flattened_entry)


# Create DataFrame˜
clustered_df = pd.DataFrame(flattened_data)

clustered_df.head()

Unnamed: 0,cluster_name,queries,count,total_msv
0,Content Scheduler,"home watch checklist template, timeboxing temp...",855,98790
1,Advertisement,"social media advertisement template, blank adv...",118,9710
2,Banner,"linkedin banner template 2017, etsy banner tem...",1453,157220
3,Flyer,"flyer template free download, toastmasters fly...",1036,88940
4,Logo,"nfl logo template, world series logo template,...",289,34550


In [None]:
clustered_df.to_csv("/content/drive/MyDrive/IST718/final_project/clustered_queries_volume.csv", index=False)

## Feature Engineering

In [None]:
import pyspark.sql.functions as fn
#Fix non-sensical data (the data has been shuffled to protect business assets,
#so there are columns where the volume column is greater than the traffic potential column)
mean_volume = seo_df.select(fn.mean('volume')).collect()[0][0]
mean_traffic_potential = seo_df.select(fn.mean('traffic potential')).collect()[0][0]

#Replace nonsense and null volume values with the mean
seo_df = seo_df.withColumn('volume',
                           fn.when(seo_df['volume'] > seo_df['traffic potential'], mean_volume)
                            .otherwise(seo_df['volume']))

#Round volume and cast as int
seo_df = seo_df.withColumn('volume', fn.round(seo_df['volume']).cast(IntegerType()))

#Repeat for traffic potential
seo_df = seo_df.withColumn('traffic potential',
                           fn.when((seo_df['volume'] > seo_df['traffic potential']), mean_traffic_potential)
                            .otherwise(seo_df['traffic potential']))

seo_df = seo_df.withColumn('traffic potential',
                           fn.when(fn.col('traffic potential').isNull(), mean_traffic_potential)
                            .otherwise(fn.col('traffic potential')))

#Round traffic potential and cast as int
seo_df = seo_df.withColumn('traffic potential', fn.round(seo_df['traffic potential']).cast(IntegerType()))

# Create new Opportunity column as a difference between Volume and Traffic potential
seo_df = seo_df.withColumn("Opportunity", fn.expr("`Traffic potential` - Volume"))

In [None]:
#Interpolate NULL values of SERP Features column with values to preserve current distribution
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql import Window
from pyspark import SparkContext
import random

# Function to randomly select features based on distribution
def random_features(distribution, count=1):
    #Captures the feature list
    features = [feature for feature, weight in distribution.items()]
    #Captures the probability of each feature appearing
    probabilities = [weight for feature, weight in distribution.items()]
    #Returns a random feature based on the probability of it appearing
    return random.choices(features, probabilities, k=count)

#Creates a Spark User Defined Function based on random_features function to return an array of strings
random_features_udf = fn.udf(random_features, ArrayType(StringType()))
#Splits the SERP Features Column on the comma and creates a new dataframe to manipulate for capturing probabilities
feature_distribution_df = seo_df.withColumn('feature', fn.explode(fn.split(fn.col('SERP Features'), ',')))
#Gets probabilities of features appearing to use with the random_features function
feature_distribution = feature_distribution_df.groupBy('feature').count().withColumn(
    'probability', fn.col('count') / fn.sum('count').over(Window.partitionBy()))
#Creates a local dictionary with the features and their probabilities from the RDD
feature_distribution_dict = feature_distribution.select('feature', 'probability').rdd.collectAsMap()

# Broadcast the feature distribution dictionary back to each worker node so
# each has the appropriate data to run the random_features function
sc = SparkContext.getOrCreate()
broadcasted_distribution = sc.broadcast(feature_distribution_dict)

# Function to interpolate SERP features based on the distribution
def interpolate_features():
    # Creates a list of the features from the broadcasted dictionary
    features = list(broadcasted_distribution.value.keys())
    # Creates a list of the associated probabilities
    probabilities = list(broadcasted_distribution.value.values())
    # Chooses a random feature based on that probability
    chosen_feature = random.choices(features, probabilities)[0]
    # And returns that feature
    return chosen_feature

# Creates a Spark user defined function for interpolation that returns a string
interpolate_features_udf = fn.udf(interpolate_features, StringType())

# Interpolate values into NULL cells in "SERP Features" column of the seo_df
seo_df = seo_df.withColumn('SERP Features',
                           fn.when(fn.col('SERP Features').isNull(),
                                  interpolate_features_udf())
                            .otherwise(fn.col('SERP Features')))

In [None]:
# Split the 'SERP Features' column into temp feature array column
seo_df = seo_df.withColumn("features_array", fn.split(fn.col("SERP Features"), ","))

# Collect all distinct features
distinct_features = seo_df.selectExpr("explode(features_array) as feature").distinct().collect()
distinct_features = [row["feature"] for row in distinct_features]

# Create one-hot encoded columns
for feature in distinct_features:
    seo_df = seo_df.withColumn(feature, fn.when(fn.array_contains(fn.col("features_array"), feature), 1).otherwise(0))

# Drop the 'features_array' column
seo_df = seo_df.drop("features_array")

seo_df.show(truncate=False)

+---------------------------------+------+-------------+-----------------+----------------------------------------------------------------------------+-----------+----------+--------------+-------------+-----------+------+---------+----------------+----------+---------------+----------------+--------------+-------+---------+----------+---------------+------+
|Keyword                          |volume|Global volume|traffic potential|SERP Features                                                               |Opportunity|Bottom ads|Knowledge card|Video preview|Top stories|Videos|Thumbnail|Shopping results|Image pack|People also ask|Featured snippet|Paid sitelinks|Top ads|Sitelinks|Local pack|Knowledge panel|Tweets|
+---------------------------------+------+-------------+-----------------+----------------------------------------------------------------------------+-----------+----------+--------------+-------------+-----------+------+---------+----------------+----------+---------------+--

## Modeling

In [None]:
#Baseline Linear Regression Model to identify which SERP Features
# have strongest coefficients to drive Volume
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

#Create Test/Train Split
training_df, testing_df = seo_df.randomSplit([0.7, 0.3], seed = 42)

#Create Pipeline assets
categorical_assembler = VectorAssembler(inputCols=['Bottom ads', 'Knowledge card', 'Video preview', 'Top stories',
                                'Videos', 'Thumbnail', 'Shopping results', 'Image pack', 'People also ask',
                                'Featured snippet', 'Paid sitelinks', 'Top ads', 'Sitelinks', 'Local pack',
                                'Knowledge panel', 'Tweets'], outputCol='features_cat')
continuous_assembler = VectorAssembler(inputCols=['traffic potential'], outputCol='feature_cont')
scaler = StandardScaler(inputCol='feature_cont', outputCol='scaled_feature', withStd=True, withMean=True)
combined_assembler = VectorAssembler(inputCols=['scaled_feature', 'features_cat'], outputCol='features_all')
lr1 = LinearRegression(featuresCol='features_all', labelCol='volume')

#Build pipe and fit Model
pipeline1 = Pipeline(stages=[categorical_assembler, continuous_assembler, scaler, combined_assembler, lr1])
model1 = pipeline1.fit(training_df)

#Test and Evaluate
lr_predictions1 = model1.transform(testing_df)

evaluator = RegressionEvaluator(
    labelCol = 'volume',
    predictionCol = 'prediction',
    metricName = 'mse')

lr_test_mse = evaluator.evaluate(lr_predictions1)

In [None]:
# Extract model from pipeline and get summary stats
lr_model = model1.stages[-1]
trainingSummary = lr_model.summary

# Print a formatted summary of the model
print("Linear Regression Model Summary:")
print("-" * 30)
print(f"R-squared (R2): {trainingSummary.r2:.4f}")
print(f"Intercept: {lr_model.intercept:.2f}")
print("\nCoefficients:")
feature_cols = ['traffic potential', 'Bottom ads', 'Knowledge card', 'Video preview', 'Top stories',
                'Videos', 'Thumbnail', 'Shopping results', 'Image pack', 'People also ask',
                'Featured snippet', 'Paid sitelinks', 'Top ads', 'Sitelinks', 'Local pack',
                'Knowledge panel', 'Tweets']
for i, coef in enumerate(lr_model.coefficients):
    print(f"  {feature_cols[i]}: {coef:.2f}")
print("\nP-values:")
for i, p_value in enumerate(trainingSummary.pValues[:-1]):
    print(f"  {feature_cols[i]}: {p_value:.4f}")
print("\nStandard Errors:")
for i, std_err in enumerate(trainingSummary.coefficientStandardErrors[:-1]):
    print(f"  {feature_cols[i]}: {std_err:.2f}")
print("-" * 30)

Linear Regression Model Summary:
------------------------------
R-squared (R2): 0.0023
Intercept: 61.49

Coefficients:
  traffic potential: 33.78
  Bottom ads: -15.86
  Knowledge card: -9.76
  Video preview: 3.48
  Top stories: -13.55
  Videos: -13.16
  Thumbnail: 16.61
  Shopping results: -20.26
  Image pack: 2.06
  People also ask: 41.81
  Featured snippet: -22.19
  Paid sitelinks: 11.45
  Top ads: -32.86
  Sitelinks: 3.66
  Local pack: -10.46
  Knowledge panel: 54.78
  Tweets: 297.50

P-values:
  traffic potential: 0.0000
  Bottom ads: 0.7564
  Knowledge card: 0.9492
  Video preview: 0.9190
  Top stories: 0.7353
  Videos: 0.1231
  Thumbnail: 0.0125
  Shopping results: 0.2220
  Image pack: 0.7321
  People also ask: 0.0000
  Featured snippet: 0.2474
  Paid sitelinks: 0.8335
  Top ads: 0.5316
  Sitelinks: 0.5606
  Local pack: 0.9900
  Knowledge panel: 0.2261
  Tweets: 0.3840

Standard Errors:
  traffic potential: 2.59
  Bottom ads: 51.13
  Knowledge card: 153.30
  Video preview: 34.23


In [None]:
print(f'Linear regression test MSE: {lr_test_mse:.0f}')

Linear regression test MSE: 371667


Extremely low R-squared value. Let's see if we can improve on this. We do have a few features that have particularly notable p-values. Let's focus on those.

In [None]:
#Create Pipeline assets
categorical_assembler = VectorAssembler(inputCols=['Thumbnail', 'People also ask'], outputCol='features_cat')
continuous_assembler = VectorAssembler(inputCols=['traffic potential'], outputCol='feature_cont')
scaler = StandardScaler(inputCol='feature_cont', outputCol='scaled_feature', withStd=True, withMean=True)
combined_assembler = VectorAssembler(inputCols=['scaled_feature', 'features_cat'], outputCol='features_all')
lr1 = LinearRegression(featuresCol='features_all', labelCol='volume')

#Build pipe and fit Model
pipeline2 = Pipeline(stages=[categorical_assembler, continuous_assembler, scaler, combined_assembler, lr1])
model2 = pipeline2.fit(training_df)

#Test and Evaluate
lr_predictions2 = model2.transform(testing_df)

lr_test_mse = evaluator.evaluate(lr_predictions2)

In [None]:
# Extract model from pipeline and get summary stats
lr_model2 = model2.stages[-1]
trainingSummary2 = lr_model2.summary

# Print a formatted summary of the model
print("Linear Regression 2 Model Summary:")
print("-" * 30)
print(f"R-squared (R2): {trainingSummary2.r2:.4f}")
print(f"Intercept: {lr_model2.intercept:.2f}")
print("\nCoefficients:")
feature_cols = ['traffic potential', 'Thumbnail', 'People also ask']
# Print the coefficients for each feature
for i, coef in enumerate(lr_model2.coefficients):
    if i < len(feature_cols):
        print(f"  {feature_cols[i]}: {coef:.2f}")
print("\nP-values:")
# Print the p-values for each feature
for i, p_value in enumerate(trainingSummary2.pValues):
    if i < len(feature_cols):  # Exclude the intercept's p-value
        print(f"  {feature_cols[i]}: {p_value:.4f}")
print("\nStandard Errors:")
# Print the standard errors for each feature
for i, std_err in enumerate(trainingSummary2.coefficientStandardErrors):
    if i < len(feature_cols):
        print(f"  {feature_cols[i]}: {std_err:.2f}")
print("-" * 30)

Linear Regression 2 Model Summary:
------------------------------
R-squared (R2): 0.0022
Intercept: 64.65

Coefficients:
  traffic potential: 33.76
  Thumbnail: 14.49
  People also ask: 40.71

P-values:
  traffic potential: 0.0000
  Thumbnail: 0.0193
  People also ask: 0.0000

Standard Errors:
  traffic potential: 2.58
  Thumbnail: 6.19
  People also ask: 5.39
------------------------------


In [None]:
print(f'Linear regression test MSE: {lr_test_mse:.0f}')

Linear regression test MSE: 371668


Cut model size substantially without sacrificing performance, although it's still an extremely small r-squared. Let's try some other model options.

In [None]:
from pyspark.ml.regression import GBTRegressor

# Create the GBT regressor
gbt = GBTRegressor(featuresCol='features_all', labelCol='volume')

# Build pipe and fit Model
pipeline_gbt = Pipeline(stages=[categorical_assembler, continuous_assembler, scaler, combined_assembler, gbt])
model_gbt = pipeline_gbt.fit(training_df)

# Test and Evaluate
gbt_predictions = model_gbt.transform(testing_df)
gbt_test_mse = evaluator.evaluate(gbt_predictions)

In [None]:
# Extract model from pipeline
gbt_model = model_gbt.stages[-1]
feature_cols = ['traffic potential', 'Thumbnail', 'People also ask']

# Print the model summary statistics
print("GBTRegressor Model Summary:")
print("-" * 30)
print("Feature Importances:")
sorted_importances = sorted(zip(feature_cols, gbt_model.featureImportances.toArray()), key=lambda x: x[1], reverse=True)
for feature, importance in sorted_importances:
    print(f"  {feature}: {importance:.4f}")
print(f"Number of Trees: {gbt_model.getNumTrees}")
print(f"Total Number of Nodes: {sum([tree.numNodes for tree in gbt_model.trees])}")
print("-" * 30)

GBTRegressor Model Summary:
------------------------------
Feature Importances:
  traffic potential: 0.9280
  People also ask: 0.0379
  Thumbnail: 0.0340
Number of Trees: 20
Total Number of Nodes: 1084
------------------------------


In [None]:
print(f'Gradient-Boosting Tree regression test MSE: {gbt_test_mse:.0f}')

Gradient-Boosting Tree regression test MSE: 367662


In [None]:
# Create a second R-squared evaluator for the model
evaluator = RegressionEvaluator(labelCol="volume", predictionCol="prediction", metricName="r2")

# Calculate R2 on test data
r2 = evaluator.evaluate(gbt_predictions)
print(f"R-squared (R2) on test data = {r2:.4f}")

R-squared (R2) on test data = 0.0132


We signficantly improved the model, although we are still yielding a low R-squared value in the grand scheme of things. Let's try an alternate approach - let's bucketize the volume column to see if we can create a model that can predict volume tiers.

In [None]:
from pyspark.ml.feature import Bucketizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define the splits for the buckets
splits = [-float("inf"), 50, 100, 150, 200, float("inf")]

# Bucketize the 'volume' column
bucketizer = Bucketizer(splits=splits, inputCol="volume", outputCol="label")

# Create a logistic regression model
logistic_regression = LogisticRegression(featuresCol='features_all', labelCol='label')

# Create the pipeline with bucketizer and logistic regression
pipeline_lr = Pipeline(stages=[categorical_assembler, continuous_assembler, scaler, combined_assembler, bucketizer, logistic_regression])

# Fit the pipeline to the training data
model_lr = pipeline_lr.fit(training_df)

# Make predictions on the testing data
predictions_lr = model_lr.transform(testing_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_lr)
print(f"Logistic Regression Model Accuracy: {accuracy:.4f}")

# Print the coefficients and intercepts
lr_model = model_lr.stages[-1]
print("Intercept:", lr_model.interceptVector)
print("Coefficients:")
for i, coef in enumerate(lr_model.coefficientMatrix.toArray()):
    print(f"  Class {i}: {coef}")

Logistic Regression Model Accuracy: 0.5351
Intercept: [1.9470265020031052,1.3052516526526012,-1.139353158577698,-1.3068907360116067,-0.806034260066402]
Coefficients:
  Class 0: [-0.07990479 -0.21700577 -0.42744536]
  Class 1: [-0.04870704  0.05634628 -0.26173621]
  Class 2: [0.02464433 0.00910982 0.07833042]
  Class 3: [0.02917589 0.01837014 0.18335094]
  Class 4: [0.07479161 0.13317953 0.42750021]


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert the predictions to an RDD
predictionAndLabels = predictions_lr.select("prediction", "label").rdd

# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)

# Confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()

# Print the confusion matrix
print("Confusion Matrix:")
print(confusion_matrix)



Confusion Matrix:
[[2.4014e+04 3.0000e+00 0.0000e+00 0.0000e+00 1.6000e+01]
 [1.4784e+04 0.0000e+00 0.0000e+00 0.0000e+00 7.0000e+00]
 [1.6280e+03 0.0000e+00 0.0000e+00 0.0000e+00 2.0000e+00]
 [1.4210e+03 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00]
 [3.0110e+03 0.0000e+00 0.0000e+00 0.0000e+00 1.0000e+01]]


Better than random guessing, but still not ideal. Let's try a Gradient Boosting Tree.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Create an instance of the RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features_all', labelCol='label')

# Create the pipeline with the stages you need
pipeline = Pipeline(stages=[categorical_assembler, continuous_assembler, scaler, combined_assembler, rf])

# Fit the pipeline to the training data
model = pipeline.fit(training_df)

# Make predictions on the testing data
predictions = model.transform(testing_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Random Forest Model Accuracy: {accuracy:.4f}")


Random Forest Model Accuracy: 0.5884


In [None]:
# Create a MulticlassMetrics object
preds_and_labels = predictions.select(['prediction', 'label']).rdd
# Check if MulticlassMetrics is a variable or a class
print(type(MulticlassMetrics))
# Cast the prediction and label columns to double type
preds_and_labels = preds_and_labels.map(lambda row: (float(row['prediction']), float(row['label'])))
metrics = MulticlassMetrics(preds_and_labels)

# Print the confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:")
print(confusion_matrix)

<class 'type'>




Confusion Matrix:
[[1.4886e+04 1.1230e+03 0.0000e+00 0.0000e+00 0.0000e+00]
 [6.9900e+03 2.8010e+03 0.0000e+00 0.0000e+00 0.0000e+00]
 [1.1010e+03 3.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00]
 [9.5700e+02 2.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00]
 [2.0880e+03 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00]]
