In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()

# Read data from CSV file
df = spark.read.csv('avocado.csv', sep=',', header=True, inferSchema=True,
                         nullValue='NA')

df.show(5)

22/08/22 10:55:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: NA, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
 Schema: _c0, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
Expected: _c0 but found: NA
CSV file: file:///Users/wirarama/python/FGA/BPDFGA/avocado.csv
+---+-------------------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|_c0|               Date|AveragePrice|Total Volume|   4046|     4225| 4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|
+---+-------------------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|  0|2015-12-27 00:00:00|        1.33|    64236.62|1036.74| 54454.85|48.16|   8696.87|   8603.62|     93.25|        0.0|

In [3]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='region', outputCol='region_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(df)

# Indexer creates a new column with numeric index values
df_indexed = indexer_model.transform(df)

                                                                                

In [6]:
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['Small Bags','Large Bags','XLarge Bags'],
                            outputCol='features')

# Consolidate predictor columns
df_assembled = assembler.transform(df_indexed)

# Check the resulting column
df_assembled.select('features', 'region_idx').show(5, truncate=False)

+--------------------+----------+
|features            |region_idx|
+--------------------+----------+
|[8603.62,93.25,0.0] |0.0       |
|[9408.07,97.49,0.0] |0.0       |
|[8042.21,103.14,0.0]|0.0       |
|[5677.4,133.76,0.0] |0.0       |
|[5986.26,197.69,0.0]|0.0       |
+--------------------+----------+
only showing top 5 rows



In [7]:
df_train, df_test = df_assembled.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = df_train.count() / df_assembled.count()
print(training_ratio)

22/08/22 11:05:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: NA, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
 Schema: _c0, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
Expected: _c0 but found: NA
CSV file: file:///Users/wirarama/python/FGA/BPDFGA/avocado.csv


                                                                                

0.7967011891062524


In [9]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(df_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(df_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

IllegalArgumentException: label does not exist. Available: _c0, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region, region_idx, features