# MODIS - Rice Pixel Classification

## Install packages

In [None]:
!pip install zkyhaxpy
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark

!mkdir spark
!cd spark
!wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz -P /content/spark
!tar xf /content/spark/spark-3.3.1-bin-hadoop3.tgz

import findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

findspark.init()

In [None]:
!pip install zkyhaxpy rasterio utm geopandas ipython-autotime gcsfs h2o

## Import libraries

In [None]:
## for all ##
from zkyhaxpy import io_tools, pd_tools, np_tools, console_tools, timer_tools, json_tools, dict_tools, colab_tools, gcp_tools
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
import os
import matplotlib.pyplot as plt
import seaborn as sns
import logging
import re
import itertools
from sklearn.model_selection import train_test_split
import h2o
from h2o.automl import H2OAutoML

%load_ext autotime

In [None]:
colab_tools.mount_drive()
colab_tools.authen_gcp()

In [None]:
!wget https://data.humdata.org/dataset/d24bdc45-eb4c-4e3d-8b16-44db02667c27/resource/d0c722ff-6939-4423-ac0d-6501830b1759/download/tha_adm_rtsd_itos_20210121_shp.zip

In [None]:
!unzip tha_adm_rtsd_itos_20210121_shp.zip

## Define paths

In [None]:
folder_prediction = '/temp/rice_pixel_v2'
folder_viz_rice_area_map = '/content/drive/MyDrive/!UNBDH2022-Multiverse-Of-Data/unbdh2022_multiverse_of_data/viz/rice_area_map'
io_tools.create_folders(folder_prediction, folder_viz_rice_area_map)

## Define functions

In [None]:
from numba import jit
@jit(nopython=True)
def arr_row_col_vals_to_raster(arr_row_col_vals, height=None, width=None, dtype=None, novalue=-999):
    '''
    Convert an array of pixel values (row|col|val_1|val_2|val_3|...|val_n) into an array of raster n layers.
    '''
    n_pixels = arr_row_col_vals.shape[0]
    n_layers = arr_row_col_vals.shape[1] - 2

    assert(n_layers >= 1)
    
    if height == None:
        height = np.max(arr_row_col_vals[:, 0]) + 1
    assert(height == int(height))
    height = int(height)

    if width == None:
        width = np.max(arr_row_col_vals[:, 1]) + 1
    assert(width == int(width))
    width = int(width)


    # arr_raster = np.full(shape=(n_layers, height, width), fill_value=-9, dtype=dtype)
    arr_raster = np.empty(shape=(n_layers, height, width), dtype=dtype)
    arr_raster[:, :, :] = novalue

    for pixel_id in range(n_pixels):
        row, col = arr_row_col_vals[pixel_id, 0:2]
        row = np.int32(row)
        col = np.int32(col)
        # for layer_id in range(n_layers):
        #     arr_raster[layer_id, row, col] = arr_row_col_vals[pixel_id, layer_id+2]
        
        arr_raster[:, row, col] = arr_row_col_vals[pixel_id, 2:]
    assert(n_pixels == pixel_id + 1)    
    converted_pixels = np.where(arr_raster[0]!=novalue, 1, 0).sum()    
    assert(converted_pixels==n_pixels)
    return arr_raster


# Execute

In [None]:
!gsutil -m cp -r -n gs://unbdh2022-multiverseofdata-dev/prediction/rice_pixel_v2 /temp

In [None]:
list_files =io_tools.get_list_files_re(folder_prediction)
list_files.sort()

In [None]:
!gsutil cp gs://unbdh2022-multiverseofdata-dev/modis/reference/mod250m16d-ndvi-reproj.tif /tmp

In [None]:
import rasterio
with rasterio.open('/tmp/mod250m16d-ndvi-reproj.tif') as ds:
    arr_thailand = ds.read(1)
arr_thailand = np.where(arr_thailand==0, np.nan, 0)
h, w = arr_thailand.shape    

In [None]:
arr_thailand

In [None]:
list_dict_prediction = []
RICE_YIELD = 0.42

for path_df_prediction in list_files:    
    dict_prediction = {}
    print(f'Visualizing {os.path.basename(path_df_prediction)}...')    
    year_month = path_df_prediction.split('.')[0][-7:]
    
    dict_prediction['year_month']=year_month
    df_prediction = pd.read_parquet(path_df_prediction)
    print(df_prediction.predict.mean())
    df_prediction['row'] = np.floor(df_prediction.index / 10000).astype(np.int32)
    df_prediction['col'] = np.floor(df_prediction.index % 10000).astype(np.int32)
    df_prediction = df_prediction.reindex(columns=['row', 'col', 'predict', 'p0', 'p1']).copy()
    
    
    arr_tmp = arr_row_col_vals_to_raster(df_prediction.values, h, w)
    arr_tmp = arr_tmp[0]
    arr_tmp = np.where(arr_tmp==-999, np.nan, arr_tmp)
    arr_tmp = np.where(arr_tmp==0, np.nan, arr_tmp)
    arr_tmp = np.where(np.isnan(arr_tmp), arr_thailand, arr_tmp)
    arr_tmp = np.where(np.isnan(arr_tmp), -1, arr_tmp)
    
    n_rice_pixels = np.where(arr_tmp==1, 1, 0).sum().astype(np.int32)
    dict_prediction['n_rice_pixels']=n_rice_pixels
    rice_production_mmt = n_rice_pixels * 25 * RICE_YIELD / 1e6
    dict_prediction['rice_production_mmt']=rice_production_mmt
    scale = 1
    plt.figure(figsize=(6*scale, 10*scale))
    plt.imshow(arr_tmp, cmap='RdYlGn', vmin=-4, vmax=2)
    plt.title(f'rice area map: {year_month} ({n_rice_pixels:,d}px, {rice_production_mmt:.2f}M MT)')
    plt.axis('off')
    plt.savefig(os.path.join(folder_viz_rice_area_map, f'rice_area_map_{year_month}.jpg'))    
    plt.show()
    list_dict_prediction.append(dict_prediction)

In [None]:
df_prediction_summary = pd.DataFrame(list_dict_prediction)
df_prediction_summary

In [None]:
df_prediction_summary.to_csv('/content/drive/MyDrive/!UNBDH2022-Multiverse-Of-Data/unbdh2022_multiverse_of_data/result/df_prediction_summary.csv', index=False)

# Adhoc - prepare data for finding Pseudo Rice Yield per pixel

In [None]:
#Download production data
!wget 'https://fenixservices.fao.org/faostat/static/bulkdownloads/Production_Crops_Livestock_E_All_Data_(Normalized).zip'
!unzip 'Production_Crops_Livestock_E_All_Data_(Normalized).zip'


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
#Read file
sdf = spark.read.csv(
    'Production_Crops_Livestock_E_All_Data_(Normalized).csv',
    header='true')



#Drop Columns
sdf = sdf.drop('Area Code (M49)',    
    'Item Code (CPC)',
    'Year Code',
    'Flag')


#Rename columns
sdf = sdf \
    .withColumnRenamed('Area Code', 'country_cd') \
    .withColumnRenamed('Area', 'country_nm') \
    .withColumnRenamed('Item Code', 'item_cd') \
    .withColumnRenamed('Item', 'item_nm') \
    .withColumnRenamed('Element Code', 'element_cd') \
    .withColumnRenamed('Element', 'element_nm') \
    .withColumnRenamed('Year', 'year') \
    .withColumnRenamed('Unit', 'unit') \
    .withColumnRenamed('Value', 'value') \

    

#Update item name to lower case
sdf = sdf.withColumn('item_nm', lower(col('item_nm')))



#Add rice & wheat f
sdf = sdf.withColumn('rice_f', sdf.item_nm == 'rice')
sdf = sdf.withColumn('wheat_f', sdf.item_nm == 'wheat')

#Filter only rice & wheat
sdf = sdf.filter(
    (sdf.rice_f | sdf.wheat_f )
    )


# convert to pandas
path_df_rice_wheat_production = '/content/drive/MyDrive/!UNBDH2022-Multiverse-Of-Data/unbdh2022_multiverse_of_data/data/df_rice_wheat_production.parquet'
if os.path.exists(path_df_rice_wheat_production):
    df = pd.read_parquet(path_df_rice_wheat_production)
else:
    df = sdf.toPandas()
    df.to_parquet(path_df_rice_wheat_production)
df_production = df.copy()        


    
#Filter only rice from Thailand    
df_thailand_rice_production = df_production[(df_production.rice_f == 1) & (df_production.element_nm == 'Production') & (df_production.country_nm == 'Thailand')].copy()
df_thailand_rice_production['value'] = df_thailand_rice_production['value'].astype(np.float32)

#Aggregate to yearly
df_thai_rice_production_vol_yearly = df_thailand_rice_production.groupby(['year']).agg(production_vol_mt = ('value', 'sum'))
df_thai_rice_production_vol_yearly

In [None]:
df_rice_pixels_yearly = df_prediction_summary.copy()
df_rice_pixels_yearly['month'] = df_rice_pixels_yearly['year_month'].str.split('-', expand=True)[1]
df_rice_pixels_yearly['year'] = df_rice_pixels_yearly['year_month'].str.split('-', expand=True)[0].astype(int)
df_rice_pixels_yearly = df_rice_pixels_yearly[df_rice_pixels_yearly['month']=='12'].copy()
df_rice_pixels_yearly = df_rice_pixels_yearly.set_index('year')
df_rice_pixels_yearly = df_rice_pixels_yearly.drop(columns=['year_month', 'month', 'rice_production_mmt'])
df_rice_pixels_yearly

In [None]:
df_thai_rice_production_vol_yearly.index

In [None]:
df_rice_pixels_to_production_yearly = df_rice_pixels_yearly.merge(df_thai_rice_production_vol_yearly, left_index=True, right_index=True)
df_rice_pixels_to_production_yearly.to_csv('/content/drive/MyDrive/!UNBDH2022-Multiverse-Of-Data/unbdh2022_multiverse_of_data/data/df_rice_pixels_to_production_yearly.csv')