In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, min, max, avg, when, sum

# Spark oturumu başlat (Eğer hazırda yoksa)
spark = SparkSession.builder.appName("EcommerceEDA").getOrCreate()

# Veriyi oku (csv yolunu kendine göre ayarla)
# header=True ve inferSchema=True önemli
df = spark.read.csv("data/2019-Oct.csv", header=True, inferSchema=True)

# 1. Veri Tiplerini ve Şemayı gör
df.printSchema()

# 2. Event Type Dağılımı (Target dengesini görmek için)
print("Event Type Dağılımı:")
df.groupBy("event_type").count().orderBy("count", ascending=False).show()

# 3. Kayıp Veri (Null) Kontrolü
print("Kayıp Veri Kontrolü (Category ve Brand):")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["category_code", "brand"]]).show()

# 4. Session İstatistikleri (Aggregation Mantığı)
# Her session için başlangıç, bitiş ve toplam event sayısını bulalım
session_stats = df.groupBy("user_session").agg(
    count("*").alias("event_count"),
    min("event_time").alias("start_time"),
    max("event_time").alias("end_time"),
    countDistinct("product_id").alias("unique_products_viewed")
)

# Session istatistiklerinin özeti (Ortalama event sayısı, min, max vs.)
print("Session İstatistikleri Özeti:")
session_stats.select("event_count", "unique_products_viewed").describe().show()

# 5. Target Label Oluşturma Mantığına Bakış
# Hangi session'lar satın almayla bitmiş?
# purchase olayı geçen sessionları 1, geçmeyenleri 0 olarak etiketleyeceğiz.
# Sadece oranı görmek için basit bir işlem:
purchase_sessions = df.filter(col("event_type") == "purchase").select("user_session").distinct().count()
total_sessions = session_stats.count()

print(f"Toplam Session: {total_sessions}")
print(f"Satın Alma Olan Session: {purchase_sessions}")
print(f"Dönüşüm Oranı (Conversion Rate): %{purchase_sessions/total_sessions*100:.2f}")

                                                                                

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

Event Type Dağılımı:


                                                                                

+----------+--------+
|event_type|   count|
+----------+--------+
|      view|40779399|
|      cart|  926516|
|  purchase|  742849|
+----------+--------+

Kayıp Veri Kontrolü (Category ve Brand):


                                                                                

+-------------+-------+
|category_code|  brand|
+-------------+-------+
|     13515609|6113008|
+-------------+-------+

Session İstatistikleri Özeti:


25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:10 WARN RowBasedKeyValueBatch: Calling spill() on



25/12/18 15:46:37 WARN TaskMemoryManager: Failed to allocate a page (16777216 bytes), try again.
25/12/18 15:46:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBat



25/12/18 15:46:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 15:46:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+-------+-----------------+----------------------+
|summary|      event_count|unique_products_viewed|
+-------+-----------------+----------------------+
|  count|          9244422|               9244422|
|   mean|4.591824561881749|    3.0056981388344237|
| stddev|6.766294212991895|      4.16973940975059|
|    min|                1|                     1|
|    max|             1159|                   868|
+-------+-----------------+----------------------+





Toplam Session: 9244422
Satın Alma Olan Session: 629560
Dönüşüm Oranı (Conversion Rate): %6.81


                                                                                

In [None]:
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time
import os

# 1. AYARLAR: Localde RAM'i Artır (Bu kısım çok önemli)
# Eğer 16GB RAM'in varsa driver'a 8GB veriyoruz.
spark = SparkSession.builder \
    .appName("EcommerceML_Optimized") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Eğer dosya varsa sil (Tekrar çalıştırılabilir olsun diye)
import shutil
if os.path.exists("intermediate_data.parquet"):
    shutil.rmtree("intermediate_data.parquet")

print("--- ADIM 1: Veri Hazırlama ve Diske Yazma (Checkpointing) ---")

# Veriyi Oku
df = spark.read.csv("2019-Nov.csv", header=True, inferSchema=True)
df_clean = df.fillna({'category_code': 'unknown', 'brand': 'unknown'})

# Window ve Filtreleme İşlemleri (Leakage-Free Mantık)
window_spec = Window.partitionBy("user_session")
df_marked = df_clean.withColumn(
    "purchase_timestamp", 
    F.min(F.when(F.col("event_type") == "purchase", F.col("event_time"))).over(window_spec)
)

df_no_leakage = df_marked.filter(
    (F.col("purchase_timestamp").isNull()) | 
    (F.col("event_time") <= F.col("purchase_timestamp"))
)

# Aggregation (Özellik Çıkarımı)
session_features = df_no_leakage.groupBy("user_session").agg(
    F.max(F.when(F.col("purchase_timestamp").isNotNull(), 1).otherwise(0)).alias("label"),
    F.count(F.when(F.col("event_type") == "view", 1)).alias("view_count"),
    F.count(F.when(F.col("event_type") == "cart", 1)).alias("cart_count"),
    (F.max("event_time").cast("long") - F.min("event_time").cast("long")).alias("session_duration"),
    F.avg("price").alias("avg_price"),
    F.max("price").alias("max_price"),
    F.countDistinct("product_id").alias("unique_items")
)

# KRİTİK ADIM: Hesaplanmış veriyi Parquet olarak diske yaz.
# CSV yerine Parquet kullanıyoruz çünkü çok daha hızlıdır ve veri tiplerini korur.
print("Veri işleniyor ve 'intermediate_data.parquet' olarak kaydediliyor... (Sabırlı ol)")
session_features.write.mode("overwrite").parquet("intermediate_data.parquet")
print("Kayıt tamamlandı! Lineage kırıldı.")

# --- ADIM 2: Model Eğitimi (Temiz Veri Üzerinden) ---
print("\n--- ADIM 2: ML Modeli Eğitimi ---")

# Artık hesaplama yükü yok, sadece diskten okuma yükü var.
df_ml = spark.read.parquet("intermediate_data.parquet")

# Null doldurma
df_ml = df_ml.fillna(0)

# Localde test yaparken 9 Milyon satır bazen çok gelir.
# Eğer yine patlarsa aşağıdaki satırın başındaki # işaretini kaldır (Sample al).
# df_ml = df_ml.sample(fraction=0.1, seed=42)  # Verinin %10'unu al

assembler = VectorAssembler(
    inputCols=['view_count', 'cart_count', 'session_duration', 'avg_price', 'max_price', 'unique_items'],
    outputCol="features"
)

data_vec = assembler.transform(df_ml).select("label", "features")

# Eğitim/Test Ayrımı
train_data, test_data = data_vec.randomSplit([0.8, 0.2], seed=42)

# Modeli Eğit
print("Model eğitiliyor...")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20, maxDepth=5)

start_time = time.time()
model = rf.fit(train_data)
end_time = time.time()

print(f"Eğitim Süresi: {end_time - start_time:.2f} saniye")

# Değerlendirme
predictions = model.transform(test_data)
binary_eval = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = binary_eval.evaluate(predictions)

print(f"AUC Score: {auc:.4f}")

# Spark'ı durdur
spark.stop()

--- ADIM 1: Veri Hazırlama ve Diske Yazma (Checkpointing) ---


25/12/18 15:51:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/umut/Desktop/25-26 Fall/BIG DATA/project files/2019-Nov.csv.

25/12/18 18:48:27 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 161.9.67.113:54011 in 10000 milliseconds


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, min, max, avg, when, sum

# Spark oturumu başlat (Eğer hazırda yoksa)
spark = SparkSession.builder.appName("EcommerceEDA").getOrCreate()

# Veriyi oku (csv yolunu kendine göre ayarla)
# header=True ve inferSchema=True önemli
df = spark.read.csv("data/2019-Oct.csv", header=True, inferSchema=True)

# 1. Veri Tiplerini ve Şemayı gör
df.printSchema()

# 2. Event Type Dağılımı (Target dengesini görmek için)
print("Event Type Dağılımı:")
df.groupBy("event_type").count().orderBy("count", ascending=False).show()

# 3. Kayıp Veri (Null) Kontrolü
print("Kayıp Veri Kontrolü (Category ve Brand):")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["category_code", "brand"]]).show()

# 4. Session İstatistikleri (Aggregation Mantığı)
# Her session için başlangıç, bitiş ve toplam event sayısını bulalım
session_stats = df.groupBy("user_session").agg(
    count("*").alias("event_count"),
    min("event_time").alias("start_time"),
    max("event_time").alias("end_time"),
    countDistinct("product_id").alias("unique_products_viewed")
)

# Session istatistiklerinin özeti (Ortalama event sayısı, min, max vs.)
print("Session İstatistikleri Özeti:")
session_stats.select("event_count", "unique_products_viewed").describe().show()

# 5. Target Label Oluşturma Mantığına Bakış
# Hangi session'lar satın almayla bitmiş?
# purchase olayı geçen sessionları 1, geçmeyenleri 0 olarak etiketleyeceğiz.
# Sadece oranı görmek için basit bir işlem:
purchase_sessions = df.filter(col("event_type") == "purchase").select("user_session").distinct().count()
total_sessions = session_stats.count()

print(f"Toplam Session: {total_sessions}")
print(f"Satın Alma Olan Session: {purchase_sessions}")
print(f"Dönüşüm Oranı (Conversion Rate): %{purchase_sessions/total_sessions*100:.2f}")

                                                                                

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

Event Type Dağılımı:


                                                                                

+----------+--------+
|event_type|   count|
+----------+--------+
|      view|40779399|
|      cart|  926516|
|  purchase|  742849|
+----------+--------+

Kayıp Veri Kontrolü (Category ve Brand):


                                                                                

+-------------+-------+
|category_code|  brand|
+-------------+-------+
|     13515609|6113008|
+-------------+-------+

Session İstatistikleri Özeti:


25/12/18 00:40:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:40:06 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+-----------------+----------------------+
|summary|      event_count|unique_products_viewed|
+-------+-----------------+----------------------+
|  count|          9244422|               9244422|
|   mean|4.591824561881749|    3.0056981388344237|
| stddev|6.766294212991889|     4.169739409750599|
|    min|                1|                     1|
|    max|             1159|                   868|
+-------+-----------------+----------------------+





Toplam Session: 9244422
Satın Alma Olan Session: 629560
Dönüşüm Oranı (Conversion Rate): %6.81


                                                                                

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

# 1. TEMİZLİK (Cleaning)
# Boş kategorileri ve markaları 'unknown' ile dolduralım
df_clean = df.fillna({'category_code': 'unknown', 'brand': 'unknown'})

# 2. FEATURE ENGINEERING (Özellik Mühendisliği)
# Burası Spark'ın gücünü göstereceğimiz yer. Devasa bir GroupBy işlemi yapıyoruz.

session_features = df_clean.groupBy("user_session").agg(
    # --- Hedef Değişken (Label) ---
    # Eğer session içinde en az 1 tane 'purchase' varsa 1, yoksa 0
    F.max(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("label"),
    
    # --- Davranışsal Özellikler ---
    # 'view' sayısı
    F.count(F.when(F.col("event_type") == "view", 1)).alias("view_count"),
    # 'cart' sayısı
    F.count(F.when(F.col("event_type") == "cart", 1)).alias("cart_count"),
    
    # --- Zaman Bazlı Özellikler ---
    # Session süresi (Saniye cinsinden: Bitiş - Başlangıç)
    (F.max("event_time").cast("long") - F.min("event_time").cast("long")).alias("session_duration"),
    
    # --- Finansal Özellikler ---
    # Baktığı ürünlerin ortalama fiyatı (Null fiyatları 0 sayar)
    F.avg("price").alias("avg_price"),
    # Maksimum fiyat (Pahalı ürünlere mi bakıyor?)
    F.max("price").alias("max_price"),
    
    # --- Çeşitlilik ---
    # Kaç farklı ürüne bakmış?
    F.countDistinct("product_id").alias("unique_items")
)

# 3. VERİYİ KONTROL ETME & TEMİZLEME (Post-Processing)
# Bazen session duration 0 çıkabilir (sadece 1 event varsa). 
# Modelin şaşırmaması için çok aykırı değerleri filtreleyebiliriz ama şimdilik kalsın.

print("Oluşturulan Feature Tablosu:")
session_features.show(5)

# Verisetinin son dengesini görelim
print("Label Dağılımı:")
session_features.groupBy("label").count().show()

# Şemayı kontrol et (MLlib için numeric olmalılar)
session_features.printSchema()


Oluşturulan Feature Tablosu:


25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:56:08 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------------+-----+----------+----------+----------------+------------------+---------+------------+
|        user_session|label|view_count|cart_count|session_duration|         avg_price|max_price|unique_items|
+--------------------+-----+----------+----------+----------------+------------------+---------+------------+
|0361d9eb-993c-465...|    0|         2|         0|             336|            76.065|   100.65|           2|
|08bf8fbd-6234-4f9...|    0|        17|         0|            1217| 355.0035294117647|   501.81|          16|
|18d2bcd8-538e-4a7...|    0|        12|         0|             380|114.15916666666669|   167.83|          11|
|1afe870d-8e38-46b...|    0|         5|         0|             241|405.53599999999994|   1181.5|           4|
|23acc4df-4fb9-4a3...|    0|         1|         0|               0|             38.59|    38.59|           1|
+--------------------+-----+----------+----------+----------------+------------------+---------+------------+
only showi

25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/12/18 00:57:32 WARN RowBasedKeyValueBatch: Calling spill() on

+-----+-------+
|label|  count|
+-----+-------+
|    1| 629560|
|    0|8614862|
+-----+-------+

root
 |-- user_session: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- view_count: long (nullable = false)
 |-- cart_count: long (nullable = false)
 |-- session_duration: long (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- max_price: double (nullable = true)
 |-- unique_items: long (nullable = false)



                                                                                

In [11]:
from pyspark.sql import Window
from pyspark.sql import functions as F

# Veriyi tekrar okuyalım (Temiz başlangıç)
# df = spark.read.csv("2019-Nov.csv", header=True, inferSchema=True)
# df_clean = df.fillna({'category_code': 'unknown', 'brand': 'unknown'})

print("Veri hazırlanıyor (Leakage Önleyici Mantık ile)...")

# ADIM 1: Her session için 'Satın Alma Zamanını' bul
# Eğer satın almadıysa null döner.
window_spec = Window.partitionBy("user_session")

df_marked = df_clean.withColumn(
    "purchase_timestamp", 
    F.min(F.when(F.col("event_type") == "purchase", F.col("event_time"))).over(window_spec)
)

# ADIM 2: Geleceği Sil (Cut-off)
# Kural: 
# 1. Eğer purchase_timestamp NULL ise (satın almamış): Hepsini tut.
# 2. Eğer purchase_timestamp DOLU ise: Sadece event_time <= purchase_timestamp olanları tut.
#    (Yani satın aldıktan sonra siteyi gezmeye devam ettiyse o veriyi görmezden geliyoruz)

df_no_leakage = df_marked.filter(
    (F.col("purchase_timestamp").isNull()) | 
    (F.col("event_time") <= F.col("purchase_timestamp"))
)

# ADIM 3: Feature Engineering (Artık güvenli veri üzerinde)
# Bu sefer aggregation sonucunda oluşan özellikler, satın alma anına kadar olan özellikleri temsil eder.

session_features_safe = df_no_leakage.groupBy("user_session").agg(
    # Label: Satın alma zamanı sütunu doluysa 1, boşsa 0
    F.max(F.when(F.col("purchase_timestamp").isNotNull(), 1).otherwise(0)).alias("label"),
    
    # Featurelar (Sadece satın alma anına kadarki hareketler)
    F.count(F.when(F.col("event_type") == "view", 1)).alias("view_count"),
    F.count(F.when(F.col("event_type") == "cart", 1)).alias("cart_count"),
    
    # Süre: İlk girişten -> Satın almaya (veya çıkışa) kadar geçen süre
    (F.max("event_time").cast("long") - F.min("event_time").cast("long")).alias("session_duration"),
    
    F.avg("price").alias("avg_price"),
    F.max("price").alias("max_price"),
    F.countDistinct("product_id").alias("unique_items")
)

print("Güvenli tablo oluşturuldu. Örnek:")
session_features_safe.show(5)

# Bu noktadan sonra MLlib kodunu (VectorAssembler ve RandomForest)
# 'session_features' yerine 'session_features_safe' tablosu ile çalıştıracaksın.

Veri hazırlanıyor (Leakage Önleyici Mantık ile)...


ConnectionRefusedError: [Errno 61] Connection refused

In [12]:
import time
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# 1. Hazırlık (Vector Assembler)
print("1. Veri Vektörleştiriliyor...")

# Boşlukları 0 ile doldur (MLlib hata vermesin)
# avg_price null olabilir (hiçbir şeye bakmadıysa), 0 yapalım.
df_ml = session_features_safe.fillna(0)

assembler = VectorAssembler(
    inputCols=['view_count', 'cart_count', 'session_duration', 'avg_price', 'max_price', 'unique_items'],
    outputCol="features"
)

data_vec = assembler.transform(df_ml).select("label", "features")

# %80 Eğitim, %20 Test
train_data, test_data = data_vec.randomSplit([0.8, 0.2], seed=42)

# Cache mekanizması (Veri temiz olduğu için RAM'de tutmak hızlandırır)
# Cluster'da çok işe yarar.
# train_data.persist()

print(f"Eğitim Seti: {train_data.count()} satır")
print(f"Test Seti: {test_data.count()} satır")

# 2. Model Eğitimi (Random Forest)
print("\n2. Model Eğitimi Başlıyor (Random Forest)...")

# numTrees=20 (Başlangıç için)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20, maxDepth=5)

start_time = time.time()
model = rf.fit(train_data)
end_time = time.time()

training_duration = end_time - start_time
print(f"Model Eğitimi Bitti. Süre: {training_duration:.2f} saniye")

# 3. Değerlendirme
print("\n3. Sonuçlar Hesaplanıyor...")
predictions = model.transform(test_data)

# AUC Score (Dengesiz veri için en önemli metrik)
binary_eval = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = binary_eval.evaluate(predictions)

# F1, Precision, Recall
multi_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
acc = multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"})
f1 = multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"})
weighted_recall = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedRecall"})
# Pozitif sınıfın (1) Recall değeri çok önemlidir (Satın alanları yakalayabildik mi?)
# Bunu elle hesaplamak gerekebilir ama weightedRecall şimdilik fikir verir.

print("-" * 30)
print("LEAKAGE-FREE MODEL SONUÇLARI")
print("-" * 30)
print(f"Eğitim Süresi: {training_duration:.2f} saniye")
print(f"Accuracy     : {acc:.4f}")
print(f"AUC Score    : {auc:.4f}")
print(f"F1 Score     : {f1:.4f}")
print("-" * 30)

# Feature Importance (Hangi özellik daha kritikmiş?)
print("Özellik Önem Düzeyleri:")
import pandas as pd
feature_list = ['view_count', 'cart_count', 'session_duration', 'avg_price', 'max_price', 'unique_items']
imp_df = pd.DataFrame(list(zip(feature_list, model.featureImportances.toArray())), columns=["Feature", "Importance"])
print(imp_df.sort_values(by="Importance", ascending=False))

1. Veri Vektörleştiriliyor...


ConnectionRefusedError: [Errno 61] Connection refused