# Kütüphanelerin İmport Edilmesi

In [1]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
import nltk
nltk.download('punkt')
nltk.download('stopwords')
import string
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import numpy as np



# Spark Ortamının Hazırlanması

In [None]:
spark = SparkSession.builder.appName("simulator").master("local").config("spark.driver.memory", "40g") \
  .config("spark.executor.memory", "50g").getOrCreate()

In [None]:
import json

with open('apikey.json') as f:
    data = json.load(f)
    api_key = data['key']
    client_id = data['client_id']
    user_agent = data['user_agent']

with open('mongo.json') as m:
    data = json.load(m)
    conn_string = data["connection_string"]

In [None]:
df = spark.read.csv("turkish-subreddits.csv", header=True)
df.show()

+-----------------+
|       Subreddits|
+-----------------+
|        liseliler|
|        Hacettepe|
|          Yatirim|
|           kripto|
|           Finans|
|         Videoyun|
|  androidoyunclub|
|       burdurland|
|            Kafes|
|sariyerbelediyesi|
|      TurkeyJerky|
+-----------------+



In [None]:
df_with_random = df.withColumn("random", rand())
random_row_1 = df_with_random.orderBy("random").limit(1)
random_row_1.head()["Subreddits"]

'TurkeyJerky'

# Praw kütüphanesi kullanılarak reddit veri çekilmesi

In [None]:
import praw

In [None]:
# PRAW ile Reddit'e bağlan
reddit = praw.Reddit(
    client_id=client_id,
    client_secret=api_key,
    user_agent=user_agent
)


In [None]:
import requests as req
import time 
import random

import pymongo

In [None]:
mongo_client = pymongo.MongoClient(conn_string)

mydb = mongo_client["bigData"]

# Verinin geçeceği transform işlemi

In [None]:
def preprocess_text(text):
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    text = text.lower()
    
    text = ' '.join(text.split())
    
    return text

In [None]:
def remove_stopwords(text):
    tokens = word_tokenize(text)
    
    stop_words = set(stopwords.words('turkish'))
    
    filtered_tokens = [token for token in tokens if token.lower() not in stop_words]
    
    filtered_text = ' '.join(filtered_tokens)
    
    return filtered_text

In [None]:
def Transformation(submission_data):
    # Eğer selftext değişkeni boşsa
    if not submission_data["selftext"]:
        # subreddit_name değişkenini selftext olarak kullan
        submission_data["selftext"] = submission_data["subreddit_name"]
    submission_data["selftext"] = remove_stopwords(preprocess_text(submission_data["selftext"]))
    submission_data["title"] = remove_stopwords(preprocess_text(submission_data["title"]))
    return submission_data

# Sentiment tahmini için Apiye istek atılması

In [None]:
def request_submission(submission_data):
    url = "http://192.168.0.159:8080/sentiment"  

    if not submission_data.get("selftext"):
        submission_data["selftext"] = submission_data["title"]

    body = {
        "text" : submission_data["selftext"]
    }

    
    # JSON verisini oluştur
    payload = json.dumps(body)
    
    # POST isteği gönder
    response = req.post(url, data=payload, headers={'Content-Type': 'application/json'})
    
    # Yanıtı kontrol et
    if response.status_code == 200:
        #print("Submission successfully posted!")
        response_data = response.json()  # Yanıtı JSON formatında al
        submission_data["sentiment"] = response_data.get("sentiment", -1)
    else:
        print(f"Failed to post submission. Status code: {response.status_code}, Response: {response.text}")

    
    return submission_data

# Main

In [None]:
while True:
    df_with_random = df.withColumn("random", rand())
    random_row_1 = df_with_random.orderBy("random").limit(1)
    subreddit_name = random_row_1.head()["Subreddits"]
    subreddit = reddit.subreddit(subreddit_name)
    
    random_hot_limit = random.randint(1, 100)

    random_hot_limit = random.randint(1, 100)
    hot_submissions = reddit.subreddit(subreddit_name).hot(limit=random_hot_limit)

    # Choose a random submission from the fetched results
    random_submission = random.choice(list(hot_submissions))

    # Extract relevant information from the submission
    submission_data = {
        "subreddit_name": subreddit_name,
        "title": random_submission.title,
        "permalink": random_submission.permalink,
        "selftext": random_submission.selftext  # Add selftext if available
    }
    print("-----önce------")
    print(submission_data)
    submission_data = request_submission(submission_data)
    print("-------------sonra--------------------")
    print(submission_data)
    submission_data = 
    # veritabanına ekleme
    collection = mydb[submission_data["subreddit_name"]]
    x = collection.insert_one(submission_data)
    time.sleep(3)

-----önce------
{'subreddit_name': 'TurkeyJerky', 'title': '%100 orjinal meme', 'permalink': '/r/TurkeyJerky/comments/1cmibl1/100_orjinal_meme/', 'selftext': ''}
-------------sonra--------------------
{'subreddit_name': 'TurkeyJerky', 'title': '%100 orjinal meme', 'permalink': '/r/TurkeyJerky/comments/1cmibl1/100_orjinal_meme/', 'selftext': '%100 orjinal meme', 'sentiment': 1}
-----önce------
{'subreddit_name': 'Finans', 'title': 'İşsizlik maaşı nasıl alınır? işsizlik ödeneğinden yararlanma şartları ne...', 'permalink': '/r/Finans/comments/16uif9c/işsizlik_maaşı_nasıl_alınır_işsizlik_ödeneğinden/', 'selftext': ''}
-------------sonra--------------------
{'subreddit_name': 'Finans', 'title': 'İşsizlik maaşı nasıl alınır? işsizlik ödeneğinden yararlanma şartları ne...', 'permalink': '/r/Finans/comments/16uif9c/işsizlik_maaşı_nasıl_alınır_işsizlik_ödeneğinden/', 'selftext': 'İşsizlik maaşı nasıl alınır? işsizlik ödeneğinden yararlanma şartları ne...', 'sentiment': 1}
-----önce------
{'subr

KeyboardInterrupt: 