In [47]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/avocado-prices/avocado.csv


In [48]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum
spark = SparkSession.builder \
    .appName("avocado price pred") \
    .getOrCreate()
df = spark.read.csv('/kaggle/input/avocado-prices/avocado.csv', header=True, inferSchema=True)
df.show(5)

df.printSchema()


+---+----------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225| 4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|
+---+----------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85|48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81|58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67|130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|
|  3|2015-12-06|        1.08|    78992.15| 1132.0| 71976.41|72.58|   5811.16|    5677.4|    133.76|        0.0|conventional|2015|Albany|
|  4|2015-11-29|        1.28|     51039.6

25/07/31 20:32:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
 Schema: _c0, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
Expected: _c0 but found: 
CSV file: file:///kaggle/input/avocado-prices/avocado.csv


In [49]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()
df.count()

25/07/31 20:32:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
 Schema: _c0, Date, AveragePrice, Total Volume, 4046, 4225, 4770, Total Bags, Small Bags, Large Bags, XLarge Bags, type, year, region
Expected: _c0 but found: 
CSV file: file:///kaggle/input/avocado-prices/avocado.csv


+---+----+------------+------------+----+----+----+----------+----------+----------+-----------+----+----+------+
|_c0|Date|AveragePrice|Total Volume|4046|4225|4770|Total Bags|Small Bags|Large Bags|XLarge Bags|type|year|region|
+---+----+------------+------------+----+----+----+----------+----------+----------+-----------+----+----+------+
|  0|   0|           0|           0|   0|   0|   0|         0|         0|         0|          0|   0|   0|     0|
+---+----+------------+------------+----+----+----+----------+----------+----------+-----------+----+----+------+



18249

In [50]:
df=df.drop('_c0')
df = df.withColumnRenamed("4046", "PLU_4046") \
       .withColumnRenamed("4225", "PLU_4225") \
       .withColumnRenamed("4770", "PLU_4770")


In [51]:
from pyspark.sql.functions import to_date,day,month,year,dayofweek

df = df.withColumn("Date", to_date("Date", "yyyy-MM-dd"))

df=df.withColumn('Day',day(col('Date')))\
     .withColumn('Month',month(col('Date')))\
      .withColumn('year',year(col('Date')))\
       .withColumn('Day_of_week',dayofweek(col('Date')))
df=df.drop('Date')

In [52]:
from pyspark.sql.functions import when

df = df.withColumn("PLU_4046_Ratio", col("PLU_4046") / col("Total Volume"))\
       .withColumn("PLU_4225_Ratio", col("PLU_4225") / col("Total Volume"))\
       .withColumn("PLU_4770_Ratio", col("PLU_4770") / col("Total Volume"))\
       .withColumn("SmallBag_Ratio", col("Small Bags") / col("Total Bags"))\
       .withColumn("LargeBag_Ratio", col("Large Bags") / col("Total Bags"))\
       .withColumn("XLargeBag_Ratio", col("XLarge Bags") / col("Total Bags"))

df = df.withColumn("Season",
    when(col("Month").isin(12, 1, 2), "Winter")
    .when(col("Month").isin(3, 4, 5), "Spring")
    .when(col("Month").isin(6, 7, 8), "Summer")
    .otherwise("Fall")
)


In [53]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
df=df.dropna()
numeric_cols = [
    'Total Volume', 'PLU_4046', 'PLU_4225', 'PLU_4770',
    'Total Bags', 'Small Bags', 'Large Bags', 'XLarge Bags',
    'year', 'Day', 'Month', 'Day_of_week',
    'PLU_4046_Ratio', 'PLU_4225_Ratio', 'PLU_4770_Ratio',
    'SmallBag_Ratio', 'LargeBag_Ratio', 'XLargeBag_Ratio']

categorical_cols = ['type', 'region', 'Season']

indexers = [StringIndexer(inputCol=col, outputCol=col + '_encoded') for col in categorical_cols]
encoded_cols = [col + "_encoded" for col in categorical_cols]

assembler_input = numeric_cols + encoded_cols

assembler = VectorAssembler(inputCols=assembler_input, outputCol='assembled_features')

scaler = MinMaxScaler(inputCol='assembled_features', outputCol='features')

train_data, test_data = df.randomSplit([0.8, 0.2], seed=1)

rf = RandomForestRegressor(featuresCol='features', labelCol='AveragePrice', numTrees=50, maxDepth=10)

pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

predictions.select('AveragePrice', 'prediction').show(10)


25/07/31 20:32:09 WARN DAGScheduler: Broadcasting large task binary with size 1132.8 KiB
25/07/31 20:32:11 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/07/31 20:32:13 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
25/07/31 20:32:14 WARN DAGScheduler: Broadcasting large task binary with size 1066.2 KiB
25/07/31 20:32:16 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
25/07/31 20:32:18 WARN DAGScheduler: Broadcasting large task binary with size 1809.8 KiB
                                                                                

+------------+------------------+
|AveragePrice|        prediction|
+------------+------------------+
|        0.49|0.7266116534072552|
|        0.53|0.7046132013173456|
|        0.53|0.6025084282093021|
|        0.53|0.6325070658803155|
|        0.53|0.7738557340867506|
|        0.56| 0.904388228252231|
|        0.56| 0.722211679047507|
|        0.56|0.7012874916133255|
|        0.57|0.7500310694025011|
|        0.57| 0.605569264428526|
+------------+------------------+
only showing top 10 rows



In [54]:
from pyspark.ml.regression import RandomForestRegressor

rmse_rf = evaluator_rmse.evaluate(predictions_rf)
mae_rf = evaluator_mae.evaluate(predictions_rf)
r2_rf = evaluator_r2.evaluate(predictions_rf)

print(f"RF RMSE: {rmse_rf:.4f}")
print(f"RF MAE: {mae_rf:.4f}")
print(f"RF R2: {r2_rf:.4f}")


RF RMSE: 0.1715
RF MAE: 0.1273
RF R2: 0.8218
