In [7]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import os

In [8]:
BASE_PATH = f"{os.path.abspath('')}/../data/iris/iris.data"

In [None]:
def linear_regression(sqlContext):
    struct = StructType([
        StructField('sepal_length', FloatType(), True),
        StructField('sepal_width', FloatType(), True),
        StructField('petal_length', FloatType(), True),
        StructField('petal_width', FloatType(), True),
        StructField('species', StringType(), True)])
    iris_df = spark.read.schema(struct).csv(BASE_PATH ,header=False)
    
    vectorAssembler = VectorAssembler(inputCols = ['sepal_len','sepal_wid','petal_len','petal_wid'], outputCol = 'features')
    viris_df = vectorAssembler.transform(iris_df)
    viris_df = viris_df.select(['features', 'output'])
    
    splits = viris_df.randomSplit([0.8, 0.2])
    train_df = splits[0]
    test_df = splits[1]
    
    lr = LinearRegression(featuresCol = 'features', labelCol='output', maxIter=10, regParam=0.3, elasticNetParam=0.8)
    lr_model = lr.fit(train_df)
    print("Coefficients: " + str(lr_model.coefficients))
    print("Intercept: " + str(lr_model.intercept))
    trainingSummary = lr_model.summary
    print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    print("r2: %f" % trainingSummary.r2)
    train_df.describe().show()

In [None]:
def decision_tree(sqlContext):
    
    struct = StructType([
        StructField('sepal_length', FloatType(), True),
        StructField('sepal_width', FloatType(), True),
        StructField('petal_length', FloatType(), True),
        StructField('petal_width', FloatType(), True),
        StructField('species', StringType(), True)])
    iris_df = spark.read.schema(struct).csv(BASE_PATH ,header=False)
    
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(iris_df)
    
    featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=5).fit(iris_df)
    
    (trainingData, testData) = data.randomSplit([0.8, 0.2])
    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
    
    pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
    model = pipeline.fit(trainingData)
    predictions = model.transform(testData)
    
    predictions.select("prediction", "indexedLabel", "features").show(5)
    
    evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Test Error = %g " % (1.0 - accuracy))

    treeModel = model.stages[2]
    # summary only
    print(treeModel)

In [None]:
sc= SparkContext()
sqlContext = SQLContext(sc)

In [None]:
linear_regression(sqlContext)
decision_tree(sql_Context)