In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline, PipelineModel

In [3]:

spark = SparkSession.builder \
.appName("PipelineOps") \
.master("local[2]") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

In [6]:
df = spark.read \
.option("header","True") \
.option("inferSchema","True") \
.option("sep",",") \
.csv("C:\\Users\\tbura\\data_sets\\simple_data_clear.csv")

In [7]:
df.toPandas().head()

Unnamed: 0,sirano,isim,yas,meslek,sehir,aylik_gelir
0,1,Cemal,35,Isci,Ankara,3500
1,2,Ceyda,42,Memur,Kayseri,4200
2,3,Timur,30,Müzisyen,Istanbul,9000
3,4,Burcu,29,Pazarlamaci,Ankara,4200
4,5,Yasemin,23,Pazarlamaci,Bursa,4800


## VERİ SETİNE ETİKET EKLEME

In [10]:
df1 = df.withColumn("ekonomik_durum",
    F.when(F.col("aylik_gelir") > 7000, "iyi").otherwise("kötü")
)

## VERİ SETİNİ TRAIN-TEST AYIRMA

In [9]:
train_df, test_df = df1.randomSplit([0.8, 0.2], seed=142)

## Pipeline için nesneleri oluşturma

In [12]:
meslek_indexer = StringIndexer() \
.setInputCol("meslek") \
.setOutputCol("meslek_index") \
.setHandleInvalid("skip")

In [13]:
sehir_indexer = StringIndexer() \
.setInputCol("sehir") \
.setOutputCol("sehir_index") \
.setHandleInvalid("skip")

In [14]:
encoder = OneHotEncoderEstimator() \
.setInputCols(["meslek_index","sehir_index"]) \
.setOutputCols(["meslek_encoded","sehir_encoded"])

In [15]:
assembler = VectorAssembler() \
.setInputCols(["yas","aylik_gelir","meslek_encoded","sehir_encoded"]) \
.setOutputCol("vectorized_features")

In [16]:
label_indexer = StringIndexer() \
.setInputCol("ekonomik_durum") \
.setOutputCol("label")

In [17]:

scaler = StandardScaler() \
.setInputCol("vectorized_features") \
.setOutputCol("features")

In [18]:
lr_object = LogisticRegression() \
.setFeaturesCol("features") \
.setLabelCol("label") \
.setPredictionCol("prediction")

## Pipeline

In [19]:
pipeline_nesnesi = Pipeline() \
.setStages([meslek_indexer,sehir_indexer,encoder,assembler,label_indexer,scaler,lr_object])

In [20]:
pipeline_modeli = pipeline_nesnesi.fit(train_df)

In [21]:
pipeline_modeli.transform(test_df).select("label","prediction").toPandas().head()

Unnamed: 0,label,prediction
0,1.0,1.0
1,0.0,0.0
2,1.0,1.0
