In [0]:
# importing spark library to start the session
from pyspark.sql import SparkSession
import random

In [0]:
#Building spark session to start it
spark = SparkSession.builder.appName("project").getOrCreate()

In [0]:
# Load the data set
data = spark.read.csv("/FileStore/tables/project.csv",inferSchema=True,header=True)
data.show()

In [0]:
#Schema of attributes
data.printSchema()

In [0]:
#Schema of attributes
data.columns

In [0]:
#Checking the missing values in all the attributes
from pyspark.sql.functions import *
from pyspark.sql.functions import when, count, col
df = data.select(['Country',
 'mkt_id',
 'Market',
 'CommodityGroup',
 'Commodity',
 'Year',
 'Month',
 'PriceTrend',
 'PEWI',
 'ALPS',
 'UnitOfMeasure',
 'PriceType',
 'Currency',
 'sn'])
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [0]:
#Dropping of Somalia country as it has a very volatile currency and it's affecting the entire prediction. 
data=data.where(data.Country!="Somalia")
data=data.where(data.Year!="1990")
data=data.where(data.Year!="1991")
data=data.where(data.Year!="1992")
data=data.where(data.Year!="1993")
data=data.where(data.Year!="1994")
data=data.where(data.Year!="1995")
data=data.where(data.Year!="1996")
data=data.where(data.Year!="1997")
data=data.where(data.Year!="1998")
data=data.where(data.Year!="1999")
data=data.where(data.Year!="2000")
data=data.where(data.Year!="2001")
data=data.where(data.Year!="2002")
data.show()

In [0]:
#Removing negative value in our dependent variable PriceTrend
import pyspark.sql.functions as F

data = data.withColumn("only_positive_price_trend", F.when(F.col("PriceTrend") > 0, F.col("PriceTrend")).otherwise(0))

In [0]:
data.show()

In [0]:
# dimensions of the dataframe
print("Number of Rows: ",data.count() ,"   Number of Columns: ", len(data.columns))

In [0]:
#All the PriceTrend are in the local currencies. Therefore we need to convert them into a single currency i.e. USD. Using the fixer.io API to scrape the latest rates of different currencies
import requests

url = 'http://data.fixer.io/api/latest?access_key=f874f54530d940862712d9c04ba6be4f&base=USD'
response = requests.get(url)
response=response.json()
print(response)

In [0]:
# importing the currency value of each currency in United States Dollar (USD)
rate = spark.read.csv("/FileStore/tables/currency_rate_USD.csv",inferSchema=True,header=True)
rate.show()

In [0]:
#Converting United States Dollar (USD) in Local Currency i.e. 1 Currency Rate = (1/Currency Rate)USD
import pyspark.sql.functions as F

rate = rate.withColumn("rate_USD_in_local_currency", 1/F.col("Rate ( 1USD = Currency Rate)"))
rate.show()

In [0]:
#Joining the data table and the rate conversion table by defining respective variable
ta = data.alias('ta')
tb = rate.alias('tb')

In [0]:
#using join to join the two tables
data = ta.join(tb, ta.Currency == tb.Currency)
data.show()

In [0]:
data.printSchema()

In [0]:
#Creating a new column to convert the existing attribute PriceTrend in USD
data = data.withColumn("price_trend_USD", F.col("only_positive_price_trend") * F.col("rate_USD_in_local_currency"))
data.show()

In [0]:
#Dropping the columns which are of no use now
columns_to_drop = ['UnitOfMeasure','Currency','sn','PriceTrend','only_positive_price_trend','Rate ( 1USD = Currency Rate)','rate_USD_in_local_currency']
data = data.drop(*columns_to_drop)
data.show()

In [0]:
#Statistical data description
data.describe().show()

In [0]:
# Grouping by CommodityGroup cloumn to check the available types of commodity groups
data.groupBy("CommodityGroup").count().orderBy("CommodityGroup").show()

In [0]:
# Converting CommodityGroup categorical variables into numerical variables
from pyspark.ml.feature import StringIndexer

df = sqlContext.createDataFrame(
    [(0, "cereals and tubers"), (1, "meat, fish and eggs"), (2, "milk and dairy"), (3, "miscellaneous food"), (4, "oil and fats"), (5, "pulses and nuts"),(6,"vegetables and fruits")],
    ["id", "CommodityGroup"])
indexer = StringIndexer(inputCol="CommodityGroup", outputCol="CommodityGroupIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()


In [0]:
#Joining Indexed Table with data table by defining respective variables
ta = data.alias('ta')
tb = indexed.alias('tb')

In [0]:
#using join to join the two tables
data = ta.join(tb, ['CommodityGroup'])
data.show()

In [0]:
# Grouping by ALPS cloumn to check the available types of Alerts
data.groupBy("ALPS").count().orderBy("ALPS").show()

In [0]:
# Converting ALPS categorical variables into numerical variables
df = sqlContext.createDataFrame(
    [(0, "Alert"), (1, "Crisis"), (2, "Normal"), (3, "Stress")],
    ["id", "ALPS"])
indexer = StringIndexer(inputCol="ALPS", outputCol="ALPSIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

In [0]:
#Joining Indexed Table with data table by defining respective variables
ta = data.alias('ta')
tb = indexed.alias('tb')

In [0]:
#using join to join the two tables
data = ta.join(tb, ta.ALPS == tb.ALPS)
data.show()

In [0]:
# Grouping by PriceType column to check the available types of prices
data.groupBy("PriceType").count().orderBy("PriceType").show()

In [0]:
# Converting PriceType categorical variables into numerical variables
df = sqlContext.createDataFrame(
    [(0, "Farm Gate"), (1, "Retail"), (2, "Wholesale")],
    ["id", "PriceType"])
indexer = StringIndexer(inputCol="PriceType", outputCol="PriceTypeIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

In [0]:
#Joining Indexed Table with data table by defining respective variables
ta = data.alias('ta')
tb = indexed.alias('tb')

In [0]:
#using join to join the two tables
data = ta.join(tb, ta.PriceType == tb.PriceType)
data.show()

In [0]:
# Grouping by mkt_id column to check the count of available types of markets
data.groupBy("mkt_id").count().orderBy("mkt_id").show()

In [0]:
#dropping of columns to prepare for predictions
columns_to_drop = ['Commodity', 'PriceTrend', 'Month','tb.CommodityGroup','id', 'ALPS','PriceType','Rate ( 1USD = Currency Rate)', 'rate_USD_in_local_currency']
data = data.drop(*columns_to_drop)
data.show()

In [0]:
data.describe().show()

#Assembling the attributes

In [0]:
#importing the assembly libraries
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [0]:
#Checking the columns
data.columns

In [0]:
#Creating a features attribute to assemble all input columns into one features column
assembler = VectorAssembler(
    inputCols=['mkt_id','Year','PEWI','CommodityGroupIndex',
 'ALPSIndex',
 'PriceTypeIndex'],
    outputCol="features")
data = assembler.transform(data)
data.show()

In [0]:
data.describe().show()

### Splitting the train and test data

In [0]:
#Splitting training and testing dataset
train_set, test_set = data.randomSplit([0.7,.3])

## Linear Regression

In [0]:
#importing the machine learning package
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Fit training data of linear regression model
lreg_model = LinearRegression(featuresCol='features', labelCol='price_trend_USD', predictionCol='prediction')
fitlreg = lreg_model.fit(train_set)

In [0]:
# Print the equation of the developed linear regression
print("Coefficients: {} Intercept: {}".format(fitlreg.coefficients,fitlreg.intercept))

In [0]:
#fitting testing data of linear regression model
test_pred = fitlreg.evaluate(test_set)
test_pred.residuals.show()

In [0]:
#Checking the prediction of the model on test dataset
test_prediction = fitlreg.transform(test_set)
test_prediction.show()

In [0]:
#Time-series plot of Year and its prediction
display(test_prediction.select("Year","prediction").groupBy("Year").agg(avg("prediction")))

Year,avg(prediction)
2003,3.174648585182398
2007,5.210286098353854
2018,5.619931162716901
2015,5.183040761569192
2006,7.520462958948662
2013,5.291867767930904
2014,5.472896143976189
2019,6.325893858035932
2004,1.111674307737377
2020,6.274771670644716


In [0]:
test_prediction.select("Country", "market", "Year","CommodityGroup", "features", "Price_trend_USD", "prediction").show()

In [0]:
#Creating a evaluator to calcuate Root Mean squared Error (RMSE) value using regression evaluator
evaluator = RegressionEvaluator(
    labelCol="price_trend_USD", predictionCol="prediction", metricName="rmse")

In [0]:
rmse = evaluator.evaluate(test_prediction)

In [0]:
#Printing RMSE value
print("Root Mean Squared Error (RMSE) for linear regression on test data = %s" % rmse)

In [0]:
print("Normalized Root Mean Squared Error for Linear Regresssor =", rmse/5.48)

#Decision Tree Regression

In [0]:
#importing ml packages for decision tree regressor
from pyspark.ml.regression import DecisionTreeRegressor

In [0]:
#Running Decision Tree Regression model
dtree_model = DecisionTreeRegressor(featuresCol='features', labelCol='price_trend_USD', predictionCol='prediction')

In [0]:
#fitting decision tree regression model on training dataset
fitdtree = dtree_model.fit(train_set)

In [0]:
#transforming the fit training dataset on the testing dataset
test_prediction = fitdtree.transform(test_set)
test_prediction.show()

In [0]:
#Time-series plot of Year and its prediction
display(test_prediction.select("Year","prediction").groupBy("Year").agg(avg("prediction")))

Year,avg(prediction)
2003,3.3402647765125746
2007,6.211451957304664
2018,5.208887410148361
2015,5.734079096873233
2006,5.862406573763498
2013,6.102258189208782
2014,6.226257801988551
2019,4.95121694963486
2004,2.1168276361490874
2020,4.190289655347351


In [0]:
#Checking the prediction value on the test dataset
test_prediction.select("Country", "market", "Year","CommodityGroup", "features", "Price_trend_USD", "prediction").show()

In [0]:
#Creating a evaluator to calcuate Root Mean squared Error (RMSE) value using regression evaluator
evaluator = RegressionEvaluator(
    labelCol="price_trend_USD", predictionCol="prediction", metricName="rmse")

In [0]:
#Calculating RMSE value
rmse = float(evaluator.evaluate(test_prediction))

In [0]:
#Printing RMSE value
print("Root Mean Squared Error (RMSE) for Decision Tree Regressor on test data = %g" % rmse)

In [0]:
print("Normalized Root Mean Squared Error for Decision Tree Regresssor =", rmse/5.48)

## Random Forest Regression

In [0]:
#importing ml pacakges for Random Forest Regression
from pyspark.ml.regression import RandomForestRegressor

In [0]:
#Running Random Forest Regression model
rforest_model = RandomForestRegressor(labelCol="price_trend_USD", featuresCol="features", predictionCol='prediction')

In [0]:
#fitting Random Forest Regression model on training dataset
fitrforest = rforest_model.fit(train_set)

In [0]:
#transforming the fit training dataset on the testing dataset
test_prediction = fitrforest.transform(test_set)
test_prediction.show()

In [0]:
#Time-series plot of Year and its prediction
display(test_prediction.select("Year","prediction").groupBy("Year").agg(avg("prediction")))

Year,avg(prediction)
2003,4.0080680875900425
2007,6.845389072342009
2018,4.752845392803069
2015,4.8949759013216205
2006,7.141068500942573
2013,5.740365940928743
2014,5.437616382984581
2019,4.6896644260737625
2004,2.7905248500583744
2020,4.088178328632401


In [0]:
#Checking the prediction value on the test dataset
test_prediction.select("Country", "market", "Year","CommodityGroup", "features", "Price_trend_USD", "prediction").show(10000)

In [0]:
#Creating a evaluator to calcuate Root Mean squared Error (RMSE) value using regression evaluator
evaluator = RegressionEvaluator(
    labelCol="price_trend_USD", predictionCol="prediction", metricName="rmse")

In [0]:
#Calculating RMSE value
rmse = float(evaluator.evaluate(test_prediction))

In [0]:
#Printing RMSE value
print("Root Mean Squared Error (RMSE) for Random Forest Regressor on test data = %g" % rmse)

In [0]:
print("Normalized Root Mean Squared Error for Random Forest Regresssor =", rmse/5.48)