In [1]:
# File location and type
file_location = "/FileStore/tables/avocado.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Max,Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,2015-12-27T00:00:00.000+0000,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,2015-12-20T00:00:00.000+0000,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2015-12-13T00:00:00.000+0000,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,2015-12-06T00:00:00.000+0000,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,2015-11-29T00:00:00.000+0000,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
5,2015-11-22T00:00:00.000+0000,1.26,55979.78,1184.27,48067.99,43.61,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
6,2015-11-15T00:00:00.000+0000,0.99,83453.76,1368.92,73672.72,93.26,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
7,2015-11-08T00:00:00.000+0000,0.98,109428.33,703.75,101815.36,80.0,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
8,2015-11-01T00:00:00.000+0000,1.02,99811.42,1022.15,87315.57,85.34,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
9,2015-10-25T00:00:00.000+0000,1.07,74338.76,842.4,64757.44,113.0,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [2]:
from pyspark.sql.functions import col, lit, unix_timestamp, datediff, to_date, from_unixtime, max, min, sum, count, explode, year, month, concat, lower, upper, approx_count_distinct, first
from pyspark.sql.types import ArrayType, LongType, TimestampType, FloatType
from pyspark.sql.window import Window
import seaborn as sns
import matplotlib.pyplot as plt
from pandas import melt

In [3]:
transform = (df
            .withColumn("AveragePrice", col("AveragePrice").cast("double").alias("AveragePrice"))
            .withColumn("TotalVolume", col("Total Volume").cast("double").alias("TotalVolume"))
            .withColumn('DateTime', unix_timestamp(col('Date').cast('string'), "yyyyMMdd"))
            .withColumn('Region', upper(lower(col("Region"))))
            )

In [4]:
from pyspark.sql.types import DateType

transform = transform.withColumn("Date",df['Date'].cast(DateType()))

In [5]:
transform = transform.select(year("Date").alias('year'),month("Date").alias('month'),'Region', 'AveragePrice','Total Volume')
display(transform)

year,month,Region,AveragePrice,Total Volume
2015,12,ALBANY,1.33,64236.62
2015,12,ALBANY,1.35,54876.98
2015,12,ALBANY,0.93,118220.22
2015,12,ALBANY,1.08,78992.15
2015,11,ALBANY,1.28,51039.6
2015,11,ALBANY,1.26,55979.78
2015,11,ALBANY,0.99,83453.76
2015,11,ALBANY,0.98,109428.33
2015,11,ALBANY,1.02,99811.42
2015,10,ALBANY,1.07,74338.76


In [6]:
from pyspark.sql.functions import year, month, dayofmonth, quarter

timeseries = (transform.select('year','month','Region', 'AveragePrice', 'Total Volume')
                        .groupBy('year','month','Region')
                        .agg( sum(col('AveragePrice')).alias('AveragePrice')
                             ,sum(col("Total Volume")).alias("Total Volume")
                            )
             )

In [7]:
windowRegionItem = Window.partitionBy("Region")
timeseries = (timeseries
              .withColumn("yearmonth",concat("year","month"))
              .withColumn("numMonths",approx_count_distinct("yearmonth").over(windowRegionItem))
             ).drop("yearmonth")

In [8]:
#display(timeseries.groupBy("Country").agg(count("MajorClassification").alias("numRows"),first("numMonths").alias("numMonths")))

In [9]:
regionList = (timeseries
               .select("Region")
               .distinct()
               .rdd
               .map(lambda x: x[0]).collect()
              )
print("# Items in regionsList:",len(regionList))
N = len(regionList)

In [10]:
ts = timeseries.orderBy("Region","year","month").toPandas()
fig, ax = plt.subplots(figsize=(20,4*N))
gr = sns.FacetGrid(ts, col="Region", hue="year", col_wrap=4, height=2, aspect = 3, sharey=False)
gr = gr.map(sns.scatterplot, "month", "AveragePrice", ci=None, legend="full", palette="ch:r=-.2,d=.3_r")
display(gr.fig)

In [11]:
ts = timeseries.orderBy("Region","year","month").toPandas()
fig, ax = plt.subplots(figsize=(6.5, 6.5))
gr = sns.despine(fig, left=True, bottom=True)
#clarity_ranking = ["I1", "SI2", "SI1", "VS2", "VS1", "VVS2", "VVS1", "IF"]
gr = sns.scatterplot(x="month", y="AveragePrice",
                hue="year", #size="AveragePrice",
                palette="ch:r=-.2,d=.3_r",
                #hue_order=clarity_ranking,
                sizes=(1, 8), linewidth=0,
                data=ts, ax=ax)
display(gr)

In [12]:
ts = timeseries.orderBy("Region","year","month").toPandas()
pal = {2015:'orange',2016:'blue',2017:'green',2018:'yellow',}
# set figure size based on number of countries, N
fig, ax = plt.subplots(figsize=(20,4*N))
g = sns.FacetGrid(ts,col="Region", col_wrap=4, height=2, aspect = 3, sharey=False)
g = g.map(sns.pointplot, "month", "AveragePrice", "year", ci=None ,legend="full",palette=pal,order=[1,2,3,4,5,6,7,8,9,10,11,12])
g.add_legend(title="Year", label_order=['2015','2016','2017','2018'])
display(g.fig)

In [13]:
# This dict assigns colors to years
pal = {2011:'fuchsia',2012:'dodgerblue',2013:'lime',2014:'black',2015:'orange',2016:'blue',2017:'green',2018:'yellow',2019:'red',2020:'purple',}
N = len(countryList)
fig, ax = plt.subplots(N,1,figsize=(15,4*N))
i = 0
for cnty in countryList:
  i += 1
  ts = timeseries.where(col("Country")==cnty).toPandas()
  plt.subplot(N,1,i)
  (sns.pointplot(x='Month', y='SalesQty', data=ts, ci=None, hue='Year', legend='full', palette=pal, height=4, aspect=8,order=[1,2,3,4,5,6,7,8,9,10,11,12])
     .legend(title="Year", loc='center right', bbox_to_anchor=(1.10, 0.5), ncol=1)
  )
  plt.title(cnty)
#   Add these label and tick settings to remove the x-axis label, tick marks and tick labels
#   plt.xlabel('')
#   plt.setp(plt.gcf().get_axes(),xticks=[])
  fig.tight_layout(pad=3.0)

display(fig)

In [14]:
df.sort("AveragePrice").explain()

In [15]:
# Create a temp table

temp_table_name = "avocado_csv"

df.createOrReplaceTempView(temp_table_name)

In [16]:
spark.sql("Select max(AveragePrice) from avocado_csv").take(1)

In [17]:
from pyspark.sql.functions import max, min
df.select(min("AveragePrice")).take(1)

In [18]:
maxsql = spark.sql("Select region, sum(4046) as RegionBagTotal from avocado_csv group by region order by sum(4046) asc")
maxsql.show()

In [19]:
# The following call takes all columns (df.columns) and casts them using Spark SQL to a numeric type (DoubleType).
#from pyspark.sql.functions import col  # for indicating a column using a string in the line below
#df = df.select([col(c).cast("double").alias(c) for c in df.columns])
#df.printSchema()

In [20]:
#choosing the intial fields to screen as potential features

df = df[['Date','AveragePrice','Total Bags','Small Bags','Large Bags','XLarge Bags','Type','Year','Region']]

In [21]:
df[['Date','AveragePrice','Total Bags','Small Bags','Large Bags','XLarge Bags','Type','Year','Region']].explain()

In [22]:
#convert the Date field from a TimeStamp to a Date

from pyspark.sql.types import DateType

df = df.withColumn("Date",df['Date'].cast(DateType()))
display(df)

Date,AveragePrice,Total Bags,Small Bags,Large Bags,XLarge Bags,Type,Year,Region
2015-12-27,1.33,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
2015-12-20,1.35,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2015-12-13,0.93,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
2015-12-06,1.08,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
2015-11-29,1.28,6183.95,5986.26,197.69,0.0,conventional,2015,Albany
2015-11-22,1.26,6683.91,6556.47,127.44,0.0,conventional,2015,Albany
2015-11-15,0.99,8318.86,8196.81,122.05,0.0,conventional,2015,Albany
2015-11-08,0.98,6829.22,6266.85,562.37,0.0,conventional,2015,Albany
2015-11-01,1.02,11388.36,11104.53,283.83,0.0,conventional,2015,Albany
2015-10-25,1.07,8625.92,8061.47,564.45,0.0,conventional,2015,Albany


In [23]:
#Identify and count null values
from pyspark.sql.functions import col
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [24]:
#create a list of the nulls discovered in the previous function

null_columns_count_list = null_value_count(df)

In [25]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

In [26]:
%sql
select Type, count(1) from avocado_csv group by Type

Type,count(1)
organic,9123
conventional,9126


In [27]:
#creating a new dataframe after splitting the date field

from pyspark.sql.functions import year, month, dayofmonth, quarter

dg = df.select(year("Date").alias('year'),month("Date").alias('month'),dayofmonth("Date").alias('dayofmonth'),"AveragePrice","Type","Total Bags","Small Bags","Large Bags","XLarge Bags","Region")
display(dg)

year,month,dayofmonth,AveragePrice,Type,Total Bags,Small Bags,Large Bags,XLarge Bags,Region
2015,12,27,1.33,conventional,8696.87,8603.62,93.25,0.0,Albany
2015,12,20,1.35,conventional,9505.56,9408.07,97.49,0.0,Albany
2015,12,13,0.93,conventional,8145.35,8042.21,103.14,0.0,Albany
2015,12,6,1.08,conventional,5811.16,5677.4,133.76,0.0,Albany
2015,11,29,1.28,conventional,6183.95,5986.26,197.69,0.0,Albany
2015,11,22,1.26,conventional,6683.91,6556.47,127.44,0.0,Albany
2015,11,15,0.99,conventional,8318.86,8196.81,122.05,0.0,Albany
2015,11,8,0.98,conventional,6829.22,6266.85,562.37,0.0,Albany
2015,11,1,1.02,conventional,11388.36,11104.53,283.83,0.0,Albany
2015,10,25,1.07,conventional,8625.92,8061.47,564.45,0.0,Albany


In [28]:
#change the type column to be numeric

dg = dg.replace("conventional","0")
dg = dg.replace("organic","1")

from pyspark.sql.types import FloatType, IntegerType

dg = dg.withColumn("Type",dg['Type'].cast(IntegerType()))
display(dg)

year,month,dayofmonth,AveragePrice,Type,Total Bags,Small Bags,Large Bags,XLarge Bags,Region
2015,12,27,1.33,0,8696.87,8603.62,93.25,0.0,Albany
2015,12,20,1.35,0,9505.56,9408.07,97.49,0.0,Albany
2015,12,13,0.93,0,8145.35,8042.21,103.14,0.0,Albany
2015,12,6,1.08,0,5811.16,5677.4,133.76,0.0,Albany
2015,11,29,1.28,0,6183.95,5986.26,197.69,0.0,Albany
2015,11,22,1.26,0,6683.91,6556.47,127.44,0.0,Albany
2015,11,15,0.99,0,8318.86,8196.81,122.05,0.0,Albany
2015,11,8,0.98,0,6829.22,6266.85,562.37,0.0,Albany
2015,11,1,1.02,0,11388.36,11104.53,283.83,0.0,Albany
2015,10,25,1.07,0,8625.92,8061.47,564.45,0.0,Albany


In [29]:
#count the number of distinct regions

from pyspark.sql.functions import countDistinct


dgregions = dg.groupBy("year").agg(countDistinct("Region"))
dgregions.show()

In [30]:
#drop the breakdown of bags as they are the sum of the label column
dg = dg.drop("Small Bags","Large Bags","XLarge Bags")
display(dg)

year,month,dayofmonth,AveragePrice,Type,Total Bags,Region
2015,12,27,1.33,0,8696.87,Albany
2015,12,20,1.35,0,9505.56,Albany
2015,12,13,0.93,0,8145.35,Albany
2015,12,6,1.08,0,5811.16,Albany
2015,11,29,1.28,0,6183.95,Albany
2015,11,22,1.26,0,6683.91,Albany
2015,11,15,0.99,0,8318.86,Albany
2015,11,8,0.98,0,6829.22,Albany
2015,11,1,1.02,0,11388.36,Albany
2015,10,25,1.07,0,8625.92,Albany


In [31]:
from pyspark.sql.functions import avg, stddev, col
dg1 = dg.select("Region","Type","year","month","AveragePrice").groupBy("Region","year","Type").agg(avg(col("AveragePrice")).alias('mean'),stddev(col("AveragePrice")).alias('stdev'))

In [32]:
dg1 = dg1.withColumn('cov',col("stdev")/col("mean")).na.drop()

In [33]:
display(dg1)

Region,year,Type,mean,stdev,cov
Charlotte,2015,0,1.1484615384615382,0.0930609038481665,0.0810309276641772
Denver,2017,0,1.2181132075471697,0.1966063678029469,0.1614023775333982
MiamiFtLauderdale,2016,1,1.4024999999999996,0.1185347636606174,0.0845167655334171
Midsouth,2017,0,1.325471698113208,0.163337381346518,0.1232296257845615
NorthernNewEngland,2015,1,1.833653846153846,0.1293123418302481,0.0705216756704027
NewYork,2016,1,2.138076923076924,0.210601737898891,0.0985005430000209
HarrisburgScranton,2018,1,1.4708333333333334,0.0596136551369556,0.040530530404729
Orlando,2016,0,1.1094230769230768,0.2155104607036508,0.1942545320955078
California,2016,1,1.6136538461538463,0.3098578348663373,0.1920224933029381
Syracuse,2015,1,1.7217307692307693,0.1326165215189601,0.0770251214004906


In [34]:
import matplotlib.pyplot as plt
import seaborn as sns

sns.set(rc={'figure.figsize':(11, 4)})
display(dg1.select("year","Region","Total Bags").groupBy("Region","year").agg(avg(col("Total Bags")).alias('meanbags')))

In [35]:
display(dg1.select("year","Region","stdev"))

#.groupBy("Region","year")).agg(avg(col("stdev")).alias('stdev1')))

year,Region,stdev
2015,Charlotte,0.0930609038481665
2017,Denver,0.1966063678029469
2016,MiamiFtLauderdale,0.1185347636606174
2017,Midsouth,0.163337381346518
2015,NorthernNewEngland,0.1293123418302481
2016,NewYork,0.210601737898891
2018,HarrisburgScranton,0.0596136551369556
2016,Orlando,0.2155104607036508
2016,California,0.3098578348663373
2015,Syracuse,0.1326165215189601


In [36]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Region", outputCol="RegionIndex")
indexed = indexer.fit(dg).transform(dg)
indexed.show()

In [37]:
dgws = indexed.drop("Region")

In [38]:
# Split the dataset randomly into 80% for training and 20% for testing
train, test = dgws.randomSplit([0.8, 0.2])

In [39]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer

featuresCols = dgws.columns
featuresCols.remove('AveragePrice')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features")

In [40]:
from pyspark.ml.regression import GBTRegressor
# Takes the "features" column and learns to predict Total Bags
gbt = GBTRegressor(labelCol="AveragePrice")

In [41]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# To get the highest accuracy, try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [20, 150])\
  .addGrid(gbt.maxBins,[54])\
  .build()
# Define an evaluation metric.  This tells CrossValidator how well it's doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="mae", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [42]:
#tie together the feature processing and pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [43]:
import mlflow

In [44]:
pipelineModel = pipeline.fit(train)

In [45]:
predictions = pipelineModel.transform(test)

In [46]:
display(predictions.select("AveragePrice", "prediction", *featuresCols))

AveragePrice,prediction,year,month,dayofmonth,Type,Total Bags,RegionIndex
0.85,0.8304651931362397,2015,1,4,0,255504.05,42.0
0.94,0.9390538469739436,2015,1,4,0,36852.99,17.0
0.95,1.0136663926411156,2015,1,4,0,4498940.02,50.0
0.97,1.1633729778190338,2015,1,4,0,86680.79,4.0
0.97,1.0901239817484851,2015,1,4,0,111554.96,38.0
1.0,1.0529037837717978,2015,1,4,0,46815.79,2.0
1.01,1.0712225115419174,2015,1,4,0,7755.62,49.0
1.01,1.1227084174981423,2015,1,4,0,68870.22,33.0
1.05,1.0212224243315622,2015,1,4,0,39600.08,52.0
1.08,1.0972916533953894,2015,1,4,0,141136.68,0.0


In [47]:
MAE = evaluator.evaluate(predictions)
print("MAE on our test set: %g" % MAE)

In [48]:
display(predictions.select("AveragePrice","prediction"))

AveragePrice,prediction
0.85,0.8304651931362397
0.94,0.9390538469739436
0.95,1.0136663926411156
0.97,1.1633729778190338
0.97,1.0901239817484851
1.0,1.0529037837717978
1.01,1.0712225115419174
1.01,1.1227084174981423
1.05,1.0212224243315622
1.08,1.0972916533953894


In [49]:
ts = timeseries.orderBy("Region","year","month").toPandas()
fig, ax = plt.subplots(figsize=(20,4*N))
gr = sns.FacetGrid(ts, col="Region", hue="year", col_wrap=4, height=2, aspect = 3, sharey=False)
gr = gr.map(sns.scatterplot, "month", "AveragePrice", ci=None, legend="full", palette="ch:r=-.2,d=.3_r")
gr.add_legend(title="year", label_order=['2015','2016','2017','2018'])
display(gr.fig)