In [1]:
%%spark
sc.install_pypi_package("numpy==1.26.4")
sc.install_pypi_package("pandas==1.5.3")
sc.install_pypi_package("pyarrow==12.0.1")
sc.install_pypi_package("scikit-learn==1.3.2")

# ihtiyacın varsa:
sc.install_pypi_package("lightgbm==4.3.0")
sc.install_pypi_package("xgboost==2.0.3")

# Kuruldu mu kontrol et
sc.list_packages()


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1755611270905_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting numpy==1.26.4
  Downloading numpy-1.26.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
Installing collected packages: numpy
Successfully installed numpy-1.26.4

Collecting pandas==1.5.3
  Downloading pandas-1.5.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.2 MB)
Installing collected packages: pandas
  Attempting uninstall: pandas
    Found existing installation: pandas 2.2.3
    Not uninstalling pandas at /usr/local/lib64/python3.9/site-packages, outside environment /mnt3/yarn/usercache/livy/appcache/application_1755611270905_0001/container_1755611270905_0001_01_000001/tmp/spark-a8deb592-c396-48bc-8063-496981e24fa3
    Can't uninstall 'pandas'. No files were found to uninstall.
Successfully installed pandas-1.5.3

Collecting pyarrow==12.0.1
  Downloading pyarrow-12.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (39.0 MB)
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: 

In [2]:
# PySpark oturumu: Hem PySpark kernelinde hem de EMR/Studio üzerinde güvenli.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .appName("TrendyolSearchRanking")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .config("spark.sql.shuffle.partitions", "400")\
    .getOrCreate()

spark


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fdcc96ec3a0>

## 2) Yol/Veri Kökü ve Güvenli Okuma

In [3]:
DATA_ROOT = "s3://melek-hackathon/raw/data"

from pyspark.sql.functions import col, lit, when, max as smax
from pyspark.sql.types import *

def p(path): 
    return f"{DATA_ROOT.rstrip('/')}/{path.lstrip('/')}"

# Dosya yolları (katalog)
PATHS = {
    "train_sessions":      p("train_sessions.parquet"),
    "test_sessions":       p("test_sessions.parquet"),
    "content_meta":        p("content/metadata.parquet"),
    "content_pricing":     p("content/price_rate_review_data.parquet"),
    "content_search":      p("content/search_log.parquet"),
    "content_sitewide":    p("content/sitewide_log.parquet"),
    "content_top_terms":   p("content/top_terms_log.parquet"),
    "user_meta":           p("user/metadata.parquet"),
    "user_search":         p("user/search_log.parquet"),
    "user_sitewide":       p("user/sitewide_log.parquet"),
    "user_fashion_search": p("user/fashion_search_log.parquet"),
    "user_fashion_site":   p("user/fashion_sitewide_log.parquet"),
    "term_search":         p("term/search_log.parquet"),
}

# Varlık kontrolü ve okunabilirlik
for k,v in PATHS.items():
    print(k, "→", v)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

train_sessions ? s3://melek-hackathon/raw/data/train_sessions.parquet
test_sessions ? s3://melek-hackathon/raw/data/test_sessions.parquet
content_meta ? s3://melek-hackathon/raw/data/content/metadata.parquet
content_pricing ? s3://melek-hackathon/raw/data/content/price_rate_review_data.parquet
content_search ? s3://melek-hackathon/raw/data/content/search_log.parquet
content_sitewide ? s3://melek-hackathon/raw/data/content/sitewide_log.parquet
content_top_terms ? s3://melek-hackathon/raw/data/content/top_terms_log.parquet
user_meta ? s3://melek-hackathon/raw/data/user/metadata.parquet
user_search ? s3://melek-hackathon/raw/data/user/search_log.parquet
user_sitewide ? s3://melek-hackathon/raw/data/user/sitewide_log.parquet
user_fashion_search ? s3://melek-hackathon/raw/data/user/fashion_search_log.parquet
user_fashion_site ? s3://melek-hackathon/raw/data/user/fashion_sitewide_log.parquet
term_search ? s3://melek-hackathon/raw/data/term/search_log.parquet

## 3) Veri Yükleme

In [4]:
# Temel oturum verileri
train = spark.read.parquet(PATHS["train_sessions"])
test  = spark.read.parquet(PATHS["test_sessions"])

# İçerik (ürün) tarafı
cmeta   = spark.read.parquet(PATHS["content_meta"])
cprice  = spark.read.parquet(PATHS["content_pricing"])
csearch = spark.read.parquet(PATHS["content_search"])
csite   = spark.read.parquet(PATHS["content_sitewide"])
cterms  = spark.read.parquet(PATHS["content_top_terms"])

# Kullanıcı tarafı
umeta   = spark.read.parquet(PATHS["user_meta"])
usearch = spark.read.parquet(PATHS["user_search"])
usite   = spark.read.parquet(PATHS["user_sitewide"])
ufash_s = spark.read.parquet(PATHS["user_fashion_search"])
ufash_w = spark.read.parquet(PATHS["user_fashion_site"])

# Terim tarafı
tsearch = spark.read.parquet(PATHS["term_search"])

print("Train rows:", train.count(), " Test rows:", test.count())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train rows: 2773805  Test rows: 2988697

## 4) Hafif Temizlik ve Tip Uyumları

In [5]:
from pyspark.sql.functions import to_timestamp, to_date, year, month, dayofweek, hour

# Zaman ve tip dönüşümleri
for df, ts_col in [(train, "ts_hour"), (test, "ts_hour")]:
    if ts_col in df.columns:
        df = df.withColumn(ts_col, to_timestamp(col(ts_col)))
    # temel zaman kırılımları
    df = df.withColumn("hour", hour(col(ts_col)))\
           .withColumn("dow", dayofweek(col(ts_col)))\
           .withColumn("month", month(col(ts_col)))
    if df is train:
        train = df
    else:
        test = df

# Eksik alan doldurma (özellikle string ve sayısal)
def safe_fill(df):
    str_cols  = [c for c,t in df.dtypes if t=='string']
    num_cols  = [c for c,t in df.dtypes if t in ('int','bigint','double','float','long','short')]
    df = df.fillna({c:"__na__" for c in str_cols}).fillna({c:0 for c in num_cols})
    return df

train = safe_fill(train)
test  = safe_fill(test)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 5) Zenginleştirme – Özellik (Feature) Türetme

Fikir:

- Ürün popülerliği / kalite sinyalleri (sitewide & search log oranları, puan/yorum, fiyat/indirim).
- Kullanıcı demografisi + geçmiş etkileşim yoğunluğu.
- Oturum saat/dow + terim ve ürün kategorilerinin “hashing” ile temsil edilmesi.
- (content_id, search_term) eşleşmesi: content_top_terms’ten CTR sinyali.

In [6]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit, when  # (opsiyonel, F.col vs. da kullanabilirsin)
# NOT: row_number artık F.row_number olarak çağrılacak


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
from pyspark.sql.window import Window

# ---- İçerik fiyat verisinde "en güncel" kaydı seç ----
# Uygun zaman kolonu hangisi ise onu bul:
time_col = next((c for c in ["update_date","updated_at","event_time","ts","date"]
                 if c in cprice.columns), None)

if time_col is not None:
    w_content = Window.partitionBy("content_id_hashed").orderBy(F.col(time_col).cast("timestamp").desc())
    cprice_last = (cprice
                   .withColumn("rn", F.row_number().over(w_content))
                   .where(F.col("rn")==1)
                   .drop("rn", time_col))
else:
    # Zaman kolonu yoksa direkt kullan (tek kayıt varsayımı / en güncel bilgi yok)
    cprice_last = cprice

# İndirim oranı
cprice_last = cprice_last.withColumn(
    "discount_rate",
    F.when(F.col("original_price") > 0,
           (F.col("original_price") - F.col("discounted_price")) / F.col("original_price"))
     .otherwise(F.lit(0.0))
)

# Sitewide/search ortalamaları (ürün)
def avg_all(df, key):
    num_cols = [c for c,t in df.dtypes if c not in [key,"date","content_id_hashed"] and t in ('double','float','int','bigint')]
    agg = [smax(c).alias(f"{c}_max") if c.endswith("_count") else F.avg(c).alias(f"{c}_avg") for c in num_cols]  # karışık ölçeklere güvenli yaklaşım
    return df.groupBy(key).agg(*agg)

import pyspark.sql.functions as F
csite_agg   = avg_all(csite, "content_id_hashed")
csearch_agg = avg_all(csearch, "content_id_hashed")

# (content_id, search_term) eşleşmesi: ilgili terim-ürün CTR
cterms_agg = cterms.groupBy("content_id_hashed","search_term_normalized")\
                   .agg(F.avg("total_search_click").alias("cterm_click_avg"),
                        F.avg("total_search_impression").alias("cterm_impr_avg"))\
                   .withColumn("cterm_ctr", when(col("cterm_impr_avg")>0, col("cterm_click_avg")/col("cterm_impr_avg")).otherwise(0.0))

# ---- Kullanıcı bazlı özetler ----
umeta2 = umeta.withColumn("user_age", when(col("user_birth_year")>1900, F.year(F.current_timestamp())-col("user_birth_year")).otherwise(None))\
              .drop("user_birth_year")

usite_agg = avg_all(usite, "user_id_hashed")\
            .withColumnRenamed("total_click_avg","user_total_click_avg")\
            .withColumnRenamed("total_cart_avg","user_total_cart_avg")\
            .withColumnRenamed("total_fav_avg","user_total_fav_avg")\
            .withColumnRenamed("total_order_avg","user_total_order_avg")

usearch_agg = avg_all(usearch, "user_id_hashed")\
            .withColumnRenamed("total_search_click_avg","user_search_click_avg")\
            .withColumnRenamed("total_search_impression_avg","user_search_impr_avg")

# ---- Terim bazlı genel popülerlik ----
tsearch_agg = tsearch.groupBy("search_term_normalized")\
                     .agg(F.avg("total_search_click").alias("term_click_avg"),
                          F.avg("total_search_impression").alias("term_impr_avg"))\
                     .withColumn("term_ctr", when(col("term_impr_avg")>0, col("term_click_avg")/col("term_impr_avg")).otherwise(0.0))

# ---- Eğitim setini zenginleştir ----
def enrich(sessions):
    df = sessions.alias("s")\
        .join(cmeta.alias("cm"),   on="content_id_hashed", how="left")\
        .join(cprice_last.alias("cp"), on="content_id_hashed", how="left")\
        .join(csite_agg.alias("cs"),   on="content_id_hashed", how="left")\
        .join(csearch_agg.alias("cse"),on="content_id_hashed", how="left")\
        .join(umeta2.alias("um"),     on="user_id_hashed", how="left")\
        .join(usite_agg.alias("us"),  on="user_id_hashed", how="left")\
        .join(usearch_agg.alias("ua"), on="user_id_hashed", how="left")\
        .join(tsearch_agg.alias("ts"), on="search_term_normalized", how="left")\
        .join(cterms_agg.alias("ct"), on=["content_id_hashed","search_term_normalized"], how="left")

    # recency: ürün oluşturulma tarihi ile oturum farkı (gün)
    df = df.withColumn("content_age_days", 
                       (F.unix_timestamp(col("s.ts_hour")) - F.unix_timestamp(col("cm.content_creation_date")))/86400.0)

    # güvenli doldurma
    df = safe_fill(df)
    return df

train_e = enrich(train)
test_e  = enrich(test)

print("train_e cols:", len(train_e.columns), " test_e cols:", len(test_e.columns))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

train_e cols: 48  test_e cols: 47

## 6) Özellik Dönüşümü (Hashing + Numerikler)

Büyük kardinaliteli kategorik alanlar için FeatureHasher kullanıyoruz; metrikte önemli search_term_normalized, leaf_category_name, level1/2 + user_gender.

In [8]:
from pyspark.ml.feature import FeatureHasher, VectorAssembler
from pyspark.ml.pipeline import Pipeline
import numpy as np

hash_cols = ["search_term_normalized","leaf_category_name","level1_category_name","level2_category_name","user_gender"]
num_cols = [
    "hour","dow","month",
    "discount_rate","original_price","selling_price","discounted_price",
    "content_rate_avg","content_rate_count","content_review_count","content_review_wth_media_count",
    "total_click_avg","total_cart_avg","total_fav_avg","total_order_avg",
    "user_total_click_avg","user_total_cart_avg","user_total_fav_avg","user_total_order_avg",
    "user_search_click_avg","user_search_impr_avg",
    "term_ctr","cterm_ctr","content_age_days"
]
# Bazı sütun adları join sonrası prefix almış olabilir; yoksa atlanır
num_cols = [c for c in num_cols if c in train_e.columns]

hasher = FeatureHasher(inputCols=[c for c in hash_cols if c in train_e.columns],
                       outputCol="hashed_cat", numFeatures=2**16)

num_assembler = VectorAssembler(inputCols=num_cols, outputCol="num_vec", handleInvalid="keep")
full_assembler = VectorAssembler(inputCols=["hashed_cat","num_vec"], outputCol="features")

fe_pipeline = Pipeline(stages=[hasher, num_assembler, full_assembler])
fe_model = fe_pipeline.fit(train_e)
train_f = fe_model.transform(train_e)
test_f  = fe_model.transform(test_e)

train_f.select("features").limit(1).show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                     

## 7) Eğitim/Doğrulama Bölme (Zamana göre)

In [9]:
# 7) Eğitim/Doğrulama Bölme (Zamana göre) — DÜZELTİLMİŞ

from pyspark.sql.functions import to_timestamp, unix_timestamp, col, lit

# ts_hour'ı garantiye al: (timestamp) -> (double: epoch seconds)
train_f = train_f.withColumn("ts_hour", to_timestamp(col("ts_hour")))
test_f  = test_f.withColumn("ts_hour", to_timestamp(col("ts_hour")))

train_f = train_f.withColumn("ts_hour_num", unix_timestamp(col("ts_hour")).cast("double"))
test_f  = test_f.withColumn("ts_hour_num",  unix_timestamp(col("ts_hour")).cast("double"))

# Son %15'i valid olacak şekilde eşik (relativeError 0.01 gibi küçük bir değer olmalı)
q = train_f.approxQuantile("ts_hour_num", [0.85], 0.01)[0]

# Bölme
train_tr = train_f.where(col("ts_hour_num") <  lit(q))
train_va = train_f.where(col("ts_hour_num") >= lit(q))

print("Train:", train_tr.count(), "Valid:", train_va.count())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train: 1796756 Valid: 977049

## 8) Modellerin Eğitimi (Click & Order)

In [12]:
%%spark
# 8) Hafif Mod: Downsample + Logistic Regression (kernel düşmeden eğit)

import pyspark.sql.functions as F
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import DoubleType

# ---- VectorUDT -> sınıf-1 olasılığı (getItem yerine güvenli UDF) ----
prob1_udf = F.udf(
    lambda v: float(v[1]) if (v is not None and getattr(v, "size", 0) > 1)
              else (float(v[0]) if v is not None else None),
    DoubleType()
)

# ---- Parametreler: ağır gelirse oranları daha da düşür ----
NEG_FRAC_CLICK = 0.02   # clicked=0 örnekleme oranı (örn. %2)
NEG_FRAC_ORDER = 0.01   # ordered=0 örnekleme oranı (örn. %1)
MAX_ITER_CLICK = 40
MAX_ITER_ORDER = 60
REG_CLICK = 0.05
REG_ORDER = 0.05

# ---- Etiketleri 0/1'e sabitle ----
def binarize(df, colname):
    return (df.fillna({colname: 0})
              .withColumn(
                  colname,
                  F.when(F.col(colname).cast("string").isin("1","true","True"), 1).otherwise(0)
              )
              .withColumn(colname, F.col(colname).cast("int")))

for nm in ["train_tr","train_va"]:
    df = globals()[nm]
    df = binarize(df, "clicked")
    df = binarize(df, "ordered")
    globals()[nm] = df

# ---- Downsample: tüm pozitifler + az sayıda negatif ----
click_pos = train_tr.filter("clicked = 1")
click_neg = train_tr.filter("clicked = 0").sample(False, NEG_FRAC_CLICK, seed=42)
train_tr_click = click_pos.unionByName(click_neg)

order_pos = train_tr.filter("ordered = 1")
order_neg = train_tr.filter("ordered = 0").sample(False, NEG_FRAC_ORDER, seed=42)
train_tr_order = order_pos.unionByName(order_neg)

# ---- Modeller: LR hızlı ve kararlı ----
lr_click = LogisticRegression(
    featuresCol="features", labelCol="clicked",
    predictionCol="click_pred", probabilityCol="click_prob",
    maxIter=MAX_ITER_CLICK, regParam=REG_CLICK, elasticNetParam=0.2
)
lr_order = LogisticRegression(
    featuresCol="features", labelCol="ordered",
    predictionCol="order_pred", probabilityCol="order_prob",
    maxIter=MAX_ITER_ORDER, regParam=REG_ORDER, elasticNetParam=0.3
)

# ---- Eğitim ----
model_click = lr_click.fit(train_tr_click)
model_order = lr_order.fit(train_tr_order)

# ---- Valid üstünde tahminler (DİKKAT: 'train_va_o' yok; 'train_va' kullanılacak) ----
va_click = (model_click.transform(train_va)
            .select("session_id","content_id_hashed","clicked","click_prob")
            .withColumn("p_click", prob1_udf(F.col("click_prob")))
            .drop("click_prob"))

va_order = (model_order.transform(train_va)
            .select("session_id","content_id_hashed","ordered","order_prob")
            .withColumn("p_order", prob1_udf(F.col("order_prob")))
            .drop("order_prob"))

# ---- Birleştir ----
valid_pred = (va_click
              .join(va_order, ["session_id","content_id_hashed"], "inner"))

print("valid sample:")
valid_pred.limit(5).show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

valid sample:
+----------------------+-----------------+-------+------------------+-------+-------------------+
|session_id            |content_id_hashed|clicked|p_click           |ordered|p_order            |
+----------------------+-----------------+-------+------------------+-------+-------------------+
|train_49f40a3ac0474cc4|fc95bba4457c8668 |0      |0.6297466734263741|0      |0.2904192710558313 |
|train_f01bb9475dc09f31|d55efbf3a0d5e6f2 |0      |0.5432484478672712|0      |0.14393191655600202|
|train_f2c19015490da3bc|495626dc3209b3b1 |0      |0.6337193178057551|0      |0.1890948270232825 |
|train_fe8caf7f3fe6304e|2a79277a5073f4df |0      |0.5827684089692162|0      |0.20195026685314632|
|train_cf1a1b1d7d639ac0|edad45fbfa885d54 |0      |0.552539113039862 |0      |0.17007590934965178|
+----------------------+-----------------+-------+------------------+-------+-------------------+

## 9) Oturum bazlı AUC ve final valid skoru (pandas’sız, hafif)

In [13]:
%%spark
# valid_pred: (session_id, content_id_hashed, clicked, ordered, p_click, p_order)
# yoksa 8. adımdan önceki hücreyi çalıştır.

import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

# --- AUC hesaplayıcı (ties destekli, saf Python) ---
def _auc_from_lists(probs, labels):
    if probs is None or labels is None:
        return None
    n = len(probs)
    if n == 0:
        return None
    n1 = sum(1 for y in labels if int(y) == 1)
    n0 = n - n1
    if n1 == 0 or n0 == 0:
        return None

    # rank with average ties
    idx = list(range(n))
    idx.sort(key=lambda i: float(probs[i]))
    ranks = [0.0] * n
    r = 1
    i = 0
    while i < n:
        j = i
        pi = probs[idx[i]]
        while j + 1 < n and probs[idx[j + 1]] == pi:
            j += 1
        avg_rank = (r + (j + 1)) / 2.0
        for k in range(i, j + 1):
            ranks[idx[k]] = avg_rank
        r = j + 2
        i = j + 1

    sum_pos = sum(ranks[i] for i, y in enumerate(labels) if int(y) == 1)
    return float((sum_pos - n1 * (n1 + 1) / 2.0) / (n1 * n0))

from pyspark.sql.functions import udf
auc_udf = udf(_auc_from_lists, DoubleType())

# Click AUC: sadece pozitif click içeren oturumlar
click_groups = (valid_pred
    .groupBy("session_id")
    .agg(F.collect_list("p_click").alias("p"), F.collect_list("clicked").alias("y")))
click_auc_df = click_groups.withColumn("auc_click", auc_udf("p","y")).where(F.col("auc_click").isNotNull())
auc_click = click_auc_df.agg(F.avg("auc_click").alias("m")).first()["m"]

# Order AUC: sadece pozitif order içeren oturumlar
order_groups = (valid_pred
    .groupBy("session_id")
    .agg(F.collect_list("p_order").alias("p"), F.collect_list("ordered").alias("y")))
order_auc_df = order_groups.withColumn("auc_order", auc_udf("p","y")).where(F.col("auc_order").isNotNull())
auc_order = order_auc_df.agg(F.avg("auc_order").alias("m")).first()["m"]

final_valid_score = 0.3*float(auc_click or 0.0) + 0.7*float(auc_order or 0.0)
print(f"Validation → Click AUC={auc_click:.4f}  Order AUC={auc_order:.4f}  Final={final_valid_score:.4f}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Validation ? Click AUC=0.5647  Order AUC=0.6280  Final=0.6090

## 10) Full-train ile yeniden eğitim (hafif – downsample + LR)

In [14]:
%%spark
# Aynı downsample stratejisiyle tüm train üstünde yeniden fit (test için daha iyi genelleme)

import pyspark.sql.functions as F
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import DoubleType

# Parametreler (gerekirse değiştir)
NEG_FRAC_CLICK = 0.02
NEG_FRAC_ORDER = 0.01
MAX_ITER_CLICK = 50
MAX_ITER_ORDER = 70
REG_CLICK = 0.05
REG_ORDER = 0.05

# Etiketleri garanti et (8. adımda yapıldıysa idempotent)
def binarize(df, colname):
    return (df.fillna({colname: 0})
              .withColumn(colname, F.when(F.col(colname).cast("string").isin("1","true","True"),1).otherwise(0).cast("int")))

train_f2 = binarize(train_f, "clicked")
train_f2 = binarize(train_f2, "ordered")

# Downsample + union
click_pos_all = train_f2.filter("clicked = 1")
click_neg_all = train_f2.filter("clicked = 0").sample(False, NEG_FRAC_CLICK, seed=13)
train_all_click = click_pos_all.unionByName(click_neg_all)

order_pos_all = train_f2.filter("ordered = 1")
order_neg_all = train_f2.filter("ordered = 0").sample(False, NEG_FRAC_ORDER, seed=13)
train_all_order = order_pos_all.unionByName(order_neg_all)

# Modeller
lr_click_full = LogisticRegression(
    featuresCol="features", labelCol="clicked",
    predictionCol="click_pred", probabilityCol="click_prob",
    maxIter=MAX_ITER_CLICK, regParam=REG_CLICK, elasticNetParam=0.2
)
lr_order_full = LogisticRegression(
    featuresCol="features", labelCol="ordered",
    predictionCol="order_pred", probabilityCol="order_prob",
    maxIter=MAX_ITER_ORDER, regParam=REG_ORDER, elasticNetParam=0.3
)

model_click_full = lr_click_full.fit(train_all_click)
model_order_full = lr_order_full.fit(train_all_order)

print("Full-train refit done.")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Full-train refit done.

## 11) Test tahmini ve oturum içi sıralama

In [15]:
%%spark
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

# Sınıf-1 olasılık çıkarıcı (vektörden)
from pyspark.sql.types import DoubleType
prob1_udf = F.udf(
    lambda v: float(v[1]) if (v is not None and getattr(v, "size", 0) > 1)
              else (float(v[0]) if v is not None else None),
    DoubleType()
)

# Tahminler
t_click = (model_click_full.transform(test_f)
           .select("session_id","content_id_hashed","click_prob")
           .withColumn("p_click", prob1_udf(F.col("click_prob")))
           .drop("click_prob"))

t_order = (model_order_full.transform(test_f)
           .select("session_id","content_id_hashed","order_prob")
           .withColumn("p_order", prob1_udf(F.col("order_prob")))
           .drop("order_prob"))

test_pred = (t_click.join(t_order, ["session_id","content_id_hashed"], "inner")
                    .withColumn("final_score", 0.3*F.col("p_click") + 0.7*F.col("p_order")))

# Gruplayıp skorla azalan sırala (pandas'sız)
def _join_sorted(cids, scores):
    pairs = sorted(zip(scores, cids), key=lambda x: float(x[0]) if x[0] is not None else -1e9, reverse=True)
    return " ".join([str(cid) for _, cid in pairs])

from pyspark.sql.functions import udf
join_sorted_udf = udf(_join_sorted, StringType())

pred_df = (test_pred.groupBy("session_id")
           .agg(join_sorted_udf(F.collect_list("content_id_hashed"),
                                F.collect_list("final_score")).alias("prediction")))

pred_df.limit(3).show(truncate=False)
print("Test sessions:", pred_df.count())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## 12) Submission yazımı (CSV)

In [17]:
%%spark
# 12) SUBMISSION'u S3'e TEK CSV OLARAK YAZ

import pyspark.sql.functions as F

BASE  = "s3://melek-hackathon/submissions"
TMP   = f"{BASE}/_tmp_submission"
FINAL = f"{BASE}/submission.csv"

# 1) Geçici klasöre tek parça CSV yaz
(pred_df
 .orderBy("session_id")
 .coalesce(1)
 .write
 .mode("overwrite")
 .option("header", True)
 .csv(TMP))

# 2) part-*.csv'yi 'submission.csv' olarak yeniden adlandır (aynı bucket içinde rename)
hconf = sc._jsc.hadoopConfiguration()
Path  = sc._jvm.org.apache.hadoop.fs.Path
fs    = Path(BASE).getFileSystem(hconf)

# part dosyasını bul
for status in fs.listStatus(Path(TMP)):
    name = status.getPath().getName()
    if name.startswith("part-") and name.endswith(".csv"):
        part_path = status.getPath()
        break
else:
    raise Exception("part-*.csv bulunamadı.")

# varsa eski 'submission.csv'yi sil
final_path = Path(FINAL)
if fs.exists(final_path):
    fs.delete(final_path, True)

# rename → tek CSV dosyası
fs.rename(part_path, final_path)

# geçiciyi temizle
fs.delete(Path(TMP), True)

print("Submission yazıldı →", FINAL)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Submission yaz?ld? ? s3://melek-hackathon/submissions/submission.csv