In [None]:
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession.builder.config(conf=SparkConf())

In [None]:
!pip install pyspark

## Set up SparkContext and SparkSession

In [None]:
# Load our Pkgs
from pyspark import SparkContext
import findspark
findspark.init()

import pyspark.sql.functions as f 

# Thư viện FPGrowth để áp dụng giải thuật khai thác mẫu phổ biến và luật kết hợp
from pyspark.ml.fpm import FPGrowth

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession


In [None]:
# Load Pkgs 
from pyspark.sql import SparkSession

In [None]:
# Spark
spark = SparkSession.builder.appName("MLwithSpark").getOrCreate()

## Load data

In [None]:
# Load our dataset
df = spark.read.csv("../../data/Bai2/mushrooms.csv",header=True,inferSchema=True)

## Import modules

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Preprocess feature columns

In [None]:
in_cols = df.schema.names[1:]
in_cols

In [None]:
# Convert the string into numerical code
# label encoding
for col in in_cols:
  genderEncoder = StringIndexer(inputCol= col, outputCol= col + " encode").fit(df)
  df = genderEncoder.transform(df)

In [None]:
label_indexer = StringIndexer(inputCol=df.schema.names[0], outputCol='label').fit(df)
df = label_indexer.transform(df)

In [None]:
df1 = df.select('label', 'cap-shape encode', 'cap-surface encode', 'cap-color encode', 'bruises encode', 'odor encode', 'gill-attachment encode', 'gill-spacing encode', 'gill-size encode', 'gill-color encode', 'stalk-shape encode', 'stalk-root encode', 'stalk-surface-above-ring encode', 'stalk-surface-below-ring encode', 'stalk-color-above-ring encode', 'stalk-color-below-ring encode', 'veil-type encode', 'veil-color encode', 'ring-number encode', 'ring-type encode', 'spore-print-color encode', 'population encode', 'habitat encode')

In [None]:
print(df1.describe().show())

In [None]:
# Remove "veil-type encode" column
df1 = df1.drop("veil-type encode")

In [None]:
# VectorAsm
vec_assembler = VectorAssembler(inputCols=df1.columns[1:],outputCol='features')

In [None]:
main_df = vec_assembler.transform(df1)

In [None]:
main_df.show(5)

In [None]:
main_df = main_df.select(['features', 'label'])

## 1. Train-test split

In [None]:
(df_train, df_val) = main_df.randomSplit([0.8, 0.2], seed=2022)

## 2.  Decision tree model

In [None]:
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc = dtc.fit(df_train)

In [None]:
pred = dtc.transform(df_val)
pred.show(3)

## 3. Random Forest Model

In [None]:
rdc = RandomForestClassifier(featuresCol='features', labelCol='label')
rdc = rdc.fit(df_train)

In [None]:
pred1 = rdc.transform(df_val)
pred1.show(3)

## 4. Evaluate models

### Decision Tree

In [None]:
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
print("Decionsion Tree Prediction Accuracy: ", acc)

### Random Forest

In [None]:
evaluator1=MulticlassClassificationEvaluator(predictionCol="prediction")
acc1 = evaluator1.evaluate(pred1)
print("Random Forest Prediction Accuracy: ", acc1)

## 5. Use Pipeline to set up the above steps as a thread

In [None]:
# Reload data
df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/Data/mushrooms.csv",header=True,inferSchema=True)

In [None]:
# Get all columns except "class" column
in_cols = df.schema.names[1:]


# Create a list of StringIndexer objects to convert strings to integer
string_indexers = [StringIndexer(inputCol=col, outputCol=col+'_index') for col in in_cols]

# Create a list of OneHotEncoder objects to convert integer indices of cat levels to one-hot encoded columns
onehot_encoders = [OneHotEncoder(dropLast=False, inputCol=col+'_index', outputCol=col+'_onehot') for col in in_cols]

# Create a VectorAssembler object that assembles all the one-hot encoded columns into one column
onehot_cols = [col+'_onehot' for col in in_cols]
feat_assembler = VectorAssembler(inputCols=onehot_cols, outputCol='features')

# Create a StringIndexer object that converts "class" column from {e, p} to {0, 1}
label_indexer = StringIndexer(inputCol=df.schema.names[0], outputCol='label')

# Create a Pipeline object that combines all the transformations we defined above
pipeline = Pipeline(stages=string_indexers+onehot_encoders+[feat_assembler, label_indexer])


# Use the pipeline object to transform our dataframe
mushrooms_trans = pipeline.fit(df).transform(df).cache()

In [None]:
mushrooms_train, mushrooms_val = mushrooms_trans.randomSplit([0.8, 0.2], seed=2022)

In [None]:
dtc_pipeline = DecisionTreeClassifier(featuresCol="features", labelCol="label").fit(mushrooms_train)
rdc_pipeline = RandomForestClassifier(featuresCol='features', labelCol='label').fit(mushrooms_train)

In [None]:
pred_pipeline = dtc_pipeline.transform(mushrooms_val)
pred1_pipeline = rdc_pipeline.transform(mushrooms_val)

In [None]:
# Decision Tree
evaluator_pipeline=MulticlassClassificationEvaluator(predictionCol="prediction")
acc_dtc = evaluator_pipeline.evaluate(pred_pipeline)

# Random Forest
evaluator1_pipeline=MulticlassClassificationEvaluator(predictionCol="prediction")
acc_rdc = evaluator1_pipeline.evaluate(pred1_pipeline)

In [None]:
print("Decionsion Tree Prediction Accuracy: ", acc_dtc)
print("Random Forest Prediction Accuracy: ", acc_rdc)