In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark=SparkSession.builder \
.master("local[4]") \
.appName("preprocessOps") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [5]:
df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",",") \
.csv("/home/taha/Downloads/simple_data.csv")

In [6]:
df.toPandas().head()

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir
0,1,Cemal,35,Isci,Ankara,3500
1,2,Ceyda,42,Memur,Kayseri,4200
2,3,Timur,30,Müzisyen,Istanbul,9000
3,4,Burcu,29,Pazarlamaci,Ankara,4200
4,5,Yasemin,23,Pazarlamaci,Bursa,4800


### Veri setine etiket ekleme

In [7]:
from pyspark.sql.functions import *

In [8]:
df1 = df.withColumn("ekonomik_durum",
when(col("aylik_gelir") > 7000,"iyi").otherwise("kötü")                   
)
df1.toPandas().head()

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum
0,1,Cemal,35,Isci,Ankara,3500,kötü
1,2,Ceyda,42,Memur,Kayseri,4200,kötü
2,3,Timur,30,Müzisyen,Istanbul,9000,iyi
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü


# 1. StringIndexer Asamasi


In [9]:
from pyspark.ml.feature import StringIndexer , OneHotEncoderEstimator , VectorAssembler , StandardScaler

In [10]:
meslek_indexer = StringIndexer() \
.setInputCol("meslek") \
.setOutputCol("meslek_index")

In [11]:
meslek_indexer_model = meslek_indexer.fit(df1)
meslek_indexer_df = meslek_indexer_model.transform(df1)

In [12]:
# burda kategorik degiskeni indexlemeye calisiyoruz
# StringIndexer oncelikle neyi indexleyecegini belirliyor
# sonra parametreden gelen dataframe gore model olusturoyor
# ve son asamada bu modele gore dataframi güncel dtaframe ceviriyor
meslek_indexer_df.toPandas().head(20)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index
0,1,Cemal,35,Isci,Ankara,3500,kötü,6.0
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0
2,3,Timur,30,Müzisyen,Istanbul,9000,iyi,2.0
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,1.0
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,1.0
5,6,Ali,33,Memur,Ankara,4250,kötü,0.0
6,7,Dilek,29,Pazarlamaci,Istanbul,7300,iyi,1.0
7,8,Murat,31,Müzisyen,Istanbul,12000,iyi,2.0
8,9,Ahmet,33,Doktor,Ankara,18000,iyi,3.0
9,10,Muhittin,46,Berber,Istanbul,12000,iyi,7.0


In [13]:
df1.groupBy(col("meslek")) \
.agg(count("*").alias("sayi"))\
.sort(desc("sayi")) \
.toPandas().head(20)

Unnamed: 0,meslek,sayi
0,Memur,3
1,Müzisyen,3
2,Pazarlamaci,3
3,Doktor,2
4,Isci,1
5,Tuhafiyeci,1
6,Berber,1
7,Tornacı,1


In [14]:
 # string indexer soldakine gore değer veriyor
    #en çok tekrar edenleri
    # buluyor katagorileştiriyor

In [15]:
sehir_indexer = StringIndexer() \
.setInputCol("sehir") \
.setOutputCol("sehir_index")

In [16]:
sehir_indexer_model = sehir_indexer.fit(meslek_indexer_df)
sehir_indexer_df = sehir_indexer_model.transform(meslek_indexer_df)

In [17]:
sehir_indexer_df.toPandas().head(20)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index
0,1,Cemal,35,Isci,Ankara,3500,kötü,6.0,0.0
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0,5.0
2,3,Timur,30,Müzisyen,Istanbul,9000,iyi,2.0,1.0
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,1.0,0.0
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,1.0,2.0
5,6,Ali,33,Memur,Ankara,4250,kötü,0.0,0.0
6,7,Dilek,29,Pazarlamaci,Istanbul,7300,iyi,1.0,1.0
7,8,Murat,31,Müzisyen,Istanbul,12000,iyi,2.0,1.0
8,9,Ahmet,33,Doktor,Ankara,18000,iyi,3.0,0.0
9,10,Muhittin,46,Berber,Istanbul,12000,iyi,7.0,1.0


# 2. OneHotEncoderEstimator Aşaması

In [18]:
# stringIndexerın müsteri onehotencoderdır
encoder = OneHotEncoderEstimator()\
.setInputCols(["meslek_index","sehir_index"]) \
.setOutputCols(["meslek_encoded","sehir_encoded"])

In [19]:
encoder_model = encoder.fit(sehir_indexer_df)
encoder_df = encoder_model.transform(sehir_indexer_df)

In [20]:
encoder_df.toPandas().head(5)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index,meslek_encoded,sehir_encoded
0,1,Cemal,35,Isci,Ankara,3500,kötü,6.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0)"
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0,5.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0)"
2,3,Timur,30,Müzisyen,Istanbul,9000,iyi,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)"
3,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,1.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)"
4,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,1.0,2.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0)"


# 3. VectorAssembler Aşaması

In [21]:
assembler = VectorAssembler() \
.setInputCols(["yas","aylik_gelir","meslek_encoded","sehir_encoded"]) \
.setOutputCol("vectorized_features")

In [22]:
# not fit etmenin amacı öğrenmek burda gerek yok

In [23]:
assembler_df = assembler.transform(encoder_df)

In [24]:
assembler_df.select("vectorized_features").toPandas().head()

Unnamed: 0,vectorized_features
0,"(35.0, 3500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1..."
1,"(42.0, 4200.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
2,"(30.0, 9000.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0..."
3,"(29.0, 4200.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0..."
4,"(23.0, 4800.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0..."


# 4. LabelIndexer Aşaması

In [25]:
label_indexer = StringIndexer() \
.setInputCol("ekonomik_durum") \
.setOutputCol("label")

In [26]:
label_indexer_model = label_indexer.fit(assembler_df)

In [27]:
label_indexer_df = label_indexer_model.transform(assembler_df)

In [28]:
label_indexer_df.select("vectorized_features","label").toPandas().head()

Unnamed: 0,vectorized_features,label
0,"(35.0, 3500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1...",0.0
1,"(42.0, 4200.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0
2,"(30.0, 9000.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0...",1.0
3,"(29.0, 4200.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0
4,"(23.0, 4800.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0


In [29]:
# 35   0 ile 1 arası çok baskın oluyor standartlaştırmalıyız

# 5. StandardScaler Aşaması

In [30]:
scaler = StandardScaler() \
.setInputCol("vectorized_features") \
.setOutputCol("features") \

In [31]:
scaler_model = scaler.fit(label_indexer_df)

In [32]:
scaler_df = scaler_model.transform(label_indexer_df)

In [33]:
scaler_df.select("features").toPandas().head()

Unnamed: 0,features
0,"(5.0082809740601215, 0.7723249167865694, 0.0, ..."
1,"(6.009937168872146, 0.9267899001438834, 2.4152..."
2,"(4.292812263480104, 1.9859783574511787, 0.0, 0..."
3,"(4.149718521364101, 0.9267899001438834, 0.0, 2..."
4,"(3.29115606866808, 1.0591884573072952, 0.0, 2...."


In [34]:
# normalleştirme işlemi gerçekleşti o büyük sayılar gitti

# 6. Train - Test Ayırma Aşaması

In [35]:
train_df , test_df = scaler_df.randomSplit([0.8,0.2],seed = 142)

In [36]:
# seed i seçimler hocayla aynı olsun diye verdik

In [37]:
train_df.toPandas().head(15)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index,meslek_encoded,sehir_encoded,vectorized_features,label,features
0,1,Cemal,35,Isci,Ankara,3500,kötü,6.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(35.0, 3500.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1...",0.0,"(5.0082809740601215, 0.7723249167865694, 0.0, ..."
1,2,Ceyda,42,Memur,Kayseri,4200,kötü,0.0,5.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0)","(42.0, 4200.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0,"(6.009937168872146, 0.9267899001438834, 2.4152..."
2,4,Burcu,29,Pazarlamaci,Ankara,4200,kötü,1.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(29.0, 4200.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0,"(4.149718521364101, 0.9267899001438834, 0.0, 2..."
3,5,Yasemin,23,Pazarlamaci,Bursa,4800,kötü,1.0,2.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0, 0.0)","(23.0, 4800.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0,"(3.29115606866808, 1.0591884573072952, 0.0, 2...."
4,7,Dilek,29,Pazarlamaci,Istanbul,7300,iyi,1.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(29.0, 7300.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...",1.0,"(4.149718521364101, 1.610849112154845, 0.0, 2...."
5,8,Murat,31,Müzisyen,Istanbul,12000,iyi,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(31.0, 12000.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",1.0,"(4.435906005596108, 2.6479711432682382, 0.0, 0..."
6,11,Hicaziye,47,Tuhafiyeci,Ankara,4800,kötü,5.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(47.0, 4800.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0...",0.0,"(6.725405879452164, 1.0591884573072952, 0.0, 0..."
7,12,Harun,43,Tornacı,Ankara,4200,kötü,4.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(43.0, 4200.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0...",0.0,"(6.15303091098815, 0.9267899001438834, 0.0, 0...."
8,13,Hakkı,33,Memur,Çorum,3750,kötü,0.0,3.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0)","(33.0, 3750.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0,"(4.722093489828115, 0.8274909822713244, 2.4152..."
9,14,Gülizar,37,Doktor,İzmir,14250,iyi,3.0,4.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 1.0)","(37.0, 14250.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...",1.0,"(5.294468458292129, 3.1444657326310326, 0.0, 0..."


In [38]:
test_df.toPandas().head(15)

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir,ekonomik_durum,meslek_index,sehir_index,meslek_encoded,sehir_encoded,vectorized_features,label,features
0,3,Timur,30,Müzisyen,Istanbul,9000,iyi,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(30.0, 9000.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0...",1.0,"(4.292812263480104, 1.9859783574511787, 0.0, 0..."
1,6,Ali,33,Memur,Ankara,4250,kötü,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(33.0, 4250.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",0.0,"(4.722093489828115, 0.9378231132408343, 2.4152..."
2,9,Ahmet,33,Doktor,Ankara,18000,iyi,3.0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0)","(33.0, 18000.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...",1.0,"(4.722093489828115, 3.9719567149023574, 0.0, 0..."
3,10,Muhittin,46,Berber,Istanbul,12000,iyi,7.0,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0)","(46.0, 12000.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,"(6.58231213733616, 2.6479711432682382, 0.0, 0...."


# Basit Makine Öğrenmesi

In [39]:
from pyspark.ml.classification import LogisticRegression

In [40]:
lr_object = LogisticRegression()\
.setFeaturesCol("features") \
.setLabelCol("label") \
.setPredictionCol("prediction")

In [41]:
lr_model = lr_object.fit(train_df)

In [42]:
sonuc_df = lr_model.transform(test_df)

In [43]:
## basta fitleme islemi ogrenme islemi demistik
# biz train dataframe gore modellestirdik
# ve bu modeli kullanarak testdf i tahmin ettirmek icin
# yeni dataframe olusturduk klasik ogren modelle dataframe olustur
sonuc_df.select("label","prediction").toPandas().head()

Unnamed: 0,label,prediction
0,1.0,1.0
1,0.0,0.0
2,1.0,1.0
3,1.0,1.0
