# Imports

In [0]:
from pyspark.sql.types import DateType, FloatType, IntegerType
from pyspark.sql.functions import desc, col
from dateutil.relativedelta import relativedelta
import datetime
import pandas as pd
import numpy as np



# Input Parameters

In [0]:
# basin_of_interest = "GULF COAST EAST"
# flowunit_of_interest = "HAYNESVILLE"
# desired_minimum_date_for_producing_wells = "2022-04-01"
# desired_maximum_date_for_producing_wells = "2034-04-01"
# wip_well_date = "2023-10-01"
# buffer_days_for_rig_movement = 10
# cutoff_first_prod_date_for_wip_wells = "2023-10-01"
# cutoff_first_prod_date_for_wip_wells = datetime.datetime.strptime(cutoff_first_prod_date_for_wip_wells, "%Y-%m-%d").date()
# desired_first_prod_date_for_wip_wells ="2023-10-01"
# desired_first_prod_date_for_wip_wells = datetime.datetime.strptime(desired_first_prod_date_for_wip_wells, "%Y-%m-%d").date()
# cutoff_date_for_new_wells = "2023-10-01"
# current_date = '2024-04-29'
# desired_minimum_firstproddate_for_completed_drilled_wells = datetime.datetime.strptime("2020-01-01", "%Y-%m-%d").date()
# scenario_id = "1"

In [0]:
basin_of_interest = dbutils.widgets.get("basin_of_interest")
flowunit_of_interest = dbutils.widgets.get("flowunit_of_interest")
desired_minimum_date_for_producing_wells = dbutils.widgets.get(
    "desired_minimum_date_for_producing_wells"
)

desired_maximum_date_for_producing_wells = dbutils.widgets.get(
    "desired_maximum_date_for_producing_wells"
)

wip_well_date = dbutils.widgets.get("desired_wip_wells_date")

buffer_days_for_rig_movement = int(dbutils.widgets.get("buffer_days_for_rig_movement"))

cutoff_first_prod_date_for_wip_wells = dbutils.widgets.get(
    "cutoff_first_prod_date_for_wip_wells"
)

cutoff_first_prod_date_for_wip_wells = datetime.datetime.strptime(
    cutoff_first_prod_date_for_wip_wells, "%Y-%m-%d"
).date()

desired_first_prod_date_for_wip_wells = dbutils.widgets.get(
    "desired_first_prod_date_for_wip_wells"
)

desired_first_prod_date_for_wip_wells = datetime.datetime.strptime(
    desired_first_prod_date_for_wip_wells, "%Y-%m-%d"
).date()

cutoff_date_for_new_wells = dbutils.widgets.get("cutoff_date_for_new_wells")
current_date = dbutils.widgets.get("current_date")

desired_minimum_firstproddate_for_completed_drilled_wells = dbutils.widgets.get(
    "desired_minimum_firstproddate_for_completed_drilled_wells"
)

desired_minimum_firstproddate_for_completed_drilled_wells = datetime.datetime.strptime(
    desired_minimum_firstproddate_for_completed_drilled_wells, "%Y-%m-%d"
).date()
scenario_id = dbutils.widgets.get("scenario_id")

# TypeCurve dataframe

In [0]:
typecurve_df_actual = spark.sql(
    f"""
  SELECT
  *
EXCEPT(LateralLength_FT, fu_median_ll, tca_median_ll),
  COALESCE(LateralLength_FT, tca_median_ll, fu_median_ll) as LateralLength_FT
FROM(
    SELECT
      API10,
      API14,
      typeCurveArea,
      FlowUnit_Analog,
      ReservoirGoldConsolidated,
      basinQuantum,
      LateralLength_FT,
      spudDate,
      FirstProdDate,
      OperatorGold,
      WellStatus,
      HoleDirection,
      PERCENTILE_CONT(0.5) WITHIN GROUP (
        ORDER BY
          LateralLength_FT
      ) OVER () AS fu_median_ll,
      PERCENTILE_CONT(0.5) WITHIN GROUP (
        ORDER BY
          LateralLength_FT
      ) OVER(PARTITION BY typeCurveArea) AS tca_median_ll
    FROM
      produced.analog_well_selection
    WHERE
      recentWell = "true"
      AND flowUnit_Analog = '{flowunit_of_interest}'
  )
"""
)
typecurve_df_actual.createOrReplaceTempView("analog_wells")
typecurve_df = typecurve_df_actual.toPandas()

# WIP wells

In [0]:
class WIPDownloader:
    def __init__(
        self,
        rig_data_table: str,
        basin_of_interest: str,
        analog_wells_view,
        flowunit_of_interest: str,
    ):
        """
        Initializes the RigDownloader object with necessary parameters.

        Parameters:
        - rig_data_table (str): Table name for rig data.
        - rig_data_columns (list): Columns to select for rig data.
        - basin_of_interest (str): Basin of interest.
        """
        self.rig_data_table1 = rig_data_table
        self.basin_of_interest = basin_of_interest
        self.analog_wells_view = analog_wells_view
        self.flowunit_of_interest = flowunit_of_interest

    def download_wip_data(self, spark, typecurve_df, cutoff_date) -> pd.DataFrame:
        """
        Downloads rig data from PySpark and returns it as a Pandas DataFrame.

        Parameters:
        - spark: PySpark SparkSession.

        Returns:
        - pd.DataFrame: Rig data as Pandas DataFrame.
        """

        rig_data = spark.sql(
            f"""
                  SELECT rigs.date, 
                    rigs.API10, 
                    rigs.operator as OperatorGold, 
                    rigs.BasinQuantum, 
                    ana.LateralLength_FT, 
                    ana.spudDate as wip_spud_date,
                    ana.typeCurveArea,
                    ana.FlowUnit_Analog,
                    ana.ReservoirGoldConsolidated
                  FROM {self.rig_data_table1} rigs
                  INNER JOIN {self.analog_wells_view} ana
                  on ana.API10 = rigs.API10
                  WHERE
                  rigs.BasinQuantum = '{self.basin_of_interest}'
                  AND ana.flowUnit_Analog = '{self.flowunit_of_interest}'
                  AND rigs.date = '{cutoff_date}'
                  """
        ).toPandas()

        rig_data = rig_data.groupby("API10", as_index=False).first()
        return rig_data

In [0]:
rigtable = "produced.private_rigs_history"
well_completion_table = "analog_wells"
wipdownload = WIPDownloader(
    rigtable, basin_of_interest, well_completion_table, flowunit_of_interest
)
wip_well_data = wipdownload.download_wip_data(spark, typecurve_df, wip_well_date)

# Adding Drilling Wells

In [0]:
drilling_wells = typecurve_df[typecurve_df.WellStatus == "DRILLING"]
drilling_wells = drilling_wells[
    ~drilling_wells.API10.isin(wip_well_data.API10.unique())
]

drilling_wells["date"] = None
drilling_wells["BasinQuantum"] = basin_of_interest
drilling_wells.rename({"spudDate": "wip_spud_date"}, inplace=True, axis=1)
drilling_wells = drilling_wells[wip_well_data.columns]

wip_well_data = pd.concat([wip_well_data, drilling_wells])

In [0]:
opr_tca_df = spark.sql(
    f"SELECT * FROM produced.operator_cycle_times where scenario_id = '{scenario_id}'  "
).toPandas()
final_df = spark.sql(
    f"select * from produced.api_level_cycle_times where scenario_id = '{scenario_id}' "
).toPandas()

In [0]:
# merge cycle time information on operator typecurve level to wip data
wip_well_data = pd.merge(
    wip_well_data, opr_tca_df, on=["OperatorGold", "typeCurveArea"], how="left"
)

In [0]:
wip_well_data["time_taken_spud_to_rigrelease"] = wip_well_data[
    "time_taken_spud_to_rigrelease"
].fillna(final_df["time_taken_spud_to_rigrelease"].median())

wip_well_data["time_taken_spud_to_completion"] = wip_well_data[
    "time_taken_spud_to_completion"
].fillna(final_df["time_taken_spud_to_completion"].median())

wip_well_data["time_taken_completion_to_firstprod"] = wip_well_data[
    "time_taken_completion_to_firstprod"
].fillna(final_df["time_taken_completion_to_firstprod"].median())

# Will Try to use Pandas for below code

In [0]:
def drilling_and_scheduler_wip(
    wip_df,
    buffer_days_for_rig_movement,
    cutoff_first_prod_date_for_wip_wells,
    desired_first_prod_date_for_wip_wells,
):
    """
    Performs scheduling for WIP wells, calculating rig release, completion, and first production dates.

    Parameters:
    - wip_df (pd.DataFrame): DataFrame containing WIP well data.
    - buffer_days (int): Buffer days to be added.
    - user_input_for_present_date (format: YYYY-MM-DD)): makeing First prod date to present date
    Returns:
    - pd.DataFrame: DataFrame with updated scheduling information.
    """

    for i in range(len(wip_df)):
        if wip_df["wip_spud_date"].iloc[i] is not None:

            wip_df.at[i, "wip_rig_release_date"] = wip_df.at[
                i, "wip_spud_date"
            ] + pd.to_timedelta(wip_df.at[i, "time_taken_spud_to_rigrelease"])

            wip_df.at[i, "wip_completion_date"] = wip_df.at[
                i, "wip_spud_date"
            ] + pd.to_timedelta(wip_df.at[i, "time_taken_spud_to_completion"])

            first_prod_date = wip_df.at[i, "wip_spud_date"] + pd.to_timedelta(
                wip_df.at[i, "time_taken_spud_to_completion"]
            )

            # if first prod date comes in the past, we will make it today
            if first_prod_date < cutoff_first_prod_date_for_wip_wells:
                first_prod_date = desired_first_prod_date_for_wip_wells

            wip_df.loc[i, "wip_firstprod_date"] = first_prod_date

    return wip_df

In [0]:
wip_well_data["wip_rig_release_date"] = None
wip_well_data["wip_completion_date"] = None
wip_well_data["wip_firstprod_date"] = None

wip_well_data = drilling_and_scheduler_wip(
    wip_well_data,
    buffer_days_for_rig_movement,
    cutoff_first_prod_date_for_wip_wells,
    desired_first_prod_date_for_wip_wells,
)

# removing all wip wells for which we don't have type curve information
wip_well_data = wip_well_data.dropna(subset=["typeCurveArea"], axis=0)

# converting first prod dates to first day of the month
wip_well_data["wip_firstprod_date_1"] = (
    pd.to_datetime(wip_well_data["wip_firstprod_date"])
    .dt.to_period("M")
    .dt.to_timestamp()
)

# Type Curve Production estimation

In [0]:
class TCAProductionDownloader:
    """
    Class for downloading TCA production data using PySpark.
    """

    def __init__(
        self, tca_production_table: str, basin_of_interest: str, tca_production_columns
    ):
        """
        Initialize the TCAProductionDownloader.

        Parameters:
        - tca_production_table (str): Name of the TCA production table.
        - basin_of_interest (str): Basin of interest for filtering data.
        """
        self.tca_production_table_name = tca_production_table
        self.basin_of_interest = basin_of_interest
        self.tca_production_columns = tca_production_columns

    def get_tca_production_data(self, spark) -> pd.DataFrame:
        """
        Get TCA production data from the specified table.

        Parameters:
        - spark: PySpark session.

        Returns:
        - pd.DataFrame: Produced DataFrame.
        """
        typecurve_qcast_forecast = (
            spark.table(self.tca_production_table_name)
            .select(*self.tca_production_columns)
            .filter(col("BasinQuantum") == self.basin_of_interest)
            .toPandas()
        )
        return typecurve_qcast_forecast

    def download_final_data(self, spark):
        """
        Download final TCA production data.

        Parameters:
        - spark: PySpark session.

        Returns:
        - pd.DataFrame: Produced DataFrame.
        """
        final_df = self.get_tca_production_data(spark)

        # converting gas unit from MCF to BCF
        # converting water and oil unit from BBL to MBO
        final_df["gas_combined_monthly"] = final_df["gas_combined_monthly"] / 10**6
        final_df["oil_combined_monthly"] = final_df["oil_combined_monthly"] / 10**3
        final_df["water_combined_monthly"] = (
            final_df["water_combined_monthly"] / 10**3
        )

        final_df.rename(
            {
                "gas_combined_monthly": "Gas_BCF",
                "oil_combined_monthly": "Oil_MBO",
                "water_combined_monthly": "Water_MBO",
                "index": "producing_month",
            },
            axis=1,
            inplace=True,
        )
        return final_df

In [0]:
tcatable = "produced.typecurve_qcast_forecast"
tca_production_columns = (
    "api_list",
    "BasinQuantum",
    "typeCurveArea",
    "date",
    "index",
    "oil_combined_monthly",
    "gas_combined_monthly",
    "water_combined_monthly",
    "normalization_value",
)
tcadownload = TCAProductionDownloader(
    tcatable, basin_of_interest, tca_production_columns
)
tca_produnction_df = tcadownload.download_final_data(spark)


The conversion of DecimalType columns is inefficient and may take a long time. Column names: [oil_combined_monthly, gas_combined_monthly, water_combined_monthly, normalization_value] If those columns are not necessary, you may consider dropping them or converting to primitive types before the conversion.



# Inventories Production

In [0]:
inventory_rigs_df = spark.sql(
    f"select * from produced.inventory_drilling_scheduling_table where Status = 'Drilled' and scenario_id = '{scenario_id}' "
).toPandas()

# converting inventory_firstprod_date to first day of month
inventory_rigs_df["inventory_firstprod_date"] = (
    pd.to_datetime(inventory_rigs_df["inventory_firstprod_date"])
    .dt.to_period("M")
    .dt.to_timestamp()
)

# merging inventory data with tca prod data
inventory_production = pd.merge(
    inventory_rigs_df,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on="typeCurveArea",
    how="left",
)

inventory_production = inventory_production[
    (inventory_production.producing_month.notnull())
    & (inventory_production.inventory_firstprod_date.notnull())
]

# calculate production date based on inventory first production date and producing month
inventory_production["production_date"] = (
    (inventory_production["inventory_firstprod_date"].dt.to_period("M"))
    + inventory_production["producing_month"]
    - 1
).dt.to_timestamp()

inventory_production.drop_duplicates(inplace=True)

# WIP well production

In [0]:
# merging wip data and tca production data to have
wip_well_production = pd.merge(
    wip_well_data,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on="typeCurveArea",
    how="left",
    validate="many_to_many",
)

wip_well_production = wip_well_production[wip_well_production.producing_month.notnull()]

# getting production month for next 50 years based on the first prod date for each wip well
wip_well_production["production_date"] = (
    (pd.to_datetime(wip_well_production["wip_firstprod_date"]).dt.to_period("M"))
    + wip_well_production["producing_month"].astype(int)
    - 1
).dt.to_timestamp()

# Producing Wells 1

In [0]:
class ProducingWellDownloader:
    """
    Class for downloading producing well data using PySpark.
    """

    def __init__(
        self,
        qcast_forecast_table: str,
        qcast_forecast_table_columns,
        basin_of_interest: str,
    ):
        """
        Initialize the ProducingWellDownloader.

        Parameters:
        - qcast_forecast_table (str): Name of the QCAST forecast table.
        - tca_data_table (str): Name of the TCS data table.
        - prod_com_table (str): Name of the production completion table.
        - basin_of_interest (str): Basin of interest for filtering data.
        """
        self.qcast_forecast_table = qcast_forecast_table
        self.qcast_forecast_table_columns = qcast_forecast_table_columns
        self.basin_of_interest = basin_of_interest

    def download_producing_data(self, spark, start_date, end_date) -> pd.DataFrame:
        """
        Download producing data from QCAST forecast table.

        Parameters:
        - spark: PySpark session.

        Returns:
        - pd.DataFrame: Produced DataFrame.
        """
        qcast_forecast = (
            spark.table(self.qcast_forecast_table)
            .select(*self.qcast_forecast_table_columns)
            .filter(
                (col("BasinQuantum") == self.basin_of_interest)
                & (
                    col("date").cast(DateType())
                    < datetime.datetime.strptime(end_date, "%Y-%m-%d")
                )
                & (
                    col("date").cast(DateType())
                    > datetime.datetime.strptime(start_date, "%Y-%m-%d")
                )
            )
            .toPandas()
        )
        return qcast_forecast

    def download_final_data(
        self, spark, typecurve_df, cutoff_min_date, cutoff_max_date, merge_how="left"
    ):
        """
        Retrieve the ultimate dataset by combining information from production, TCS, and production completion data. In this process, we extract operator and lateral length values for production from TCS data and subsequently populate any missing values from SPR data.

        Parameters:
        - spark: PySpark session.
        - typecurve_df: typecurve infomration for api10.
        - merge_how (str): Type of merge to be performed (default: 'left').

        Returns:
        - pd.DataFrame: Produced DataFrame.
        """

        df = self.download_producing_data(spark, cutoff_min_date, cutoff_max_date)
        df.rename({"api": "API10"}, inplace=True, axis=1)

        df = pd.merge(
            df,
            typecurve_df[
                [
                    "API10",
                    "typeCurveArea",
                    "FlowUnit_Analog",
                    "LateralLength_FT",
                    "ReservoirGoldConsolidated",
                    "OperatorGold",
                ]
            ],
            on="API10",
            how="inner",
        )

        df["gas_combined_monthly"] = df["gas_combined_monthly"] / 10**6
        df["oil_combined_monthly"] = df["oil_combined_monthly"] / 10**3
        df["water_combined_monthly"] = df["water_combined_monthly"] / 10**3

        df.rename(
            {
                "gas_combined_monthly": "Gas_BCF",
                "oil_combined_monthly": "Oil_MBO",
                "water_combined_monthly": "Water_MBO",
                "index": "producing_month",
            },
            axis=1,
            inplace=True,
        )

        return df

In [0]:
pdptable = "produced.autodca_qcast_forecast"
pdptable_col = (
    "api",
    "date",
    "index",
    "oil_combined_monthly",
    "gas_combined_monthly",
    "water_combined_monthly",
)
pdpdownload = ProducingWellDownloader(pdptable, pdptable_col, basin_of_interest)
pdp_df = pdpdownload.download_final_data(
    spark,
    typecurve_df,
    desired_minimum_date_for_producing_wells,
    desired_maximum_date_for_producing_wells,
)


The conversion of DecimalType columns is inefficient and may take a long time. Column names: [oil_combined_monthly, gas_combined_monthly, water_combined_monthly] If those columns are not necessary, you may consider dropping them or converting to primitive types before the conversion.



In [0]:
# removing future api
current_date = datetime.datetime.strptime(current_date, "%Y-%m-%d").date()

api_to_remove = pdp_df[(pdp_df.producing_month == 1) & (pdp_df.date >= current_date)][
    "API10"
].unique()
pdp_df = pdp_df[~pdp_df.API10.isin(api_to_remove)]

# Producing Wells 2

In [0]:
class ProducingWellDownloader2:
    """
    Class for downloading producing well data using PySpark.
    """

    def __init__(
        self,
        merge_production_table: str,
        merge_production_table_columns,
        pdp_df1_api_list,
        analog_well_table,
        basin_of_interest: str,
    ):
        """
        Initialize the ProducingWellDownloader.

        Parameters:
        - qcast_forecast_table (str): Name of the QCAST forecast table.
        - tca_data_table (str): Name of the TCS data table.
        - prod_com_table (str): Name of the production completion table.
        - basin_of_interest (str): Basin of interest for filtering data.
        """
        self.merge_production_table = merge_production_table
        self.merge_production_table_columns = merge_production_table_columns
        self.pdp_df1_api_list = pdp_df1_api_list
        self.basin_of_interest = basin_of_interest
        self.analog_well_table = analog_well_table

    def download_producing_data(self, spark, cutoff_date, current_date) -> pd.DataFrame:
        """
        Download producing data from QCAST forecast table.

        Parameters:
        - spark: PySpark session.

        Returns:
        - pd.DataFrame: Produced DataFrame.
        """

        query = f"""
                SELECT prod.API10, 
                    prod.Date, 
                    prod.Water_BBL / 1000 AS Water_MBO, 
                    prod.Oil_BBL / 1000 AS Oil_MBO, 
                    prod.Gas_MCF / 1000000 AS Gas_BCF, 
                    prod.NormalizedMonth as producing_month,
                    ana.typeCurveArea,
                    ana.FlowUnit_Analog,
                    ana.LateralLength_FT,
                    ana.ReservoirGoldConsolidated,
                    ana.OperatorGold
                FROM {self.merge_production_table} prod
                INNER JOIN {self.analog_well_table} ana
                ON ana.api10 = prod.api10
                AND ana.wellStatus = "PRODUCING"
                WHERE date > '{cutoff_date}' 
                    AND ana.API10 NOT IN {self.pdp_df1_api_list}
            """

        pdp_df2 = spark.sql(query).toPandas()
        return pdp_df2

In [0]:
columns = ("API10", "Date", "Water_BBL", "Oil_BBL", "Gas_MCFD", "NormalizedMonth")
merge_prod_table = "produced.merge_production"
analog_well_table = "analog_wells"
api_list = tuple(pdp_df.API10.unique().tolist())
downloader = ProducingWellDownloader2(
    merge_prod_table, columns, api_list, analog_well_table, basin_of_interest
)
pdp_df2 = downloader.download_producing_data(
    spark, desired_minimum_date_for_producing_wells, current_date
)


The conversion of DecimalType columns is inefficient and may take a long time. Column names: [Water_MBO, Oil_MBO, Gas_BCF] If those columns are not necessary, you may consider dropping them or converting to primitive types before the conversion.



In [0]:
temp1 = pdp_df2.loc[pdp_df2.groupby("API10")["producing_month"].idxmax()]
temp1 = pd.merge(
    temp1,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on=("typeCurveArea", "producing_month"),
    how="left",
    suffixes=("_actual", "_tca_table"),
    validate="many_to_one",
)

In [0]:
temp1[
    [
        "Gas_BCF_tca_table",
        "Gas_BCF_actual",
        "Water_MBO_actual",
        "Water_MBO_tca_table",
        "Oil_MBO_actual",
        "Oil_MBO_tca_table",
    ]
] = temp1[
    [
        "Gas_BCF_tca_table",
        "Gas_BCF_actual",
        "Water_MBO_actual",
        "Water_MBO_tca_table",
        "Oil_MBO_actual",
        "Oil_MBO_tca_table",
    ]
].astype(
    float
)

In [0]:
# getting Error factors for gas, water and oil
temp1["Gas_error_factor"] = np.where(
    temp1["Gas_BCF_tca_table"] != 0,
    temp1["Gas_BCF_actual"] / temp1["Gas_BCF_tca_table"],
    None,
)
temp1["Water_error_factor"] = np.where(
    temp1["Water_MBO_tca_table"] != 0,
    temp1["Water_MBO_actual"] / temp1["Water_MBO_tca_table"],
    None,
)
temp1["Oil_error_factor"] = np.where(
    temp1["Oil_MBO_tca_table"] != 0,
    temp1["Oil_MBO_actual"] / temp1["Oil_MBO_tca_table"],
    None,
)


# getting first prod date for each api
temp1["Date"] = pd.to_datetime(temp1["Date"])
temp1["first_prod_date"] = temp1.apply(
    lambda row: row["Date"] - pd.DateOffset(months=row["producing_month"] - 1), axis=1
)

In [0]:
# Filter out APIs with first_prod_date greater than or equal to current_date
current_date = pd.to_datetime(current_date)
# Ensure first_prod_date is in datetime format
temp1["first_prod_date"] = pd.to_datetime(temp1["first_prod_date"])

api_to_remove = temp1[temp1.first_prod_date >= current_date]["API10"].unique()
temp1 = temp1[~temp1.API10.isin(api_to_remove)]

In [0]:
temp1 = temp1.drop(
    columns=[
        "Water_MBO_actual",
        "Oil_MBO_actual",
        "Gas_BCF_actual",
        "producing_month",
        "Oil_MBO_tca_table",
        "Gas_BCF_tca_table",
        "Water_MBO_tca_table",
    ]
)

# getting production values from typecurve production table
temp2 = pd.merge(
    temp1,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on="typeCurveArea",
    how="left",
)

In [0]:
# correcting values of gas, oil and water based on error factor

temp2[["Oil_MBO", "Gas_BCF", "Water_MBO"]] = temp2[
    ["Oil_MBO", "Gas_BCF", "Water_MBO"]
].astype(float)


# Multiply 'Oil_MBO' with 'Oil_error_factor' only when 'Oil_error_factor' is not None
temp2["Oil_MBO"] = temp2["Oil_MBO"] * temp2["Oil_error_factor"].where(
    temp2["Oil_error_factor"].notna(), temp2["Oil_MBO"]
)

# Multiply 'Water_MBO' with 'Water_error_factor' only when 'Water_error_factor' is not None
temp2["Water_MBO"] = temp2["Water_MBO"] * temp2["Water_error_factor"].where(
    temp2["Water_error_factor"].notna(), temp2["Water_MBO"]
)

# Multiply 'Gas_BCF' with 'Gas_error_factor' only when 'Gas_error_factor' is not None
temp2["Gas_BCF"] = temp2["Gas_BCF"] * temp2["Gas_error_factor"].where(
    temp2["Gas_error_factor"].notna(), temp2["Gas_BCF"]
)

# Removing rows which has producing_month as null
temp2 = temp2[temp2.producing_month.notnull()]

# getting production date for each month from first prod date and producing month
temp2["producing_month"] = temp2["producing_month"].astype(int)
temp2["production_date"] = (
    (temp2["first_prod_date"].dt.to_period("M")) + temp2["producing_month"] - 1
).dt.to_timestamp()

In [0]:
pdp_df2.rename({"Date": "production_date"}, inplace=True, axis=1)

temp3 = pd.merge(
    temp2,
    pdp_df2[["API10", "production_date", "Water_MBO", "Oil_MBO", "Gas_BCF"]],
    on=("API10", "production_date"),
    how="left",
    validate="one_to_one",
    suffixes=("_predicted", ""),
)

temp3["Water_MBO"] = temp3["Water_MBO"].fillna(temp3["Water_MBO_predicted"])
temp3["Oil_MBO"] = temp3["Oil_MBO"].fillna(temp3["Oil_MBO_predicted"])
temp3["Gas_BCF"] = temp3["Gas_BCF"].fillna(temp3["Gas_BCF_predicted"])
temp3.rename({"production_date": "date"}, inplace=True, axis=1)
temp3 = temp3[pdp_df.columns]
temp3 = temp3[temp3.date >= desired_minimum_date_for_producing_wells]

# removing future api
temp3["date"] = temp3["date"].dt.date
api_to_remove = temp3[(temp3.producing_month == 1) & (temp3.date >= current_date)][
    "API10"
].unique()
temp3 = temp3[~temp3.API10.isin(api_to_remove)]

pdp_df = pd.concat([pdp_df, temp3])


Comparison of Timestamp with datetime.date is deprecated in order to match the standard library behavior. In a future version these will be considered non-comparable. Use 'ts == pd.Timestamp(date)' or 'ts.date() == date' instead.



In [0]:
pdp_df = pdp_df[~pdp_df.API10.isin(wip_well_data.API10.unique())]

# New Wells 

In [0]:
class NewWellDownloader:
    """
    Class for downloading well data and TCA data using PySpark.
    """

    def __init__(
        self,
        analog_well_table: str,
        merge_production: str,
        basin_of_interest: str,
        flowunit_of_interest: str,
        producing_wells,
    ):
        """
        Initialize the NewWellDownloader.

        Parameters:
        - well_data_table (str): Name of the well data table.
        - tca_data_table (str): Name of the TCA data table.
        - basin_of_interest (str): Basin of interest for filtering data.
        """
        self.analog_well_table = analog_well_table
        self.merge_production = merge_production
        self.basin_of_interest = basin_of_interest
        self.flowunit_of_interest = flowunit_of_interest
        self.producing_wells = producing_wells

    def download_well_data(
        self, spark, current_date, cutoff_date_for_new_wells
    ) -> pd.DataFrame:
        """
        Download well data based on specified criteria.

        Parameters:
        - spark: PySpark session.

        Returns:
        - pd.DataFrame: Produced DataFrame.
        """
        well_data_df = spark.sql(
            f"""
            SELECT
                ana.API10,
                ana.HoleDirection,
                ana.LateralLength_FT,
                ana.BasinQuantum,
                ana.OperatorGold,
                cast(TRUNC(ana.FirstProdDate, 'month') as DATE) as FirstProdDate,
                ana.typeCurveArea,
                ana.FlowUnit_Analog,
                ana.ReservoirGoldConsolidated,
                prod.Gas_MCF/1000000 AS Gas_BCF,
                prod.Oil_BBL AS Oil_MBO,
                prod.Water_BBL AS Water_MBO,
                prod.NormalizedMonth AS producing_month,
                prod.Date,
                DATE_FORMAT(prod.Date, 'yyyy-MM') as newwell_production_month_year
            FROM
            {self.analog_well_table} ana
            LEFT JOIN {self.merge_production} prod 
                ON ana.api10 = prod.api10
            WHERE ana.WellStatus in ("PRODUCING")
            AND ana.api10 not in {self.producing_wells}
                AND ana.HoleDirection = "H"
                AND ana.FirstProdDate <= '{current_date}'
                AND ana.FirstProdDate >= '{cutoff_date_for_new_wells}'
            """
        )
        return well_data_df.toPandas()

In [0]:
new_wells_table = "analog_wells"
supporting_table = "produced.merge_production"
producing_wells_api = tuple(pdp_df.API10.unique())
newwelldownload = NewWellDownloader(
    new_wells_table,
    supporting_table,
    basin_of_interest,
    flowunit_of_interest,
    producing_wells_api,
)
new_wells_production = newwelldownload.download_well_data(
    spark, current_date, cutoff_date_for_new_wells
)


The conversion of DecimalType columns is inefficient and may take a long time. Column names: [Gas_BCF, Oil_MBO, Water_MBO] If those columns are not necessary, you may consider dropping them or converting to primitive types before the conversion.



In [0]:
# filtering api for which we have production data
new_wells_production1 = new_wells_production[
    new_wells_production.producing_month.notnull()
]

In [0]:
# filtering api for which we do not have production data
new_wells_production2 = new_wells_production[
    new_wells_production.producing_month.isnull()
][
    [
        "API10",
        "LateralLength_FT",
        "BasinQuantum",
        "OperatorGold",
        "FirstProdDate",
        "typeCurveArea",
        "FlowUnit_Analog",
        "ReservoirGoldConsolidated",
    ]
]


# merging inventory data with tca prod data
new_wells_production2 = pd.merge(
    new_wells_production2,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on="typeCurveArea",
    how="left",
)

new_wells_production2 = new_wells_production2[
    (new_wells_production2.producing_month.notnull())
    & (new_wells_production2.FirstProdDate.notnull())
]

# converting inventory_firstprod_date to first day of month
new_wells_production2["FirstProdDate"] = (
    pd.to_datetime(new_wells_production2["FirstProdDate"])
    .dt.to_period("M")
    .dt.to_timestamp()
)

new_wells_production2["producing_month"] = new_wells_production2[
    "producing_month"
].astype(int)
# calculate production date based on inventory first production date and producing month

new_wells_production2["production_date"] = (
    (new_wells_production2["FirstProdDate"].dt.to_period("M"))
    + new_wells_production2["producing_month"]
    - 1
).dt.to_timestamp()

new_wells_production2.drop_duplicates(inplace=True)

In [0]:
temp1 = new_wells_production1.loc[
    new_wells_production1.groupby("API10")["producing_month"].idxmax()
]
temp1 = pd.merge(
    temp1,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on=("typeCurveArea", "producing_month"),
    how="left",
    suffixes=("_actual", "_tca_table"),
    validate="many_to_one",
)

In [0]:
temp1[
    [
        "Gas_BCF_tca_table",
        "Gas_BCF_actual",
        "Water_MBO_actual",
        "Water_MBO_tca_table",
        "Oil_MBO_actual",
        "Oil_MBO_tca_table",
    ]
] = temp1[
    [
        "Gas_BCF_tca_table",
        "Gas_BCF_actual",
        "Water_MBO_actual",
        "Water_MBO_tca_table",
        "Oil_MBO_actual",
        "Oil_MBO_tca_table",
    ]
].astype(
    float
)

In [0]:
# getting Error factors for gas, water and oil
temp1["Gas_error_factor"] = np.where(
    temp1["Gas_BCF_tca_table"] != 0,
    temp1["Gas_BCF_actual"] / temp1["Gas_BCF_tca_table"],
    None,
)
temp1["Water_error_factor"] = np.where(
    temp1["Water_MBO_tca_table"] != 0,
    temp1["Water_MBO_actual"] / temp1["Water_MBO_tca_table"],
    None,
)
temp1["Oil_error_factor"] = np.where(
    temp1["Oil_MBO_tca_table"] != 0,
    temp1["Oil_MBO_actual"] / temp1["Oil_MBO_tca_table"],
    None,
)


# getting first prod date for each api
temp1["Date"] = pd.to_datetime(temp1["Date"])
temp1["first_prod_date"] = temp1.apply(
    lambda row: row["Date"] - pd.DateOffset(months=row["producing_month"] - 1), axis=1
)

In [0]:
# Filter out APIs with first_prod_date greater than or equal to current_date
current_date = pd.to_datetime(current_date)
# Ensure first_prod_date is in datetime format
temp1["first_prod_date"] = pd.to_datetime(temp1["first_prod_date"])

api_to_remove = temp1[temp1.first_prod_date >= current_date]["API10"].unique()
temp1 = temp1[~temp1.API10.isin(api_to_remove)]

In [0]:
temp1 = temp1.drop(
    columns=[
        "Water_MBO_actual",
        "Oil_MBO_actual",
        "Gas_BCF_actual",
        "producing_month",
        "Oil_MBO_tca_table",
        "Gas_BCF_tca_table",
        "Water_MBO_tca_table",
    ]
)

# getting production values from typecurve production table
temp2 = pd.merge(
    temp1,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on="typeCurveArea",
    how="left",
)

In [0]:
# correcting values of gas, oil and water based on error factor

temp2[["Oil_MBO", "Gas_BCF", "Water_MBO"]] = temp2[
    ["Oil_MBO", "Gas_BCF", "Water_MBO"]
].astype(float)


# Multiply 'Oil_MBO' with 'Oil_error_factor' only when 'Oil_error_factor' is not None
temp2["Oil_MBO"] = temp2["Oil_MBO"] * temp2["Oil_error_factor"].where(
    temp2["Oil_error_factor"].notna(), temp2["Oil_MBO"]
)

# Multiply 'Water_MBO' with 'Water_error_factor' only when 'Water_error_factor' is not None
temp2["Water_MBO"] = temp2["Water_MBO"] * temp2["Water_error_factor"].where(
    temp2["Water_error_factor"].notna(), temp2["Water_MBO"]
)

# Multiply 'Gas_BCF' with 'Gas_error_factor' only when 'Gas_error_factor' is not None
temp2["Gas_BCF"] = temp2["Gas_BCF"] * temp2["Gas_error_factor"].where(
    temp2["Gas_error_factor"].notna(), temp2["Gas_BCF"]
)

# Removing rows which has producing_month as null
temp2 = temp2[temp2.producing_month.notnull()]

# getting production date for each month from first prod date and producing month
temp2["producing_month"] = temp2["producing_month"].astype(int)
temp2["production_date"] = (
    (temp2["first_prod_date"].dt.to_period("M")) + temp2["producing_month"] - 1
).dt.to_timestamp()

In [0]:
new_wells_production1.rename({"Date": "production_date"}, inplace=True, axis=1)

temp3 = pd.merge(
    temp2,
    new_wells_production1[
        ["API10", "production_date", "Water_MBO", "Oil_MBO", "Gas_BCF"]
    ],
    on=("API10", "production_date"),
    how="left",
    validate="one_to_one",
    suffixes=("_predicted", ""),
)

temp3["Water_MBO"] = temp3["Water_MBO"].fillna(temp3["Water_MBO_predicted"])
temp3["Oil_MBO"] = temp3["Oil_MBO"].fillna(temp3["Oil_MBO_predicted"])
temp3["Gas_BCF"] = temp3["Gas_BCF"].fillna(temp3["Gas_BCF_predicted"])
temp3.rename({"production_date": "date"}, inplace=True, axis=1)
temp3 = temp3[pdp_df.columns]
temp3 = temp3[temp3.date >= desired_minimum_date_for_producing_wells]

# removing future api
temp3["date"] = temp3["date"].dt.date
api_to_remove = temp3[(temp3.producing_month == 1) & (temp3.date >= current_date)][
    "API10"
].unique()
temp3 = temp3[~temp3.API10.isin(api_to_remove)]



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


Comparison of Timestamp with datetime.date is deprecated in order to match the standard library behavior. In a future version these will be considered non-comparable. Use 'ts == pd.Timestamp(date)' or 'ts.date() == date' instead.



In [0]:
# concatinating both new wells dataframe

new_wells_production = pd.concat([new_wells_production2, temp3])

# Completed and Drilled Wells

In [0]:
Completed_drilled_wells = typecurve_df[
    (typecurve_df.WellStatus.isin(["COMPLETED", "DRILLED"]))
    & (~typecurve_df.API10.isin(pdp_df.API10.unique()))
    & (~typecurve_df.API10.isin(new_wells_production.API10.unique()))
    & (~typecurve_df.API10.isin(wip_well_data.API10.unique()))
    & (typecurve_df.spudDate.notnull())
]

In [0]:
# Converting columns to appropriate datatypes
Completed_drilled_wells["spudDate"] = pd.to_datetime(
    Completed_drilled_wells["spudDate"]
)



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



In [0]:
desired_minimum_firstproddate_for_completed_drilled_wells = pd.to_datetime(
    desired_minimum_firstproddate_for_completed_drilled_wells
)

# Filter the DataFrame
Completed_drilled_wells = Completed_drilled_wells[
    Completed_drilled_wells.spudDate
    >= desired_minimum_firstproddate_for_completed_drilled_wells
]

In [0]:
null_indices = Completed_drilled_wells.index[
    Completed_drilled_wells["FirstProdDate"].isnull()
]

for i, idx in enumerate(null_indices):
    # Calculate the month offset based on the pattern

    month_offset = i // 2
    new_date = current_date + relativedelta(months=month_offset)
    Completed_drilled_wells.at[idx, "FirstProdDate"] = new_date

In [0]:
# merging completed drilling wells data and tca production data to have
Completed_drilled_wells = pd.merge(
    Completed_drilled_wells,
    tca_produnction_df[
        ["typeCurveArea", "producing_month", "Oil_MBO", "Gas_BCF", "Water_MBO"]
    ],
    on="typeCurveArea",
    how="left",
    validate="many_to_many",
)

Completed_drilled_wells = Completed_drilled_wells[
    Completed_drilled_wells.producing_month.notnull()
]

# getting production month for next 50 years based on the first prod date for each wip well
Completed_drilled_wells["production_date"] = (
    (pd.to_datetime(Completed_drilled_wells["FirstProdDate"]).dt.to_period("M"))
    + Completed_drilled_wells["producing_month"].astype(int)
    - 1
).dt.to_timestamp()


Comparison of Timestamp with datetime.date is deprecated in order to match the standard library behavior. In a future version these will be considered non-comparable. Use 'ts == pd.Timestamp(date)' or 'ts.date() == date' instead.


Comparison of Timestamp with datetime.date is deprecated in order to match the standard library behavior. In a future version these will be considered non-comparable. Use 'ts == pd.Timestamp(date)' or 'ts.date() == date' instead.



# Total Production

In [0]:
def total_production_estimation(
    inventory, producing, wip, new, Completed_drilled_wells
):
    """
    Combine data from different well statuses into a single DataFrame.

    Parameters:
    - inventory (pd.DataFrame): DataFrame for inventory wells.
    - producing (pd.DataFrame): DataFrame for producing wells.
    - wip (pd.DataFrame): DataFrame for wells in progress.
    - new (pd.DataFrame): DataFrame for new wells.

    Returns:
    - pd.DataFrame: Combined DataFrame.
    """
    # Rename and select columns for inventory wells
    inventory.rename(
        {"lateralLength_ft": "LateralLength_FT", "flowUnit": "FlowUnit_Analog"},
        axis=1,
        inplace=True,
    )
    inventory = inventory[
        [
            "entityID",
            "LateralLength_FT",
            "typeCurveArea",
            "FlowUnit_Analog",
            "OperatorGold",
            "ReservoirGoldConsolidated",
            "production_date",
            "producing_month",
            "Oil_MBO",
            "Gas_BCF",
            "Water_MBO",
        ]
    ]
    inventory["forecastType"] = "inventory_wells"

    # Rename and select columns for producing wells
    producing.rename(
        {"API10": "entityID", "date": "production_date"}, axis=1, inplace=True
    )
    producing = producing[
        [
            "entityID",
            "LateralLength_FT",
            "typeCurveArea",
            "FlowUnit_Analog",
            "OperatorGold",
            "ReservoirGoldConsolidated",
            "production_date",
            "producing_month",
            "Oil_MBO",
            "Gas_BCF",
            "Water_MBO",
        ]
    ]
    producing["forecastType"] = "producing_wells"

    # Rename and select columns for wells in progress
    wip.rename(
        {"API10": "entityID", "lateralLength_Ft": "LateralLength_FT"},
        axis=1,
        inplace=True,
    )
    wip = wip[
        [
            "entityID",
            "LateralLength_FT",
            "typeCurveArea",
            "FlowUnit_Analog",
            "OperatorGold",
            "ReservoirGoldConsolidated",
            "production_date",
            "producing_month",
            "Oil_MBO",
            "Gas_BCF",
            "Water_MBO",
        ]
    ]
    wip["forecastType"] = "wip_wells"

    # Rename and select columns for new wells
    new.rename({"API10": "entityID"}, axis=1, inplace=True)
    new = new[
        [
            "entityID",
            "LateralLength_FT",
            "typeCurveArea",
            "FlowUnit_Analog",
            "OperatorGold",
            "ReservoirGoldConsolidated",
            "production_date",
            "producing_month",
            "Oil_MBO",
            "Gas_BCF",
            "Water_MBO",
        ]
    ]
    new["forecastType"] = "new_wells"

    Completed_drilled_wells.rename({"API10": "entityID"}, axis=1, inplace=True)
    Completed_drilled_wells = Completed_drilled_wells[
        [
            "entityID",
            "LateralLength_FT",
            "typeCurveArea",
            "FlowUnit_Analog",
            "OperatorGold",
            "ReservoirGoldConsolidated",
            "production_date",
            "producing_month",
            "Oil_MBO",
            "Gas_BCF",
            "Water_MBO",
        ]
    ]
    Completed_drilled_wells["forecastType"] = "drilled_completed"

    # Concatenate all DataFrames into a single DataFrame
    final_df = pd.concat(
        [inventory, wip, new, producing, Completed_drilled_wells], ignore_index=True
    )

    return final_df

In [0]:
final_df = total_production_estimation(
    inventory_production,
    pdp_df,
    wip_well_production,
    new_wells_production,
    Completed_drilled_wells,
)

final_df = final_df[
    final_df.production_date <= pd.Timestamp(desired_maximum_date_for_producing_wells)
]



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/

# Normalization of oil and gas values based on Lateral length

In [0]:
def denormalize_vectorized(
    eur_normalized, lateral_length_ft, qcast_nom_lateral_ft, error_factor
):
    """
    Vectorized denormalization for predictions based on lateral length, QCAST nominal lateral length, and error factor.

    Parameters:
    - eur_normalized (pd.Series): Normalized prediction values.
    - lateral_length_ft (pd.Series): Actual lateral lengths.
    - qcast_nom_lateral_ft (float): QCAST nominal lateral length.
    - error_factor (float): Error factor for denormalization.

    Returns:
    - pd.Series: Denormalized predictions.
    """
    return (
        eur_normalized
        * lateral_length_ft
        / (
            lateral_length_ft
            + (qcast_nom_lateral_ft - lateral_length_ft) * error_factor
        )
    )


# Assume 'final_df' is your DataFrame and it has the necessary columns
qcast_nom_lateral_ft = 10000
error_factor = 0.8
columns_to_denormalize = ["Oil_MBO", "Gas_BCF", "Water_MBO"]

# Vectorized denormalization for each column
for column in columns_to_denormalize:
    final_df[column] = denormalize_vectorized(
        final_df[column].astype(float),
        final_df["LateralLength_FT"],
        qcast_nom_lateral_ft,
        error_factor,
    )

In [0]:
final_df["production_date"] = pd.to_datetime(final_df["production_date"])
final_df["production_month_year"] = final_df["production_date"].dt.to_period("M")

final_df["Gas_BCFD"] = final_df["Gas_BCF"] / 30.4375

final_df["producing_month"] = final_df["producing_month"].astype(int)
final_df["production_month_year"] = final_df["production_month_year"].astype(str)

final_df["basin"] = basin_of_interest
final_df["scenario_id"] = scenario_id


Comparison of Timestamp with datetime.date is deprecated in order to match the standard library behavior. In a future version these will be considered non-comparable. Use 'ts == pd.Timestamp(date)' or 'ts.date() == date' instead.


Comparison of Timestamp with datetime.date is deprecated in order to match the standard library behavior. In a future version these will be considered non-comparable. Use 'ts == pd.Timestamp(date)' or 'ts.date() == date' instead.



# Creating Tables

In [0]:
spark.sql(
    f"""
          delete from produced.production_estimation_model where scenario_id = "{scenario_id}"
          """
)

Out[671]: DataFrame[num_affected_rows: bigint]

In [0]:
(
    spark.createDataFrame(final_df)
    .withColumn("LateralLength_FT", col("LateralLength_FT").cast(FloatType()))
    .withColumn("producing_month", col("producing_month").cast(IntegerType()))
    .withColumn("Oil_MBO", col("Oil_MBO").cast(FloatType()))
    .withColumn("Gas_BCF", col("Gas_BCF").cast(FloatType()))
    .withColumn("Water_MBO", col("Water_MBO").cast(FloatType()))
    .withColumn("Gas_BCFD", col("Gas_BCFD").cast(FloatType()))
    .write.format("delta")
    .option("mergeSchema", "true")
    .mode("append")
    .saveAsTable(f"produced.production_estimation_model")
)