In [1]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Polygon

def create_geometries_from_manual_edit(df, geom_col):

    # Raise a KeyError if geom_col does not appear in the dataframe
    if geom_col not in df.columns:
        raise KeyError(f"{geom_col} is not a valid column of this dataframe.")

    # Convert from string `lat,lon` to individual columns for lat and lon
    df[["point_lat", "point_lon"]] = df[geom_col].str.split(",", expand=True, n=2)
    
    # Add a point location column, specify as manual to differ from ODK original values
    df.loc[:, "point_location"] = "manual"

    # Convert to geopandas with point geometry
    geom = gpd.points_from_xy(x=df.point_lon, y=df.point_lat, crs="EPSG:4326")
    gdf = gpd.GeoDataFrame(df, geometry=geom)

    return gdf


def create_geometries_from_ODK(df):

    # identify rows with either center measurement or corner measurement
    corner_measurement = df.loc[:, "access_consent"] == "no"
    center_measurement = df.loc[:, "access_consent"] == "yes"

    # create a new column to specify location type
    df.loc[corner_measurement, "point_location"] = "outside_corner"
    df.loc[center_measurement, "point_location"] = "center"

    # create a column to store the lat,lon values
    df.loc[corner_measurement, "point_latlon"] = df.loc[
        corner_measurement, "field_outside_corner"
    ]
    df.loc[center_measurement, "point_latlon"] = df.loc[
        center_measurement, "field_center"
    ]

    # Convert from string `lat,lon` to individual columns for lat and lon
    point_latlon_df = pd.DataFrame(
        df["point_latlon"].str.split(",").to_list(), columns=["point_lat", "point_lon"]
    )
    df = pd.concat((df, point_latlon_df), axis="columns")

    # Add boundaries for any locations with center points
    df.loc[center_measurement, "field_boundary"] = df.loc[
        center_measurement, "field_boundary"
    ].str.split("; ")

    # Separate out items where boundary exists to perform next steps
    df_withboundary = df.loc[center_measurement, ("KEY", "field_boundary")]

    # Explode all boundaries to get one row per point in boundary, then split into lat, lon, altitude and accuracy
    df_withboundary = df_withboundary.explode("field_boundary")
    df_withboundary[["lat", "lon", "alt", "acc"]] = df_withboundary[
        "field_boundary"
    ].str.split(" ", expand=True, n=4)
    df_withboundary = df_withboundary.drop(
        ["field_boundary", "alt", "acc"], axis="columns"
    )

    # Convert into Polygons using groupby
    df_withboundary["field_boundary_polygon"] = df_withboundary.groupby(
        df_withboundary.index
    ).apply(lambda g: Polygon(gpd.points_from_xy(g["lon"], g["lat"])))

    # Drop duplicates
    df_withboundary = df_withboundary.drop(["lat", "lon"], axis="columns")
    df_withboundary = df_withboundary.drop_duplicates(subset="KEY")

    df = df.set_index("KEY").join(df_withboundary.set_index("KEY"), how="left")

    # Convert to geopandas with point geometry
    geom = gpd.points_from_xy(x=df.point_lon, y=df.point_lat, crs="EPSG:4326")
    gdf = gpd.GeoDataFrame(df, geometry=geom)

    return gdf



In [31]:
import os
import geopandas as gpd
import pandas as pd
from shapely.geometry import Polygon


import os
import geopandas as gpd
import numpy as np
import pandas as pd
import json
import pickle
from datacube.utils import geometry
from deafrica_tools.classification import collect_training_data
from odc.io.cgroups import get_cpu_quota
from sklearn.preprocessing import LabelEncoder
pd.set_option("display.max_columns", None)

  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)


In [3]:
samples_file = "https://raw.githubusercontent.com/digitalearthafrica/crop-type/main/1_Prepare_samples_for_ML/data/datasheet.csv"
samples_df = pd.read_csv(samples_file)

In [5]:
# Remove prefixes from columns to improve readability
updated_columns = samples_df.columns.str.replace("data-", "")
updated_columns = updated_columns.str.replace("consent_given-", "")
updated_columns = updated_columns.str.replace("field_planted-", "")
samples_df.columns = updated_columns
# Convert from missing values being " " to None
samples_df = samples_df.replace({" ": None})
# Convert date columns to date strings. Must be string format for shapefile
samples_df["start"] = pd.to_datetime(samples_df["start"], dayfirst=True).dt.strftime("%Y-%m-%d")
samples_df["end"] = pd.to_datetime(samples_df["end"], dayfirst=True).dt.strftime("%Y-%m-%d")

In [9]:
# Create a dictionary to clean any remaining mismatched data
crop_dictionary = {
    "bananas": "banana",
    "groundnuts": "groundnut",
    "macadamia nuts": "macadamia",
    "macadamia nut": "macadamia",
    "ochra vegetables": "ochra",
    "okra": "ochra",
    "soyabean": "soyabean",
    "sweet potatoes": "sweet potato",
    "water melon": "watermelon",
}

In [12]:
# use with original ODK toolkit output
# cleaned_geom_column = None
# use with modified ODK toolkit output. The column listed below must contain data in lat,lon format (i.e. -14.4,28.0)
cleaned_geom_column = "Cleaned_Coordinates"

In [13]:
if cleaned_geom_column is None:
    # Return cleaned samples from ECAAS ODK format, containing mix of points and polygons
    cleaned_samples_df = create_geometries_from_ODK(samples_df)

    # Create two geodataframes, one with point geometry and one with polygon geometry
    points_gdf = cleaned_samples_df.drop(["field_boundary_polygon"], axis="columns").copy()
    polygons_gdf = cleaned_samples_df.set_geometry("field_boundary_polygon", drop=True).copy()
    polygons_gdf = polygons_gdf.loc[
        ~cleaned_samples_df["field_boundary_polygon"].isna(), :
    ]

else:
    # Retun geodataframe containing point geometry, extracted from cleaned_geom_column
    points_gdf = create_geometries_from_manual_edit(samples_df, cleaned_geom_column)
    polygons_gdf = None

In [14]:
# Create dictionary with columns of interest and corresponding 10-character names
col_rename_dict = {
    "start": "start",
    "end": "end",
    "field_fallow": "fallow",
    "primary_crop_type": "pri_type",
    "primary_crop": "pri_crop",
    "crop_development": "crop_dev",
    "multiple_crops": "multi_crop",
    "multiple_crops_percentage": "multi_per",
    "secondary_crop": "sec_crop",
    "geometry": "geometry",
}
    
# Export polygons
if polygons_gdf is not None:

    polygons_gdf[col_rename_dict.keys()].to_file("data/cleaned_polygons.geojson")
    polygons_gdf[col_rename_dict.keys()].rename(columns=col_rename_dict).to_file(
        "data/cleaned_polygons.shp"
    )

# Add additional column to specify the point location
col_rename_dict["point_location"] = "point_loc"

# Export points
points_gdf[col_rename_dict.keys()].to_file("data/cleaned_points.geojson")
points_gdf[col_rename_dict.keys()].rename(columns=col_rename_dict).to_file(
    "data/cleaned_points.shp"
)

In [26]:
# Point to cleaned data from previous step
path = "./data/cleaned_points.geojson"
# Load input data
input_data = gpd.read_file(path)

In [27]:
# Convert date fields to datetimes
input_data["start"] = pd.to_datetime(input_data["start"], yearfirst=True)
input_data["end"] = pd.to_datetime(input_data["end"], yearfirst=True)

In [28]:
# Identify rows with multiple crops or fallow fields
multiple_crop_condition = input_data.loc[:, "multiple_crops"] == "yes"
fallow_field_condition = input_data.loc[:, "field_fallow"] == "yes"

# Split datasets
single_crops = input_data.loc[
    (multiple_crop_condition == False) & (fallow_field_condition == False), :
].copy()

multiple_crops = input_data.loc[
    (multiple_crop_condition == True) & (fallow_field_condition == False), :
].copy()

In [29]:
single_crops_subset = single_crops[single_crops.groupby('primary_crop').primary_crop.transform('count')>=10].reset_index(drop=True).copy()

In [32]:
# Select field to label
field = "primary_crop"

# Fit label encoder to match classes to numeric labels
le = LabelEncoder()
le.fit(single_crops_subset[field])

# Get a list of the crop types
classes = list(le.classes_)

# Assign numeric label for each class
single_crops_subset["label"] = le.transform(single_crops_subset[field])

# Create a dictionary mapping classes to numeric labels
class_dictionary = {crop_class: int(le.transform([crop_class])[0]) for crop_class in classes}
print("Class Dictionary:")
print(class_dictionary)

# Export class dictionary
with open("data/class_labels.json", 'w', encoding='utf-8') as f:
    json.dump(class_dictionary, f, ensure_ascii=False, indent=4)

Class Dictionary:
{'beans': 0, 'cassava': 1, 'cotton': 2, 'groundnut': 3, 'maize': 4, 'millet': 5, 'sorghum': 6, 'soybean': 7, 'sunflower': 8, 'sweet potato': 9}


In [33]:
# Set a flag to convert to polygons:
use_polygons = True

if use_polygons:
    # Convert from lat,lon to EPSG:6933 (projection in metres)
    single_crops_subset = single_crops_subset.to_crs("EPSG:6933")

    # Buffer geometry to get a square - only if trying to sample multiple pixels
    buffer_radius_m = 15
    single_crops_subset.geometry = single_crops_subset.geometry.buffer(buffer_radius_m, cap_style=3)

In [36]:
single_crops_subset.start.min(), single_crops_subset.end.max()

(Timestamp('2022-04-11 00:00:00'), Timestamp('2022-04-19 00:00:00'))

In [43]:
query_start_date = pd.Timestamp(
    year=start_date.year, month=start_date.month, day=1
)  - pd.DateOffset(months=9)
query_start_date

Timestamp('2021-07-01 00:00:00')

In [45]:
query_end_date = pd.Timestamp(
    year=start_date.year, month=start_date.month, day=1
) - pd.DateOffset(minutes=1)
query_end_date

Timestamp('2022-03-31 23:59:00')

In [37]:
start_date = single_crops_subset.start.min()
end_date = single_crops_subset.end.max()

query_start_date = pd.Timestamp(
    year=start_date.year, month=start_date.month, day=1
) - pd.DateOffset(months=9)
query_end_date = pd.Timestamp(
    year=start_date.year, month=start_date.month, day=1
) - pd.DateOffset(minutes=1)
print(f"Query start: {query_start_date}")
print(f"Query end: {query_end_date}")

Query start: 2021-07-01 00:00:00
Query end: 2022-03-31 23:59:00


In [None]:
# Write a general query
time = (query_start_date, query_end_date)
resolution = (-10, 10)
output_crs = "EPSG:6933"

query = {
    "time": time,
    "resolution": resolution,
    "output_crs": output_crs,
}

# Export query to pickle file for future re-use
with open('results/query.pickle', 'wb') as f:
    pickle.dump(query, f)

In [None]:
def apply_function_over_custom_times(ds, func, func_name, time_ranges):
    """Apply generic function over an xarray dataset"""

    output_list = []

    for timelabel, timeslice in time_ranges.items():

        if isinstance(timeslice, slice):
            ds_timeslice = ds.sel(time=timeslice)
        else:
            ds_timeslice = ds.sel(time=timeslice, method="nearest")

        ds_modified = func(ds_timeslice)

        rename_dict = {
            key: f"{key}_{func_name}_{timelabel}" for key in list(ds_modified.keys())
        }

        ds_modified = ds_modified.rename(name_dict=rename_dict)

        if "time" in list(ds_modified.coords):
            ds_modified = ds_modified.reset_coords().drop_vars(["time", "spatial_ref"])

        output_list.append(ds_modified)

    return output_list


# Define functions to load features
def feature_layers(query):
    """Compute feature layers according to datacube query"""
    
#     query = static_query.copy() # include to make sure original query isn't modified

    # Connnect to datacube
    dc = datacube.Datacube(app="crop_type_ml")

#     # Check query for required time ranges and remove them
#     if all(
#         [
#             key in query.keys()
#             for key in [
#                 "time_ranges",
#                 "annual_geomedian_times",
#                 "semiannual_geomedian_times",
#             ]
#         ]
#     ):
#         pass
#     else:
#         print(
#             "Query missing at least one of time_ranges, annual_geomedian_times, or semiannual_geomedian_times"
#         )
#         sys.exit(1)

#     # ----------------- STORE TIME RANGES FOR CUSTOM QUERIES -----------------
#     # This removes these items from the query so it can be used for loads
#     time_ranges = query.pop("time_ranges")
#     annual_geomedian_times = query.pop("annual_geomedian_times")
#     semiannual_geomedian_times = query.pop("semiannual_geomedian_times")

    # ----------------- HARDCODE TIME RANGES FOR CUSTOM QUERIES -----------------
    # This means the function can be used without modifying the datacube query
    
    time_ranges = {
        "Q3_2021": slice("2021-08-01", "2021-10-31"),
        "Q4_2021": slice("2021-11-01", "2022-01-31"),
        "Q1_2022": slice("2022-02-01", "2022-04-30"),
    }
    
    # !!! FOR ZAMBIA, S1 DATA IS MISSING FOR HALF THE COUNTRY IN 2022 !!!
    s1_time_ranges = {
        "Q3_2021": slice("2021-08-01", "2022-10-31"),
        "Q4_2021": slice("2021-11-01", "2022-01-31"),
    }
    
    annual_geomedian_times = {
        "annual_2021": "2021-01-01",
    }
    semiannual_geomedian_times = {
        "semiannual_2021_01": "2021-01-01",
        "semiannual_2021_06": "2021-06-01",
    }

    # ----------------- DEFINE MEASUREMENTS TO USE FOR EACH PRODUCT -----------------

    s2_measurements = [
        "blue",
        "green",
        "red",
        "nir",
        "swir_1",
        "swir_2",
        "red_edge_1",
        "red_edge_2",
        "red_edge_3",
    ]

    s2_geomad_measurements = s2_measurements + ["smad", "emad", "bcmad"]

    s1_measurements = ["vv", "vh"]

    fc_measurements = ["bs", "pv", "npv", "ue"]

    rainfall_measurements = ["rainfall"]

    slope_measurements = ["slope"]

    # ----------------- S2 CUSTOM GEOMEDIANS -----------------
    # These are designed to take the geomedian for every range in time_ranges
    # This is controlled through the input query

    ds = load_ard(
        dc=dc,
        products=["s2_l2a"],
        measurements=s2_measurements,
        group_by="solar_day",
        verbose=False,
        **query,
    )

    # Apply geomedian over time ranges and calculate band indices
    s2_geomad_list = apply_function_over_custom_times(
        ds, geomedian_with_indices_wrapper, "s2", time_ranges
    )

    # ----------------- S2 ANNUAL GEOMEDIAN -----------------

    # Update query to use annual_geomedian_times
    ds_annual_geomad_query = query.copy()
    query_times = list(annual_geomedian_times.values())
    ds_annual_geomad_query.update({"time": (query_times[0], query_times[-1])})

    # load s2 annual geomedian
    ds_s2_geomad = dc.load(
        product="gm_s2_annual",
        measurements=s2_geomad_measurements,
        **ds_annual_geomad_query,
    )

    # Calculate band indices
    s2_annual_list = apply_function_over_custom_times(
        ds_s2_geomad, indices_wrapper, "s2", annual_geomedian_times
    )

    # ----------------- S2 SEMIANNUAL GEOMEDIAN -----------------

    # Update query to use semiannual_geomedian_times
    ds_semiannual_geomad_query = query.copy()
    query_times = list(semiannual_geomedian_times.values())
    ds_semiannual_geomad_query.update({"time": (query_times[0], query_times[-1])})

    # load s2 semiannual geomedian
    ds_s2_semiannual_geomad = dc.load(
        product="gm_s2_semiannual",
        measurements=s2_geomad_measurements,
        **ds_semiannual_geomad_query,
    )

    # Calculate band indices
    s2_semiannual_list = apply_function_over_custom_times(
        ds_s2_semiannual_geomad, indices_wrapper, "s2", semiannual_geomedian_times
    )

    # ----------------- S1 CUSTOM GEOMEDIANS -----------------

    # Update query to suit Sentinel 1
    s1_query = query.copy()
    s1_query.update({"sat_orbit_state": "ascending"})

    # Load s1
    s1_ds = load_ard(
        dc=dc,
        products=["s1_rtc"],
        measurements=s1_measurements,
        group_by="solar_day",
        verbose=False,
        **s1_query,
    )

    # Apply geomedian
    s1_geomad_list = apply_function_over_custom_times(
        s1_ds, xr_geomedian, "s1_xrgm", s1_time_ranges
    )

    # -------- LANDSAT BIMONTHLY FRACTIONAL COVER -----------

    # Update query to suit fractional cover
    fc_query = query.copy()
    fc_query.update({"resampling": "bilinear", "measurements": fc_measurements})

    # load fractional cover
    ds_fc = dc.load(product="fc_ls", collection_category="T1", **fc_query)

    # Apply median
    fc_median_list = apply_function_over_custom_times(
        ds_fc, median_wrapper, "median", time_ranges
    )

    # -------- CHIRPS MONTHLY RAINFALL -----------

    # Update query to suit CHIRPS rainfall
    rainfall_query = query.copy()
    rainfall_query.update(
        {"resampling": "bilinear", "measurements": rainfall_measurements}
    )

    # Load rainfall and update no data values
    ds_rainfall = dc.load(product="rainfall_chirps_monthly", **rainfall_query)

    rainfall_nodata = -9999.0
    ds_rainfall = ds_rainfall.where(
        ds_rainfall.rainfall != rainfall_nodata, other=np.nan
    )

    # Apply mean
    rainfall_mean_list = apply_function_over_custom_times(
        ds_rainfall, mean_wrapper, "mean", time_ranges
    )

    # -------- DEM SLOPE -----------
    slope_query = query.copy()
    slope_query.update(
        {
            "resampling": "bilinear",
            "measurements": slope_measurements,
            "time": "2000-01-01",
        }
    )

    # Load slope data and update no data values and coordinates
    ds_slope = dc.load(product="dem_srtm_deriv", **slope_query)

    slope_nodata = -9999.0
    ds_slope = ds_slope.where(ds_slope != slope_nodata, np.nan)

    ds_slope = ds_slope.squeeze("time")#.reset_coords("time", drop=True)

    # ----------------- FINAL MERGED XARRAY -----------------

    # Create a list to keep all items for final merge
    ds_list = []
    ds_list.extend(s2_geomad_list)
    ds_list.extend(s2_annual_list)
    ds_list.extend(s2_semiannual_list)
    ds_list.extend(s1_geomad_list)
    ds_list.extend(fc_median_list)
    ds_list.extend(rainfall_mean_list)
    ds_list.append(ds_slope)

    ds_final = xr.merge(ds_list)

    return ds_final