In [None]:
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder.appName("PurchaseIntentionAnalysis")
    .remote("sc://192.168.1.7:15002")
    .config("spark.sql.ansi.enabled", "false")
    .config("spark.sql.repl.eagerEval.enabled", "true")
    .getOrCreate()
)

sessions_data = spark.read.csv(["/opt/spark/data/worker1/*.csv", "/opt/spark/data/worker2/*.csv"], header=True, inferSchema=True)
sessions_data.createOrReplaceTempView("sessions_data")
sessions_data.repartition(3)

sessions_data

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType, IntegerType, BooleanType, StringType

@udf(FloatType())
def ratio_duration_per_visit(visitc, duration):
  return 0 if visitc == 0 else duration / visitc
  
@udf(BooleanType())
def is_special_date(special_day):
  return special_day > 0

@udf(StringType())
def operating_system_label(os):
  match os:
    case 1:
      return 'OS_1'
    case 2:
      return 'OS_2'
    case 3:
      return 'OS_3'
    case _:
      return 'OS_Other'
    
@udf(StringType())
def region_label(region):
  return 'Region_1' if region == 1 else 'Region_Other'

@udf(StringType())
def traffic_type_label(traffic_type):
  return 'TrafficType_1_3' if 1 <= traffic_type <= 3 else 'TrafficType_Other'

@udf(IntegerType())
def month_number(month):
  month_mapping = {
    'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'June': 6,
    'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
  }
  return month_mapping[month]

sessions_data_fe = sessions_data.withColumns({
  "Administrative_Duration_Per_Visit": ratio_duration_per_visit(
    col("Administrative"), col("Administrative_Duration")
  ),
  "Informational_Duration_Per_Visit": ratio_duration_per_visit(
    col("Informational"), col("Informational_Duration")
  ),
  "ProductRelated_Duration_Per_Visit": ratio_duration_per_visit(
    col("ProductRelated"), col("ProductRelated_Duration")
  ),
  "Is_Special_Date": is_special_date(col("SpecialDay")),
  "OperatingSystems": operating_system_label(col("OperatingSystems")),
  "Region": region_label(col("Region")),
  "TrafficType": traffic_type_label(col("TrafficType")),
  "Month_Number": month_number(col("Month"))
})
sessions_data_fe

In [None]:
# only important features
sessions_data_fee = sessions_data_fe.select([
  "Revenue",
  "Administrative_Duration_Per_Visit",
  "Informational_Duration_Per_Visit",
  "ProductRelated_Duration_Per_Visit",
  "BounceRates",
  "ExitRates",
  "PageValues",
  "Is_Special_Date",
  "Month_Number",
  "OperatingSystems",
  "Region",
  "TrafficType",
  "Weekend"
])
sessions_data_fee