In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("local[4]") \
.appName("IrisClassification") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.memory", "1g") \
.getOrCreate()

In [None]:
iris = spark.read \
.option("header", True) \
.option("inferSchema", True) \
.option("sep", ",") \
.csv("/home/alper/Spark/data/iris.csv")

In [None]:
iris.limit(5).toPandas().head()

In [None]:
iris.count()

In [None]:
iris.printSchema()

In [None]:
numeric = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
categorical = ["species"]

In [None]:
iris.describe(numeric).toPandas()

In [None]:
from pyspark.sql import functions as F

In [None]:
iris.groupBy(F.col("species")).agg({"*": "count"}).toPandas()

In [None]:
null_count = 1
for column in iris.columns:
    if (iris.filter(F.col(column).isNull()).count() > 0):
        print(null_count, ". ", column, " has null.")
    else:
        print(null_count, ". ", column)
        
    null_count += 1

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [None]:
species_indexer = StringIndexer() \
.setInputCol("species") \
.setOutputCol("label")

In [None]:
iris = species_indexer.fit(iris).transform(iris)

In [None]:
iris.toPandas().head()

In [None]:
assembler = VectorAssembler() \
.setInputCols(numeric) \
.setOutputCol("features")

In [None]:
iris = assembler.transform(iris)

In [None]:
iris.toPandas().head()

In [None]:
train_df, test_df = iris.randomSplit([0.8, 0.2], seed=4242)

In [None]:
print("Train: ", train_df.count())
print("Test: ", test_df.count())

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
logreg_object = LogisticRegression() \
.setLabelCol("label") \
.setFeaturesCol("features")

In [None]:
model = logreg_object.fit(train_df)

In [None]:
transformedDF = model.transform(test_df)

In [None]:
transformedDF.limit(5).toPandas().head()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
multi = MulticlassClassificationEvaluator() \
.setLabelCol("label") \
.setPredictionCol("prediction") \
.setMetricName("accuracy")

In [None]:
accuracy = multi.evaluate(transformedDF)

In [None]:
accuracy