In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ML_analysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()

In [None]:
# Load normalized data
df = spark.read.option("delimiter", "\t") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/yl9709/combined_training_before_normalize.txt")

# Select necessary columns and cast them as needed
from pyspark.sql.functions import col

df = df.select(
    col("numClick").cast("int"),
    col("region").cast("int"),
    col("city").cast("int"),
    col("adExchange").cast("int"),
    col("width").cast("int"),
    col("height").cast("int"),
    col("floorPrice").cast("int"),
    col("weekday"),
    col("hour").cast("int"),
    col("payingPrice").cast("int")
)

# Check the data
df.show(5)

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

# Adding a unique ID to test_df before splitting into train/test or applying transformations
df = df.withColumn("unique_id", monotonically_increasing_id())

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Indexing and encoding categorical columns
categoricalColumns = ['region', 'city', 'adExchange', 'weekday']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

# Assembling vector
assemblerInputs = [c + "classVec" for c in categoricalColumns] + ['width', 'height', 'floorPrice', 'hour']
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Set up the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="numClick", metricName="areaUnderROC")

train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)


In [None]:

# Logistic Regression
lr = LogisticRegression(labelCol="numClick", featuresCol="features", maxIter=10)
pipeline_lr = Pipeline(stages=stages + [lr])
model_lr = pipeline_lr.fit(train_df)
predictions_lr = model_lr.transform(test_df)
print("Logistic Regression AUC: ", evaluator.evaluate(predictions_lr))

In [None]:
# Gradient Boosting Trees
gbt = GBTClassifier(labelCol="numClick", featuresCol="features", maxIter=10)
pipeline_gbt = Pipeline(stages=stages + [gbt])
model_gbt = pipeline_gbt.fit(train_df)
predictions_gbt = model_gbt.transform(test_df)
print("Gradient Boosting AUC: ", evaluator.evaluate(predictions_gbt))

In [None]:
# Support Vector Machine
svm = LinearSVC(labelCol="numClick", featuresCol="features", maxIter=10)
pipeline_svm = Pipeline(stages=stages + [svm])
model_svm = pipeline_svm.fit(train_df)
predictions_svm = model_svm.transform(test_df)
print("SVM AUC: ", evaluator.evaluate(predictions_svm))

In [None]:
# Add the GBT prediction column to the test data
predictions_gbt = predictions_gbt.withColumn("unique_id", monotonically_increasing_id())
test_df= test_df.join(predictions_gbt.select('prediction', 'features'), on='features', how='inner')

In [None]:
# Sum bidding price to determine the budget. --- should it be paying price or bidding price
# The max bidding in the simulation will be less than this.

from pyspark.sql.functions import sum
total_paying_price = test_df.agg(sum("payingPrice").alias("total_paying_price")).collect()[0]["total_paying_price"]
print("Total Paying Price: ", total_paying_price)

In [None]:
# (avg winning price / average click through rate) to determine the range of the alpha

from pyspark.sql.functions import avg, max, min
results = test_df.agg(
    avg("payingPrice").alias("avg_paying_price"),
    avg("clickRate").alias("avg_click_rate"),
    max("payingPrice").alias("max_paying_price"),
).collect()[0]

ratio_avg = results["avg_paying_price"] / results["avg_click_rate"]

print("Ratio of average payingPrice to average clickRate:", ratio_avg)
print("Ratio of max payingPrice to min clickRate:", results["max_paying_price"])

# bid price = click through rate * alpha

### Define the Bidding Function:
(1) Bidding Price = α × Predicted Click Rate
<br>
<br>(2) Implement Binary Search: Use binary search to find the optimal α while staying in the budget
<br>
<br>(3) Simulation Function: Create a function to simulate bidding based on a given α and calculate the total clicks won and the total cost.
<br>
<br>(4) Optimization Loop: Use the binary search to adjust α to maximize clicks while staying under the budget.

In [None]:
# simulate bidding to based on a given alpha - returns the total clicks and total costs
def simulateBidding(alpha, test_df, budget):
    total_spent = 0
    total_clicks = 0
    roi_multiplier = 1

    for row in test_df.collect():
        predicted_click_rate = row.prediction
        winning_price = row.payingPrice / 1000.0
        bidding_hour = row.hour
        

        bid_price = alpha * (predicted_click_rate * roi_dict[bidding_hour])###
        if bid_price > winning_price:
            total_spent += winning_price
            total_clicks =  total_clicks + row.numClick


    return total_clicks, total_spent


# Use binary search to find the alpha that maximizes the click rate while staying under the budget
def binarySearch(low, high, test_df, budget):
    # returns the alpha
    alpha = 0
    max_clicks = 0

    while high - low > 0.01:
        mid = (low + high) / 2
        clicks, spend = simulateBidding(mid, test_df, budget)
        if spend > budget:
            high = mid
        else:
            low = mid
            if clicks > max_clicks:
                max_clicks = clicks
                alpha = mid

    return alpha

In [None]:
alpha = binarySearch(0, 100, test_df, 50980984/2) # the low & high & budget are to be changed later. 

In [None]:
# hour, ROI
# 