In [1]:
# Gerekli Kütüphaneler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf # DataFrame işlemleri için
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType, StructField # DataFrame şemaları için
from pyspark.ml.recommendation import ALS # MLlib'in DataFrame tabanlı ALS'si
from pyspark.ml.evaluation import RegressionEvaluator # Model değerlendirme için
import time
import pandas as pd # Küçük veri işlemleri ve kullanıcı girdisi için (opsiyonel)

# 1. SparkSession Başlatma
# Spark 2.0 ve sonrası için SparkSession ana giriş noktasıdır.
# MLlib'in DataFrame tabanlı API'sini kullanmak için SparkSession daha uygundur.
spark = (SparkSession.builder
    .appName("MovieLens32M Recommender")
    .config("spark.driver.memory", "4g") # Sürücü belleğini artırabiliriz (veri büyüklüğüne göre)
    .config("spark.executor.memory", "4g") # Yürütücü belleğini artırabiliriz
    .config("spark.sql.shuffle.partitions", "200") # Büyük veri için shuffle partition sayısını ayarlayabiliriz
    .master("local[*]") # Tüm yerel çekirdekleri kullan
    .getOrCreate()
)
sc = spark.sparkContext # Eski RDD tabanlı API'ler için SparkContext'e erişim
print(f"SparkSession başlatıldı. Spark Sürümü: {spark.version}")
print(f"SparkContext mevcut: {sc.appName}")

# 2. Dosya Yolları (Bu yolları kendi sisteminize göre güncelleyin)
# ml-32m klasörünün notebook dosyanızla aynı dizinde olduğunu varsayıyorum.
base_path = "./" # Veya tam yolunu verin
movies_filepath = base_path + "movies.csv"
ratings_filepath = base_path + "ratings.csv"
# tags_filepath = base_path + "tags.csv" # Bu örnekte kullanmayacağız ama yolu hazır
# links_filepath = base_path + "links.csv" # Bu örnekte kullanmayacağız ama yolu hazır

# 3. Veri Yükleme ve Ön İşleme

# Movies Verisini Yükleme (DataFrame olarak)
print(f"\n'{movies_filepath}' dosyasından filmler yükleniyor...")
# Şemayı manuel olarak tanımlamak, Spark'ın doğru tipleri çıkarmasına yardımcı olur
movie_schema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)
])

movies_df = spark.read.csv(movies_filepath, header=True, schema=movie_schema, escape="\"") # escape="\"" başlıkta veya türlerde virgül varsa
movies_df.printSchema()
print(f"Toplam film sayısı: {movies_df.count()}")
movies_df.show(5, truncate=False)

# Film ID'lerini ve Başlıklarını bir sözlüğe alalım (kullanıcı arayüzü için)
# Büyük bir DataFrame için collect() dikkatli kullanılmalı, ancak film sayısı (~87k) yönetilebilir olabilir.
# Daha büyükse, bu sözlüğü broadcast edebilir veya join ile kullanabiliriz.
print("\nFilm ID ve başlıkları sözlüğü oluşturuluyor...")
try:
    movie_titles_list = movies_df.select("movieId", "title").collect()
    titles = {row.movieId: row.title for row in movie_titles_list}
    titles_to_ids = {row.title: row.movieId for row in movie_titles_list}
    print(f"{len(titles)} film başlığı sözlüğe eklendi.")
    print(f"Örnek: Film ID 1 -> {titles.get(1)}")
except Exception as e:
    print(f"Film başlıkları sözlüğü oluşturulurken hata: {e}")
    titles = {}
    titles_to_ids = {}

# Ratings Verisini Yükleme (DataFrame olarak)
print(f"\n'{ratings_filepath}' dosyasından derecelendirmeler yükleniyor...")
rating_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True) # Timestamp'i şimdilik integer olarak alalım
])

ratings_df_raw = spark.read.csv(ratings_filepath, header=True, schema=rating_schema)
ratings_df_raw.printSchema()
print(f"Toplam derecelendirme sayısı (ham): {ratings_df_raw.count()}") # Bu işlem zaman alabilir
ratings_df_raw.show(5)

# MLlib ALS için gerekli sütunları seçelim ve isimlerini değiştirelim (user, item, rating)
# userId -> user, movieId -> item
ratings_df = ratings_df_raw.select(
    col("userId").alias("user"),
    col("movieId").alias("item"),
    col("rating")
).cache() # Sık kullanılacağı için cache'leyelim

print("\nALS için hazırlanan ratings DataFrame'i:")
ratings_df.printSchema()
print(f"Toplam derecelendirme sayısı (işlenmiş): {ratings_df.count()}") # Bu da zaman alabilir
ratings_df.show(5)


# 4. Veri Setini Eğitim ve Test Olarak Ayırma
print("\nVeri seti eğitim ve test olarak bölünüyor (80% eğitim, 20% test)...")
(training_df, test_df) = ratings_df.randomSplit([0.8, 0.2], seed=42) # seed tekrarlanabilirlik için
print(f"Eğitim seti boyutu: {training_df.count()}")
print(f"Test seti boyutu: {test_df.count()}")
training_df.cache() # Eğitim setini de cache'leyelim
test_df.cache()


# 5. ALS Modelini Eğitme
# Spark ML (DataFrame tabanlı API) içindeki ALS'yi kullanacağız
print("\nALS modeli eğitiliyor...")
# Parametreleri belirleyelim (bunlar optimize edilebilir)
rank_param = 10       # Gizli faktör sayısı
max_iter_param = 10   # Maksimum iterasyon sayısı
reg_param = 0.1       # Regülarizasyon parametresi (lambda)
alpha_param = 1.0     # Örtük geri bildirim için alfa (bu örnekte explicit feedback kullanıyoruz)

als = ALS(userCol="user", itemCol="item", ratingCol="rating",
          rank=rank_param,
          maxIter=max_iter_param,
          regParam=reg_param,
          coldStartStrategy="drop", # Yeni kullanıcı/öğe için NaN döndürür, "nan" veya "drop"
          # nonnegative=True, # Faktörlerin negatif olmamasını sağlar (opsiyonel)
          implicitPrefs=False) # Derecelendirmelerimiz açık (explicit)

# Modeli eğitim verisiyle eğit
try:
    model = als.fit(training_df)
    print("ALS modeli başarıyla eğitildi.")
except Exception as e:
    print(f"ALS modeli eğitilirken hata: {e}")
    model = None # Hata durumunda modeli None yap

if not model:
    print("Model eğitilemediği için program sonlandırılıyor.")
    spark.stop()
    exit()

# ŞİMDİLİK BURAYA KADAR BİR BAŞLANGIÇ.
# Buradan sonra arkadaşından girdi alma, modeli güncelleme (veya yeni verilerle birleştirip yeniden eğitme),
# tahmin yapma ve tavsiye sunma adımlarını ekleyeceğiz.
# Dosyaya ekleme yapmak yerine, yeni derecelendirmeleri bir DataFrame'e dönüştürüp
# mevcut training_df ile `union` yaparak modeli yeniden eğitmek daha iyi bir pratik olacaktır.

25/06/30 15:14:08 WARN Utils: Your hostname, turan-Dell-G15-5511 resolves to a loopback address: 127.0.1.1; using 172.17.30.206 instead (on interface wlp0s20f3)
25/06/30 15:14:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/30 15:14:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession başlatıldı. Spark Sürümü: 3.5.1
SparkContext mevcut: MovieLens32M Recommender

'./movies.csv' dosyasından filmler yükleniyor...
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

Toplam film sayısı: 86537
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |


                                                                                

Toplam derecelendirme sayısı (ham): 33832162
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      1|   4.0|1225734739|
|     1|    110|   4.0|1225865086|
|     1|    158|   4.0|1225733503|
|     1|    260|   4.5|1225735204|
|     1|    356|   5.0|1225735119|
+------+-------+------+----------+
only showing top 5 rows


ALS için hazırlanan ratings DataFrame'i:
root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: float (nullable = true)



                                                                                

Toplam derecelendirme sayısı (işlenmiş): 33832162
+----+----+------+
|user|item|rating|
+----+----+------+
|   1|   1|   4.0|
|   1| 110|   4.0|
|   1| 158|   4.0|
|   1| 260|   4.5|
|   1| 356|   5.0|
+----+----+------+
only showing top 5 rows


Veri seti eğitim ve test olarak bölünüyor (80% eğitim, 20% test)...


                                                                                

Eğitim seti boyutu: 27063996


                                                                                

Test seti boyutu: 6768166

ALS modeli eğitiliyor...


25/06/30 15:14:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

ALS modeli başarıyla eğitildi.


In [None]:
# Gerekli kütüphaneler (ilk hücreden bazıları zaten import edilmiş olabilir)
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
import pandas as pd

# --- Arkadaştan Girdi Alma ---

# İlk hücrede 'titles' ve 'titles_to_ids' sözlüklerinin oluşturulduğunu varsayıyoruz.
if 'titles_to_ids' not in globals() or not titles_to_ids:
    print("HATA: 'titles_to_ids' sözlüğü bulunamadı. Lütfen ilk hücreyi çalıştırdığınızdan emin olun.")
else:
    print("\n--- Arkadaş Girdileri ---")
    
    friend_user_id = None
    try:
        # Mevcut kullanıcı ID'lerinden daha büyük bir ID atamak iyi bir pratiktir.
        # İlk hücredeki ratings_df'ten max kullanıcı id'sini alabiliriz.
        max_user_id = ratings_df.selectExpr("max(user)").collect()[0][0]
        suggested_id = max_user_id + 1
        
        friend_user_id_str = input(f"Lütfen arkadaşının kullanıcı ID'sini gir (sayısal, önerilen: {suggested_id}): ")
        friend_user_id = int(friend_user_id_str)
    except (ValueError, TypeError):
        print("Geçersiz kullanıcı ID'si. Lütfen sayısal bir değer girin.")

    friend_new_ratings_list = []
    if friend_user_id is not None:
        print("\nLütfen arkadaşının izlediği filmleri ve verdiği puanları gir (1-5 arası).")
        print("Bitirmek için film adı yerine 'bitti' yaz.")

        while True:
            movie_title_input = input("Film Adı (örn: Toy Story (1995)): ")
            if movie_title_input.lower() == 'bitti':
                break
            
            movie_id = titles_to_ids.get(movie_title_input)
            if movie_id is None:
                print(f"  UYARI: '{movie_title_input}' filmi veri setinde bulunamadı. Lütfen tam adını doğru yazdığınızdan emin olun.")
                continue
                
            while True:
                try:
                    rating_input_str = input(f"  '{movie_title_input}' için puanın (1-5): ")
                    rating_input = float(rating_input_str)
                    if 1.0 <= rating_input <= 5.0:
                        # Spark DataFrame'ine uygun bir formatta (user, item, rating) listeye ekliyoruz
                        friend_new_ratings_list.append((friend_user_id, movie_id, rating_input))
                        print(f"    '{movie_title_input}' için puan {rating_input} eklendi.")
                        break
                    else:
                        print("  Geçersiz puan. Lütfen 1 ile 5 arasında bir değer girin.")
                except ValueError:
                    print("  Geçersiz puan formatı. Lütfen sayısal bir değer girin (örn: 4.5).")

    # --- Modeli Yeni Verilerle Güncelleme ve Tavsiye Yapma ---
    
    if friend_user_id is None or not friend_new_ratings_list:
        print("\nKullanıcı ID'si veya derecelendirme alınamadı. Tavsiye adımları atlanıyor.")
    else:
        # 1. Yeni derecelendirmelerden bir Spark DataFrame oluştur
        # YÖNTEM DEĞİŞİKLİĞİ: Dosyaya yazıp okumak yerine, doğrudan bellekte birleştirmek çok daha verimlidir.
        friend_schema = StructType([
            StructField("user", IntegerType(), True),
            StructField("item", IntegerType(), True),
            StructField("rating", FloatType(), True)
        ])
        friend_ratings_df = spark.createDataFrame(data=friend_new_ratings_list, schema=friend_schema)
        
        print("\nArkadaşınızın girdiği derecelendirmeler:")
        friend_ratings_df.show()

        # 2. Yeni derecelendirmeleri mevcut eğitim setine ekle
        print("Mevcut eğitim verisi yeni derecelendirmelerle birleştiriliyor...")
        combined_training_df = training_df.union(friend_ratings_df)
        combined_training_df.cache() # Yeni birleşim de cache'lenebilir.
        
        print(f"Eski eğitim seti boyutu: {training_df.count()}")
        print(f"Yeni birleşik eğitim seti boyutu: {combined_training_df.count()}")

        # 3. Modeli güncellenmiş veri setiyle YENİDEN EĞİT
        # İlk hücredeki 'als' nesnesini kullanarak yeniden fit ediyoruz.
        print("\nALS modeli güncel verilerle yeniden eğitiliyor...")
        try:
            # YÖNTEM DEĞİŞİKLİĞİ: Eski RDD tabanlı ALS.train() yerine modern DataFrame tabanlı .fit() kullanılıyor.
            updated_model = als.fit(combined_training_df)
            print("ALS modeli başarıyla yeniden eğitildi.")

            # 4. Arkadaşın için tavsiye oluştur
            # Önce arkadaşının izlemediği filmleri bulalım
            friend_watched_movies_df = friend_ratings_df.select("item").withColumnRenamed("item", "movieId")
            
            # Tüm filmlerden izlenenleri çıkar (bu bir "anti-join" işlemidir)
            movies_to_recommend_df = movies_df.join(
                friend_watched_movies_df,
                movies_df.movieId == friend_watched_movies_df.movieId,
                "left_anti" # Sadece soldaki df'te (movies_df) olup sağdakinde (izlenenler) olmayanları tutar
            ).select(col("movieId").alias("item"), lit(friend_user_id).alias("user"))

            print(f"\nArkadaşınız için {movies_to_recommend_df.count()} adet izlenmemiş film üzerinden tahmin yapılıyor...")

            # 5. İzlenmemiş tüm filmler için tahmin yap
            predictions = updated_model.transform(movies_to_recommend_df)

            # 6. En yüksek tahminli 10 filmi göster
            top_recommendations = predictions.join(movies_df, predictions.item == movies_df.movieId) \
                                             .orderBy(col("prediction").desc()) \
                                             .select("title", "genres", "prediction") \
                                             .limit(10)
            
            print(f"\n--- Arkadaşın ({friend_user_id}) İçin Top 10 Film Tavsiyesi ---")
            top_recommendations.show(truncate=False)

        except Exception as e:
            print(f"Model yeniden eğitilirken veya tavsiye oluşturulurken hata oluştu: {e}")

print("\nİnteraktif tavsiye işlemi tamamlandı.")


--- Arkadaş Girdileri ---


Lütfen arkadaşının kullanıcı ID'sini gir (sayısal, önerilen: 330976):  330978



Lütfen arkadaşının izlediği filmleri ve verdiği puanları gir (1-5 arası).
Bitirmek için film adı yerine 'bitti' yaz.


Film Adı (örn: Toy Story (1995)):  the Godfather


  UYARI: 'the Godfather' filmi veri setinde bulunamadı. Lütfen tam adını doğru yazdığınızdan emin olun.


Film Adı (örn: Toy Story (1995)):  The Dark Knight (2008)


  UYARI: 'The Dark Knight (2008)' filmi veri setinde bulunamadı. Lütfen tam adını doğru yazdığınızdan emin olun.
