# Metehan Ayhan - ApacheSpark, Big Data

In [12]:
#pip install pyspark

In [13]:
# PySpark için gerekli modülleri içe aktarıyoruz
from pyspark import SparkConf, SparkContext
import collections

# SparkContext oluşturuluyor, Spark uygulaması burada çalışacak
sc = SparkContext()

# Bir RDD (Resilient Distributed Dataset) oluşturuluyor, paralel olarak işlenecek veri
rdd = sc.parallelize([3, 4, 56, 7, 4, 2])

# RDD üzerindeki her elemanı karesi alınarak yeni bir RDD oluşturuluyor
sq = rdd.map(lambda x: x * x)

# RDD'deki veriler toplanarak bir liste halinde yazdırılıyor
print(sq.collect())

# SparkContext sonlandırılıyor, tüm kaynaklar serbest bırakılıyor
sc.stop()


[9, 16, 3136, 49, 16, 4]


In [20]:
# Bu proje, 1800.csv dosyasındaki hava durumu verilerini analiz eder ve her hava durumu istasyonunda kaydedilen
# en düşük sıcaklıkları bulur. PySpark kullanarak verilerin filtrelenmesi, dönüştürülmesi ve azaltılması gibi temel
# büyük veri işlemlerini gösteren bir örnektir.

# PySpark için gerekli modülleri içe aktarıyoruz
from pyspark import SparkConf, SparkContext
import collections

# Spark uygulaması için bir SparkConf nesnesi oluşturuluyor
# 'local' olarak çalışacağı belirtiliyor ve uygulama adı 'MinTemperatures' olarak ayarlanıyor
conf = SparkConf().setMaster("local").setAppName("MinTemperatures")

# SparkContext oluşturuluyor, Spark uygulaması burada çalışacak
sc = SparkContext(conf = conf)

# Her satırı parçalayıp gerekli bilgileri (istasyon ID'si, giriş türü, sıcaklık) döndüren bir fonksiyon tanımlıyoruz
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]  # İstasyon kimliği
    entryType = fields[2]  # Giriş türü (TMIN, TMAX, vs.)
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0  # Santigrat'tan Fahrenheit'a sıcaklık dönüşümü
    return (stationID, entryType, temperature)

# '1800.csv' dosyasındaki veriler bir RDD olarak yükleniyor
lines = sc.textFile("1800.csv")

# Satırlar parseLine fonksiyonu kullanılarak parçalanıyor
parsedLines = lines.map(parseLine)

# Yalnızca minimum sıcaklık girişlerini (TMIN) filtreliyoruz
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])

# İstasyon kimliği ve sıcaklık çiftleri oluşturuyoruz
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

# Aynı istasyon için kaydedilen en düşük sıcaklıkları buluyoruz
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))

# Sonuçları topluyoruz
results = minTemps.collect()

# Her istasyonun kimliği ve en düşük sıcaklık değeri ekrana yazdırılıyor
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

# SparkContext sonlandırılıyor, tüm kaynaklar serbest bırakılıyor
sc.stop()


ITE00100554	5.36F
EZE00100082	7.70F


In [21]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("MaxTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
sc.stop()

ITE00100554	90.14F
EZE00100082	90.14F


In [28]:
# Bu proje, bir çizgi roman veri setindeki süper kahramanların birlikte yer aldıkları sayıları analiz eder
# ve en çok ortak görünüme sahip süper kahramanı bulur. PySpark kullanarak büyük veri işlemleri,
# eşlemeler ve azaltma işlemleri gibi temel veri işleme tekniklerini gösteren bir örnektir.

from pyspark import SparkConf, SparkContext

# Bir satırdaki süper kahramanların sayısını ve bu süper kahramanın ID'sini döndüren bir fonksiyon tanımlıyoruz
def countCoOccurences(line):
    elements = line.split()
    return (int(elements[0]), len(elements) - 1)

# Süper kahraman isimlerini ve ID'lerini döndüren bir fonksiyon tanımlıyoruz
def parseNames(line):
    fields = line.split('\"')
    return (int(fields[0]), fields[1].encode("utf8"))

# Spark uygulaması için bir SparkConf nesnesi oluşturuluyor
# 'local' olarak çalışacağı belirtiliyor ve uygulama adı 'PolpularHero' olarak ayarlanıyor
conf = SparkConf().setMaster("local").setAppName("PolpularHero")

# SparkContext oluşturuluyor, Spark uygulaması burada çalışacak
sc = SparkContext(conf = conf)

# 'Marvel-names.txt' dosyasındaki süper kahraman isimleri bir RDD olarak yükleniyor
names = sc.textFile("Marvel-names.txt")
namesRdd = names.map(parseNames)

# 'Marvel-graph.txt' dosyasındaki süper kahraman ilişkileri bir RDD olarak yükleniyor
lines = sc.textFile("Marvel-graph.txt")

# Her satırdaki süper kahraman ID'si ve bu kahramanın birlikte yer aldığı diğer kahraman sayısını içeren çiftler oluşturuluyor
pairings = lines.map(countCoOccurences)

# Her süper kahraman için birlikte göründüğü kahramanların toplam sayısı hesaplanıyor
totalFriendsByCharacter = pairings.reduceByKey(lambda x, y : x + y)

# Toplam arkadaş sayısını anahtar olarak kullanarak, ID ile birlikte sıralama için çiftler oluşturuluyor
flipped = totalFriendsByCharacter.map(lambda xy : (xy[1], xy[0]))

# En çok arkadaşa sahip süper kahraman bulunuyor
mostPopular = flipped.max()

# Bu süper kahramanın ismi, ID'si kullanılarak bulunuyor
mostPopularName = namesRdd.lookup(mostPopular[1])[0]

# En popüler süper kahramanın ismi ve toplam arkadaş sayısı ekrana yazdırılıyor
print(str(mostPopularName) + " is the most popular superhero, with " + str(mostPopular[0]) + \
      " co-appearances.")

# SparkContext sonlandırılıyor, tüm kaynaklar serbest bırakılıyor
sc.stop()


b'CAPTAIN AMERICA' is the most popular superhero, with 1933 co-appearances.


In [30]:
# Bu proje, Pima Kızılderililerinin diyabet verisini kullanarak bir lojistik regresyon modeli eğitir ve test eder.
# Projede PySpark kullanılarak veri işleme, model eğitimi, tahmin ve değerlendirme işlemleri yapılmaktadır.

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Adım 1: Spark oturumu başlatılıyor
# SparkSession, Spark SQL ve DataFrame API'si ile çalışmak için temel nesnedir
spark = SparkSession.builder.appName("PimaIndianClassification").getOrCreate()

# Adım 2: Veri kümesini yükleme
# Pima Kızılderililerine ait diyabet veri seti CSV formatında yükleniyor
# 'inferSchema=True' ile veri tipleri otomatik olarak belirleniyor ve 'header=True' ile ilk satır başlık olarak kabul ediliyor
data = spark.read.csv("pima-indians-diabetes.csv", inferSchema=True, header=True)

# Adım 3: Eğitim için veriyi hazırlama
# Model eğitimi için gerekli özellik sütunlarını bir araya getiriyoruz
feature_columns = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Özellik sütunları birleştiriliyor ve 'features' adlı yeni bir sütun oluşturuluyor
# Sonrasında sadece 'features' ve 'Outcome' (etiket) sütunları seçiliyor
data = assembler.transform(data).select("features", "Outcome")

# Adım 4: Veriyi eğitim ve test kümelerine ayırma
# Veriyi rastgele olarak %70 eğitim ve %30 test olacak şekilde ikiye ayırıyoruz
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Adım 5: Lojistik regresyon modeli eğitimi
# Lojistik regresyon modeli oluşturuluyor, etiket ve özellik sütunları belirleniyor
lr = LogisticRegression(labelCol="Outcome", featuresCol="features")

# Model, eğitim verisi üzerinde eğitiliyor
model = lr.fit(train_data)

# Adım 6: Test verisi üzerinde tahmin yapma
# Eğitimden sonra model, test verisi üzerinde tahmin yapıyor
predictions = model.transform(test_data)

# Adım 7: Modelin değerlendirilmesi
# Modelin performansı, binary sınıflandırma için değerlendiriliyor
evaluator = BinaryClassificationEvaluator(labelCol="Outcome")

# Modelin doğruluğu hesaplanıyor ve ekrana yazdırılıyor
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.8546265328874024
