## Pyspark Giriş

In [None]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
!pip install findspark

In [None]:
import findspark

In [None]:
findspark.init("/Users/krtdnc/spark/spark-3.5.0-bin-hadoop3")

In [None]:
from pyspark import SparkContext

## Spark Uygulamasının Başlatılması

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("pyspark_giris") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
sc

In [None]:
sc.version

In [None]:
sc.appName

In [None]:
sc.stop()

## Temel DataFrame İşlemleri

In [None]:
sc

In [None]:
spark_df = spark.read.csv("./churn.csv", header = True, inferSchema = True)

In [None]:
spark_df.printSchema()

In [None]:
type(spark_df)

In [None]:
spark_df.cache()

In [None]:
!pip install seaborn

In [None]:
import seaborn as sns
df = sns.load_dataset("diamonds")

In [None]:
type(df)

In [None]:
df.head()

In [None]:
spark_df.head() #Her pandas fonskiyonu spark_df lerde aynı çalışmaz ve farklı sonuçlar verebilir sparkın kendi fonksiyonlarını öğrenmek zorundayız.

In [None]:
spark_df.show(5, truncate = True)

In [None]:
spark_df.count()

In [None]:
spark_df.columns

In [None]:
spark_df.describe().show()

In [None]:
spark_df.describe("Age").show()

In [None]:
spark_df.select("Age","Names").show()

In [None]:
spark_df.filter(spark_df.Age>40).count()

In [None]:
spark_df.groupby("Churn").count().show()

In [None]:
spark_df.groupby("Churn").agg({"Age":"mean"}).show()

In [None]:
sc.stop()

## SQL İşlemleri

In [None]:
sc

In [None]:
spark_df.registerTempTable("tbl_df")

In [None]:
spark.sql("show databases").show()

In [None]:
spark.sql("show tables").show()

In [None]:
spark.sql("select Age from tbl_df").show(5)

In [None]:
spark.sql("select Churn, mean(Age) from tbl_df group by Churn").show()

In [None]:
sc.stop()

## Büyük Veri Görselleştirme

In [None]:
sc

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
sdf = spark_df.toPandas() 

# Matplotlib pandas dataframe üzerinden çalışır bu yüzden önce spark ortamındaki dataframei pandas dataframeine çevirmemiz gerekiyor.
# Ancak çok büyük verilerde bunu yapamayacağımızdan dolayı sadece ihtiyacımız olan kolonları çekmemiz gerekir. Aşağıda örnek veriyoruz.
# Local makinamızda büyük veriyi görselleştiremeyeceğimiz için indirgeme işleminden sonra görselleştirmemiz gerekir. Örn: [46]

In [None]:
sdf.head()

In [None]:
sns.barplot(x = "Churn", y = sdf.Churn.index, data = sdf)

In [None]:
a = spark_df.groupby("Churn").count().toPandas()

# Büyük veri indirgeme işlemi

In [None]:
a

In [None]:
sc.stop()

## Makine Öğrenmesi

In [None]:
sc

In [None]:
spark_df = spark.read.csv("./churn.csv", header = True, inferSchema = True)

In [None]:
spark_df.show(5)

In [None]:
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])

In [None]:
spark_df.show(5)

In [None]:
spark_df = spark_df.withColumnRenamed("_c0", "index")

In [None]:
spark_df.show(5)

In [None]:
spark_df.count()

In [None]:
len(spark_df.columns)

In [None]:
spark_df.columns

In [None]:
spark_df.describe().show()

In [None]:
spark_df.select("age","total_purchase","account_manager","years","num_sites","churn").describe().toPandas().transpose()

In [None]:
spark_df = spark_df.dropna()

In [None]:
spark_df = spark_df.withColumn("age_kare", spark_df.age**2)

In [None]:
spark_df.show(5)

In [None]:
# Bağımli değişkeni belirtme

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
stringIndexer = StringIndexer(inputCol = "churn", outputCol = "label")
mod = stringIndexer.fit(spark_df)
indexed = mod.transform(spark_df)
spark_df = indexed.withColumn("label", indexed["label"].cast("integer"))

In [None]:
spark_df.show(5)

In [None]:
# Bağımsız değişkenleri ayarlama

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
spark_df.columns

In [None]:
bagimsiz_degiskenler = ["age", "total_purchase", "account_manager", "years", "num_sites"]

In [None]:
vectorAssembler = VectorAssembler(inputCols = bagimsiz_degiskenler, outputCol = "features")
va_df = vectorAssembler.transform(spark_df)

In [None]:
va_df.show(5)

In [None]:
final_df = va_df.select(["features", "label"])

In [None]:
final_df.show(5)

In [None]:
# Test _ Train

In [None]:
splits = final_df.randomSplit([0.70,0.30])
train_df = splits[0]
test_df = splits[1]

In [None]:
train_df

In [None]:
test_df

In [None]:
sc.stop()

## GBM ile Müşteri Terk Modellemesi

In [None]:
sc

In [None]:
from pyspark.ml.classification import GBTClassifier

In [None]:
gbm = GBTClassifier(maxIter = 10, featuresCol = "features", labelCol = "label")

In [None]:
gbm_model = gbm.fit(train_df)

In [None]:
y_pred = gbm_model.transform(test_df)

In [None]:
ac = y_pred.select("label", "prediction")

In [None]:
ac.filter(ac.label == ac.prediction).count()/ac.count()

In [None]:
# Model Tuning

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator = BinaryClassificationEvaluator()

paramGrid = (ParamGridBuilder()
             .addGrid(gbm.maxDepth, [2, 4, 6])
             .addGrid(gbm.maxBins, [20, 30])
             .addGrid(gbm.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator=gbm, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)

In [None]:
cvModel = cv.fit(train_df)

In [None]:
y_pred = cvModel.transform(test_df)

In [None]:
ac = y_pred.select("label","prediction")

In [None]:
ac.filter(ac.label == ac.prediction).count() / ac.count()

In [None]:
# Yeni Müşteri Terk Eder mi ? Etmez mi ?

In [None]:
import pandas as pd

names = pd.Series(["Ali Ahmetoğlu", "Taner Gün", "Berkay", "Polat Konak", "Kamil Atasoy"])
age = pd.Series([38, 43, 34, 50, 40])
total_purchase = pd.Series([30000, 10000, 6000, 30000, 100000])
account_manager = pd.Series([1, 0, 0, 1, 1])
years = pd.Series([20, 10, 3, 8, 30])
num_sites = pd.Series([30, 8, 8, 6, 50])

yeni_musteriler = pd.DataFrame({
    "names":names,
    "age":age,
    "total_purchase":total_purchase,
    "account_manager":account_manager,
    "years":years,
    "num_sites":num_sites})

yeni_musteriler.columns

In [None]:
yeni_sdf = spark.createDataFrame(yeni_musteriler)

In [None]:
yeni_sdf.show(5)

In [None]:
yeni_musteriler = vectorAssembler.transform(yeni_sdf)

In [None]:
results = cvModel.transform(yeni_musteriler)

In [None]:
results.select("names","prediction").show()

In [None]:
sc.stop()