<a href="https://colab.research.google.com/github/mirklys/little-projects/blob/main/crop-yield-prediction/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Crop Yield Prediction

## Pre-setup

In [None]:
import os
import gzip
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from google.colab import drive
from functools import reduce
import zipfile
import re

# installing extra packages
try:
    from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame
    from pyspark.sql.functions import col, regexp_replace, when, lit, sum
    import rasterio
except:
    !pip install pyspark
    !pip install rasterio
    from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame
    from pyspark.sql.functions import col, regexp_replace, when, lit, sum
    import rasterio

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# this is space for changing to gpu/tpu

In [None]:
drive.mount('/content/gdrive', force_remount=True)
PATH_MAIN = '/content/gdrive/MyDrive/Projects/Crop Yield Prediction'
os.chdir(PATH_MAIN)

Mounted at /content/gdrive


## Loading the data
The data folder contains files with different file extensions; many of the files must be extracted, some are .gz, some are .zip. 

In [None]:
# Data folder contains: crop-yield, soil-properties, weather data
crop_yield_files = os.listdir('./data/crop-yield')
crop_yield_files = ['./data/crop-yield/' + f for f in crop_yield_files if not f.startswith('./data/crop-yield/')]


crop_yield_dfs = [spark.read.option("header", "true").csv(PATH_MAIN+f[1:]) for f in crop_yield_files]
# concatenate all DataFrames into a single DataFrame
crop_yield_df = reduce(lambda df1, df2: df1.unionAll(df2), crop_yield_dfs)

In [None]:
# https://www.ncei.noaa.gov/pub/data/ghcn/daily/
weather_files = os.listdir('./data/weather')
weather_files = ['./data/weather/' + f for f in weather_files if not f.startswith('./data/weather/')]
weather_dfs = {}

for wf in weather_files[:1]:
    weather_dfs[wf[15:19]] = spark.read \
          .option("header", "false") \
          .option("inferSchema", "true") \
          .option("compression", "gzip") \
          .csv(wf)

weather_dfs = {year: df.withColumn('year', lit(year)) for year, df in weather_dfs.items()}
weather_df = reduce(lambda df1, df2: df1.unionAll(df2), weather_dfs.values())

In [None]:
soil_properties_files = os.listdir('./data/soil-properties')
soil_properties_files = ['./data/soil-properties/' + f for f in soil_properties_files if not f.startswith('./data/soil-properties/')]
soil_properties_df = pd.DataFrame()
with zipfile.ZipFile(soil_properties_files[0], 'r') as zip_ref:
    zip_ref.extractall('./data/soil-properties/data/')
soil_properties_files_data = os.listdir('./data/soil-properties/data')
soil_properties_files_data = ['./data/soil-properties/data/' + f for f in soil_properties_files_data if not f.startswith('./data/soil-properties/data/')]
for spfd in soil_properties_files_data:
    if spfd.endswith('.zip'):
        with zipfile.ZipFile(spfd, 'r') as zip_ref:
            filename = os.path.splitext(os.path.basename(spfd))[0]
            zip_ref.extractall('./data/soil-properties/data/{}/'.format(filename))

In [None]:
soil_properties_files_data = os.listdir('./data/soil-properties/data')
soil_properties_files_data = ['./data/soil-properties/data/' + f for f in soil_properties_files_data if not f.startswith('./data/soil-properties/data/')]

In [None]:
soil_porosity_path = './data/soil-properties/data/por_gNATSGO/por_gNATSGO/por_gNATSGO_US.tif'
with rasterio.open(soil_porosity_path) as src:
    # Get the number of bands in the raster
    soil_porosity_data = src.read(1)
print('Number of bands in the raster:', soil_porosity_data)

## Analyzing Crop Yield

In [None]:
crop_yield_df = crop_yield_df.drop("Week Ending","Ag District","Ag District Code", 'County',
                   'County ANSI', 'Zip Code', 'Region', 'WaterShed',
                   'Geo Level', 'watershed_code', 'Program', 'Period')
crop_yield_df = crop_yield_df.filter(~col("Data Item").rlike(r"\bSALES\b|\bOPERATIONS\b|\bPRODUCTION\b"))
crop_yield_df = crop_yield_df.withColumn("Value", regexp_replace("Value", "\(H\)|\(D\)", "null"))

In [None]:
crop_yield_df.show()

+----+------+--------+----------+------------------+--------------------+--------------+--------------------+------+------+
|Year|Period|   State|State ANSI|         Commodity|           Data Item|        Domain|     Domain Category| Value|CV (%)|
+----+------+--------+----------+------------------+--------------------+--------------+--------------------+------+------+
|2021|  YEAR| ALABAMA|        01| FIELD CROP TOTALS|FIELD CROP TOTALS...|ORGANIC STATUS|ORGANIC STATUS: (...| 1,270|  85.5|
|2021|  YEAR| ALABAMA|        01|               HAY|HAY, (EXCL ALFALF...|ORGANIC STATUS|ORGANIC STATUS: (...| 1,270|  85.5|
|2021|  YEAR| ALABAMA|        01|               HAY|HAY, ORGANIC - AC...|ORGANIC STATUS|ORGANIC STATUS: (...| 1,270|  85.5|
|2021|  YEAR| ARIZONA|        04|         CHICKPEAS|CHICKPEAS, ORGANI...|ORGANIC STATUS|ORGANIC STATUS: (...|  null|   (D)|
|2021|  YEAR| ARIZONA|        04|            COTTON|COTTON, ORGANIC -...|ORGANIC STATUS|ORGANIC STATUS: (...|  null|   (D)|
|2021|  

In [None]:
crop_yield_df.select('Period').distinct().rdd.flatMap(lambda x: x).collect()

['YEAR', 'END OF DEC']

In [None]:
crop_total_df = crop_yield_df.filter(col('Commodity')=='FIELD CROP TOTALS')
crop_total_df = crop_total_df.drop('Commodity', 'Data Item', 'Domain',
                                   'Domain Category')

#crop_total_df = crop_total_df.withColumnRenamed("Value", "Total Harvested") \
#       .withColumn("Total Harvested", col("Total Harvested") * 4046.856422)
crop_total_df = crop_total_df.withColumnRenamed("Value", "total_harvested") \
                .withColumn("total_harvested",
                            when(col("total_harvested").rlike("\d+"), 
                                 regexp_replace(col("total_harvested"), ",", "") \
                                 .cast("integer") * 4046.86) \
                                 .otherwise(col("total_harvested")))
crop_total_df = crop_total_df.groupBy("Year", "State").agg(sum("total_harvested").alias("total_harvested"))

In [None]:
crop_total_df.show(500)