In [5]:
# Importing packages and libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from pmdarima import auto_arima
from arch import arch_model
from numpy import median

# reading datasets
# 1. White maize
area_white_maize = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Area\Area Grown Maize.xlsx",
    sheet_name="Area WM (ha)",
)
production_white_maize = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Production\Production Maize.xlsx",
    sheet_name="Production WM (t)",
)
area_white_maize = area_white_maize.drop(area_white_maize.columns[[1, 2]], axis=1)
area_white_maize.rename(columns={"Production Region": "Season"}, inplace=True)
area_white_maize.set_index("Season", inplace=True)
production_white_maize.rename(columns={"Production Region": "Season"}, inplace=True)
production_white_maize.set_index("Season", inplace=True)
# 2. Yellow maize
area_yellow_maize = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Area\Area Grown Maize.xlsx",
    sheet_name="Area YM (ha)",
)
production_yellow_maize = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Production\Production Maize.xlsx",
    sheet_name="Production YM (t)",
)
area_yellow_maize = area_yellow_maize.drop(area_yellow_maize.columns[[1, 2]], axis=1)
area_yellow_maize.rename(columns={"Production Region": "Season"}, inplace=True)
area_yellow_maize.set_index("Season", inplace=True)
production_yellow_maize.rename(columns={"Production Region": "Season"}, inplace=True)
production_yellow_maize.set_index("Season", inplace=True)
# 3. Soyabean
area_soyabean = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Area\Area Grown Soyabean.xlsx"
)
production_soyabean = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Production\Production Soyabean.xlsx"
)
area_soyabean = area_soyabean.drop(area_soyabean.columns[[1, 2]], axis=1)
area_soyabean.rename(columns={"Production Region": "Season"}, inplace=True)
area_soyabean.set_index("Season", inplace=True)
production_soyabean.rename(columns={"Production Region": "Season"}, inplace=True)
production_soyabean.set_index("Season", inplace=True)
# 4. Sunflower
area_sunflower = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Area\Area Grown Sunflower.xlsx"
)
production_sunflower = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Production\Production Sunflower.xlsx"
)
area_sunflower = area_sunflower.drop(area_sunflower.columns[[1, 2]], axis=1)
area_sunflower.rename(columns={"Production Region": "Season"}, inplace=True)
area_sunflower.set_index("Season", inplace=True)
production_sunflower.rename(columns={"Production Region": "Season"}, inplace=True)
production_sunflower.set_index("Season", inplace=True)
# 5. Wheat
area_wheat = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Area\Area Grown Wheat.xlsx"
)
production_wheat = pd.read_excel(
    r"C:\Users\seana\OneDrive - Agnify (Pty) Ltd\Ag.io\Crop Data\Production\Production Wheat.xlsx"
)
area_wheat.rename(columns={"Production Region": "Season"}, inplace=True)
area_wheat.set_index("Season", inplace=True)
production_wheat.rename(columns={"Production Region": "Season"}, inplace=True)
production_wheat.set_index("Season", inplace=True)

# production dictionary
production_data = {
    "white maize": pd.DataFrame(production_white_maize),
    "yellow maize": pd.DataFrame(production_yellow_maize),
    "soyabean": pd.DataFrame(production_soyabean),
    "sunflower": pd.DataFrame(production_sunflower),
    "wheat": pd.DataFrame(production_wheat),
}

# area dictionary
area_data = {
    "white maize": pd.DataFrame(area_white_maize),
    "yellow maize": pd.DataFrame(area_yellow_maize),
    "soyabean": pd.DataFrame(area_soyabean),
    "sunflower": pd.DataFrame(area_sunflower),
    "wheat": pd.DataFrame(area_wheat),
}

# Dry Land Yield Estimate directory
dry_yield_estimates = {
    "white maize": [2.5, 3, 3.5, 4, 4.5, 5, 5.5, 6, 6.5, 7],
    "yellow maize": [2.5, 3, 3.5, 4, 4.5, 5, 5.5, 6, 6.5, 7],
    "soyabean": [0.75, 1, 1.25, 1.5, 1.75, 2, 2.25, 2.5],
    "sunflower": [0.75, 1, 1.25, 1.5, 1.75, 2, 2.25, 2.5],
    "wheat": [1, 1.5, 2, 2.5, 3, 3.5, 4],
}

# yield (t/ha) calculation with function for error handling


def yield_cal(prod, area):
    """Calculate yield (t/ha) with error handling.

    Arguments:
        prod (float): Total production.
        area (float): Area in hectares.

    Returns:
        float: Yield in tonnes per hectare, or 0 if production or area is 0.
    """
    if prod == 0 or area == 0:
        return 0
    else:
        return prod / area


# function to collect data based on province, crop, and number of years (max 34)
def data_collection(prov, crop, years):
    """Collects and calculates yield data for a given province, crop, and number of years.

    Arguments:
        prov (str): Province name.
        crop (str): Crop type.
        years (int): Number of years (maximum 34).

    Returns:
        pd.DataFrame: DataFrame containing yield information.

    Raises:
        ValueError: If `years` exceeds available data.
    """
    # production and area data selection based on crop
    area = area_data[crop]
    prod = production_data[crop]
    # yield calculation
    # Apply the function element-wise to the sunflower DataFrame
    yield_df = prod.applymap(lambda x: float(x)) / area.applymap(lambda x: float(x))
    yield_df = yield_df.applymap(lambda x: 0 if pd.isna(x) or x == float("inf") else x)
    # reset production region as column again
    yield_df.reset_index(inplace=True)

    # Calculate the average yield per season across all regions
    average_yield_per_season = yield_df.mean(axis=0)

    # Convert the result to a DataFrame
    average_yield_per_season_df = average_yield_per_season.reset_index()
    average_yield_per_season_df.columns = ["Season", "SA Average Yield"]

    # Calculate total production and total area per season
    total_production_per_season = prod.sum(axis=0)
    total_area_per_season = area.sum(axis=0)

    # Calculate the weighted average yield per season
    weighted_average_yield_per_season = (
        total_production_per_season / total_area_per_season
    )

    # Convert the result to a DataFrame
    weighted_average_yield_per_season_df = (
        weighted_average_yield_per_season.reset_index()
    )
    weighted_average_yield_per_season_df.columns = [
        "Season",
        "SA Weighted Average Yield",
    ]
    # splitting data for required seasons
    all_seasons = weighted_average_yield_per_season_df["Season"][:years]
    if years <= len(all_seasons):
        seasons = all_seasons[-years:]
    else:
        raise ValueError("The number of years exceeds the available seasons.")

    # Merge the average yield and weighted average yield DataFrames
    combined_yield_df = pd.merge(
        average_yield_per_season_df, weighted_average_yield_per_season_df, on="Season"
    )

    # Select the relevant seasons
    combined_yield_df = combined_yield_df[combined_yield_df["Season"].isin(seasons)]

    # Select the yields for the given province
    reg_yields = yield_df[yield_df["Season"] == prov][seasons]
    lr_data = reg_yields.transpose()
    lr_data.reset_index(inplace=True)
    lr_data.columns = ["Season", prov]

    # Merge the province yields with the combined yield DataFrame
    yield_df = pd.merge(combined_yield_df, lr_data, on="Season")
    return yield_df


def reg_data(data):
    """Collects data to perform regression analysis

    Arguments:
       data (df): Data frame c

    Returns:
        pd.DataFrame: DataFrame containing regression results.
    """
    if isinstance(data, list):
        data = pd.DataFrame(data, columns=["Values"])

    # Remove rows with 0 values
    data = data[data["Values"] != 0]

    # Initialize the result dictionary
    result = {"Slope": [], "Intercept": [], "Predicted Value": []}

    def perform_regression(data, period):
        X = np.arange(period).reshape(-1, 1)
        model = LinearRegression()
        model.fit(X, data)
        slope = model.coef_[0]
        intercept = model.intercept_
        predicted_value = model.predict([[period]])[0]
        return slope, intercept, predicted_value

    # Assuming 'Values' as the only column to process
    for column in data.columns:
        column_data = data[column].values
        years = len(column_data)
        slope, intercept, predicted_value = perform_regression(column_data, years)
        result["Slope"].append(slope)
        result["Intercept"].append(intercept)
        result["Predicted Value"].append(predicted_value)

    result_df = pd.DataFrame(result)
    return result_df


# Define function to perform regression analysis on data collected
def yield_regression_analysis(prov, crop, years):
    """Performs linear regression on yield data for a given province, crop, and number of years.

    Arguments:
        prov (str): Province name.
        crop (str): Crop type.
        years (int): Number of years (maximum 34).

    Returns:
        pd.DataFrame: DataFrame containing regression results for each data level.
    """
    yield_data = data_collection(prov, crop, years)
    seasons = yield_data["Season"]

    # Initialize the result dictionary
    result = {"Data Level": [], "Slope": [], "Intercept": [], "Predicted Value": []}

    def perform_regression(data, period):
        """Helper function to perform linear regression."""
        X = np.arange(period).reshape(-1, 1)
        model = LinearRegression()
        model.fit(X, data)
        slope = model.coef_[0]
        intercept = model.intercept_
        predicted_value = model.predict([[period]])[0]
        return slope, intercept, predicted_value

    columns = yield_data.columns.drop("Season")

    for column in columns:
        data = yield_data[column].values
        years = len(data)
        slope, intercept, predicted_value = perform_regression(data, years)
        result["Data Level"].append(column)
        result["Slope"].append(slope)
        result["Intercept"].append(intercept)
        result["Predicted Value"].append(predicted_value)

    result_df = pd.DataFrame(result)
    return result_df


def arima_fitting(prov, crop, years, training_size):
    """Fits an ARIMA model on yield data and forecasts future values.

    Arguments:
        prov (str): Province name.
        crop (str): Crop type.
        years (int): Number of years (maximum 34).
        training_size (float): Fraction of data used for training (0 < training_size < 1), using 0.85.

    Returns:
        ARIMA model summary.
    """
    collection = data_collection(prov, crop, years)
    collection["Season"] = collection.index
    train_size = int(len(collection) * training_size)
    train, test = collection.iloc[:train_size], collection.iloc[train_size:]
    model = auto_arima(
        train[prov],
        seasonal=False,
        trace=False,
        error_action="ignore",
        suppress_warnings=True,
        stepwise=True,
    )
    # Forecasting the test data range
    n_periods = len(test)
    forecast, conf_int = model.predict(n_periods=n_periods, return_conf_int=True)

    # Convert forecasts to a DataFrame for better visualization and understanding
    forecast_index = (
        test.index
    )  # Assuming you want to forecast for the same indices as the test set
    forecast_df = pd.DataFrame(
        {"Forecast": forecast, "Lower CI": conf_int[:, 0], "Upper CI": conf_int[:, 1]},
        index=forecast_index,
    )

    print(forecast_df)
    plt.figure(figsize=(10, 6))
    plt.plot(train.index, train[prov], label="Training Data")
    plt.plot(test.index, test[prov], label="Actual Data", color="green")
    plt.plot(
        forecast_df.index, forecast_df["Forecast"], label="Forecasted Data", color="red"
    )
    plt.fill_between(
        forecast_df.index,
        forecast_df["Lower CI"],
        forecast_df["Upper CI"],
        color="orange",
        alpha=0.3,
    )
    plt.title("Actual vs Forecasted Yields")
    plt.legend()
    plt.show()
    return model.summary()


## Function that calculates the industry AYE
def yield_consistancy(prov, crop, hist_yield, initial_est, years, training_size):
    """Calculates the industry based yield estimate and produce a yield consistancy score.

    Args:
        prov (str): Province name.
        crop (str): Crop type.
        hist yield (1xn df): historical yields.
        initial_est (float): initial yield estimate for the land.
        years (int): Number of years (maximum 34).
        training_size (float): Fraction of data used for training (0 < training_size < 1), using 0.85.

    Output:
        Resulting Yield Consistancy score.
    """
    # Collect the data
    data = data_collection(prov, crop, years)
    # Define regression analysis to store slope and predicted value over all years
    reg = yield_regression_analysis(prov, crop, years)
    reg_slope = reg[reg["Data Level"] == prov]["Slope"].iloc[0]
    reg_pred = reg[reg["Data Level"] == prov]["Predicted Value"].iloc[0]
    # Define regression analysis to store slope and predicted value over last 5 years
    reg_short = yield_regression_analysis(prov, crop, 5)
    reg_slope_short = reg_short[reg_short["Data Level"] == prov]["Slope"].iloc[0]
    reg_pred_short = reg_short[reg_short["Data Level"] == prov]["Predicted Value"].iloc[
        0
    ]
    # Historic yield data
    if not hist_yield:
        yc_3 = 0
        hist_yield = (
            "No farm level data warrents industry data considered. Hence, industry"
        )
        hist_slope = reg_slope_short
        hist_pred = reg_pred_short
    else:
        reg_data_hist = reg_data(hist_yield)
        hist_slope = round(reg_data_hist["Slope"].iloc[0], 2)
        hist_pred = round(reg_data_hist["Predicted Value"].iloc[0], 2)

    # ARIMA-GARCH fitting
    def arima_forecast(prov, crop, years, training_size, forecast_len):
        # Data collection based on inputs
        collection = data_collection(prov, crop, years)
        # Resetting index
        collection["Season"] = collection.index
        # Defining training and test data
        train_size = int(len(collection) * training_size)
        train, test = collection.iloc[:train_size], collection.iloc[train_size:]

        # Fitting the ARIMA model to the data
        model = auto_arima(
            train[prov],
            seasonal=False,
            trace=False,
            error_action="ignore",
            suppress_warnings=True,
            stepwise=True,
        )

        # Get residuals from the ARIMA model
        residuals = model.resid()

        # Fit GARCH model to the residuals
        garch_model = arch_model(residuals, vol="Garch", p=1, q=1)
        garch_fit = garch_model.fit(disp="off")

        # Fit EGARCH model
        egarch_model = arch_model(residuals, vol="EGARCH", p=1, q=1, dist="Normal")
        egarch_fit = egarch_model.fit(disp="off", update_freq=5)

        # Fit TGARCH model
        tgarch_model = arch_model(
            residuals, vol="Garch", p=1, q=1, dist="Normal", power=1.0, o=1
        )
        tgarch_fit = tgarch_model.fit(disp="off", update_freq=5)

        # Comparing models based on AIC, BIC, and log likelihood
        models = {"GARCH": garch_fit, "EGARCH": egarch_fit, "TGARCH": tgarch_fit}

        best_model = min(
            models.values(), key=lambda fit: (fit.aic, fit.bic, -fit.loglikelihood)
        )

        ##Forecasting ARIMA
        arima_forecast = model.predict(n_periods=forecast_len, return_conf_int=False)
        # Extract all fitted parameters from the EGARCH model
        params = best_model.params

        # Initialize variables to store forecasts
        simulated_residuals = np.zeros(forecast_len)
        volatility_forecast = np.zeros(forecast_len)

        # Check if 'resid' and 'conditional_volatility' are non-empty
        if not best_model.resid.empty and not best_model.conditional_volatility.empty:
            # Get the last fitted residual and variance (volatility) safely using iloc
            last_resid = best_model.resid.iloc[-1]
            last_variance = best_model.conditional_volatility.iloc[-1] ** 2

            # Simulate the next 'forecast_horizon' steps
            for t in range(forecast_len):
                # Calculate the next volatility using the EGARCH model formula
                variance_t = np.exp(
                    params["omega"]
                    + params["alpha[1]"]
                    * (np.abs(last_resid) / np.sqrt(last_variance) - np.sqrt(2 / np.pi))
                    + params["beta[1]"] * np.log(last_variance)
                )
                volatility_forecast[t] = np.sqrt(variance_t)

                # Simulate the next residual using the forecasted volatility
                simulated_residuals[t] = np.random.normal(0, volatility_forecast[t])

                # Update last_resid and last_variance for the next iteration
                last_resid = simulated_residuals[t]
                last_variance = variance_t

            # Step 3: Combine ARIMA Forecasts and Simulated EGARCH Residuals
            final_forecast = arima_forecast + simulated_residuals

            # Return the ARIMA forecast and the combined ARIMA + EGARCH forecast
            return arima_forecast, final_forecast

        else:
            print(
                f"GARCH fit residuals or conditional volatility series are empty. Check the model fitting process."
            )

    arm_gar, forcast_gar = arima_forecast(
        prov, crop, years, training_size, forecast_len=7
    )
    arima_forecast = arm_gar.iloc[-1]
    arima_gar_forecast = forcast_gar.iloc[-1]
    # Defining range for forecast
    dry_land_min = min(dry_yield_estimates[crop])
    dry_land_max = max(dry_yield_estimates[crop])
    min_yield = min(
        arima_forecast, arima_gar_forecast, reg_pred, reg_pred_short, dry_land_min
    )
    max_yield = max(
        arima_forecast, arima_gar_forecast, reg_pred, reg_pred_short, dry_land_max
    )
    med_yield = median([arima_forecast, arima_gar_forecast, reg_pred, reg_pred_short])
    # Limit 10% of industry standard either side
    yield_lim = 0.1
    min_lim = min_yield / (1 + yield_lim)
    max_lim = max_yield * (1 + yield_lim)
    # Print result if required, can comment out if needed
    print(
        f"Since you are based within the provincial boundaries of {prov} with {crop} planted, Farmers Friend generated an industry-based peer benchmark of {round(med_yield,2)} t/ha."
    )
    # Logic for acceotable YC calc:
    if initial_est < min_lim or initial_est > max_lim:
        print(
            f"Initial Yield Estimate exceeds acceptable bounds of dryland PCS for {crop} in {prov}. Re-evaluate your estimate."
        )
    else:
        # Acceptable IYE
        # Logic to assign points for yc_1 - yield estimates
        if initial_est <= med_yield:
            if initial_est > min_yield:
                yc_1 = round(25 * (initial_est / med_yield), 0)
            elif initial_est > min_lim:
                yc_1 = round(20 * (initial_est / min_yield), 0)
            else:
                yc_1 = 0
        else:
            if initial_est <= max_yield:
                m = -5 / (max_yield - med_yield)
                c = 20 - m * max_yield
                yc_1 = round(m * initial_est + c, 0)
            elif initial_est <= max_lim and initial_est > max_yield:
                m = -20 / (max_lim - max_yield)
                c = 20 - m * max_yield
                yc_1 = round(m * initial_est + c, 0)
            else:
                yc_1 = 0

        # Logic for trend analysis points yc_2, confirm with business

        if reg_slope > -0.25:
            if reg_slope_short < -0.25:
                yc_2 = 1
            elif reg_slope_short > 0:
                yc_2 = 5
            else:
                yc_2 = 3
        else:
            if reg_slope_short < -0.25:
                yc_2 = 0
            elif reg_slope_short > 0.25:
                yc_2 = 4
            else:
                yc_2 = 2

        # Logic for farmer historic yield, confirm with business
        if not hist_yield:
            yc_3 = 0
        else:
            if hist_slope < -0.25:
                yc_3 = 0
            elif hist_slope < 0.25:
                yc_3 = 2
            else:
                yc_3 = 4

        # Final Yield Consistancy score
        yc = yc_1 + yc_2 + yc_3

        print(f"For a farmer based in {prov} planning to grow {crop}:")
        print(f"A Yield Consistancy (YC) of {yc} out of a possible 35 points.")
        print(
            f"For an initial yield estimate of {initial_est} t/ha compared to area min {round(min_yield,2)} t/ha, max {round(max_yield,2)} t/ha. AYE = {round(med_yield,2)} t/ha with limits of {round(min_lim,2)} t/ha and {round(max_lim,2)} t/ha."
        )
        print(f"Industry benchmarks define a {yc_1} from a possible 25 points.")
        print(
            f"5 Year Historical Industry trend analysis sugests a predicted yield of {round(reg_pred_short,2)} t/ha and a slope of {round(reg_slope_short,2)} t/ha yields {yc_2} from a possible 5 points."
        )
        print(
            f"Historical farm data trend analysis: {hist_yield} sugests a predicted yield of {round(hist_pred,2)} t/ha and a slope of {round(hist_slope,2)} t/ha yields {yc_3} from a possible 5 points."
        )

In [10]:
yield_consistancy("Free State", "wheat", [5.1, 4.8, 5.5, 5, 6], 3.8, 34, 0.85)

Since you are based within the provincial boundaries of Free State with wheat planted, Farmers Friend generated an industry-based peer benchmark of 4.07 t/ha.
For a farmer based in Free State planning to grow wheat:
A Yield Consistancy (YC) of 30.0 out of a possible 35 points.
For an initial yield estimate of 3.8 t/ha compared to area min 1 t/ha, max 4.17 t/ha. AYE = 4.07 t/ha with limits of 0.91 t/ha and 4.59 t/ha.
Industry benchmarks define a 23.0 from a possible 25 points.
5 Year Historical Industry trend analysis sugests a predicted yield of 1.22 t/ha and a slope of 0.07 t/ha yields 5 from a possible 5 points.
Historical farm data trend analysis: [5.1, 4.8, 5.5, 5, 6] sugests a predicted yield of 5.88 t/ha and a slope of 0.2 t/ha yields 2 from a possible 5 points.
