 # Crop yield prediction using machine learning model 

Crop yield prediction is an important agricultural problem. The Agricultural yield primarily depends on weather conditions (rain, temperature, etc), pesticides and accurate information about history of crop yield is an important thing for making decisions related to agricultural risk management and future predictions. The basic ingredients that sustain humans are similar. We eat a lot of corn, wheat, rice and other simple crops. In this project the prediction of top 10 most consumed yields all over the world is established by applying machine learning techniques. It consist of 10 most consumed crops. It is a regression problem

These corps include :

Cassava, Maize, Plantains and others, Potatoes, Rice, paddy, Sorghum, Soybeans, Sweet potatoes, Wheat, Yams.

In [3]:
import findspark              # importing apache spark
findspark.init()

In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.getOrCreate()             # building spark session

Crops Yield Data:

After importing required libraries, crops yield of ten most consumed crops around the world was downloaded from FAO webiste.The collected data include country, item, year starting from 1961 to 2016 and yield value.

In [8]:
df = spark.read.csv('yield_df.csv',header=True, inferSchema=True)
df.show()        # displaying top 20 rows

+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|_c0|   Area|       Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|  0|Albania|      Maize|1990|      36613|                       1485.0|            121.0|   16.37|
|  1|Albania|   Potatoes|1990|      66667|                       1485.0|            121.0|   16.37|
|  2|Albania|Rice, paddy|1990|      23333|                       1485.0|            121.0|   16.37|
|  3|Albania|    Sorghum|1990|      12500|                       1485.0|            121.0|   16.37|
|  4|Albania|   Soybeans|1990|       7000|                       1485.0|            121.0|   16.37|
|  5|Albania|      Wheat|1990|      30197|                       1485.0|            121.0|   16.37|
|  6|Albania|      Maize|1991|      29068|                       1485.0|            121.0|   15.36|


In [9]:
df.show(5)        #displaying top 5 rows

+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|_c0|   Area|       Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|  0|Albania|      Maize|1990|      36613|                       1485.0|            121.0|   16.37|
|  1|Albania|   Potatoes|1990|      66667|                       1485.0|            121.0|   16.37|
|  2|Albania|Rice, paddy|1990|      23333|                       1485.0|            121.0|   16.37|
|  3|Albania|    Sorghum|1990|      12500|                       1485.0|            121.0|   16.37|
|  4|Albania|   Soybeans|1990|       7000|                       1485.0|            121.0|   16.37|
+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
only showing top 5 rows



displaying datatypes

In [10]:
df.dtypes           

[('_c0', 'int'),
 ('Area', 'string'),
 ('Item', 'string'),
 ('Year', 'int'),
 ('hg/ha_yield', 'int'),
 ('average_rain_fall_mm_per_year', 'double'),
 ('pesticides_tonnes', 'double'),
 ('avg_temp', 'double')]

Displaying  total count

In [11]:
df.count()    

28242

 We can drop invalid rows while reading the dataset by setting the read mode as “DROPMALFORMED”

In [12]:
df=spark.read.option('header',True).option('mode','DROPMALFORMED').csv('yield_df.csv')

 Filling null values with 0

In [13]:
df.fillna(value=0).show()      

+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|_c0|   Area|       Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+---+-------+-----------+----+-----------+-----------------------------+-----------------+--------+
|  0|Albania|      Maize|1990|      36613|                       1485.0|            121.0|   16.37|
|  1|Albania|   Potatoes|1990|      66667|                       1485.0|            121.0|   16.37|
|  2|Albania|Rice, paddy|1990|      23333|                       1485.0|            121.0|   16.37|
|  3|Albania|    Sorghum|1990|      12500|                       1485.0|            121.0|   16.37|
|  4|Albania|   Soybeans|1990|       7000|                       1485.0|            121.0|   16.37|
|  5|Albania|      Wheat|1990|      30197|                       1485.0|            121.0|   16.37|
|  6|Albania|      Maize|1991|      29068|                       1485.0|            121.0|   15.36|


Checking null values or not


In [14]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+---+----+----+----+-----------+-----------------------------+-----------------+--------+
|_c0|Area|Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+---+----+----+----+-----------+-----------------------------+-----------------+--------+
|  0|   0|   0|   0|          0|                            0|                0|       0|
+---+----+----+----+-----------+-----------------------------+-----------------+--------+



Find count of null values for selected columns


In [15]:
df_Columns=["Area","Item"]
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_Columns]
   ).show()

+----+----+
|Area|Item|
+----+----+
|   0|   0|
+----+----+



We can select a specific column for analysis purpose, by passing argument count in the show we can select limited record

In [16]:
df.select('Area','Item','hg/ha_yield').show(5)

+-------+-----------+-----------+
|   Area|       Item|hg/ha_yield|
+-------+-----------+-----------+
|Albania|      Maize|      36613|
|Albania|   Potatoes|      66667|
|Albania|Rice, paddy|      23333|
|Albania|    Sorghum|      12500|
|Albania|   Soybeans|       7000|
+-------+-----------+-----------+
only showing top 5 rows



Describing the dataset using toPandas function

In [17]:
df.describe().toPandas()

Unnamed: 0,summary,_c0,Area,Item,Year,hg/ha_yield,average_rain_fall_mm_per_year,pesticides_tonnes,avg_temp
0,count,28242.0,28242,28242,28242.0,28242.0,28242.0,28242.0,28242.0
1,mean,14120.5,,,2001.544295729764,77053.33209404434,1149.055980454642,37076.909343529136,20.542626584519557
2,stddev,8152.90748751634,,,7.0519052853951205,84956.61289666739,709.8121499492227,59958.78466505776,6.312050836049751
3,min,0.0,Albania,Cassava,1990.0,100.0,1010.0,0.04,1.3
4,max,9999.0,Zimbabwe,Yams,2013.0,9999.0,92.0,999.0,9.99


It is used useful in retrieving all the elements of the row from each partition

In [18]:
df.groupBy('Area').count().show()     

+---------+-----+
|     Area|count|
+---------+-----+
|  Senegal|  138|
|   Sweden|   46|
|   Guyana|  124|
|  Eritrea|   80|
| Malaysia|   93|
|   Turkey|  625|
|   Malawi|  171|
|     Iraq|  276|
|  Germany|  270|
|   Rwanda|  207|
|    Sudan|   28|
|   France|  138|
|   Greece|  161|
|Sri Lanka|  184|
|  Algeria|  114|
|Argentina|  368|
|   Angola|  164|
|  Belgium|   39|
|  Ecuador|  621|
|    Qatar|   69|
+---------+-----+
only showing top 20 rows



Grouping any two column features 

In [19]:
df.groupBy("Area","Item").avg().collect()

[Row(Area='Greece', Item='Rice, paddy'),
 Row(Area='Madagascar', Item='Rice, paddy'),
 Row(Area='Albania', Item='Sorghum'),
 Row(Area='Central African Republic', Item='Plantains and others'),
 Row(Area='Croatia', Item='Maize'),
 Row(Area='India', Item='Sweet potatoes'),
 Row(Area='El Salvador', Item='Maize'),
 Row(Area='Niger', Item='Sweet potatoes'),
 Row(Area='Spain', Item='Sorghum'),
 Row(Area='Latvia', Item='Wheat'),
 Row(Area='El Salvador', Item='Plantains and others'),
 Row(Area='Angola', Item='Maize'),
 Row(Area='Jamaica', Item='Plantains and others'),
 Row(Area='Mozambique', Item='Maize'),
 Row(Area='Nepal', Item='Soybeans'),
 Row(Area='Zimbabwe', Item='Maize'),
 Row(Area='Bangladesh', Item='Sorghum'),
 Row(Area='Brazil', Item='Wheat'),
 Row(Area='Saudi Arabia', Item='Potatoes'),
 Row(Area='Argentina', Item='Cassava'),
 Row(Area='Iraq', Item='Sorghum'),
 Row(Area='Japan', Item='Yams'),
 Row(Area='Namibia', Item='Wheat'),
 Row(Area='Turkey', Item='Wheat'),
 Row(Area='Zimbabwe', 

importing different types and functions 

In [20]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [21]:
import pyspark.sql.functions as F

In [22]:
df.select('Area').show(5)

+-------+
|   Area|
+-------+
|Albania|
|Albania|
|Albania|
|Albania|
|Albania|
+-------+
only showing top 5 rows



filtering

In [23]:
df.filter(F.col('Item')=='Maize').show(5)     


+---+-------+-----+----+-----------+-----------------------------+-----------------+--------+
|_c0|   Area| Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+---+-------+-----+----+-----------+-----------------------------+-----------------+--------+
|  0|Albania|Maize|1990|      36613|                       1485.0|            121.0|   16.37|
|  6|Albania|Maize|1991|      29068|                       1485.0|            121.0|   15.36|
| 12|Albania|Maize|1992|      24876|                       1485.0|            121.0|   16.06|
| 18|Albania|Maize|1993|      24185|                       1485.0|            121.0|   16.05|
| 23|Albania|Maize|1994|      25848|                       1485.0|            201.0|   16.96|
+---+-------+-----+----+-----------+-----------------------------+-----------------+--------+
only showing top 5 rows



multiple filtering

In [24]:
df.where(F.col('Item')=='Maize').where(F.col("Area")=="India").show(5)     

+-----+-----+-----+----+-----------+-----------------------------+-----------------+--------+
|  _c0| Area| Item|Year|hg/ha_yield|average_rain_fall_mm_per_year|pesticides_tonnes|avg_temp|
+-----+-----+-----+----+-----------+-----------------------------+-----------------+--------+
|10524|India|Maize|1990|      15178|                       1083.0|          75000.0|   25.58|
|10525|India|Maize|1990|      15178|                       1083.0|          75000.0|   26.88|
|10526|India|Maize|1990|      15178|                       1083.0|          75000.0|   25.79|
|10527|India|Maize|1990|      15178|                       1083.0|          75000.0|    24.1|
|10528|India|Maize|1990|      15178|                       1083.0|          75000.0|   25.25|
+-----+-----+-----+----+-----------+-----------------------------+-----------------+--------+
only showing top 5 rows



Renaming some columns 

In [25]:
df = df.withColumnRenamed("hg/ha_yield","Value") \
    .withColumnRenamed("average_rain_fall_mm_per_year","AvgRain") \
    .withColumnRenamed("pesticides_tonnes","PTonnes")
df.show()

+---+-------+-----------+----+-----+-------+-------+--------+
|_c0|   Area|       Item|Year|Value|AvgRain|PTonnes|avg_temp|
+---+-------+-----------+----+-----+-------+-------+--------+
|  0|Albania|      Maize|1990|36613| 1485.0|  121.0|   16.37|
|  1|Albania|   Potatoes|1990|66667| 1485.0|  121.0|   16.37|
|  2|Albania|Rice, paddy|1990|23333| 1485.0|  121.0|   16.37|
|  3|Albania|    Sorghum|1990|12500| 1485.0|  121.0|   16.37|
|  4|Albania|   Soybeans|1990| 7000| 1485.0|  121.0|   16.37|
|  5|Albania|      Wheat|1990|30197| 1485.0|  121.0|   16.37|
|  6|Albania|      Maize|1991|29068| 1485.0|  121.0|   15.36|
|  7|Albania|   Potatoes|1991|77818| 1485.0|  121.0|   15.36|
|  8|Albania|Rice, paddy|1991|28538| 1485.0|  121.0|   15.36|
|  9|Albania|    Sorghum|1991| 6667| 1485.0|  121.0|   15.36|
| 10|Albania|   Soybeans|1991| 6066| 1485.0|  121.0|   15.36|
| 11|Albania|      Wheat|1991|20698| 1485.0|  121.0|   15.36|
| 12|Albania|      Maize|1992|24876| 1485.0|  121.0|   16.06|
| 13|Alb

In [26]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType, IntegerType, DoubleType


imported different functions for changing datatypes 

In [27]:
df = df.withColumn("Value",col("Value").cast(IntegerType())) \
    .withColumn("AvgRain",col("AvgRain").cast(DoubleType())) \
     .withColumn("_c0",col("_c0").cast(IntegerType())) \
     .withColumn("avg_temp",col("avg_temp").cast(DoubleType())) \
     .withColumn("Year",col("Year").cast(IntegerType())) \
    .withColumn("PTonnes",col("PTonnes").cast(DoubleType()))

In [28]:
df.dtypes

[('_c0', 'int'),
 ('Area', 'string'),
 ('Item', 'string'),
 ('Year', 'int'),
 ('Value', 'int'),
 ('AvgRain', 'double'),
 ('PTonnes', 'double'),
 ('avg_temp', 'double')]

In [29]:
df.show()

+---+-------+-----------+----+-----+-------+-------+--------+
|_c0|   Area|       Item|Year|Value|AvgRain|PTonnes|avg_temp|
+---+-------+-----------+----+-----+-------+-------+--------+
|  0|Albania|      Maize|1990|36613| 1485.0|  121.0|   16.37|
|  1|Albania|   Potatoes|1990|66667| 1485.0|  121.0|   16.37|
|  2|Albania|Rice, paddy|1990|23333| 1485.0|  121.0|   16.37|
|  3|Albania|    Sorghum|1990|12500| 1485.0|  121.0|   16.37|
|  4|Albania|   Soybeans|1990| 7000| 1485.0|  121.0|   16.37|
|  5|Albania|      Wheat|1990|30197| 1485.0|  121.0|   16.37|
|  6|Albania|      Maize|1991|29068| 1485.0|  121.0|   15.36|
|  7|Albania|   Potatoes|1991|77818| 1485.0|  121.0|   15.36|
|  8|Albania|Rice, paddy|1991|28538| 1485.0|  121.0|   15.36|
|  9|Albania|    Sorghum|1991| 6667| 1485.0|  121.0|   15.36|
| 10|Albania|   Soybeans|1991| 6066| 1485.0|  121.0|   15.36|
| 11|Albania|      Wheat|1991|20698| 1485.0|  121.0|   15.36|
| 12|Albania|      Maize|1992|24876| 1485.0|  121.0|   16.06|
| 13|Alb

Grouping different columns with their distinct feature values count 

In [30]:
import pyspark.sql.functions as func 
df.groupby('Area').agg(func.expr('count(Item)')\
                              .alias('Distinct_items')).show(10)

+--------+--------------+
|    Area|Distinct_items|
+--------+--------------+
| Senegal|           138|
|  Sweden|            46|
|  Guyana|           124|
| Eritrea|            80|
|Malaysia|            93|
|  Turkey|           625|
|  Malawi|           171|
|    Iraq|           276|
| Germany|           270|
|  Rwanda|           207|
+--------+--------------+
only showing top 10 rows



using agg function displaying mean value

In [31]:
df.agg({'Value':'mean'}).collect()

[Row(avg(Value)=77053.33209404434)]


converting categorical data into numerical data using stringindexer function

In [32]:
from pyspark.ml.feature import StringIndexer        # indexing according unique values

In [33]:
indexer=StringIndexer(inputCol='Item',
                      outputCol='ItemIndex')
df=indexer.fit(df).transform(df)
df.show(5)

+---+-------+-----------+----+-----+-------+-------+--------+---------+
|_c0|   Area|       Item|Year|Value|AvgRain|PTonnes|avg_temp|ItemIndex|
+---+-------+-----------+----+-----+-------+-------+--------+---------+
|  0|Albania|      Maize|1990|36613| 1485.0|  121.0|   16.37|      1.0|
|  1|Albania|   Potatoes|1990|66667| 1485.0|  121.0|   16.37|      0.0|
|  2|Albania|Rice, paddy|1990|23333| 1485.0|  121.0|   16.37|      3.0|
|  3|Albania|    Sorghum|1990|12500| 1485.0|  121.0|   16.37|      5.0|
|  4|Albania|   Soybeans|1990| 7000| 1485.0|  121.0|   16.37|      4.0|
+---+-------+-----------+----+-----+-------+-------+--------+---------+
only showing top 5 rows



In [34]:
indexer=StringIndexer(inputCol='Area',
                      outputCol='AreaIndex')
df1=indexer.fit(df).transform(df)
df1.show(5)

+---+-------+-----------+----+-----+-------+-------+--------+---------+---------+
|_c0|   Area|       Item|Year|Value|AvgRain|PTonnes|avg_temp|ItemIndex|AreaIndex|
+---+-------+-----------+----+-----+-------+-------+--------+---------+---------+
|  0|Albania|      Maize|1990|36613| 1485.0|  121.0|   16.37|      1.0|     71.0|
|  1|Albania|   Potatoes|1990|66667| 1485.0|  121.0|   16.37|      0.0|     71.0|
|  2|Albania|Rice, paddy|1990|23333| 1485.0|  121.0|   16.37|      3.0|     71.0|
|  3|Albania|    Sorghum|1990|12500| 1485.0|  121.0|   16.37|      5.0|     71.0|
|  4|Albania|   Soybeans|1990| 7000| 1485.0|  121.0|   16.37|      4.0|     71.0|
+---+-------+-----------+----+-----+-------+-------+--------+---------+---------+
only showing top 5 rows



displaying columns 

In [35]:
df.columns


['_c0',
 'Area',
 'Item',
 'Year',
 'Value',
 'AvgRain',
 'PTonnes',
 'avg_temp',
 'ItemIndex']

In [34]:
df.dtypes

[('_c0', 'int'),
 ('Area', 'string'),
 ('Item', 'string'),
 ('Year', 'int'),
 ('Value', 'int'),
 ('AvgRain', 'double'),
 ('PTonnes', 'double'),
 ('avg_temp', 'double'),
 ('ItemIndex', 'double')]




feature engineering using vectorassembler 

In [37]:
from pyspark.ml.feature import VectorAssembler

In [38]:
featureassembler=VectorAssembler(inputCols=['ItemIndex','Value'], outputCol='Independent feature')


In [39]:
output=featureassembler.transform(df1)
output.show()

+---+-------+-----------+----+-----+-------+-------+--------+---------+---------+-------------------+
|_c0|   Area|       Item|Year|Value|AvgRain|PTonnes|avg_temp|ItemIndex|AreaIndex|Independent feature|
+---+-------+-----------+----+-----+-------+-------+--------+---------+---------+-------------------+
|  0|Albania|      Maize|1990|36613| 1485.0|  121.0|   16.37|      1.0|     71.0|      [1.0,36613.0]|
|  1|Albania|   Potatoes|1990|66667| 1485.0|  121.0|   16.37|      0.0|     71.0|      [0.0,66667.0]|
|  2|Albania|Rice, paddy|1990|23333| 1485.0|  121.0|   16.37|      3.0|     71.0|      [3.0,23333.0]|
|  3|Albania|    Sorghum|1990|12500| 1485.0|  121.0|   16.37|      5.0|     71.0|      [5.0,12500.0]|
|  4|Albania|   Soybeans|1990| 7000| 1485.0|  121.0|   16.37|      4.0|     71.0|       [4.0,7000.0]|
|  5|Albania|      Wheat|1990|30197| 1485.0|  121.0|   16.37|      2.0|     71.0|      [2.0,30197.0]|
|  6|Albania|      Maize|1991|29068| 1485.0|  121.0|   15.36|      1.0|     71.0| 