In [None]:
%pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=68a55085f9450acef79621cd857cee66c771851ebcaaf34dd3983c629851eb56
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
!pip3 install memory_profiler
%load_ext memory_profiler

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting memory_profiler
  Downloading memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Installing collected packages: memory_profiler
Successfully installed memory_profiler-0.61.0


In [None]:
# spark libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType, TimestampType
from pyspark.sql import functions as F
from pyspark.sql.functions import col,isnan,when,count,lit
from pyspark.ml.feature import Imputer
from pyspark.sql import Window
from pyspark.sql.functions import month,year,dayofmonth, hour, minute, second


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
spark = SparkSession.builder.appName('BigData').getOrCreate()
spark

In [None]:
crop= spark.read.options(delimiter=',').csv('/content/drive/MyDrive/vslab/final/irbidProd.csv', header=True, inferSchema=True)

In [None]:
crop.printSchema()

root
 |-- Crop: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- Production: double (nullable = true)



In [None]:
crop.show()

+---------+------+----+------+----------+
|     Crop|Region|Year|Season|Production|
+---------+------+----+------+----------+
| Tomatoes| Irbid|2017|Summer|   11124.5|
| Tomatoes| Irbid|2018|Summer|   15540.4|
| Tomatoes| Irbid|2019|Summer|    6292.4|
| Tomatoes| Irbid|2020|Summer|    5122.7|
| Tomatoes| Irbid|2021|Summer|    2968.9|
|     Okra| Irbid|2017|Summer|     319.0|
|     Okra| Irbid|2018|Summer|    1003.1|
|     Okra| Irbid|2019|Summer|    1808.7|
|     Okra| Irbid|2020|Summer|    1428.2|
|     Okra| Irbid|2021|Summer|     867.3|
|Onion dry| Irbid|2017|Summer|     294.0|
|Onion dry| Irbid|2018|Summer|     781.9|
|Onion dry| Irbid|2019|Summer|    5616.0|
|Onion dry| Irbid|2020|Summer|    6269.1|
|Onion dry| Irbid|2021|Summer|    7770.8|
| Tomatoes| Irbid|2017|Winter|    9332.2|
| Tomatoes| Irbid|2018|Winter|    1509.9|
| Tomatoes| Irbid|2019|Winter|       0.0|
| Tomatoes| Irbid|2020|Winter|    3060.0|
| Tomatoes| Irbid|2021|Winter|    1046.7|
+---------+------+----+------+----

In [None]:
crop.describe().show()

+-------+--------+------+-----------------+------+-----------------+
|summary|    Crop|Region|             Year|Season|       Production|
+-------+--------+------+-----------------+------+-----------------+
|  count|      30|    30|               30|    30|               30|
|   mean|    null|  null|           2019.0|  null|3672.026666666666|
| stddev|    null|  null|1.438389904456147|  null|3963.238883133542|
|    min|    Okra| Irbid|             2017|Summer|              0.0|
|    max|Tomatoes| Irbid|             2021|Winter|          15540.4|
+-------+--------+------+-----------------+------+-----------------+



In [None]:
irbidSchema = StructType([

 StructField('Region', StringType(), True),
 StructField('Date/Time', TimestampType(),True),
 StructField('AirDewPoint', IntegerType(),True),
 StructField('AirTemperature', IntegerType(),True),
 StructField('Humidity%', IntegerType(),True),
 StructField('ManualPresentWeather', StringType(),True),
 StructField('CloudType', StringType(),True),
 StructField('CloudsCover(Okta)', IntegerType(),True),
 StructField('CloudsCover%', IntegerType(),True),
 StructField('WindDirection(Degrees)', IntegerType(), True),
 StructField('WindSpeed', IntegerType(), True),
 StructField('WindType', StringType(), True)
 ])

irbidDf = spark.read.csv('/content/drive/MyDrive/vslab/final/Irbid.csv', header=True, schema=irbidSchema)

In [None]:
irbidDf.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Date/Time: timestamp (nullable = true)
 |-- AirDewPoint: integer (nullable = true)
 |-- AirTemperature: integer (nullable = true)
 |-- Humidity%: integer (nullable = true)
 |-- ManualPresentWeather: string (nullable = true)
 |-- CloudType: string (nullable = true)
 |-- CloudsCover(Okta): integer (nullable = true)
 |-- CloudsCover%: integer (nullable = true)
 |-- WindDirection(Degrees): integer (nullable = true)
 |-- WindSpeed: integer (nullable = true)
 |-- WindType: string (nullable = true)



In [None]:
irbidDf.show(7)

+------+-------------------+-----------+--------------+---------+--------------------+---------+-----------------+------------+----------------------+---------+--------+
|Region|          Date/Time|AirDewPoint|AirTemperature|Humidity%|ManualPresentWeather|CloudType|CloudsCover(Okta)|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|
+------+-------------------+-----------+--------------+---------+--------------------+---------+-----------------+------------+----------------------+---------+--------+
| Irbid|2017-01-01 06:00:00|          4|             5|       94|                null|  cumulus|                2|          25|                  null|        0|    calm|
| Irbid|2017-01-01 09:00:00|          7|             8|       91|                null|  cumulus|                2|          25|                   300|        3|  normal|
| Irbid|2017-01-01 12:00:00|          6|            11|       72|                null|  cumulus|                3|          38|                   250|

In [None]:
# irbidDf.write.csv('/content/drive/MyDrive/vslab/final/irbidviz.csv')

In [None]:
irbid_weather=irbidDf

In [None]:
#describe data
irbid_weather.describe().show()

+-------+------+-----------------+------------------+-----------------+--------------------+------------+------------------+------------------+----------------------+------------------+--------+
|summary|Region|      AirDewPoint|    AirTemperature|        Humidity%|ManualPresentWeather|   CloudType| CloudsCover(Okta)|      CloudsCover%|WindDirection(Degrees)|         WindSpeed|WindType|
+-------+------+-----------------+------------------+-----------------+--------------------+------------+------------------+------------------+----------------------+------------------+--------+
|  count|  8043|              976|              8029|              976|                  42|         402|               399|              8042|                  5722|              8040|    8042|
|   mean|  null|6.844262295081967|18.917424336779174|49.91188524590164|                null|        null| 3.611528822055138| 2.252300422780403|    234.19259000349527| 1.728731343283582|    null|
| stddev|  null|6.6776571

In [None]:
#dropping columns that have < 5000 count
irbid_weather=irbid_weather.drop('AirDewPoint','Humidity%','ManualPresentWeather','CloudType','CloudsCover(Okta)')

In [None]:
irbid_weather.show(3)

+------+-------------------+--------------+------------+----------------------+---------+--------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|
+------+-------------------+--------------+------------+----------------------+---------+--------+
| Irbid|2017-01-01 06:00:00|             5|          25|                  null|        0|    calm|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|
+------+-------------------+--------------+------------+----------------------+---------+--------+
only showing top 3 rows



In [None]:
numeric_vals = [col for col, dtype in irbid_weather.dtypes if dtype != "string" and dtype != "timestamp"]
df_numeric = irbid_weather.select(numeric_vals)
df_numeric.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_numeric.columns]).show()
print( df_numeric.count())

+--------------+------------+----------------------+---------+
|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|
+--------------+------------+----------------------+---------+
|            14|           1|                  2321|        3|
+--------------+------------+----------------------+---------+

8043


In [None]:
imptr = Imputer(inputCols=['AirTemperature','CloudsCover%','WindSpeed'],
                outputCols=['AirTemperature','CloudsCover%','WindSpeed']).setStrategy('mean')

irbid_weather = imptr.fit(irbid_weather).transform(irbid_weather)

In [None]:
irbid_weather = irbid_weather.fillna(0, subset=["WindDirection(Degrees)"])


In [None]:
numeric_vals = [col for col, dtype in irbid_weather.dtypes if dtype != "string" and dtype != "timestamp"]
df_numeric = irbid_weather.select(numeric_vals)
df_numeric.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_numeric.columns]).show()
print( df_numeric.count())

+--------------+------------+----------------------+---------+
|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|
+--------------+------------+----------------------+---------+
|             0|           0|                     0|        0|
+--------------+------------+----------------------+---------+

8043


In [None]:
irbid_weather = irbid_weather \
    .withColumn("year", year("Date/Time")) \
    .withColumn("month", month("Date/Time")) \
    .withColumn("day", dayofmonth("Date/Time")) \


# Show the resulting DataFrame
irbid_weather.show()

+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|2017|    1|  1|
| Irbid|2017-01-01 15:00:00|             9|          25|                     0|        0|    calm|2017|    1|  1|
| Irbid|2017-01-01 18:00:00|             6|           0|                     0|        0|    calm|2017|    1|  1|
| Irbid|2017-01-02 06:00:00|             7|          38|                   200|        3

In [None]:
# irbidP=irbid_weather.toPandas()
# irbidP.to_csv('irbidViz.csv', index=False)

In [None]:
#i will add a season column
irbid_weather=irbid_weather.withColumn("Season",
   F.when((F.col("month") >=6) & (F.col("month")  <= 9), 'Summer')
    .when((F.col("month") <= 2) | (F.col("month") >= 11), 'Winter')
    .otherwise('Autumn/Spring')

)

irbid_weather.show(3)

+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|Season|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|Winter|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|Winter|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|2017|    1|  1|Winter|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+
only showing top 3 rows



In [None]:
temp1= irbid_weather.withColumn("extrm_temps",
   F.when((F.col("AirTemperature") <= 12) | (F.col("AirTemperature") > 30), 1)
    .otherwise(0)
)

temp1.show(3)
temp1.count()


+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|Season|extrm_temps|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|Winter|          1|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|Winter|          1|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|2017|    1|  1|Winter|          1|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+
only showing top 3 rows



8043

In [None]:
#grouping by day
extrm_temps_days=temp1.groupBy('day',"month","year",'season').agg(F.max('extrm_temps').alias('extrm_temps')).orderBy('year','month','day','season', ascending=True)
extrm_temps_days.show(10)
extrm_temps_days.count()



+---+-----+----+------+-----------+
|day|month|year|season|extrm_temps|
+---+-----+----+------+-----------+
|  1|    1|2017|Winter|          1|
|  2|    1|2017|Winter|          1|
|  3|    1|2017|Winter|          1|
|  4|    1|2017|Winter|          1|
|  5|    1|2017|Winter|          1|
|  6|    1|2017|Winter|          1|
|  7|    1|2017|Winter|          1|
|  8|    1|2017|Winter|          1|
|  9|    1|2017|Winter|          1|
| 10|    1|2017|Winter|          1|
+---+-----+----+------+-----------+
only showing top 10 rows



2040

In [None]:
extrm_temps_years=extrm_temps_days.groupBy('year','season').agg(F.sum('extrm_temps').alias("sum_extrm_temps")).orderBy('year', 'season',ascending=True)
extrm_temps_years.show(10)
extrm_temps_years.count()

+----+-------------+---------------+
|year|       season|sum_extrm_temps|
+----+-------------+---------------+
|2017|Autumn/Spring|             41|
|2017|       Summer|             43|
|2017|       Winter|             60|
|2018|Autumn/Spring|             36|
|2018|       Summer|             60|
|2018|       Winter|             92|
|2019|Autumn/Spring|             75|
|2019|       Summer|             56|
|2019|       Winter|             93|
|2020|Autumn/Spring|             54|
+----+-------------+---------------+
only showing top 10 rows



19

In [None]:
def map_values(test):
    def inner_map(year, season):
        return test.get((year, season), None)
    return F.udf(inner_map)

In [None]:
temp_values = {(row['year'], row['season']): row['sum_extrm_temps'] for row in extrm_temps_years.collect()}

In [None]:
mapping = map_values(temp_values)
#adding new column in crop_worstacase
crop_worstcase = crop.withColumn("extreme_temperatures", mapping(F.col("year"), F.col("season")))
crop_worstcase.show()

+---------+------+----+------+----------+--------------------+
|     Crop|Region|Year|Season|Production|extreme_temperatures|
+---------+------+----+------+----------+--------------------+
| Tomatoes| Irbid|2017|Summer|   11124.5|                  43|
| Tomatoes| Irbid|2018|Summer|   15540.4|                  60|
| Tomatoes| Irbid|2019|Summer|    6292.4|                  56|
| Tomatoes| Irbid|2020|Summer|    5122.7|                  38|
| Tomatoes| Irbid|2021|Summer|    2968.9|                  70|
|     Okra| Irbid|2017|Summer|     319.0|                  43|
|     Okra| Irbid|2018|Summer|    1003.1|                  60|
|     Okra| Irbid|2019|Summer|    1808.7|                  56|
|     Okra| Irbid|2020|Summer|    1428.2|                  38|
|     Okra| Irbid|2021|Summer|     867.3|                  70|
|Onion dry| Irbid|2017|Summer|     294.0|                  43|
|Onion dry| Irbid|2018|Summer|     781.9|                  60|
|Onion dry| Irbid|2019|Summer|    5616.0|              

In [None]:
temp2=temp1.withColumn("bad_wind",
   F.when((F.col("WindSpeed") >=5),1 )
    .otherwise(0)

)

temp2.show(3)
temp2.count()

+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|Season|extrm_temps|bad_wind|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|Winter|          1|       0|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|Winter|          1|       0|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|2017|    1|  1|Winter|          1|       0|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+
only s

8043

In [None]:
bad_wind_days=temp2.groupBy('day',"month","year",'season').agg(F.max('bad_wind').alias('bad_wind')).orderBy('year','month','day','season', ascending=True)
bad_wind_days.show(3)
bad_wind_days.count()

+---+-----+----+------+--------+
|day|month|year|season|bad_wind|
+---+-----+----+------+--------+
|  1|    1|2017|Winter|       0|
|  2|    1|2017|Winter|       0|
|  3|    1|2017|Winter|       0|
+---+-----+----+------+--------+
only showing top 3 rows



2040

In [None]:
bad_windperyear=bad_wind_days.groupBy('year','season').agg(F.sum('bad_wind').alias("sum_badwind_days")).orderBy('year','season', ascending=True)
bad_windperyear.show()
bad_windperyear.count()

+----+-------------+----------------+
|year|       season|sum_badwind_days|
+----+-------------+----------------+
|2017|Autumn/Spring|               9|
|2017|       Summer|               1|
|2017|       Winter|              11|
|2018|Autumn/Spring|               4|
|2018|       Summer|               5|
|2018|       Winter|               5|
|2019|Autumn/Spring|               6|
|2019|       Summer|               1|
|2019|       Winter|               6|
|2020|Autumn/Spring|               8|
|2020|       Summer|               4|
|2020|       Winter|               2|
|2021|Autumn/Spring|               2|
|2021|       Summer|              11|
|2021|       Winter|               6|
|2022|Autumn/Spring|              22|
|2022|       Summer|              57|
|2022|       Winter|               8|
|2023|       Winter|              10|
+----+-------------+----------------+



19

In [None]:
wind_values = {(row['year'], row['season']): row['sum_badwind_days'] for row in bad_windperyear.collect()}

mapping = map_values(wind_values)

crop_worstcase = crop_worstcase.withColumn("harmful_winds", mapping(F.col("year"), F.col("season")))
crop_worstcase.show()


+---------+------+----+------+----------+--------------------+-------------+
|     Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|
+---------+------+----+------+----------+--------------------+-------------+
| Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|
| Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|
| Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|
| Tomatoes| Irbid|2020|Summer|    5122.7|                  38|            4|
| Tomatoes| Irbid|2021|Summer|    2968.9|                  70|           11|
|     Okra| Irbid|2017|Summer|     319.0|                  43|            1|
|     Okra| Irbid|2018|Summer|    1003.1|                  60|            5|
|     Okra| Irbid|2019|Summer|    1808.7|                  56|            1|
|     Okra| Irbid|2020|Summer|    1428.2|                  38|            4|
|     Okra| Irbid|2021|Summer|     867.3|                  70|           11|

In [None]:
#temporary df to hold cloud cover values over 40%, these values could affect crop production negatively
temp3 = temp2.withColumn("clouds",
   F.when(F.col("CloudsCover%") > 30, 1)
    .otherwise(0)
)

temp3.show(3)
temp3.count()


+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|Season|extrm_temps|bad_wind|clouds|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+------+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|Winter|          1|       0|     0|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|Winter|          1|       0|     0|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|2017|    1|  1|Winter|          1|       0|     1|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+---

8043

In [None]:
bad_cloud_days=temp3.groupBy('day',"month","year",'season').agg(F.max('clouds').alias('clouds')).orderBy('year','month','day','season', ascending=True)
bad_cloud_days.show(3)
bad_cloud_days.count()

+---+-----+----+------+------+
|day|month|year|season|clouds|
+---+-----+----+------+------+
|  1|    1|2017|Winter|     1|
|  2|    1|2017|Winter|     1|
|  3|    1|2017|Winter|     1|
+---+-----+----+------+------+
only showing top 3 rows



2040

In [None]:
bad_cloudy_years=bad_cloud_days.groupBy('year','season').agg(F.sum('clouds').alias("sum_too_cloudy")).orderBy('Year','season', ascending=True)
bad_cloudy_years.show()
bad_cloudy_years.count()

+----+-------------+--------------+
|year|       season|sum_too_cloudy|
+----+-------------+--------------+
|2017|Autumn/Spring|            60|
|2017|       Summer|            14|
|2017|       Winter|            38|
|2018|Autumn/Spring|             1|
|2018|       Summer|             0|
|2018|       Winter|             0|
|2019|Autumn/Spring|             0|
|2019|       Summer|             0|
|2019|       Winter|             0|
|2020|Autumn/Spring|             0|
|2020|       Summer|             0|
|2020|       Winter|             0|
|2021|Autumn/Spring|             0|
|2021|       Summer|             0|
|2021|       Winter|             0|
|2022|Autumn/Spring|             0|
|2022|       Summer|             0|
|2022|       Winter|             1|
|2023|       Winter|             0|
+----+-------------+--------------+



19

In [None]:
cloud_vals={(row['year'], row['season']): row['sum_too_cloudy'] for row in bad_cloudy_years.collect()}

In [None]:
mapping = map_values(cloud_vals)
#adding new column in crop_worstacase
crop_worstcase = crop_worstcase.withColumn("too_cloudy", mapping(F.col("year"), F.col("season")))
crop_worstcase.show(3)
crop_worstcase.count()

+--------+------+----+------+----------+--------------------+-------------+----------+
|    Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|
+--------+------+----+------+----------+--------------------+-------------+----------+
|Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|        14|
|Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|         0|
|Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|         0|
+--------+------+----+------+----------+--------------------+-------------+----------+
only showing top 3 rows



30

In [None]:
temp4 = temp3.withColumn("harmful_wind_direction",
   F.when((F.col("WindDirection(Degrees)") >= 45) , 1)
    .otherwise(0)
)

temp4.show(3)
temp4.count()

+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+------+----------------------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|Season|extrm_temps|bad_wind|clouds|harmful_wind_direction|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+------+----------------------+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|Winter|          1|       0|     0|                     0|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|Winter|          1|       0|     0|                     1|
| Irbid|2017-01-01 12:00:00|            11|          38|                   250|        3|  normal|2017|    1|  1|Winter|          1|       0

8043

In [None]:
wind_dir_days=temp4.groupBy('day',"month","year",'season').agg(F.max('harmful_wind_direction').alias('harmful_wind_direction')).orderBy('year','month','day','season', ascending=True)
wind_dir_days.show(3)
wind_dir_days.count()

+---+-----+----+------+----------------------+
|day|month|year|season|harmful_wind_direction|
+---+-----+----+------+----------------------+
|  1|    1|2017|Winter|                     1|
|  2|    1|2017|Winter|                     1|
|  3|    1|2017|Winter|                     1|
+---+-----+----+------+----------------------+
only showing top 3 rows



2040

In [None]:
wind_dir_years=wind_dir_days.groupBy('year','season').agg(F.sum('harmful_wind_direction').alias("sum_harmful_windDir")).orderBy('year','season', ascending=True)
wind_dir_years.show()
wind_dir_years.count()

+----+-------------+-------------------+
|year|       season|sum_harmful_windDir|
+----+-------------+-------------------+
|2017|Autumn/Spring|                 84|
|2017|       Summer|                 47|
|2017|       Winter|                 46|
|2018|Autumn/Spring|                114|
|2018|       Summer|                115|
|2018|       Winter|                106|
|2019|Autumn/Spring|                117|
|2019|       Summer|                111|
|2019|       Winter|                115|
|2020|Autumn/Spring|                109|
|2020|       Summer|                 98|
|2020|       Winter|                106|
|2021|Autumn/Spring|                121|
|2021|       Summer|                116|
|2021|       Winter|                110|
|2022|Autumn/Spring|                122|
|2022|       Summer|                115|
|2022|       Winter|                115|
|2023|       Winter|                 31|
+----+-------------+-------------------+



19

In [None]:
dir_vals={(row['year'], row['season']): row['sum_harmful_windDir'] for row in wind_dir_years.collect()}

In [None]:
mapping = map_values(dir_vals)
#adding new column in crop_worstacase
crop_worstcase = crop_worstcase.withColumn("wind_dir", mapping(F.col("year"), F.col("season")))
crop_worstcase.show(10)
crop_worstcase.count()

+--------+------+----+------+----------+--------------------+-------------+----------+--------+
|    Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|
+--------+------+----+------+----------+--------------------+-------------+----------+--------+
|Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|        14|      47|
|Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|         0|     115|
|Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|         0|     111|
|Tomatoes| Irbid|2020|Summer|    5122.7|                  38|            4|         0|      98|
|Tomatoes| Irbid|2021|Summer|    2968.9|                  70|           11|         0|     116|
|    Okra| Irbid|2017|Summer|     319.0|                  43|            1|        14|      47|
|    Okra| Irbid|2018|Summer|    1003.1|                  60|            5|         0|     115|
|    Okra| Irbid|2019|Summer|    1808.7|

30

making a 2022 df for predictions

In [None]:
temp4.show(2)

+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+------+----------------------+
|Region|          Date/Time|AirTemperature|CloudsCover%|WindDirection(Degrees)|WindSpeed|WindType|year|month|day|Season|extrm_temps|bad_wind|clouds|harmful_wind_direction|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------+------+----------------------+
| Irbid|2017-01-01 06:00:00|             5|          25|                     0|        0|    calm|2017|    1|  1|Winter|          1|       0|     0|                     0|
| Irbid|2017-01-01 09:00:00|             8|          25|                   300|        3|  normal|2017|    1|  1|Winter|          1|       0|     0|                     1|
+------+-------------------+--------------+------------+----------------------+---------+--------+----+-----+---+------+-----------+--------

In [None]:
for_pred_day=temp4.groupBy('day',"month","year",'season').agg(F.max('extrm_temps'), F.max('bad_wind'), F.max('clouds'), F.max('harmful_wind_direction')).orderBy('year','day','month', ascending=True)


In [None]:
for_pred=for_pred_day.groupBy("year",'season').agg(F.sum('max(extrm_temps)').alias('extreme_temperatures'), F.sum('max(bad_wind)').alias('harmful_winds'), F.sum('max(clouds)').alias('too_cloudy'), F.sum('max(harmful_wind_direction)').alias('wind_dir')).orderBy('year','season', ascending=True)

In [None]:
pred = for_pred.filter((for_pred['year'] == 2022) & (for_pred['season'] != 'Autumn/Spring'))
pred.show()


+----+------+--------------------+-------------+----------+--------+
|year|season|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|
+----+------+--------------------+-------------+----------+--------+
|2022|Summer|                  61|           57|         0|     115|
|2022|Winter|                  93|            8|         1|     115|
+----+------+--------------------+-------------+----------+--------+



In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

season_indexer = StringIndexer(inputCol="season", outputCol="seasonIndex")
indexed_df = season_indexer.fit(pred).transform(pred)
season_encoder = OneHotEncoder(inputCol="seasonIndex", outputCol="seasonVec")
encoded_df = season_encoder.fit(indexed_df).transform(indexed_df)

assembler = VectorAssembler(inputCols=["year", "seasonVec", "extreme_temperatures", "harmful_winds", "too_cloudy", "wind_dir"],
                            outputCol="features")
assembled_df = assembler.transform(encoded_df)
assembled_df.show(truncate=False)

+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------------------+
|year|season|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|seasonIndex|seasonVec    |features                        |
+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------------------+
|2022|Summer|61                  |57           |0         |115     |0.0        |(1,[0],[1.0])|[2022.0,1.0,61.0,57.0,0.0,115.0]|
|2022|Winter|93                  |8            |1         |115     |1.0        |(1,[],[])    |[2022.0,0.0,93.0,8.0,1.0,115.0] |
+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------------------+



In [None]:

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(assembled_df)
scaled_pred = scaler_model.transform(assembled_df)
scaled_pred.show()

+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------+--------------------+
|year|season|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|seasonIndex|    seasonVec|            features|      scaledFeatures|
+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------+--------------------+
|2022|Summer|                  61|           57|         0|     115|        0.0|(1,[0],[1.0])|[2022.0,1.0,61.0,...|[0.0,1.4142135623...|
|2022|Winter|                  93|            8|         1|     115|        1.0|    (1,[],[])|[2022.0,0.0,93.0,...|[0.0,0.0,4.110058...|
+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------+--------------------+



# Prep Data for Predictions

In [None]:
df = crop_worstcase.withColumn('extreme_temperatures', crop_worstcase['extreme_temperatures'].cast('integer'))
df = df.withColumn('harmful_winds', df['harmful_winds'].cast('integer'))
df = df.withColumn('too_cloudy', df['too_cloudy'].cast('integer'))
df =df.withColumn('wind_dir', df['wind_dir'].cast('integer'))
df.printSchema()


root
 |-- Crop: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- Production: double (nullable = true)
 |-- extreme_temperatures: integer (nullable = true)
 |-- harmful_winds: integer (nullable = true)
 |-- too_cloudy: integer (nullable = true)
 |-- wind_dir: integer (nullable = true)



In [None]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer

stringIndexer = StringIndexer(inputCol="Season", outputCol="SeasonIndex")
indexed = stringIndexer.fit(df).transform(df)

indexed.show()

+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+
|     Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|
+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+
| Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|        14|      47|        0.0|
| Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|         0|     115|        0.0|
| Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|         0|     111|        0.0|
| Tomatoes| Irbid|2020|Summer|    5122.7|                  38|            4|         0|      98|        0.0|
| Tomatoes| Irbid|2021|Summer|    2968.9|                  70|           11|         0|     116|        0.0|
|     Okra| Irbid|2017|Summer|     319.0|                  43|            1|        14|      47|        0.0|
|     Okra| Irbid|2

In [None]:
oneHotEncoder = OneHotEncoder(inputCol="SeasonIndex", outputCol="season_numeric")
encoded = oneHotEncoder.fit(indexed).transform(indexed)

encoded.show()

+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|     Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|
+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
| Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|
| Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|
| Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|
| Tomatoes| Irbid|2020|Summer|    5122.7|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|
| Tomatoes| Irbid|2021|Summer|    2968.9|                  70|           11|         0|     116|        0.0| (1,[0],[1.0])|
|     Ok

### Okra Predictions

In [None]:
okra_df = encoded.filter(encoded['Crop'] == "Okra")
okra_df.show()

+----+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|
+----+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|Okra| Irbid|2017|Summer|     319.0|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|
|Okra| Irbid|2018|Summer|    1003.1|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|
|Okra| Irbid|2019|Summer|    1808.7|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|
|Okra| Irbid|2020|Summer|    1428.2|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|
|Okra| Irbid|2021|Summer|     867.3|                  70|           11|         0|     116|        0.0| (1,[0],[1.0])|
|Okra| Irbid|2017|Winter|      12.2|            

In [None]:
okra_features=['Year', 'season_numeric','extreme_temperatures','harmful_winds','too_cloudy','wind_dir']


In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=okra_features, outputCol='features')
vector_df = assembler.transform(okra_df)

vector_df.show()

+----+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+
|Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|            features|
+----+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+
|Okra| Irbid|2017|Summer|     319.0|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|[2017.0,1.0,43.0,...|
|Okra| Irbid|2018|Summer|    1003.1|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|[2018.0,1.0,60.0,...|
|Okra| Irbid|2019|Summer|    1808.7|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|[2019.0,1.0,56.0,...|
|Okra| Irbid|2020|Summer|    1428.2|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|[2020.0,1.0,38.0,...|
|Okra| Irbid|2021|Su

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(vector_df)
scaled_okra = scaler_model.transform(vector_df)


In [None]:
scaled_okra.show()

+----+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+--------------------+
|Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|            features|      scaledFeatures|
+----+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+--------------------+
|Okra| Irbid|2017|Summer|     319.0|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|[2017.0,1.0,43.0,...|[1353.04473318512...|
|Okra| Irbid|2018|Summer|    1003.1|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|[2018.0,1.0,60.0,...|[1353.71555357837...|
|Okra| Irbid|2019|Summer|    1808.7|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|[2019.0,1.0,56.0,...|[1354.38637397162...|
|Okra| Irbid|2020|Summer|    1428.

In [None]:
final_data = scaled_okra.select('scaledFeatures','Production')
trainset, testset = final_data.randomSplit([0.7, 0.3])
trainset.show()

+--------------------+----------+
|      scaledFeatures|Production|
+--------------------+----------+
|[1353.04473318512...|      12.2|
|[1353.04473318512...|     319.0|
|[1353.71555357837...|       0.0|
|[1354.38637397162...|    1808.7|
|[1355.05719436487...|    1428.2|
|[1355.72801475812...|     221.8|
+--------------------+----------+



In [None]:
testset.show()

+--------------------+----------+
|      scaledFeatures|Production|
+--------------------+----------+
|[1353.71555357837...|    1003.1|
|[1354.38637397162...|       0.0|
|[1355.05719436487...|       0.0|
|[1355.72801475812...|     867.3|
+--------------------+----------+



In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
%%time
%memit
model_okra = LinearRegression(featuresCol='scaledFeatures', labelCol='Production', regParam=1.0, elasticNetParam=0.5)
model_okra = model_okra.fit(trainset)

peak memory: 135.74 MiB, increment: 0.01 MiB
CPU times: user 120 ms, sys: 17.6 ms, total: 138 ms
Wall time: 4.53 s


In [None]:
print("Coefficients: " + str(model_okra.coefficients))
print("Intercept: " + str(model_okra.intercept))

Coefficients: [-3.571796521686801,641.451033395055,-177.3409534847533,-58.60511774125192,643.8094328843555,984.0309780586342]
Intercept: 1913.5324373576989


In [None]:
trainingSummary = model_okra.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 8.240277
r2: 0.999867


In [None]:
result=model_okra.evaluate(testset)

In [None]:
result.r2

-0.6510314726872397

In [None]:
result.rootMeanSquaredError

603.989736056423

In [None]:
scaled_pred.show()

+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------+--------------------+
|year|season|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|seasonIndex|    seasonVec|            features|      scaledFeatures|
+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------+--------------------+
|2022|Summer|                  61|           57|         0|     115|        0.0|(1,[0],[1.0])|[2022.0,1.0,61.0,...|[0.0,1.4142135623...|
|2022|Winter|                  93|            8|         1|     115|        1.0|    (1,[],[])|[2022.0,0.0,93.0,...|[0.0,0.0,4.110058...|
+----+------+--------------------+-------------+----------+--------+-----------+-------------+--------------------+--------------------+



In [None]:
pred_2022=scaled_pred.select('scaledFeatures')


In [None]:
predictions=model_okra.transform(pred_2022)
predictions.show()

+--------------------+------------------+
|      scaledFeatures|        prediction|
+--------------------+------------------+
|[0.0,1.4142135623...|2246.1859302154753|
|[0.0,0.0,4.110058...| 2081.603381510574|
+--------------------+------------------+



### Tomatoes

In [None]:
tomato_df = encoded.filter(encoded['Crop'] == "Tomatoes")
tomato_df.show()

+--------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|    Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|
+--------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|
|Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|
|Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|
|Tomatoes| Irbid|2020|Summer|    5122.7|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|
|Tomatoes| Irbid|2021|Summer|    2968.9|                  70|           11|         0|     116|        0.0| (1,[0],[1.0])|
|Tomatoes| Irbid

In [None]:
tomato_features=['Year', 'season_numeric','extreme_temperatures','harmful_winds','too_cloudy','wind_dir']

In [None]:
assembler = VectorAssembler(inputCols=tomato_features, outputCol='features')
vector_df = assembler.transform(tomato_df)

vector_df.show()

+--------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+
|    Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|            features|
+--------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+
|Tomatoes| Irbid|2017|Summer|   11124.5|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|[2017.0,1.0,43.0,...|
|Tomatoes| Irbid|2018|Summer|   15540.4|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|[2018.0,1.0,60.0,...|
|Tomatoes| Irbid|2019|Summer|    6292.4|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|[2019.0,1.0,56.0,...|
|Tomatoes| Irbid|2020|Summer|    5122.7|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|[2020.0,1.0,38

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(vector_df)
scaled_tomato = scaler_model.transform(vector_df)


In [None]:
final_data = scaled_tomato.select('scaledFeatures','Production')
trainset, testset = final_data.randomSplit([0.7, 0.3], seed=42)
trainset.show()

+--------------------+----------+
|      scaledFeatures|Production|
+--------------------+----------+
|[1353.04473318512...|    9332.2|
|[1353.04473318512...|   11124.5|
|[1353.71555357837...|   15540.4|
|[1354.38637397162...|       0.0|
|[1354.38637397162...|    6292.4|
|[1355.05719436487...|    5122.7|
+--------------------+----------+



In [None]:
testset.show()

+--------------------+----------+
|      scaledFeatures|Production|
+--------------------+----------+
|[1353.71555357837...|    1509.9|
|[1355.05719436487...|    3060.0|
|[1355.72801475812...|    1046.7|
|[1355.72801475812...|    2968.9|
+--------------------+----------+



In [None]:
%%time
%memit
model_tomato = LinearRegression(featuresCol='scaledFeatures', labelCol='Production', regParam=0.75, elasticNetParam=0.1)
model_tomato = model_tomato.fit(trainset)

peak memory: 136.12 MiB, increment: 0.00 MiB
CPU times: user 125 ms, sys: 18.9 ms, total: 144 ms
Wall time: 2.21 s


In [None]:
print("Coefficients: " + str(model_tomato.coefficients))
print("Intercept: " + str(model_tomato.intercept))

Coefficients: [-6802.143705654144,5697.360642515419,-777.7902547739499,4118.2382541046245,-893.8205899902207,1611.9012013303911]
Intercept: 9202701.168144654


In [None]:
trainingSummary = model_tomato.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 3.011791
r2: 1.000000


In [None]:
result=model_tomato.evaluate(testset)

In [None]:
result.r2

-90.4974545388024

In [None]:
result.rootMeanSquaredError

8455.59620003052

In [None]:
%%time
%memit
predictions=model_tomato.transform(pred_2022)
predictions.show()

peak memory: 136.14 MiB, increment: 0.01 MiB
+--------------------+-----------------+
|      scaledFeatures|       prediction|
+--------------------+-----------------+
|[0.0,1.4142135623...|9215436.587875161|
|[0.0,0.0,4.110058...|9199191.220065072|
+--------------------+-----------------+

CPU times: user 116 ms, sys: 15.8 ms, total: 132 ms
Wall time: 1.77 s


### Onion



In [None]:
onion_df = encoded.filter(encoded['Crop'] == "Onion dry")
onion_df.show()

+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|     Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|
+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+
|Onion dry| Irbid|2017|Summer|     294.0|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|
|Onion dry| Irbid|2018|Summer|     781.9|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|
|Onion dry| Irbid|2019|Summer|    5616.0|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|
|Onion dry| Irbid|2020|Summer|    6269.1|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|
|Onion dry| Irbid|2021|Summer|    7770.8|                  70|           11|         0|     116|        0.0| (1,[0],[1.0])|
|Onion d

In [None]:
onion_features=['Year', 'season_numeric','extreme_temperatures','harmful_winds','too_cloudy','wind_dir']

In [None]:
assembler = VectorAssembler(inputCols=onion_features, outputCol='features')
vector_df = assembler.transform(onion_df)

vector_df.show()

+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+
|     Crop|Region|Year|Season|Production|extreme_temperatures|harmful_winds|too_cloudy|wind_dir|SeasonIndex|season_numeric|            features|
+---------+------+----+------+----------+--------------------+-------------+----------+--------+-----------+--------------+--------------------+
|Onion dry| Irbid|2017|Summer|     294.0|                  43|            1|        14|      47|        0.0| (1,[0],[1.0])|[2017.0,1.0,43.0,...|
|Onion dry| Irbid|2018|Summer|     781.9|                  60|            5|         0|     115|        0.0| (1,[0],[1.0])|[2018.0,1.0,60.0,...|
|Onion dry| Irbid|2019|Summer|    5616.0|                  56|            1|         0|     111|        0.0| (1,[0],[1.0])|[2019.0,1.0,56.0,...|
|Onion dry| Irbid|2020|Summer|    6269.1|                  38|            4|         0|      98|        0.0| (1,[0],[1.0])|[2020.0

In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(vector_df)
scaled_onion = scaler_model.transform(vector_df)

In [None]:
final_data = scaled_onion.select('scaledFeatures','Production')
trainset, testset = final_data.randomSplit([0.7, 0.3], seed=42)
trainset.show()

+--------------------+----------+
|      scaledFeatures|Production|
+--------------------+----------+
|[1353.04473318512...|    5850.6|
|[1353.04473318512...|     294.0|
|[1353.71555357837...|     781.9|
|[1354.38637397162...|    2267.0|
|[1354.38637397162...|    5616.0|
|[1355.05719436487...|    6269.1|
+--------------------+----------+



In [None]:
testset.show()

+--------------------+----------+
|      scaledFeatures|Production|
+--------------------+----------+
|[1353.71555357837...|    5466.7|
|[1355.05719436487...|    9465.7|
|[1355.72801475812...|    4721.0|
|[1355.72801475812...|    7770.8|
+--------------------+----------+



In [None]:
%%time
%memit
model_onion = LinearRegression(featuresCol='scaledFeatures', labelCol='Production', regParam=0.1, elasticNetParam=0.75)
model_onion = model_onion.fit(trainset)

peak memory: 136.16 MiB, increment: 0.00 MiB
CPU times: user 128 ms, sys: 18.2 ms, total: 146 ms
Wall time: 1.39 s


In [None]:
print("Coefficients: " + str(model_onion.coefficients))
print("Intercept: " + str(model_onion.intercept))

Coefficients: [4555.118902990958,150.53257605332377,-455.5984455914365,-1855.8543419686457,5916.654374307759,2641.0627923686666]
Intercept: -6173038.522023802


In [None]:
trainingSummary = model_onion.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.271925
r2: 1.000000


In [None]:
result=model_onion.evaluate(testset)

In [None]:
result.r2

-3.5322817747345656

In [None]:
result.rootMeanSquaredError

4002.102715201569

In [None]:
%%time
%memit
predictions=model_onion.transform(pred_2022)
predictions.show()

peak memory: 136.18 MiB, increment: 0.00 MiB
+--------------------+------------------+
|      scaledFeatures|        prediction|
+--------------------+------------------+
|[0.0,1.4142135623...|-6177106.935743801|
|[0.0,0.0,4.110058...|-6166972.147214965|
+--------------------+------------------+

CPU times: user 98.8 ms, sys: 6.74 ms, total: 106 ms
Wall time: 1.16 s


In [None]:
spark.stop()