## Data Wrangling

### Introduction

This project is part of a Capstone project for Springboard Data Science Career Track.

The goal of this project is to develop a machine learning model to rank and predict the likelihood that an oil company will initiate a frac job in a county within the Permian Basin in the first quarter of 2024.

In [1]:
# Import statements

from collections import defaultdict
from http.client import IncompleteRead
from time import sleep
import concurrent.futures as cf
from datetime import datetime
import json
import random
import re
import tempfile
import warnings
from functools import lru_cache
from pathlib import Path
from typing import Optional
from urllib.request import urlopen
from urllib.error import URLError
from zipfile import ZipFile

import missingno as msno
import cartopy.crs as ccrs
from shapely import wkt
from shapely.geometry import Point
import geopandas as gpd
import geoviews as gv
import geoviews.tile_sources as gts
import colorcet as cc
import holoviews as hv
import hvplot.pandas  # noqa
import hvplot.dask  # noqa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyproj
from fiona.io import ZipMemoryFile
from pyvis.network import Network

# from sqlalchemy import create_engine
# from sqlalchemy.exc import SQLAlchemyError
from tqdm import tqdm
import dask
from dask import persist
import dask.dataframe as dd

# from dask.diagnostics import ProgressBar
from dask.distributed import Client
import panel as pn
import panel.widgets as pnw
from panel.template import FastListTemplate

hv.extension("bokeh")
gv.extension("bokeh")
pn.extension("tabulator", template="fast", sizing_mode="stretch_width")

In [2]:
# ignore all warnings
warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 40)

In [3]:
# Test initial print statement
print("CapstoneJourney begins!")

CapstoneJourney begins!


#### Constants
Let's start by defining some constants that will be used throughout this notebook.

Most of the data was first downloaded from external websites and then uploaded onto a cloud storage bucket. This was done to ensure consistency and availability during the project. A brief description of the data and its original source link is referenced below.

## Data Sources

The following table provides an overview of the data sources used in this project:

| Dataset Name | Source URL | Original Source | Description | Date Downloaded |
|--------------|------------|-----------------|-------------|-----------------|
| RegistryUpload Table | [link](https://fracfocus.org/data-download) | FracFocus | This table contains each disclosure’s header information such as the job date, API number, location, base water volume, and total vertical depth. | 2023-11-11 |
| RBDMSWells | [link](https://gisdata-occokc.opendata.arcgis.com/datasets/OCCOKC::rbdms-wells/about) | Oklahoma Corporation Commission | This table contains Oklahoma RBDMS statewide well data | 2023-11-23 |
| Wolfcamp Delaware Play Boundary | [link](https://www.eia.gov/maps/maps.htm)| EIA | Permian Basin, Delaware Sub-Basin: Wolfcamp play boundary (9/4/2018) | 2023-11-19 |
| Wolfcamp Midland Play Boundaries | [link](https://www.eia.gov/maps/maps.htm)| EIA | Wolfcamp A, B, C, and D play boundaries, Midland Basin (6/4/2020) | 2023-11-21 |
| ShalePlay Delaware | [link](https://www.eia.gov/maps/maps.htm)| EIA |Delaware play boundary (10/8/2019)  | 2023-11-21 |
| AboYeso GlorietaYeso Spraberry | [link](https://www.eia.gov/maps/maps.htm)| EIA | Abo-Yeso, Glorieta-Yeso, and Spraberry play boundaries (3/11/2016) | 2023-11-21 |
| NM SLO OilGas Leases | [link](https://www.nmstatelands.org/maps-gis/gis-data-download/)| New Mexico State Land Office | Active Oil and Gas Leases (11/07/2023) | 2023-11-21 |
| NM SLO Geologic Regions | [link](https://www.nmstatelands.org/maps-gis/gis-data-download/)| New Mexico State Land Office | Geologic Regions (01/04/2010) | 2023-11-21 |
| NM SLO STL Status Combined | [link](https://www.nmstatelands.org/maps-gis/gis-data-download/)| New Mexico State Land Office | New Mexico State Trust Lands By Subdivision (04/14/2022) | 2023-11-21 |
| Production Data Query Dump| [link](https://rrc.texas.gov/resource-center/research/data-sets-available-for-download/)| Railroad Commission of Texas | Production Data Query Dump (11/17/2023) | 2023-11-17 |
| All Layers By County | [link](https://rrc.texas.gov/resource-center/research/data-sets-available-for-download/)  | Railroad Commission of Texas | Map & Associated Data: Base Map, Wells, Surveys & Pipelines layers | 2023-11-17 |
| Oil & Gas Leases | [link](https://www.glo.texas.gov/land/land-management/gis/index.html) | Texas General Land Office | Active Leases (11/17/2023) | 2023-11-17 |
| Oil & Gas Units | [link](https://www.glo.texas.gov/land/land-management/gis/index.html) | Texas General Land Office | Active Units (11/17/2023) | 2023-11-17 |
| U.S. County Boundaries | [link](https://www2.census.gov/geo/tiger/TIGER2022/COUNTY/tl_2022_us_county.zip) | United States Census Bureau | County (2022-10-31). Data is downloaded directly in the code. | N/A |
| U.S. County FIPS Codes | [link](https://en.wikipedia.org/wiki/List_of_United_States_FIPS_codes_by_county) | Wikipedia | List of United States FIPS codes by county. Data is downloaded directly in the code. | N/A |

Each row in the table represents a different dataset. The columns are:

- **Dataset Name**: The name of the dataset.
- **Source URL**: The URL where the dataset can be downloaded. Click on "link" to access the webpage.
- **Original Source**: The original source of the data.
- **Description**: A brief description of the dataset.
- **Date Downloaded**: The date when the dataset was downloaded.

In [4]:
# Constants
# This cell generates lists of URLs to CSV files stored in a Google Cloud Storage bucket.
# The CSV files contain data from the FracFocus Chemical Disclosure Registry.

# Generate a list of URLs to the FracFocusRegistry CSV files.
# There are 24 files in total, named FracFocusRegistry_i.csv where i ranges from 1 to 24.
DATA_URLS1 = [
    f"https://storage.googleapis.com/mrprime_dataset/fracfocus/FracFocusRegistry_{i}.csv"
    for i in range(1, 25)
]

# Generate a list of URLs to the registryupload CSV files.
# There are 3 files in total, named registryupload_i.csv where i ranges from 1 to 3.
DATA_URLS2 = [
    f"https://storage.googleapis.com/mrprime_dataset/fracfocus/registryupload_{j}.csv"
    for j in range(1, 4)
]

# URL to the readme.txt file in the bucket.
DATA_README_URL = [
    "https://storage.googleapis.com/mrprime_dataset/fracfocus/readme.txt"
]

# url to the OCC (Oklahoma) well data in th bucket
OCC_PARQUET_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/occ/rbdms_wells.parquet"

In [5]:
# Url for the shapefile for US counties from the Census Bureau's website.
CENSUS_COUNTY_MAP_URL = (
    "https://www2.census.gov/geo/tiger/TIGER2022/COUNTY/tl_2022_us_county.zip"
)
# Url for a Wikipedia page containing a table of FIPS codes for US counties.
FIPS_WIKI_URL = (
    "https://en.wikipedia.org/wiki/List_of_United_States_FIPS_codes_by_county"
)
# Bounds of the continental US in longitude and latitude.
USA_BOUNDS = (-124.77, 24.52, -66.95, 49.38)
# bounds of the continental US in Web Mercator coordinates.
USA_BOUNDS_MERCATOR = (-13874905.0, 2870341.0, -7453304.0, 6338219.0)

In [6]:
# url for the shapefiles of Permian Basin, Delaware Sub-Basin: Wolfcamp play boundary
WOLFCAMP_ZIP_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/eia/Wolfcamp_Delaware_Play_Boundary.zip"
MIDLAND_ZIP_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/eia/Wolfcamp_Midland_Play_Boundaries_EIA.zip"
DELAWARE_ZIP_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/eia/ShalePlay_Delaware_EIA.zip"
ABOYESO_ZIP_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/eia/ShalePlays_AboYeso_GlorietaYeso_Spraberry_EIA.zip"
# PB_ZIP_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/eia/PermianBasin_Boundary_Structural_Tectonic.zip"

basins_url_list = [
    WOLFCAMP_ZIP_URL,
    MIDLAND_ZIP_URL,
    DELAWARE_ZIP_URL,
    ABOYESO_ZIP_URL,
    # PB_ZIP_URL,
]


# url for shapefiles of Polygon data set intended to delineate active oil and gas leases on New Mexico State Trust Lands.
NM_SLO_OIL_LEASE_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/nm_slo/OilGas_Leases.zip"

# url for shapefiles of Polygon layer created to highlight general boundaries of subsurface geologic basins and uplifts of New Mexico
NM_SLO_GEO_REGION_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/nm_slo/slo_GeologicRegions.zip"
# url for shapefiles of Polygons of New Mexico State Trust Lands by PLSS subdivision (quarter-quarter, lot, tract, or partial).
NM_SLO_STL_PLSS_URL = "https://storage.googleapis.com/mrprime_dataset/capstone_journey/nm_slo/slo_STLStatusCombined.zip"

nm_slo_url_list = [
    NM_SLO_OIL_LEASE_URL,
    NM_SLO_GEO_REGION_URL,
]  # , NM_SLO_STL_PLSS_URL]

# Production data query for RRC website
PDQ_URL = (
    "https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/PDQ_DSV.zip"
)

In [7]:
# Define a list of county numbers that we want to test. These numbers correspond to counties
# that we did not include in the data folder, but they do not cover all 254 counties.

# county numbers are only odd numbers
county_nums = [str(i).zfill(3) for i in range(1, 508) if i % 2]

# Generate a list of URLs to the shapefile zip files stored in a Google Cloud Storage bucket.
# The zip files are named Shp{num}.zip, where {num} is a county number from the county_nums list.
SHP_ZIP_URLS = [
    f"https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp{num}.zip"
    for num in county_nums
]

In [8]:
# url for the active leases in Texas on State land gdb
GDB_ZIP_URLS = [
    "https://storage.googleapis.com/mrprime_dataset/capstone_journey/glo/GDB_ActiveLeases.zip",
    "https://storage.googleapis.com/mrprime_dataset/capstone_journey/glo/GDB_ActiveUnits.zip",
    # "https://storage.googleapis.com/mrprime_dataset/capstone_journey/glo/GDB_InactiveLeases.zip",
]

#### Function definitions
Next, let's define some functions that will be used throughout this notebook.

In [9]:
@lru_cache(maxsize=None)
def get_county_data():
    county = gpd.read_file(CENSUS_COUNTY_MAP_URL)[
        ["GEOID", "STATEFP", "COUNTYFP", "NAME", "geometry"]
    ]
    county.columns = county.columns.str.lower()
    return county

In [10]:
def read_csv_concurrent(urls_list):
    """Reads a list of CSV files concurrently"""
    # Create a thread pool
    with cf.ThreadPoolExecutor() as executor:
        # Use map to apply pd.read_csv to each URL
        results = list(tqdm(executor.map(pd.read_csv, urls_list), total=len(urls_list)))
    # Return the results
    return results

In [11]:
def extract_specific_gdf_from_local_zip(
    zip_paths: list[str], regex_patterns: list[str]
) -> dict[str, gpd.GeoDataFrame]:
    """
    Reads shapefiles from a list of zip files and returns a dictionary
    where the keys are the names of the shapefiles and the values are GeoDataFrames.
    """
    # Initialize an empty dictionary to store the GeoDataFrames
    shp_dict = {}
    # compile the regex patterns
    patterns = [re.compile(pattern) for pattern in regex_patterns]

    # Loop over the list of zip file paths
    for zip_path in zip_paths:
        # Open the zip file
        with ZipFile(zip_path) as z:
            # Get the list of files in the zip file
            zip_contents = z.namelist()
            # Filter the list to get only the shapefiles that match any of the patterns
            shp_files = [
                f
                for f in zip_contents
                for pattern in patterns
                if pattern.search(f) and f.endswith(".shp")
            ]
            # read the shapefiles into GeoDataFrames
            for shp_file in shp_files:
                # Get the name of the shapefile
                shp_name = Path(shp_file).stem
                # Read the shapefile into a GeoDataFrame and add it to the dictionary
                shp_dict[shp_name] = gpd.read_file(f"zip://{zip_path}!{shp_file}")
    # Return the dictionary of GeoDataFrames
    return shp_dict

In [12]:
def extract_matching_shp_files_from_zip_urls(
    zip_urls: list[str], regex_patterns: list[str]
) -> dict[str, gpd.GeoDataFrame]:
    """
    Reads shapefiles from a list of zip file urls and returns a dictionary
    where the keys are the names of the shapefiles and the values are GeoDataFrames.
    """
    # Initialize an empty dictionary to store the GeoDataFrames
    shp_dict = {}
    # compile the regex patterns
    patterns = [re.compile(pattern) for pattern in regex_patterns]

    # Loop over the list of zip file urls
    for zip_url in tqdm(zip_urls, desc="Processing zip files"):
        # download the zip file
        with urlopen(zip_url) as u:
            zip_data = u.read()
        # create a ZipMemoryFile from the zip data
        with ZipMemoryFile(zip_data) as z:
            # get the list of files in the zip file
            zip_files = z.listdir()
            # filter for shapefiles that match any of the patterns
            shp_files = [
                f
                for f in zip_files
                for pattern in patterns
                if pattern.search(f) and f.endswith(".shp")
            ]
            # read the shapefiles into GeoDataFrames
            for shp_file in shp_files:
                with z.open(shp_file) as f:
                    shp_dict[Path(shp_file).stem] = gpd.GeoDataFrame.from_features(
                        f, crs=f.crs
                    )
    # Return the dictionary of GeoDataFrames
    return shp_dict

In [13]:
def process_zip_url(
    zip_url: str, patterns: list[re.Pattern]
) -> Optional[dict[str, gpd.GeoDataFrame]]:
    """Downloads a zip file url and returns a dictionary of GeoDataFrames for shapefiles that match the patterns"""
    shp_dict = {}
    retries = 5
    for i in range(retries):
        try:
            with urlopen(zip_url) as u:
                zip_data = u.read()
            with ZipMemoryFile(zip_data) as z:
                zip_files = z.listdir()
                shp_files = [
                    f
                    for f in zip_files
                    for pattern in patterns
                    if pattern.search(f) and f.endswith(".shp")
                ]
                for shp_file in shp_files:
                    with z.open(shp_file) as f:
                        shp_dict[Path(shp_file).stem] = gpd.GeoDataFrame.from_features(
                            f, crs=f.crs
                        )
            return shp_dict
        except (IncompleteRead, URLError) as e:
            print(f"Error: {e} on try {i+1} of {retries} for {zip_url}")
            if i < retries - 1:
                sleep(2)
                continue
            else:
                raise


def extract_matching_shp_files_from_zip_urls_concurrent(
    zip_urls: list[str], regex_patterns: list[str]
) -> dict[str, gpd.GeoDataFrame]:
    """Reads shapefiles from a list of zip file urls and returns a dictionary
    where the keys are the names of the shapefiles and the values are GeoDataFrames."""
    shp_dict = {}
    patterns = [re.compile(pattern) for pattern in regex_patterns]
    with cf.ThreadPoolExecutor() as executor:
        future_to_url = {
            executor.submit(process_zip_url, url, patterns): url for url in zip_urls
        }
        futures = tqdm(
            cf.as_completed(future_to_url),
            total=len(future_to_url),
            desc="Processing URLs",
            dynamic_ncols=True,
        )
        for future in futures:
            result = future.result()
            if result:
                shp_dict.update(result)
    return shp_dict

In [14]:
def concat_gdf_from_dict(gdf_dict: dict[str, gpd.GeoDataFrame]) -> gpd.GeoDataFrame:
    """
    Given a dictionary of GeoDataFrames, returns a single GeoDataFrame
    with a new column indicating the source of the data.
    """
    # use a dictionary comprehension to create a new dictionary
    gdf_data = {k: gdf.assign(source_file=k) for k, gdf in gdf_dict.items()}
    # return the concatenated GeoDataFrame
    return pd.concat(gdf_data.values(), ignore_index=True)

In [15]:
def extract_gdfs_from_zip(zip_path: str) -> Optional[dict[str, gpd.GeoDataFrame]]:
    """
    Reads shapefiles from a zip file and returns a dictionary of GeoDataFrames.
    """
    gdfs = {}
    # Open the zip file
    with ZipFile(zip_path) as z:
        # Get the list of files in the zip file
        zip_contents = z.namelist()
        # Find the shapefiles
        shp_files = [f for f in zip_contents if f.endswith(".shp")]
        for shp_file in shp_files:
            # Read the shapefile into a GeoDataFrame
            gdf = gpd.read_file(f"zip://{zip_path}!{shp_file}")
            gdfs[shp_file] = gdf

    # If no shapefile was found, return None
    return gdfs if gdfs else None

In [16]:
def extract_gdfs_from_zip_url(zip_url: str) -> Optional[dict[str, gpd.GeoDataFrame]]:
    """
    Downloads a ZIP file from a URL, reads shapefiles from the ZIP file, and returns a dictionary of GeoDataFrames.
    """
    gdfs = {}
    # Open the URL
    with urlopen(zip_url) as u:
        # Read the content of the response into a byte stream
        zip_data = u.read()
        # Open the ZIP file from the byte stream
        with ZipMemoryFile(zip_data) as z:
            # Get the list of files in the ZIP file
            zip_contents = z.listdir()
            # Find the shapefiles
            shp_files = [f for f in zip_contents if f.endswith(".shp")]
            for shp_file in shp_files:
                # Read the shapefile into a GeoDataFrame
                with z.open(shp_file) as f:
                    gdf = gpd.GeoDataFrame.from_features(f, crs=f.crs)
                gdfs[Path(shp_file).stem] = gdf

    # If no shapefile was found, return None
    return gdfs if gdfs else None

In [17]:
def process_shp_url(zip_url: str):
    """Downloads a zip file url and returns a dictionary of GeoDataFrames for shapefiles that match the patterns"""
    shp_dict = {}
    with urlopen(zip_url) as u:
        zip_data = u.read()
    with ZipMemoryFile(zip_data) as z:
        zip_files = z.listdir()
        shp_files = [f for f in zip_files if f.endswith(".shp")]
        for shp_file in shp_files:
            with z.open(shp_file) as f:
                shp_dict[Path(shp_file).stem] = gpd.GeoDataFrame.from_features(
                    f, crs=f.crs
                )
    return shp_dict


def extract_gdfs_from_zip_url_concurrent(
    zip_urls: list[str],
) -> dict[str, gpd.GeoDataFrame]:
    """Reads shapefiles from a list of zip file urls and returns a dictionary
    where the keys are the names of the shapefiles and the values are GeoDataFrames."""
    shp_dict = {}
    with cf.ThreadPoolExecutor() as executor:
        future_to_url = {executor.submit(process_shp_url, url): url for url in zip_urls}
        futures = tqdm(
            cf.as_completed(future_to_url),
            total=len(future_to_url),
            desc="Processing URLs",
            dynamic_ncols=True,
        )
        for future in futures:
            shp_dict.update(future.result())
    return shp_dict

In [18]:
def read_gdb_from_zip(gdb_zips_list: list[str]):
    """Reads a list of zip files containing geodatabases and returns a dictionary of GeoDataFrames"""
    # initialize an empty dictionary
    gdb_dict = {}
    # loop through each zip file
    for gdb_zip in gdb_zips_list:
        with ZipFile(gdb_zip, "r") as z:
            # get list of files in zip
            files = z.namelist()
            # filter for gdb folders
            gdb_folders = [f for f in files if f.endswith(".gdb/")]
            # if there is a gdb folder in the zip file
            if gdb_folders:
                # get it and read it into a GeoDataFrame
                gdb_folder = gdb_folders[0]
                gdb_dict[Path(gdb_folder).stem] = gpd.read_file(
                    f"zip://{gdb_zip}!{gdb_folder}"
                ).to_crs("EPSG:4269")
    # return the dictionary of GeoDataFrames
    return gdb_dict

In [19]:
def read_gdb_from_zip_url(gdb_urls_list: list[str]):
    """Reads a list of zip file urls containing geodatabases and returns a dictionary of GeoDataFrames"""
    # initialize an empty dictionary
    gdb_dict = {}
    # loop through each zip file
    for gdb_url in gdb_urls_list:
        # create a temporary directory
        with tempfile.TemporaryDirectory() as tmp_dir:
            # download the zip file
            with urlopen(gdb_url) as u, open(f"{tmp_dir}/data.zip", "wb") as f_out:
                f_out.write(u.read())
            # extract the zip file
            with ZipFile(f"{tmp_dir}/data.zip", "r") as zip_ref:
                zip_ref.extractall(tmp_dir)
            # get the list of extracted files
            extracted_files = list(Path(tmp_dir).iterdir())
            # filter for gdb folders
            gdb_folders = [f for f in extracted_files if f.suffix == ".gdb"]
            # if there is a gdb folder in the extracted files
            if gdb_folders:
                # get it and read it into a GeoDataFrame
                gdb_folder = gdb_folders[0]
                gdb_dict[Path(gdb_folder).stem] = gpd.read_file(gdb_folder).to_crs(
                    "EPSG:4269"
                )
    # return the dictionary of GeoDataFrames
    return gdb_dict

In [20]:
def process_gdb_url(gdb_url):
    """Downloads a zip file url containing a geodatabase and returns a GeoDataFrame"""
    with tempfile.TemporaryDirectory() as tmp_dir:
        # download the zip file
        with urlopen(gdb_url) as u, open(f"{tmp_dir}/data.zip", "wb") as f_out:
            f_out.write(u.read())
        # extract the zip file
        with ZipFile(f"{tmp_dir}/data.zip", "r") as zip_ref:
            zip_ref.extractall(tmp_dir)
        # get the list of extracted files
        extracted_files = list(Path(tmp_dir).iterdir())
        # filter for gdb folders
        gdb_folders = [f for f in extracted_files if f.suffix == ".gdb"]
        # if there is a gdb folder in the extracted files
        if gdb_folders:
            # get it and read it into a GeoDataFrame
            gdb_folder = gdb_folders[0]
            return Path(gdb_folder).stem, gpd.read_file(gdb_folder)


def read_gdb_from_zip_url_concurrent(gdb_urls_list: list[str]):
    """Reads a list of zip file urls containing geodatabases and returns a dictionary of GeoDataFrames"""
    # initialize an empty dictionary
    gdb_dict = {}
    # create a ThreadPoolExecutor
    with cf.ThreadPoolExecutor() as executor:
        # submit the process_gdb_url function for each url and gather the results
        future_to_url = {
            executor.submit(process_gdb_url, url): url for url in gdb_urls_list
        }
        for future in cf.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                key, data = future.result()
                gdb_dict[key] = data
            except Exception as exc:
                print(f"{url} generated an exception: {exc}")
    # return the dictionary of GeoDataFrames
    return gdb_dict

In [21]:
# Function definitions
def pascal_to_snake(name: str):
    """Converts a string from PascalCase to snake_case"""
    # (?<=[A-Za-z0-9]) - positive lookbehind for any alphanumeric character
    # (?=[A-Z][a-z]) - positive lookahead for any uppercase followed by lowercase
    pattern = re.compile(r"(?<=[A-Za-z0-9])(?=[A-Z][a-z])")
    name = pattern.sub("_", name).lower()
    return name

In [22]:
def plot_statistics_table_nonmissing_hbar(df):
    # Calculate the percentage of non-missing values in each column
    missing_data_percent = (df.notnull().mean()).rename("Percent")

    # Create a DataFrame of the counts of non-missing values
    if isinstance(df, dd.DataFrame):
        non_missing_count, missing_data_percent = dask.compute(
            df.count().rename("Count"), (missing_data_percent * 100)
        )
    else:
        missing_data_percent = missing_data_percent * 100
        non_missing_count = df.notnull().sum().rename("Count")

    # Concatenate the two DataFrames along the columns
    non_missing_data = pd.concat([missing_data_percent, non_missing_count], axis=1)

    # Create a horizontal bar plot of the percentage of non-missing data
    hbar_plot = non_missing_data.hvplot.barh(
        y="Percent",
        width=800,
        height=600,
        title="Percentage of Non-Missing Data in Each Column",
        ylabel="",
        xlabel="",
        xaxis="bare",
        hover_cols="all",
    ).opts(
        active_tools=["box_zoom"],
        toolbar="above",
    )

    return hbar_plot

In [23]:
def unify_crs(
    dataframe: pd.DataFrame,
    lon_col: str = "longitude",
    lat_col: str = "latitude",
    crs_col: str = "crs",
    final_crs: str = "EPSG:4269",
):
    """
    Given a DataFrame with lon/lat or x/y coordinates,
    converts the coordinates to a unified crs and combines
    into a single GeoDataframe with a geometry column.
    """

    # Define the main columns that will be used for the conversion
    main_cols = [lon_col, lat_col, crs_col]

    # Get the other columns in the dataframe
    other_cols = list(set(dataframe.columns) - set(main_cols))

    # Create a subframe with only the main columns
    subframe = dataframe[main_cols]

    # Create a list of GeoDataFrames, each with a different CRS
    geo_dfs = [
        gpd.GeoDataFrame(
            # Use the data for this CRS
            data=data,
            # Create a geometry column from the lon/lat columns
            geometry=gpd.points_from_xy(x=data[lon_col].values, y=data[lat_col].values),
            # Set the CRS for this GeoDataFrame
            crs=pyproj.CRS(crs_val),
            # Convert the GeoDataFrame to the final CRS
        ).to_crs(final_crs)
        # Do this for each unique CRS in the subframe
        for crs_val, data in subframe.groupby(crs_col)
    ]

    # Merge the GeoDataFrames back together and return the result
    return pd.merge(
        # Concatenate the GeoDataFrames
        pd.concat(geo_dfs, sort=True),
        # Add the other columns back in
        dataframe[other_cols],
        # Merge on the index
        left_index=True,
        right_index=True,
    )

In [24]:
# @lru_cache(maxsize=3)
def get_background_map(bgcolor="black", alpha=0.5):
    """Returns a GeoViews background map"""
    return gts.CartoLight().opts(bgcolor=bgcolor, alpha=alpha)


def platecaree_to_mercator_vectorised(x, y):
    """Use Cartopy to convert PlateCarree coordinates to Mercator"""
    return ccrs.GOOGLE_MERCATOR.transform_points(ccrs.PlateCarree(), x, y)[:, :2]

In [25]:
def format_in_000(num):
    """Formats a number in thousands"""
    for unit in ["", "thousand", "million", "billion", "trillion"]:
        if abs(num) < 1000.0:
            return f"{num:3.2f} {unit}"
        num /= 1000.0
    return f"{num:.2f} quadrillion"

In [26]:
def split_datetime(df, column):
    """Splits a datetime column into year, month, and day columns"""
    # remove '_date' from the column name
    column_stem = column.replace("_date", "") if "_date" in column else column
    try:
        datetime_series = pd.to_datetime(df[column], errors="coerce")
        if datetime_series.isna().any():
            print(f"Errors occurred during conversion of column {column}.")
        df[column_stem + "_year"] = datetime_series.dt.year
        df[column_stem + "_month"] = datetime_series.dt.month
        df[column_stem + "_day"] = datetime_series.dt.day
    except KeyError:
        print(f"Column {column} not found in the DataFrame.")
    except Exception as e:
        print(f"An error occurred: {e}")

### Load data

First dataset is from FracFocus. There is also a readme file which contains the data dictionary for the dataset. Let's have a look at both.

Readme file with data dictionary

In [None]:
# get readme data
readme = urlopen(DATA_README_URL[0]).read().decode("windows-1252")
display(readme)

In [None]:
# print function goes beyond 'hello world' and takes care of the escape characters
print(readme)

In [41]:
# We can collect all the dataframe into a list and then concatenate them
df_list = read_csv_concurrent(DATA_URLS2)

dfs = pd.concat(df_list).reset_index(drop=True)

100%|██████████| 3/3 [00:04<00:00,  1.64s/it]


In [42]:
registry_df = pd.DataFrame()
registry_df = dfs.copy()
registry_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 213883 entries, 0 to 213882
Data columns (total 21 columns):
 #   Column                   Non-Null Count   Dtype  
---  ------                   --------------   -----  
 0   pKey                     213883 non-null  object 
 1   JobStartDate             213868 non-null  object 
 2   JobEndDate               213883 non-null  object 
 3   APINumber                213883 non-null  object 
 4   StateNumber              213883 non-null  int64  
 5   CountyNumber             213883 non-null  int64  
 6   OperatorName             213883 non-null  object 
 7   WellName                 213883 non-null  object 
 8   Latitude                 213883 non-null  float64
 9   Longitude                213883 non-null  float64
 10  Projection               213883 non-null  object 
 11  TVD                      183743 non-null  float64
 12  TotalBaseWaterVolume     183714 non-null  float64
 13  TotalBaseNonWaterVolume  163574 non-null  float64
 14  Stat

Looking at the missing values it is interesting to see that most missing values are from the `TVD`, `TotalBaseWaterVolume` and `TotalBaseNonWaterVolume`. One reason for this may be found in the data limitations on terms of use on the FracFocus website. It states:
-  Disclosures submitted using the FracFocus 1.0 format (January, 2011 to May 31, 2013) will contain only header data. 
-  Disclosures submitted using the FracFocus 2.0 format (November 2012 to present) will contain both header and chemical data. NOTE: Between November, 2012 and May 31, 2013 disclosures in both 1.0 and 2.0 formats were submitted to the system. 
-  After May 31, 2013 only disclosures submitted in the 2.0 format were accepted.
-  Data submitted appears as it was submitted by the operator or operator’s authorized agent. FracFocus does not warrant the data in any way.

In [None]:
# plot the missing data
plot_statistics_table_nonmissing_hbar(registry_df)

In [None]:
# Calculate the percentage of non-missing values in each column
missing_data_percent = (registry_df.notna().mean() * 100).rename("Percent")

# Create a DataFrame of the counts of non-missing values
non_missing_count = registry_df.notna().sum().rename("Count")

# Concatenate the two DataFrames along the columns
non_missing_data = pd.concat([missing_data_percent, non_missing_count], axis=1)

# Create a horizontal bar plot of the percentage of non-missing data
barh_plot = non_missing_data.hvplot.barh(
    y="Percent",
    width=800,
    height=600,
    title="Percentage of Non-Missing Data in Each Column",
    ylabel="",
    xlabel="",
    xaxis="bare",
    hover_cols="all",
).opts(
    active_tools=["box_zoom"],
    toolbar="above",
)

barh_plot

In [None]:
# Look at some of the rows of the dataframe
display(registry_df.head(3))
display(registry_df.sample(5, random_state=628))
display(registry_df.tail(3))

From our first look at a few sample rows some things stick out immediately.
1. The dataset may be in chronological order and the values of the `JobStartDate`/`JobEndDate` at both of the extremes may be incorrect.
2. There may be an abundance for `StateNumber` `42` if 4 out of the 5 draws of the 200k+ rows drawn at random had a `StateNumber` of `42`.



### Data Cleaning

Before we jump into cleaning the data in the columns, let's make the columns look more pythonic by changing the column names to snake_case.


In [43]:
registry_df.columns = [pascal_to_snake(col) for col in registry_df.columns]
registry_df.info(memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 213883 entries, 0 to 213882
Data columns (total 21 columns):
 #   Column                       Non-Null Count   Dtype  
---  ------                       --------------   -----  
 0   p_key                        213883 non-null  object 
 1   job_start_date               213868 non-null  object 
 2   job_end_date                 213883 non-null  object 
 3   api_number                   213883 non-null  object 
 4   state_number                 213883 non-null  int64  
 5   county_number                213883 non-null  int64  
 6   operator_name                213883 non-null  object 
 7   well_name                    213883 non-null  object 
 8   latitude                     213883 non-null  float64
 9   longitude                    213883 non-null  float64
 10  projection                   213883 non-null  object 
 11  tvd                          183743 non-null  float64
 12  total_base_water_volume      183714 non-null  float64
 13 

Next, we can remove the columns with only null values. These are the last 2 columns in the dataframe, `source` and `dtmod`. Also we can drop the `total_non_base_water_volume` column since we may not have much need for it.


In [44]:
registry_df = registry_df.drop(
    columns=["source", "dtmod", "total_base_non_water_volume"]
)
registry_df.info(memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 213883 entries, 0 to 213882
Data columns (total 18 columns):
 #   Column                   Non-Null Count   Dtype  
---  ------                   --------------   -----  
 0   p_key                    213883 non-null  object 
 1   job_start_date           213868 non-null  object 
 2   job_end_date             213883 non-null  object 
 3   api_number               213883 non-null  object 
 4   state_number             213883 non-null  int64  
 5   county_number            213883 non-null  int64  
 6   operator_name            213883 non-null  object 
 7   well_name                213883 non-null  object 
 8   latitude                 213883 non-null  float64
 9   longitude                213883 non-null  float64
 10  projection               213883 non-null  object 
 11  tvd                      183743 non-null  float64
 12  total_base_water_volume  183714 non-null  float64
 13  state_name               213881 non-null  object 
 14  coun

Next, we will fix some of the dtypes of the columns.
- Both the `job_start_date` and the `job_end_date` columns are object dtypes, so we will convert those to datetime dtypes and drop the timestamp.
- We can also separate out the date components into its various components. This may come in handy for feature engineering later on.
- The `projection` column is an object dtype. That can be converted to a string dtype and shorten to `crs` as it represents the Cooordinate Reference System used in the `latitude` and `longitude` columns values. We can dig into what CRS is later on.
- The `federal_well` and `indian_well` columns are both boolean type columns. They may be more aptly named as `is_federal_well` and `is_indian_well` respectively.

In [45]:
# Use the function on 'job_start_date' and 'job_end_date'
split_datetime(registry_df, "job_start_date")
split_datetime(registry_df, "job_end_date")
registry_df[[col for col in registry_df.columns if re.search("start|end", col)]].info(
    memory_usage="deep"
)
# show the values which are null still
registry_df[
    registry_df[[col for col in registry_df.columns if re.search("start|end", col)]]
    .isna()
    .any(axis=1)
].shape

Errors occurred during conversion of column job_start_date.
Errors occurred during conversion of column job_end_date.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 213883 entries, 0 to 213882
Data columns (total 8 columns):
 #   Column           Non-Null Count   Dtype  
---  ------           --------------   -----  
 0   job_start_date   213868 non-null  object 
 1   job_end_date     213883 non-null  object 
 2   job_start_year   213866 non-null  float64
 3   job_start_month  213866 non-null  float64
 4   job_start_day    213866 non-null  float64
 5   job_end_year     213882 non-null  float64
 6   job_end_month    213882 non-null  float64
 7   job_end_day      213882 non-null  float64
dtypes: float64(6), object(2)
memory usage: 41.4 MB


(17, 24)

In [46]:
# Convert 'job_start_date' to datetime format and format it as 'YYYY-MM-DD'
registry_df["job_start_date"] = pd.to_datetime(
    registry_df["job_start_date"], errors="coerce"
).dt.strftime("%Y-%m-%d")

# Convert 'job_end_date' to datetime format and format it as 'YYYY-MM-DD'
registry_df["job_end_date"] = pd.to_datetime(
    registry_df["job_end_date"], errors="coerce"
).dt.strftime("%Y-%m-%d")

# drop rows with null values in 'job_start_date' and 'job_end_date'
# registry_df = registry_df.dropna(subset=["job_start_date", "job_end_date"])


# Rename some columns for clarity
registry_df.rename(
    columns={
        "federal_well": "is_federal_well",
        "indian_well": "is_indian_well",
        "projection": "crs",
    },
    inplace=True,
)

# Display the information of the DataFrame
registry_df.info(memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 213883 entries, 0 to 213882
Data columns (total 24 columns):
 #   Column                   Non-Null Count   Dtype  
---  ------                   --------------   -----  
 0   p_key                    213883 non-null  object 
 1   job_start_date           213866 non-null  object 
 2   job_end_date             213882 non-null  object 
 3   api_number               213883 non-null  object 
 4   state_number             213883 non-null  int64  
 5   county_number            213883 non-null  int64  
 6   operator_name            213883 non-null  object 
 7   well_name                213883 non-null  object 
 8   latitude                 213883 non-null  float64
 9   longitude                213883 non-null  float64
 10  crs                      213883 non-null  object 
 11  tvd                      183743 non-null  float64
 12  total_base_water_volume  183714 non-null  float64
 13  state_name               213881 non-null  object 
 14  coun

Next, we will look at the `api_number` column.
We learned from the read me that 
> APINumber - The American Petroleum Institute well identification number formatted as follows xx-xxx-xxxxx0000 Where: 
> - First two digits represent the state, 
> - second three digits represent the county, 
> - third 5 digits represent the well.

Theoretically, we could just grab the first two characters of the `APINnumber` and use that as the state number according to the definition of the `APINumber` above. Actually, that would not be a good idea, and here is why.<br>
Although the column was called `APINumber`, it is not actually a number, so if it starts with a leading `0` that first character `0`, cannot be omitted from the value. Let's look at some of the rows with a single digit state numbers.

Right now, 
- the `api_number` column is an object dtype, but a better option would be a `string` dtype, as `object` dtype can be mixed . We can also shorten that column name to `api`.
- the `state_number` column and the `county_number` column are both `int64` dtypes right now. `string` type may be a stronger option.
- `state_code` and `county_code` may be better names for the `state_number` and `county_number` columns respectively.


In [None]:
# rows where the state_number is a single digit
registry_df[
    (registry_df["state_number"] == 3) | (registry_df["state_number"] == 5)
].sample(5, random_state=628)

Some rows' `api_number` values have leading `0`, which is correct, but some do not. The rows without the leading `0` though are 13 characters long instead of 14. Maybe we can just add a leading `0` where needed until all API number values are 14 characters long. 

In [None]:
# Check the number of characters in the api_number column
registry_df["api_number"].astype("string").str.len().value_counts()

Most are 14 characters long, but some are 13 characters long, like the ones we saw above without the leading `0`. Let's assume the ones with 13 characters are missing the leading `0` and not something else.

In [47]:
# Convert 'api' to string and pad it with zeros to make it 14 characters long
registry_df["api"] = registry_df["api_number"].astype("string").str.zfill(14)

# Convert 'state_number' to string and pad it with zeros to make it 2 characters long
registry_df["state_code"] = registry_df["state_number"].astype("string").str.zfill(2)

# Convert 'county_number' to string and pad it with zeros to make it 3 characters long
registry_df["county_code"] = registry_df["county_number"].astype("string").str.zfill(3)

In [48]:
# check which rows may have the api with the first two digits not matching the state number
api_state_mismatch_mask = registry_df["state_code"] != registry_df["api"].str[0:2]
# api_state_mismatch_mask

In [None]:
# check which rows may have the api with the first two digits not matching the state number
registry_df[api_state_mismatch_mask][
    ["api_number", "api", "state_code", "state_name", "county_code", "county_name"]
]

We expected to get 2 rows here, since we checked the length of the `api_number` column above we saw that 1 row had 10 and another row had 12 characters. It is only two rows, so this may be an easy fix.

In [49]:
# Remove leading zeros and pad to 14 digits on mismatches
registry_df.loc[api_state_mismatch_mask, "api"] = (
    registry_df.loc[api_state_mismatch_mask, "api"].str.lstrip("0").str.ljust(14, "0")
)

In [None]:
# check which rows may have the api with the first two digits not matching the state number
registry_df[api_state_mismatch_mask][
    ["api_number", "api", "state_code", "state_name", "county_code", "county_name"]
]

In [50]:
# check which rows may have the api with the 3-5 digits not matching the county number
api_county_mismatch_mask = registry_df["county_code"] != registry_df["api"].str[2:5]
registry_df[api_county_mismatch_mask]

Unnamed: 0,p_key,job_start_date,job_end_date,api_number,state_number,county_number,operator_name,well_name,latitude,longitude,crs,tvd,total_base_water_volume,state_name,county_name,ff_version,is_federal_well,is_indian_well,job_start_year,job_start_month,job_start_day,job_end_year,job_end_month,job_end_day,api,state_code,county_code


State name should not have more than 50 possible values, given that there are only 50 states in the US. If we were to check the number of unique values in the `state_name` column, we would see 95. This is due to the variation in the way the `state_name` value is entered. Although not as obvious, we can assume the same for the `county_name` column. Luckily, the `api` includes both the `state_number` and the `county_number`. With this we can do 
1. data validation ensuring that these corresponding columns match
2. Ensure that the `state_name` and the `county_name` columns are correct. Important to note that 
> The state codes used in an API number are DIFFERENT from another standard which is the Federal Information Processing Standard (FIPS) state code established in 1987 by NIST. ([source](https://en.wikipedia.org/wiki/API_well_number#State_code))

In [None]:
print(
    f'Number of different values in state_name column: {registry_df["state_name"].nunique()}'
)
print(
    f'Number of different values in state_number column: {registry_df["state_number"].nunique()}'
)

In [51]:
# group by state_code and find the mode of the state_name
state_code_mode = (
    registry_df.groupby("state_code")["state_name"]
    .apply(lambda x: x.mode().iloc[0])
    .reset_index()
)
state_code_mode = state_code_mode.rename(columns={"state_name": "state"})
state_code_mode

Unnamed: 0,state_code,state
0,1,Alabama
1,3,Arkansas
2,4,California
3,5,Colorado
4,11,Idaho
5,12,Illinois
6,13,Indiana
7,15,Kansas
8,16,Kentucky
9,17,Louisiana


In [52]:
registry_df = registry_df.merge(state_code_mode.rename(columns={"state_name": "state"}))
registry_df.sample(3, random_state=628)

Unnamed: 0,p_key,job_start_date,job_end_date,api_number,state_number,county_number,operator_name,well_name,latitude,longitude,crs,tvd,total_base_water_volume,state_name,county_name,ff_version,is_federal_well,is_indian_well,job_start_year,job_start_month,job_start_day,job_end_year,job_end_month,job_end_day,api,state_code,county_code,state
192605,d45ffa9c-25cc-42a3-92ab-09fcc74d032f,2018-11-13,2018-11-18,35083244260000,35,83,"chisholm Oil and Gas Operating, LLC",King Ranch #18-4-32 1H,36.000705,-97.653121,NAD27,6569.0,12481014.0,Oklahoma,Logan,3,False,False,2018.0,11.0,13.0,2018.0,11.0,18.0,35083244260000,35,83,Oklahoma
286,e3a3d6d1-d409-4e7c-ad57-703cbff32eef,2011-02-14,2011-02-14,42097341930000,42,97,"EOG Resources, Inc.",Herbert Unit #3H,33.536889,-97.474433,NAD27,,,Texas,Cooke,1,False,False,2011.0,2.0,14.0,2011.0,2.0,14.0,42097341930000,42,97,Texas
47360,702c7c38-053f-4b28-930d-b75fd0d1623d,2014-12-07,2015-01-07,42255337360000,42,255,Encana Oil & Gas (USA) Inc.,Charger 9H,29.037556,-97.934511,NAD27,10395.0,14672682.0,Texas,Karnes,2,False,False,2014.0,12.0,7.0,2015.0,1.0,7.0,42255337360000,42,255,Texas


We will focus our efforts in the most recent 10 years. Although more data is usually better, data too far in the past may distract whatever model we may build since unconventional drilling practices have really taken over the industry. We will also put our focus in one specific area, the Permian Basin. The Permian Basin has been instrumental in the shale boom transformation and is the most active area of exploration and production in the US presently. 

In [53]:
# create mask for from 2013 onwards
post_2012_mask = registry_df["job_start_date"] >= "2013-01-01"
registry_df_post_2012 = registry_df[post_2012_mask].copy()

# find all the rows with null values
null_mask = registry_df_post_2012.isna().any(axis=1)
registry_df_post_2012[null_mask]

Unnamed: 0,p_key,job_start_date,job_end_date,api_number,state_number,county_number,operator_name,well_name,latitude,longitude,crs,tvd,total_base_water_volume,state_name,county_name,ff_version,is_federal_well,is_indian_well,job_start_year,job_start_month,job_start_day,job_end_year,job_end_month,job_end_day,api,state_code,county_code,state
61454,7534cb0b-9f85-4ca2-a21c-31be328456f8,2017-04-10,2017-04-10,42173374020000,42,173,"Cinnabar Energy, LTD.",Thomas 4101HD,31.996472,-101.568598,NAD27,9310.0,,Texas,Glasscock,3,False,False,2017.0,4.0,10.0,2017.0,4.0,10.0,42173374020000,42,173,Texas
61965,c57e59c6-d132-4a13-bb0b-8fb3e58ce918,2017-05-08,2017-05-08,42077352710000,42,77,Lane Operating Company,Dillard A Unit No. 1,33.591273,-98.138934,NAD27,4600.0,,Texas,Clay,3,False,False,2017.0,5.0,8.0,2017.0,5.0,8.0,42077352710000,42,77,Texas
62626,a2ad142e-6a38-44ed-8ebc-8572e43236b7,2017-06-13,2017-06-13,42237401310000,42,237,"Blakenergy Operating, LLC",Garner #2,33.434316,-98.227896,NAD27,6350.0,,Texas,Jack,3,False,False,2017.0,6.0,13.0,2017.0,6.0,13.0,42237401310000,42,237,Texas
84840,fcbf3967-867e-4e4e-af18-d83e7a82c604,2020-04-19,2020-04-19,42461378800000,42,461,COG Operating LLC,Powell 36 7,31.523085,-102.118329,NAD27,,,Texas,Upton,1,False,False,2020.0,4.0,19.0,2020.0,4.0,19.0,42461378800000,42,461,Texas
89052,ead37751-a53f-4ed9-99df-8cac1ebf1e15,2021-05-23,2021-05-23,42173346750000,42,173,Berry Petroleum,Talon #4,31.950785,-101.775302,NAD83,,,Texas,Glasscock,1,False,False,2021.0,5.0,23.0,2021.0,5.0,23.0,42173346750000,42,173,Texas
89305,4753a32e-39cb-4994-b69b-8acb36597463,2021-06-08,2021-06-08,42115334560000,42,115,Pioneer Natural Resources,Echols 10 #1,32.531683,-102.096109,NAD27,,,Texas,Dawson,1,False,False,2021.0,6.0,8.0,2021.0,6.0,8.0,42115334560000,42,115,Texas
98083,b76c7cc4-f1c1-4711-9168-ba819f41d070,2022-10-16,2022-10-27,42479446890000,42,479,Lewis Energy Group,HAMILTON NO. 34H,27.95851,-99.567909,WGS84,10456.0,,Texas,Webb,3,False,False,2022.0,10.0,16.0,2022.0,10.0,27.0,42479446890000,42,479,Texas
98087,99463560-ccb5-4c07-9537-8abdc731208b,2022-10-16,2022-10-27,42479446880000,42,479,Lewis Energy Group,HAMILTON NO. 33H,27.95851,-99.567956,WGS84,10437.0,,Texas,Webb,3,False,False,2022.0,10.0,16.0,2022.0,10.0,27.0,42479446880000,42,479,Texas
139253,0f8e944c-87d7-4d84-8d56-4b8a5f1cba94,2022-05-28,2022-06-08,33610338000000,33,610,Hunt Oil Company,TRULSON 156-90-11-14H-3,48.355506,-102.212124,NAD83,8872.4,13496802.0,North Dakota,,3,False,False,2022.0,5.0,28.0,2022.0,6.0,8.0,33610338000000,33,610,North Dakota
139256,6e5d8284-29d1-4b3d-a72c-729d95c327de,2022-05-28,2022-06-09,33610338100000,33,610,Hunt Oil Company,PALERMO 156-90-2-31H-5,48.355506,-102.21233,NAD83,8782.65,14820967.0,North Dakota,,3,False,False,2022.0,5.0,28.0,2022.0,6.0,9.0,33610338100000,33,610,North Dakota


See how many nans we still have in each column.

In [None]:
registry_df_post_2012.info(memory_usage="deep")
registry_df_post_2012.isna().sum()

Looking at the `county_number` for the rows with a nan value in the `county_name` column, we can see why there is a nan for the `county_name`. Those numbers are most likely incorrect as small states like `North Dakota` and `Arkansas` do not have large `county_number` values. However we can still try to impute what the correct values by cross referencing with other sources or by using the `latitude` and `longitude` values.


In [None]:
# the rows with a null value for the county_name column
registry_df_post_2012[registry_df_post_2012["county_name"].isna()]

In [54]:
# get the index of one of the rows with a null value for the county_name column (3rd one down)
index_vanorsdale = registry_df_post_2012.query("api_number == '03729439000000'").index

#### Oklahoma Commission Corporation (OOC)

With some search engine investigating, we can learn that WFD Oil Corporation is a PC in Oklahoma. We also learn that the well name is `VANORSDOL` ,the well number is `#1-29`, and the API number is `3503729439` `0000`. We can correct some of the data which was entered incorrectly in FracFocus.

The data are looking for is in this markdown cell so we can manually input it in but we will do it through code instead. We will query the api number in the data we have on the wells in OK.

| Column Name | Value |
| --- | --- |
API	|3503729439
WELL_NAME|	VANORSDOL
WELL_NUM|	#1-29
OPERATOR|	WFD OIL CORPORATION
WELLSTATUS|	AC
WELLTYPE|	OIL
SH_LAT	|35.749381
SH_LON	|-96.370355
COUNTY	|CREEK


In [55]:
# When reading the Parquet file
occ_wells = pd.read_parquet(OCC_PARQUET_URL)
# Convert the WKT column back to a geometry column
occ_wells["geometry"] = occ_wells["geometry"].apply(lambda x: wkt.loads(x))


# Convert the DataFrame to a GeoDataFrame, specifying the CRS
occ_wells = gpd.GeoDataFrame(
    occ_wells, geometry="geometry", crs=occ_wells["crs"].iloc[0]
)
occ_wells.info()
# look at 1 sample row of the dataframe
occ_wells.sample()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 445575 entries, 0 to 445574
Data columns (total 27 columns):
 #   Column             Non-Null Count   Dtype   
---  ------             --------------   -----   
 0   objectid           445575 non-null  int64   
 1   api                445575 non-null  float64 
 2   well_browse_link   445575 non-null  object  
 3   well_records_docs  445575 non-null  object  
 4   well_name          445568 non-null  object  
 5   well_num           445570 non-null  object  
 6   operator           445575 non-null  object  
 7   wellstatus         443813 non-null  object  
 8   welltype           365021 non-null  object  
 9   symbol_class       445575 non-null  object  
 10  sh_lat             437073 non-null  float64 
 11  sh_lon             437073 non-null  float64 
 12  county             445575 non-null  object  
 13  section            445454 non-null  float64 
 14  township           445450 non-null  object  
 15  range              445575 

Unnamed: 0,objectid,api,well_browse_link,well_records_docs,well_name,well_num,operator,wellstatus,welltype,symbol_class,sh_lat,sh_lon,county,section,township,range,qtr4,qtr3,qtr2,qtr1,pm,footage_ew,ew,footage_ns,ns,geometry,crs
343803,343804,3513101000.0,http://wellbrowse.occ.ok.gov/Webforms/WellInfo...,https://public.occ.ok.gov/OGCDWebLink/Search.a...,ENGERSOL,#1,OTC/OCC NOT ASSIGNED,PA,DRY,PLUGGED,36.32894,-95.44452,ROGERS,1.0,21N,17E,,NE,NE,SE,IM,0.0,,0.0,,POINT (-95.44452 36.32894),EPSG:4326


In [56]:
# Convert the 'api' column to string
occ_wells["api"] = occ_wells["api"].astype("int64").astype("string")
occ_wells.sample()

Unnamed: 0,objectid,api,well_browse_link,well_records_docs,well_name,well_num,operator,wellstatus,welltype,symbol_class,sh_lat,sh_lon,county,section,township,range,qtr4,qtr3,qtr2,qtr1,pm,footage_ew,ew,footage_ns,ns,geometry,crs
168044,168045,3507120763,http://wellbrowse.occ.ok.gov/Webforms/WellInfo...,https://public.occ.ok.gov/OGCDWebLink/Search.a...,BICENTENNIAL,#1,JEFFRIES PUMPING SERVICE INC,AC,OIL,OIL,36.829925,-96.96361,KAY,10.0,27N,03E,,SE,NW,SE,IM,990.0,W,1650.0,S,POINT (-96.96361 36.82993),EPSG:4326


In [57]:
# make a copy of the well_name column
registry_df_post_2012["well"] = registry_df_post_2012["well_name"].copy()

In [58]:
registry_df_post_2012[registry_df_post_2012["api"].str.contains("11533124")]

Unnamed: 0,p_key,job_start_date,job_end_date,api_number,state_number,county_number,operator_name,well_name,latitude,longitude,crs,tvd,total_base_water_volume,state_name,county_name,ff_version,is_federal_well,is_indian_well,job_start_year,job_start_month,job_start_day,job_end_year,job_end_month,job_end_day,api,state_code,county_code,state,well


In [59]:
# query the well_name column for 'vanors
# occ_wells[occ_wells["well_name"].str.contains("vanors", case=False, na=False)]
vanorsdol_row = occ_wells.query(
    'well_name.fillna("").str.contains("vanors", case=False) & (api == "3503729439")',
)[
    [
        "api",
        "well_name",
        "well_num",
        "operator",
        "sh_lat",
        "sh_lon",
        "county",
    ]
].rename(
    columns={"sh_lat": "latitude", "sh_lon": "longitude"}
)
vanorsdol_row["well"] = (
    vanorsdol_row["well_name"].str.title()
    + " "
    + vanorsdol_row["well_num"].astype("string")
)
index_vanorsdol = vanorsdol_row.index

columns_to_replace = ["api", "well", "latitude", "longitude"]
for col in columns_to_replace:
    registry_df_post_2012.loc[index_vanorsdale, col] = vanorsdol_row.loc[
        index_vanorsdol, col
    ].values

# check that the values have been replaced
registry_df_post_2012.loc[index_vanorsdale, columns_to_replace]

Unnamed: 0,api,well,latitude,longitude
178445,3503729439,Vanorsdol #1-29,35.749381,-96.370355


In [None]:
# adjust the api column to 14 characters again
registry_df_post_2012["api"] = registry_df_post_2012["api"].str.ljust(14, "0")

registry_df_post_2012[registry_df_post_2012["county_name"].isna()]

Now, with `latitude` and `longitude` coordinates for all 4 rows with missing `county_name`, let's find out which counties they belong to spatially.



### Geodataframe

We can get the boundary coordinates for all the counties in the US from the [census.gov](https://www.census.gov/) website. We saved the URL for this as `CENSUS_COUNTY_MAP_URL` at the top of the notebook.

In [60]:
# get the US countiesmap data
county_gdf = get_county_data()
county_gdf.sample(3)

Unnamed: 0,geoid,statefp,countyfp,name,geometry
1313,48167,48,167,Galveston,"POLYGON ((-95.23298 29.46616, -95.23294 29.466..."
1692,8107,8,107,Routt,"POLYGON ((-106.65274 40.38897, -106.65274 40.3..."
184,19027,19,27,Carroll,"POLYGON ((-94.74490 42.20952, -94.74389 42.209..."


We will scrape the FIPS table from wikipedia since the county dataframe does not have the state name and merge the 2 tables just for convenience. 

In [61]:
fips_df = pd.read_html(FIPS_WIKI_URL)[1]
fips_df.columns = ["geoid", "county", "state"]
fips_df["geoid"] = fips_df["geoid"].astype("string").str.zfill(5)
fips_df.sample(3)

Unnamed: 0,geoid,county,state
1842,36015,Chemung County,New York
1870,36071,Orange County,New York
889,19187,Webster County,Iowa


In [62]:
county_fips_gdf = county_gdf.merge(fips_df, on="geoid")
county_fips_gdf.sample(3, random_state=628)

Unnamed: 0,geoid,statefp,countyfp,name,geometry,county,state
824,31045,31,45,Dawes,"POLYGON ((-102.77315 42.52564, -102.77298 42.5...",Dawes County,Nebraska
3058,51149,51,149,Prince George,"POLYGON ((-77.13939 37.12645, -77.14097 37.125...",Prince George County,Virginia
2315,22015,22,15,Bossier,"POLYGON ((-93.84522 32.95043, -93.84486 32.951...",Bossier Parish,Louisiana


Quick note about `GeoDataFrames`: they must have a column called `geometry` and this column contains the geometric objects. This call is what enables `geopandas` to perform spatial operations, and can also contain certain attributes like `.crs` which is the coordinate reference system.


Commonly used datums in North America are NAD27, NAD83, and WGS84. More info [here](https://webhelp.esri.com/arcgisdesktop/9.3/index.cfm?TopicName=Projection_basics_the_GIS_professional_needs_to_know).<br>

The county geodataframe uses `EPSG:4269` which is the EPSG code for the NAD83 coordinate system. Let's create a geodataframe with the `latitude` and `longitude` values that we have and put all of the points to the same CRS.

In [None]:
county_fips_gdf.crs

In [63]:
# ensures each row of the geodataframe is in the same CRS
registry_gdf = unify_crs(registry_df_post_2012, crs_col="crs")

In [64]:
# We can now perform a spatial join on the 2 GeoDataFrame.
# fitler for rows with null county_name

joined_gdf = (
    registry_gdf[registry_gdf["county_name"].isna()]
    .sjoin(county_fips_gdf.drop(columns=["county"]), how="left", predicate="intersects")
    .drop(columns=["index_right"])
)
joined_gdf

Unnamed: 0,crs,geometry,latitude,longitude,p_key,tvd,is_indian_well,state_left,state_name,state_number,well,operator_name,county_number,county_name,api,county_code,job_end_date,job_start_year,job_start_day,api_number,job_start_month,job_start_date,is_federal_well,well_name,total_base_water_volume,ff_version,job_end_year,job_end_month,job_end_day,state_code,geoid,statefp,countyfp,name,state_right
178445,NAD27,POINT (-96.37064 35.74946),35.749381,-96.370355,692d9381-748e-4e5f-b83f-30f868f18882,2442.0,False,Arkansas,Arkansas,3,Vanorsdol #1-29,WFD Oil Corporation,729,,3503729439,729,2019-11-19,2019.0,19.0,3729439000000,11.0,2019-11-19,False,Vanorsdale,22134.0,3,2019.0,11.0,19.0,3,40037,40,37,Creek,Oklahoma
139253,NAD83,POINT (-102.21212 48.35551),48.355506,-102.212124,0f8e944c-87d7-4d84-8d56-4b8a5f1cba94,8872.4,False,North Dakota,North Dakota,33,TRULSON 156-90-11-14H-3,Hunt Oil Company,610,,33610338000000,610,2022-06-08,2022.0,28.0,33610338000000,5.0,2022-05-28,False,TRULSON 156-90-11-14H-3,13496802.0,3,2022.0,6.0,8.0,33,38061,38,61,Mountrail,North Dakota
139256,NAD83,POINT (-102.21233 48.35551),48.355506,-102.21233,6e5d8284-29d1-4b3d-a72c-729d95c327de,8782.65,False,North Dakota,North Dakota,33,PALERMO 156-90-2-31H-5,Hunt Oil Company,610,,33610338100000,610,2022-06-09,2022.0,28.0,33610338100000,5.0,2022-05-28,False,PALERMO 156-90-2-31H-5,14820967.0,3,2022.0,6.0,9.0,33,38061,38,61,Mountrail,North Dakota
206589,NAD83,POINT (-101.80852 32.40914),32.409144,-101.808518,87ea1a50-ab40-4956-8051-d39bf139ae53,8262.0,False,Utah,Utah,43,Rhea 1-6 Unit 1 #133,Endeavor Energy Resources,317,,43317428660000,317,2020-12-01,2020.0,11.0,43317428660000,11.0,2020-11-11,False,Rhea 1-6 Unit 1 #133,17519292.0,3,2020.0,12.0,1.0,43,48317,48,317,Martin,Texas


- The `North Dakota` `county_number` should be `061`, which is `Mountrail` county, not `610`. 
- The `Utah` `county_number` though was actually correct. The error was the state number which should have been `42`, not `43`. This error is somewhat significant as according to the data dictionary:
> APINumber - The American Petroleum Institute well identification number formatted as follows xx-xxx-xxxxx0000 Where: First two digits 
represent the state, second three digits represent the county, third 5 digits represent the well.<br>

All this means is the `api` number is also incorrect. It should be `42317428660000` (<u><b>42</b></u>-317-42866-0000) instead of `43317428660000` (<u><b>43</b></u>-317-42866-0000).<br>


In [None]:
state_code_mode

In [65]:
# Let's corect theos values putting the county_code and
registry_gdf["county"] = registry_gdf["county_name"].copy()

# replace the county_code and county columns with the values from the joined_gdf
registry_gdf.loc[joined_gdf.index, "county"] = joined_gdf["name"]
registry_gdf.loc[joined_gdf.index, "county_code"] = joined_gdf["countyfp"]

# change the api column of the last row in joined_gdf to 42317428660000 instead of 43317428660000
# registry_gdf.loc[joined_gdf.index[-1], "api"] = "42317428660000"
registry_gdf["api"] = registry_gdf["api"].replace("43317428660000", "42317428660000")
# correct the state_code values for where the api was changed
registry_gdf["state_code"] = registry_gdf["api"].str[0:2]
# Create a mapping from 'state_code' to 'state'
state_mapping = state_code_mode.set_index("state_code")["state"].to_dict()

# Use the mapping to update the 'state' column in 'registry_gdf'
registry_gdf["state"] = registry_gdf["state_code"].map(state_mapping)


# check that the values have been replaced
registry_gdf[registry_gdf["county_name"].isna()]

Unnamed: 0,crs,geometry,latitude,longitude,p_key,tvd,is_indian_well,state,state_name,state_number,well,operator_name,county_number,county_name,api,county_code,job_end_date,job_start_year,job_start_day,api_number,job_start_month,job_start_date,is_federal_well,well_name,total_base_water_volume,ff_version,job_end_year,job_end_month,job_end_day,state_code,county
178445,NAD27,POINT (-96.37064 35.74946),35.749381,-96.370355,692d9381-748e-4e5f-b83f-30f868f18882,2442.0,False,Oklahoma,Arkansas,3,Vanorsdol #1-29,WFD Oil Corporation,729,,3503729439,37,2019-11-19,2019.0,19.0,3729439000000,11.0,2019-11-19,False,Vanorsdale,22134.0,3,2019.0,11.0,19.0,35,Creek
139253,NAD83,POINT (-102.21212 48.35551),48.355506,-102.212124,0f8e944c-87d7-4d84-8d56-4b8a5f1cba94,8872.4,False,North Dakota,North Dakota,33,TRULSON 156-90-11-14H-3,Hunt Oil Company,610,,33610338000000,61,2022-06-08,2022.0,28.0,33610338000000,5.0,2022-05-28,False,TRULSON 156-90-11-14H-3,13496802.0,3,2022.0,6.0,8.0,33,Mountrail
139256,NAD83,POINT (-102.21233 48.35551),48.355506,-102.21233,6e5d8284-29d1-4b3d-a72c-729d95c327de,8782.65,False,North Dakota,North Dakota,33,PALERMO 156-90-2-31H-5,Hunt Oil Company,610,,33610338100000,61,2022-06-09,2022.0,28.0,33610338100000,5.0,2022-05-28,False,PALERMO 156-90-2-31H-5,14820967.0,3,2022.0,6.0,9.0,33,Mountrail
206589,NAD83,POINT (-101.80852 32.40914),32.409144,-101.808518,87ea1a50-ab40-4956-8051-d39bf139ae53,8262.0,False,Texas,Utah,43,Rhea 1-6 Unit 1 #133,Endeavor Energy Resources,317,,42317428660000,317,2020-12-01,2020.0,11.0,43317428660000,11.0,2020-11-11,False,Rhea 1-6 Unit 1 #133,17519292.0,3,2020.0,12.0,1.0,42,Martin


In [None]:
registry_gdf.columns

Another way to impute the missing `county_name` values would have been by using the `well_name` in the `registry_df_post_2012` dataframe. Assuming the other wells on the same pad has the correct `state_code` and `state` values. However, this may not have worked with `Vanorsdol` as there's only one well with that name.

In [None]:
registry_df_post_2012[
    registry_df_post_2012["well_name"].str.contains("vanors", case=False, na=False)
]

In [None]:
# show other wells with a similar name to rhea 1-6
registry_df_post_2012[
    registry_df_post_2012["well"].str.contains("Rhea 1-6", case=False, na=False)
]

In [None]:
registry_df_post_2012[registry_df_post_2012["well"].str.contains("Trulson", case=False)]

In [None]:
# county_fips_gdf[county_fips_gdf["state"].isin(permian_states)]
registry_df_post_2012[registry_df_post_2012["well"].str.contains("palermo", case=False)]

In [66]:
# make the well column uppercase to lower variation among for wells on the same pad
registry_gdf["well"] = registry_gdf["well"].str.upper()
# make the operator column uppercase to lower the variation among entry for the same operator
registry_gdf["operator"] = registry_gdf["operator_name"].str.upper()

In [None]:
registry_gdf.columns

Just as we did for those 4 wells to find the `county` and `state` for which they belong to, we can do that for all the wells. Let's see which well have a different `county` and `state` than what their coordinates suggest.

In [67]:
# create 2 new columns, lat and lon in the registry_gdf for corrections
registry_gdf["lat"] = registry_gdf["latitude"].copy()
registry_gdf["lon"] = registry_gdf["longitude"].copy()

# Check which other wells may have a state value which did not agree with the spatial join result
registry_county_gdf = registry_gdf.sjoin(
    county_fips_gdf, how="left", predicate="intersects"
).drop(columns=["index_right"])

trimmed_column_set = [
    "api",
    "well",
    "state_left",
    "state_right",
    "county_left",
    "county_right",
    "county_code",
    "countyfp",
    "operator",
    "lon",
    "lat",
    "geometry",
]

# rows which the spatial join did not match for the both dataframes
mismatch_geo = registry_county_gdf[
    registry_county_gdf["state_left"] != registry_county_gdf["state_right"]
]
print(f"Number of rows: {len(mismatch_geo)}")

Number of rows: 155


The ones with a nan in the `state_right` column are those that we could not find a match for based on the `geometry`. We can match those for OK using the `api` number and see if the coordinates match for the wells in FracFocus match those from the OCC.

In [68]:
# get the rows with Oklahoma in the state_left column
mismatch_geo_ok = mismatch_geo.query('state_left.str.contains("Oklahoma")')
print(f"Number of rows from OK state: {len(mismatch_geo_ok)}")
if len(mismatch_geo_ok) > 1:
    display(mismatch_geo_ok.sample(1, random_state=628))
else:
    display("No rows from OK state")

Number of rows from OK state: 27


Unnamed: 0,crs,geometry,latitude,longitude,p_key,tvd,is_indian_well,state_left,state_name,state_number,well,operator_name,county_number,county_name,api,county_code,job_end_date,job_start_year,job_start_day,api_number,job_start_month,job_start_date,is_federal_well,well_name,total_base_water_volume,ff_version,job_end_year,job_end_month,job_end_day,state_code,county_left,operator,lat,lon,geoid,statefp,countyfp,name,county_right,state_right
193302,NAD27,POINT (-98.91895 32.72479),32.724655,-98.918603,3e82e6e4-3d1e-4a10-90ad-7079fdcebb4c,14476.0,False,Oklahoma,Oklahoma,35,TRIPLE J 1H-5-8,Citizen Energy II,39,Custer,35039225490000,39,2019-04-02,2019.0,26.0,35039225490000,3.0,2019-03-26,False,Triple J 1H-5-8,15273846.0,3,2019.0,4.0,2.0,35,Custer,CITIZEN ENERGY II,32.724655,-98.918603,48429,48,429,Stephens,Stephens County,Texas


In [69]:
# Store the original index in a new column
mismatch_geo_ok["original_index"] = mismatch_geo_ok.index
# alter api to match the format in the occ_wells dataframe
mismatch_geo_ok["ok_api"] = mismatch_geo_ok["api"].str[:10]  # ok for Oklahoma
# drop the api column
mismatch_geo_ok.drop(columns=["api"], inplace=True)
# rename the api column in occ_wells to ok_api
occ_wells.rename(columns={"api": "ok_api"}, inplace=True)
# look for the api in the occ_wells dataframe and merge that row to mismatch_geo_ok
joined_mismatch_ok = mismatch_geo_ok.merge(
    occ_wells[
        ["ok_api", "well_name", "well_num", "operator", "sh_lat", "sh_lon", "county"]
    ],
    how="left",
    on="ok_api",
)
joined_mismatch_ok[["geometry", "latitude", "longitude", "sh_lat", "sh_lon"]]

Unnamed: 0,geometry,latitude,longitude,sh_lat,sh_lon
0,POINT (-98.84517 37.97053),37.9705,-98.8448,36.970261,-97.844927
1,POINT (-99.95937 33.74175),33.741644,-99.95899,35.40731,-99.99887
2,POINT (-98.39757 37.59893),37.598889,-98.39722,36.955708,-97.835542
3,POINT (-98.40035 37.60004),37.6,-98.4,36.955708,-97.835542
4,POINT (-97.69883 37.04772),37.047667,-97.6985,36.10746,-98.88069
5,POINT (-98732102099.00000 36.20187),36.201866,-98732100000.0,,
6,POINT (35.82785 -97.77623),-97.776229,35.82785,35.827851,-97.776229
7,POINT (36.18891 -98.07005),-98.070053,36.18891,36.188967,-98.070053
8,POINT (36.11700 -98.01788),-98.017878,36.117,36.117041,-98.018222
9,POINT (36.14611 -97.98831),-97.988311,36.14611,36.146174,-97.89667


In [70]:
# replace the lat and lon values with the sh_lat and sh_lon values
# Replace the 'lat' and 'lon' values
joined_mismatch_ok.loc[
    joined_mismatch_ok["sh_lat"].notna(), "lat"
] = joined_mismatch_ok["sh_lat"]
joined_mismatch_ok.loc[
    joined_mismatch_ok["sh_lon"].notna(), "lon"
] = joined_mismatch_ok["sh_lon"]

joined_mismatch_ok.set_index("original_index", inplace=True)

# occ_wells[occ_wells["api"].isin(mismatch_geo_ok["ok_api"])][
#     ["api", "well_name", "well_num", "operator", "sh_lat", "sh_lon", "county"]
# ]

In [71]:
# Update 'lat' and 'lon' in the original DataFrame
registry_gdf.loc[joined_mismatch_ok.index, "lat"] = joined_mismatch_ok["lat"]
registry_gdf.loc[joined_mismatch_ok.index, "lon"] = joined_mismatch_ok["lon"]
# registry_gdf


# Update 'geometry' in the original DataFrame
registry_gdf.loc[joined_mismatch_ok.index, "geometry"] = [
    Point(xy) for xy in zip(joined_mismatch_ok.lon, joined_mismatch_ok.lat)
]
# Check which other wells may have a state value which did not agree with the spatial join result again
registry_county_gdf = registry_gdf.sjoin(
    county_fips_gdf, how="left", predicate="intersects"
).drop(columns=["index_right"])

# mismatch_geo = registry_county_gdf[registry_county_gdf["state_right"].isna()]

mismatch_geo = registry_county_gdf[
    registry_county_gdf["state_left"] != registry_county_gdf["state_right"]
]
print(f"Number of rows with mismatched state values: {mismatch_geo.shape[0]}")
# mismatch_geo

Number of rows with mismatched state values: 130


25 rows had incorrect Coordinates for OK. We can do the same for Texas. The `state_left` column is the state from the FracFocus data and the `state_right` column is the state from the `county_fips_gdf` geodataframe.

In [72]:
# query for the wells in Texas
mismatch_geo_tx = mismatch_geo[trimmed_column_set].query(
    'state_left.str.contains("Texas") | state_right.str.contains("Texas")'
)
# store the original index in a new column
mismatch_geo_tx["original_index"] = mismatch_geo_tx.index

mismatch_geo_tx["tx_api"] = mismatch_geo_tx["api"].str[2:10]
mismatch_geo_tx.loc[
    :,
    [
        "api",
        "well",
        "state_left",
        "state_right",
        "county_left",
        "county_right",
        "county_code",
        "countyfp",
    ],
]
# len(mismatch_geo_tx)

Unnamed: 0,api,well,state_left,state_right,county_left,county_right,county_code,countyfp
18718,42177327040000,DOROTHY SPRINGS 1H,Texas,Arkansas,Gonzales,Clark County,177,019
18906,42283334450000,GUTIERREZ LEYENDER 3H,Texas,,La Salle,,283,
18977,42419317080000,SMITH I 1H,Texas,Louisiana,Shelby,Sabine Parish,419,085
19199,42283334250000,HUBBARD B UNIT NO. 1H,Texas,,LA SALLE,,283,
19200,42283334270000,RAMSEY FAULKNER B NO. 1H,Texas,,LA SALLE,,283,
...,...,...,...,...,...,...,...,...
141853,30025409130000,ESDU #30,New Mexico,Texas,Lea,Jeff Davis County,025,243
148157,30015469200000,PALE RIDE W0OB STATE COM #1H,New Mexico,Texas,Eddy,Culberson County,015,109
199347,17011210040000,IRBY #3,Louisiana,Texas,Beauregard,Walker County,011,471
211333,04222382800000,"CHALK, G. O.-D- 29",California,Texas,State Waters - San Nicolas Offshore Islands,Howard County,222,227


#### Texas Railroad Commission (RRC)


Land survey data, bottom well data, and surface well data were taken from the RRC website. The data was then uploaded to GCP for easier reliabiliity. They are 254 zipfiles, one for each county, in the state of Texas. Each of those zipfiles contained various file extensions, and spatial data format that is usually contained in shapefiles, and contained info for various categories ranging from Airport lines to Offshore survey polys.  

In [73]:
# regex patterns to identify which shapefiles to extract
# we are grabbing the survey lines, surface wells, and bottom well locations shp files
patterns = [r"surv\d{3}p", r"well\d{3}s", r"well\d{3}b"]

# Look at the survey lines polygons and the surface wells points. Data saved from RRC website
shp_dict = extract_matching_shp_files_from_zip_urls_concurrent(SHP_ZIP_URLS, patterns)

Processing URLs:   6%|▌         | 14/254 [00:29<06:01,  1.51s/it]

Error: IncompleteRead(2944078 bytes read, 1253737 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp013.zip


Processing URLs:  11%|█         | 28/254 [01:06<08:24,  2.23s/it]

Error: IncompleteRead(6764282 bytes read, 268354 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp009.zip


Processing URLs:  22%|██▏       | 56/254 [02:08<06:10,  1.87s/it]

Error: IncompleteRead(6257034 bytes read, 775602 more expected) on try 2 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp009.zip


Processing URLs:  26%|██▋       | 67/254 [02:50<13:40,  4.39s/it]

Error: IncompleteRead(9614590 bytes read, 1108490 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp127.zip


Processing URLs:  40%|████      | 102/254 [04:17<04:17,  1.70s/it]

Error: IncompleteRead(1571598 bytes read, 393540 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp207.zip


Processing URLs:  52%|█████▏    | 133/254 [05:36<02:56,  1.46s/it]

Error: IncompleteRead(7881826 bytes read, 468189 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp167.zip


Processing URLs:  77%|███████▋  | 196/254 [08:00<01:31,  1.57s/it]

Error: IncompleteRead(3192875 bytes read, 970911 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp373.zip


Processing URLs:  80%|████████  | 204/254 [08:30<02:10,  2.61s/it]

Error: IncompleteRead(10174464 bytes read, 1311031 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp389.zip
Error: IncompleteRead(6188957 bytes read, 1267646 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp355.zip


Processing URLs:  84%|████████▍ | 214/254 [09:04<01:24,  2.11s/it]

Error: IncompleteRead(30927390 bytes read, 338911 more expected) on try 1 of 5 for https://storage.googleapis.com/mrprime_dataset/capstone_journey/rrc/all_layers_rrc_20231117/Shp201.zip


Processing URLs: 100%|██████████| 254/254 [11:40<00:00,  2.76s/it]


In [None]:
import sys


def deep_getsizeof(obj):
    """Recursively find size of object and its elements"""
    size = sys.getsizeof(obj)
    if isinstance(obj, dict):
        size += sum(deep_getsizeof(k) + deep_getsizeof(v) for k, v in obj.items())
    elif isinstance(obj, (list, tuple)):
        size += sum(deep_getsizeof(x) for x in obj)
    return size


# Assume 'my_dict' is your dictionary
total_size_in_bytes = deep_getsizeof(shp_dict)
total_size_in_mb = total_size_in_bytes / 1e6
print(f"The total size of the dictionary is {total_size_in_mb:.2f} MB.")

In [74]:
# use the patterns to separate the gdf in the dict based on the pattern
surv_dict = {k: shp_dict[k] for k, v in shp_dict.items() if re.search(patterns[0], k)}
swell_dict = {k: shp_dict[k] for k, v in shp_dict.items() if re.search(patterns[1], k)}
bwell_dict = {k: shp_dict[k] for k, v in shp_dict.items() if re.search(patterns[2], k)}

#### Texas RRC land survey data

In [75]:
# O&G well symnum is a number that indicates the type of well simplified for fewer bins
well_symnum_dict = {
    2: "Permitted Location",
    3: "Dry Hole",
    4: "Oil/Gas",  # oil
    5: "Oil/Gas",  # gas
    6: "Oil/Gas",  # oil/ gas
    7: "Plugged/Shut-in",  # oil
    8: "Plugged/Shut-in",  # gas
    9: "Canceled Location",
    10: "Plugged/Shut-in",
    11: "Injection/Disposal",
    12: "Core Test",
    17: "Storage",  # oil
    18: "Storage",  # gas
    19: "Plugged/Shut-in",  # oil
    20: "Plugged/Shut-in",  # gas
    21: "Injection/Disposal",  # oil
    22: "Injection/Disposal",  # gas
    23: "Injection/Disposal",  # oil/ gas
    73: "Brine Mining",
    74: "Water Supply",
    75: "Water Supply",  # oil
    76: "Water Supply",  # gas
    77: "Water Supply",  # oil/ gas
    86: "Horizontal",  # Horizontal Well Surface Location",
    87: "Horizontal",  # Directional/Sidetrack Well Surface Location,
    88: "Storage",
    103: "Storage",  # oil/gas
}
# well_symnum_dict

In [76]:
# Concatenate the GeoDataFrames in surv_dict into a single GeoDataFrame
surv_data_gdf = concat_gdf_from_dict(surv_dict)

# Convert the column names to snake case for consistency
surv_data_gdf.columns = [pascal_to_snake(col) for col in surv_data_gdf.columns]
# addd a coulmn for the county_code
surv_data_gdf["county_code"] = surv_data_gdf["source_file"].str.extract(r"(\d{3})")

# Display a sample of 3 rows from the DataFrame
display(surv_data_gdf.sample(3))

# Display information about the DataFrame, including the number of non-null entries in each column
surv_data_gdf.info(memory_usage="deep")

Unnamed: 0,geometry,abstract_n,level1_sur,level2_blo,level3_sur,level4_sur,abstract_l,scrap_file,source_file,county_code
257253,"POLYGON ((-98.91351 32.64414, -98.92465 32.644...",429913,TE&L CO,,3397,,A-913,,surv429p,429
99712,"POLYGON ((-99.01243 28.89601, -99.02427 28.885...",16331,AB&M,,9,,A-31,,surv163p,163
264823,"POLYGON ((-100.73387 30.44265, -100.73377 30.4...",4351568,HE&WT RR CO,C,164,"WORD, O T",A-1568,,surv435p,435


<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 311248 entries, 0 to 311247
Data columns (total 10 columns):
 #   Column       Non-Null Count   Dtype   
---  ------       --------------   -----   
 0   geometry     311248 non-null  geometry
 1   abstract_n   311127 non-null  object  
 2   level1_sur   310857 non-null  object  
 3   level2_blo   139664 non-null  object  
 4   level3_sur   221353 non-null  object  
 5   level4_sur   108712 non-null  object  
 6   abstract_l   311240 non-null  object  
 7   scrap_file   9271 non-null    object  
 8   source_file  311248 non-null  object  
 9   county_code  311248 non-null  object  
dtypes: geometry(1), object(9)
memory usage: 142.1 MB


#### Texas RRC surface well data

In [77]:
# Concatenate the GeoDataFrames in well_dict into a single GeoDataFrame
swell_data_gdf = concat_gdf_from_dict(swell_dict)

# Convert the column names to snake case for consistency
swell_data_gdf.columns = [pascal_to_snake(col) for col in swell_data_gdf.columns]
# get the county code from the source_file column
swell_data_gdf["county_code"] = swell_data_gdf["source_file"].str.extract(r"(\d{3})")
# map the dictionary to the SYMNUM column and fill the rare values with 'Other'
swell_data_gdf["well_type"] = (
    swell_data_gdf["symnum"].map(well_symnum_dict).fillna("Other")
)

# Display a sample of 3 rows from the DataFrame
display(swell_data_gdf.sample(3))

# Display information about the DataFrame, including the number of non-null entries in each column
swell_data_gdf.info(
    memory_usage="deep"
)  
# shp_dict = extract_specific_gdf_from_zip_url(shp_zip_urls, patterns)

Unnamed: 0,geometry,surface_id,symnum,api,reliab,long27,lat27,long83,lat83,wellid,source_file,county_code,well_type
1056544,POINT (-100.12863 32.40754),1242518,4,44134365,55,-100.128625,32.407541,-100.129001,32.407664,34365.0,well441s,441,Oil/Gas
534459,POINT (-101.31300 32.10108),1179120,11,22736155,40,-101.312997,32.101082,-101.313394,32.101209,36155.0,well227s,227,Injection/Disposal
73671,POINT (-97.71920 29.73028),208710,7,55,50,-97.719198,29.730285,-97.719476,29.730511,,well055s,55,Plugged/Shut-in


<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 1359173 entries, 0 to 1359172
Data columns (total 13 columns):
 #   Column       Non-Null Count    Dtype   
---  ------       --------------    -----   
 0   geometry     1359173 non-null  geometry
 1   surface_id   1359173 non-null  int64   
 2   symnum       1359173 non-null  int64   
 3   api          1359173 non-null  object  
 4   reliab       1359173 non-null  object  
 5   long27       1359173 non-null  float64 
 6   lat27        1359173 non-null  float64 
 7   long83       1359173 non-null  float64 
 8   lat83        1359173 non-null  float64 
 9   wellid       994978 non-null   object  
 10  source_file  1359173 non-null  object  
 11  county_code  1359173 non-null  object  
 12  well_type    1359173 non-null  object  
dtypes: float64(4), geometry(1), int64(2), object(6)
memory usage: 549.2 MB


In [109]:
swell_data_gdf["well_type"].value_counts()

well_type
Plugged/Shut-in       392946
Oil/Gas               353912
Dry Hole              287253
Horizontal            153381
Permitted Location     88198
Injection/Disposal     40512
Canceled Location      39465
Water Supply            1227
Storage                  783
Core Test                763
Other                    632
Brine Mining             101
Name: count, dtype: int64

In [78]:
# plot the well types on a horizontal bar chart
swell_data_gdf["well_type"].value_counts().hvplot.barh(
    height=400,
    width=600,
    title="Surface Well Types Count",
    xlabel="",
    ylabel="",
    xaxis="bare",
    hover_cols="all",
).opts(
    active_tools=["box_zoom"],
    toolbar="above",
)

In [None]:
# plot on a map the dry holes
dry_hole_gdf = swell_data_gdf[swell_data_gdf["well_type"] == "Dry Hole"]

# vertorize the coordinates
dry_hole_mer = platecaree_to_mercator_vectorised(
    dry_hole_gdf["geometry"].x, dry_hole_gdf["geometry"].y
)
dry_hole_coords = pd.DataFrame(dry_hole_mer, columns=["x", "y"])

bg_map = get_background_map()
# plot the dry holes on a map
bg_map * gv.Points(
    dry_hole_coords.reset_index(), ["x", "y"], ["index"], crs=ccrs.GOOGLE_MERCATOR
).opts(width=800, height=600, title="Dry Hole Locations", size=1)

In [None]:
# create function to plot the well types on a map
def plot_well_types(gdf, well_type):
    """Plots the well types on a map"""
    # filter the swell_data_gdf for the well_type
    well_type_gdf = gdf[gdf["well_type"] == well_type]
    # vertorize the coordinates
    well_type_mer = platecaree_to_mercator_vectorised(
        well_type_gdf["geometry"].x, well_type_gdf["geometry"].y
    )
    well_type_coords = pd.DataFrame(well_type_mer, columns=["x", "y"])
    # plot the well types on a map
    return get_background_map() * gv.Points(
        well_type_coords.reset_index(),
        ["x", "y"],
        ["index"],
        crs=ccrs.GOOGLE_MERCATOR,
    ).opts(width=800, height=600, title=f"{well_type} Locations", size=1).opts(
        active_tools=["box_zoom"],
        toolbar="above",
        xaxis="bare",
        yaxis="bare",
        tools=["hover"],
    )


from functools import partial

# create a partial function for the plot_well_types function
plot_well_types_partial = partial(plot_well_types, swell_data_gdf)

plot_well_types_partial("Plugged/Shut-in")

In [None]:
def plot_well_types(well_type, size):
    """Plots the well types on a map"""
    # filter the swell_data_gdf for the well_type
    well_type_gdf = swell_data_gdf[swell_data_gdf["well_type"] == well_type]
    # vectorize the coordinates
    well_type_mer = platecaree_to_mercator_vectorised(
        well_type_gdf["geometry"].x, well_type_gdf["geometry"].y
    )
    well_type_coords = pd.DataFrame(well_type_mer, columns=["x", "y"])
    # plot the well types on a map
    return get_background_map() * gv.Points(
        well_type_coords.reset_index(),
        ["x", "y"],
        ["index"],
        crs=ccrs.GOOGLE_MERCATOR,
    ).opts(width=800, height=600, title=f"{well_type} Locations", size=size).opts(
        active_tools=["box_zoom"],
        toolbar="above",
        xaxis="bare",
        yaxis="bare",
        tools=["hover"],
    )


# create the interactive plot
pn.interact(plot_well_types, well_type=well_types, size=(1, 10)).show()

Launching server at http://localhost:56143


<panel.io.server.Server at 0x1b59b3e3970>



#### Texas RRC bottom hole data

In [82]:
# Concatenate the GeoDataFrames in well_dict into a single GeoDataFrame
bwell_data_gdf = concat_gdf_from_dict(bwell_dict)

# Convert the column names to snake case for consistency
bwell_data_gdf.columns = [pascal_to_snake(col) for col in bwell_data_gdf.columns]
# get the county code from the source_file column
bwell_data_gdf["county_code"] = bwell_data_gdf["source_file"].str.extract(r"(\d{3})")

# map the dictionary to the SYMNUM column and fill the rare values with 'Other'
bwell_data_gdf["well_type"] = (
    bwell_data_gdf["symnum"].map(well_symnum_dict).fillna("Other")
)
# Display a sample of 3 rows from the DataFrame
display(bwell_data_gdf.sample(3))

# Display information about the DataFrame, including the number of non-null entries in each column
bwell_data_gdf.info(memory_usage="deep")

Unnamed: 0,geometry,bottom_id,surface_id,symnum,apinum,reliab,api10,api,long27,lat27,long83,lat83,out_fips,cwellnum,radioact,wellid,stcode,source_file,county_code,well_type
956685,POINT (-102.70667 30.88163),275973,275973,3,4237135055,15,37135055,37135055,-102.706672,30.881631,-102.70712,30.881808,N,3,,35055.0,,well371b,371,Dry Hole
176607,POINT (-99.27824 31.55920),473219,473219,3,42083,15,83,83,-99.278238,31.5592,-99.27857,31.559356,N,3,,,,well083b,83,Dry Hole
595793,POINT (-101.27394 35.65187),1031978,1031978,4,4223300399,15,23300399,23300399,-101.273942,35.651871,-101.274381,35.651914,N,2,,399.0,,well233b,233,Oil/Gas


<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 1375390 entries, 0 to 1375389
Data columns (total 20 columns):
 #   Column       Non-Null Count    Dtype   
---  ------       --------------    -----   
 0   geometry     1375390 non-null  geometry
 1   bottom_id    1375390 non-null  int64   
 2   surface_id   1375390 non-null  int64   
 3   symnum       1375390 non-null  int64   
 4   apinum       1375390 non-null  object  
 5   reliab       1375390 non-null  object  
 6   api10        1375390 non-null  object  
 7   api          1375390 non-null  object  
 8   long27       1375390 non-null  float64 
 9   lat27        1375390 non-null  float64 
 10  long83       1375390 non-null  float64 
 11  lat83        1375390 non-null  float64 
 12  out_fips     1375311 non-null  object  
 13  cwellnum     1342491 non-null  object  
 14  radioact     66484 non-null    object  
 15  wellid       1011289 non-null  object  
 16  stcode       169650 non-null   object  
 17  source_file  137539

In [108]:
bwell_data_gdf["well_type"].value_counts()

well_type
Oil/Gas               467417
Plugged/Shut-in       410482
Dry Hole              293056
Permitted Location    113777
Canceled Location      44086
Injection/Disposal     42881
Water Supply            1262
Storage                  847
Core Test                763
Other                    709
Brine Mining             110
Name: count, dtype: int64

#### Statewide API Data
These file contains descriptive information about oil and gas wells which are maintained in the Railroad Commission of Texas (RRC) mapping system. This file is organized by map area and contains API number, survey name, well number, plug date, completion date, lease name, and lease I.D. The file is organized by USGS 7.5 minute quadrangle and is sorted by API number within each quadrangle. It is intended to be used along with location information extracted from the well mapping system. Records contained in this file will match the API numbers identified in the quad maps, and may not represent all known API numbers in the map area.

In [144]:
import glob

dbf_paths = glob.glob("../data/rrc_dbf/*.dbf")

dbfs = [gpd.read_file(file_path) for file_path in tqdm(dbf_paths)]

dbf_df = pd.concat(dbfs, ignore_index=True)

100%|██████████| 250/250 [05:26<00:00,  1.31s/it]


In [146]:
dbf_df.info()

dbf_df.sample(3)

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 1471905 entries, 0 to 1471904
Data columns (total 20 columns):
 #   Column      Non-Null Count    Dtype   
---  ------      --------------    -----   
 0   ABSTRACT    707806 non-null   object  
 1   APINUM      1471905 non-null  object  
 2   BLOCK       456972 non-null   object  
 3   COMPLETION  1471905 non-null  object  
 4   FIELD_NAME  1118264 non-null  object  
 5   LEASE_NAME  1278963 non-null  object  
 6   GAS_RRCID   1471905 non-null  object  
 7   OIL_GAS_CO  1053499 non-null  object  
 8   ON_OFF_SCH  1053499 non-null  object  
 9   OPERATOR    1307889 non-null  object  
 10  PERMIT_NUM  1471904 non-null  object  
 11  PLUG_DATE   1471905 non-null  object  
 12  REFER_TO_A  1471905 non-null  object  
 13  SECTION     544908 non-null   object  
 14  SURVEY      1041458 non-null  object  
 15  TOTAL_DEPT  1471905 non-null  object  
 16  WELLID      1324454 non-null  object  
 17  QUADNUM     1281256 non-null  object  

Unnamed: 0,ABSTRACT,APINUM,BLOCK,COMPLETION,FIELD_NAME,LEASE_NAME,GAS_RRCID,OIL_GAS_CO,ON_OFF_SCH,OPERATOR,PERMIT_NUM,PLUG_DATE,REFER_TO_A,SECTION,SURVEY,TOTAL_DEPT,WELLID,QUADNUM,OBJECTID_1,geometry
1303210,,46102471,F,19600627,MCELROY,"MCELROY, J. T., CONS.",4161,O,Y,CHEVRON U. S. A. INC.,0,20030715,0,194.0,CCSD&RGNG,3160,583 C,3102131,779646,
973013,,35532879,,0,FLOUR BLUFF (MASSIVES),"WEBB, L. A.",151261,G,Y,HEADINGTON OIL COMPANY,404883,20230320,0,47.0,"FLOUR BLUFF & ENCINAL FARM AND GARDEN TRACTS, ...",6650,18,2797424,1363157,
212338,,7900038,,0,SLAUGHTER,"SLAUGHTER, NW. (SAN ANDRES) UNIT",19316,O,Y,SCOUT ENERGY MANAGEMENT LLC,0,20191219,0,,"LGE. 58, MARTIN CNTY SCHOOL LAND",4985,38,3302312,1320335,


#### matching the mismatched wells

In [83]:
swell_columns = ["api", "lat83", "long83", "well_type"]
mismatch_columns = ["tx_api", "lon", "lat", "original_index"]

In [84]:
mismatch_geo_tx[mismatch_columns].merge(
    swell_data_gdf[swell_columns], how="left", left_on="tx_api", right_on="api"
)

Unnamed: 0,tx_api,lon,lat,original_index,api,lat83,long83,well_type
0,17732704,-92.950000,34.090000,18718,17732704,29.446890,-97.349024,Horizontal
1,28333445,-99.247072,25.177289,18906,28333445,28.177606,-99.247488,Horizontal
2,41931708,-93.614330,31.610560,18977,41931708,31.609584,-93.948177,Horizontal
3,28333425,-98.682000,26.172000,19199,28333425,28.338389,-99.185575,Horizontal
4,28333427,-98.682000,26.172000,19200,28333427,28.338362,-99.185606,Horizontal
...,...,...,...,...,...,...,...,...
73,02540913,-104.746472,30.737142,141853,,,,
74,01546920,-104.105383,31.138073,148157,,,,
75,01121004,-95.523224,30.509602,199347,,,,
76,22238280,-101.261797,32.103056,211333,,,,


In [85]:
joined_mismatch_tx = mismatch_geo_tx[mismatch_columns].merge(
    swell_data_gdf[swell_columns], how="left", left_on="tx_api", right_on="api"
)

# set the index to the original_index column
joined_mismatch_tx.set_index("original_index", inplace=True)

# replace the lat and lon values with the lat83 and lon83 values if not nan
joined_mismatch_tx.loc[joined_mismatch_tx["lat83"].notna(), "lat"] = joined_mismatch_tx[
    "lat83"
]
joined_mismatch_tx.loc[
    joined_mismatch_tx["long83"].notna(), "lon"
] = joined_mismatch_tx["long83"]

# Update 'lat' and 'lon' in the original DataFrame


registry_gdf.loc[joined_mismatch_tx.index, "lat"] = joined_mismatch_tx["lat"]
registry_gdf.loc[joined_mismatch_tx.index, "lon"] = joined_mismatch_tx["lon"]


# Update 'geometry' in the original DataFrame
registry_gdf.loc[joined_mismatch_tx.index, "geometry"] = [
    Point(xy) for xy in zip(joined_mismatch_tx.lon, joined_mismatch_tx.lat)
]

In [92]:
# Repeat the spatial join with registry_gdf and county_fips_gdf
registry_county_gdf_2 = registry_gdf.sjoin(
    county_fips_gdf, how="left", predicate="intersects"
).drop(columns=["index_right"])

mismatched_geo_2 = registry_county_gdf_2[
    registry_county_gdf_2["state_left"] != registry_county_gdf_2["state_right"]
]
print(f"Number of rows with mismatched state values: {mismatched_geo_2.shape[0]}")

Number of rows with mismatched state values: 77


Another check is for rows where the `county_code` is an even number. Although the API number does not follow the state fips code as its first 2 numbers, the next 3 numbers do follow the county fips code. County fips codes are mostly odd numbers (almost exclusively) so any even number in the `county_code` column is most likely incorrect.

In [99]:
# the the county_code which are even numbers
mismatched_geo_2[
    mismatched_geo_2["county_code"].astype(int).apply(lambda x: x % 2 == 0)
][trimmed_column_set]

Unnamed: 0,api,well,state_left,state_right,county_left,county_right,county_code,countyfp,operator,lon,lat,geometry
210599,4228337070000,PAVLICEK UNIT 2H,California,Texas,State Waters - Santa Cruz Offshore Islands,Lavaca County,228,285,PENN VIRGINIA CORPORATION,-97.149289,29.53873,POINT (-97.14929 29.53873)
211666,4218530868000,NGR FIVELAND 1H,California,Texas,State Waters - San Clemente Offshore Islands,Grimes County,218,185,"NEW GULF RESOURCES, LLC",-96.152663,30.653893,POINT (-96.15266 30.65389)
212374,4226131591000,MRS SK EAST 229,California,Texas,State Waters - Santa Catalina Offshore Islands,Kenedy County,226,261,"HEADINGTON ENERGY PARTNERS, LLC",-97.8676,27.0249,POINT (-97.86760 27.02490)
212761,4216334169000,WHITWELL G 119H,California,Texas,State Waters - Anacapa Offshore Islands,Frio County,216,163,"TRINITY OPERATING (USG), LLC",-99.3206,28.7281,POINT (-99.32060 28.72810)
212762,4216334169000,WHITWELL G 119H,California,Texas,State Waters - Anacapa Offshore Islands,Frio County,216,163,"TRINITY OPERATING (USG), LLC",-99.3206,28.7281,POINT (-99.32060 28.72810)
212765,4228336790000,MARTINEZ NORTH D 102H,California,Texas,State Waters - Santa Cruz Offshore Islands,Dimmit County,228,127,"TRINITY OPERATING (USG), LLC",-99.4011,28.6184,POINT (-99.40110 28.61840)
212766,4228336790000,MARTINEZ NORTH D 102H,California,Texas,State Waters - Santa Cruz Offshore Islands,Dimmit County,228,127,"TRINITY OPERATING (USG), LLC",-99.4011,28.6184,POINT (-99.40110 28.61840)
212767,4228336792000,MARTINEZ NORTH D 103H,California,Texas,State Waters - Santa Cruz Offshore Islands,Dimmit County,228,127,"TRINITY OPERATING (USG), LLC",-99.4011,28.6184,POINT (-99.40110 28.61840)
212768,4228336789000,MARTINEZ NORTH D 101H,California,Texas,State Waters - Santa Cruz Offshore Islands,Dimmit County,228,127,"TRINITY OPERATING (USG), LLC",-99.4012,28.6183,POINT (-99.40120 28.61830)
212769,4228336789000,MARTINEZ NORTH D 101H,California,Texas,State Waters - Santa Cruz Offshore Islands,Dimmit County,228,127,"TRINITY OPERATING (USG), LLC",-99.4012,28.6183,POINT (-99.40120 28.61830)


In [None]:
# Check which other wells may have a state value which did not agree with the spatial join result again
registry_county_gdf = registry_gdf.sjoin(
    county_fips_gdf, how="left", predicate="intersects"
).drop(columns=["index_right"])
mismatch_geo = registry_county_gdf[registry_county_gdf["state_right"].isna()]
mismatch_geo.shape
print(f"Number of rows with mismatched state values: {mismatch_geo.shape[0]}")
if mismatch_geo.shape[0] > 0:
    display(mismatch_geo.sample(3))
else:
    print("No rows with mismatched state values")
# display(mismatch_geo.sample(3))

# drop the rows with null values in the state_right column of mismatch_geo from the registry_gdf
print(f"Number of rows before dropping: {registry_gdf.shape[0]}")
registry_gdf.drop(mismatch_geo.index, inplace=True)

In [None]:
print(f"Number of rows after dropping: {registry_gdf.shape[0]}")

In [None]:
registry_gdf.sjoin(county_fips_gdf, how="left", predicate="intersects").drop(
    columns=["index_right"]
)[trimmed_column_set]

In [None]:
# get the row where the county_left and county_right columns do not match
trimmed_joined_gdf = registry_gdf.sjoin(
    county_fips_gdf, how="left", predicate="intersects"
).drop(columns=["index_right"])[trimmed_column_set]
state_mismatch_mask = (
    trimmed_joined_gdf["state_left"] != trimmed_joined_gdf["state_right"]
)
state_mismatch_gdf = trimmed_joined_gdf[state_mismatch_mask]
print(f"Number of rows with mismatched county values: {state_mismatch_gdf.shape[0]}")

In [None]:
state_mismatch_gdf.sample(3)

In [None]:
# list the columns to keep
columns_we_want = [
    "api",
    "well",
    "state_code",
    "state_left",
    "state_right",
    "county_code",
    "county_left",
    "countyfp",
    "name",
    "operator",
    "lat",
    "lon",
    "geoid",
    "geometry",
]
registry_county_gdf = registry_gdf.sjoin(
    county_fips_gdf, how="left", predicate="intersects"
).drop(columns=["index_right"])[columns_we_want]

print(f"Number of rows: {registry_county_gdf.shape[0]}")
registry_county_gdf[
    registry_county_gdf["state_left"] != registry_county_gdf["state_right"]
][columns_we_want].sample(3)

In [None]:
# get the rows where the state_left is California with the state_right being Texas
# This would represent where the geometry said Texas but the api or original dataset had California
# we check the api with
cali_tx_query = registry_county_gdf.query(
    'state_left == "California" & state_right == "Texas"'
)
# from the query, take the last 9 of the api (well_id) with 42 at beginning
# with county_code from county that was matched with the geometry
cali_tx_query["state_code"] = "42"
cali_tx_query["county_code"] = cali_tx_query["countyfp"]
cali_tx_query["well_id"] = cali_tx_query["api"].str[-9:]
cali_tx_query["api"] = (
    cali_tx_query["state_code"]
    + cali_tx_query["county_code"]
    + cali_tx_query["well_id"]
)

# put api, county_code and state_code into the registry_gdf
registry_gdf.loc[
    cali_tx_query.index, ["api", "county_code", "state_code"]
] = cali_tx_query[["api", "county_code", "state_code"]].values

In [None]:
registry_county_gdf[
    registry_county_gdf[columns_we_want]["state_left"]
    != registry_county_gdf[columns_we_want]["state_right"]
][columns_we_want]

In [None]:
# swell_data_gdf["well_type"].value_counts()
# well_data_gdf[well_data_gdf["well_type"] == "Other"]["symnum"].value_counts()
# look for the mismatch_geo_tx api in the swell_data_gdf and get those rrows

mismatch_geo_tx_sw = swell_data_gdf[
    swell_data_gdf["api"].isin(mismatch_geo_tx["tx_api"])
].copy()
mismatch_geo_tx_sw
# convert the long83 and lat83 columns to geomety Points
mismatch_geo_tx_sw["geometry"] = [
    Point(xy) for xy in zip(mismatch_geo_tx_sw.long83, mismatch_geo_tx_sw.lat83)
]
# create a GeoDataFrame from the mismatch_geo_tx_sw dataframe
mismatch_geo_tx_sw = gpd.GeoDataFrame(
    mismatch_geo_tx_sw, geometry="geometry", crs="EPSG:4269"
)
# plot the mismatch_geo_tx_sw GeoDataFrame
# bg_map * gv.Points(mismatch_geo_tx_sw).opts(height=500, width=800, tools=["hover"])

mismatch_geo_tx_sw.sjoin(county_fips_gdf, how="left", predicate="intersects")

In [None]:
# check the length of the tx_api values
swell_data_gdf["api"].astype("string").str.len().value_counts()

# drop the rows with the api number length of 3
swell_data_gdf.drop(
    swell_data_gdf[swell_data_gdf["api"].astype("string").str.len() == 3].index,
    inplace=True,
)

In [None]:
bg_map = get_background_map()

ok_counties = county_fips_gdf.query('state.str.contains("Oklahoma")').copy()
ok_counties.to_crs("EPSG:3857", inplace=True)

tx_counties = county_fips_gdf.query('state.str.contains("Texas")').copy()
tx_counties.to_crs("EPSG:3857", inplace=True)

ok_polys = gv.Polygons(ok_counties, crs=ccrs.GOOGLE_MERCATOR).opts(
    projection=ccrs.GOOGLE_MERCATOR,
    line_color="black",
    fill_alpha=0,
    height=500,
    width=500,
)
bg_map * ok_polys

In [None]:
# Define a list of states that are in the Permian Basin.
nm_tx = ["New Mexico", "Texas"]

# Filter the county_fips_gdf DataFrame to include only the counties in the Permian states.
counties_nm_tx_gdf = county_fips_gdf[county_fips_gdf["state"].isin(nm_tx)]

# Perform a spatial join between the registry_gdf and counties_nm_tx_gdf DataFrames.
# This will add the data from counties_nm_tx_gdf to registry_gdf for matching locations.
# After the join, drop the 'index_right' column as it's not needed.
registry_nm_tx_gdf = registry_gdf.sjoin(counties_nm_tx_gdf).drop(
    columns=["index_right"]
)
registry_nm_tx_gdf.sample(3)

In [None]:
registry_nm_tx_gdf.info()

In [None]:
registry_nm_tx_gdf["operator_name"].nunique()

In [None]:
# create a pivot table with the count of operators for each year
operator_year_count = registry_nm_tx_gdf.pivot_table(
    index="operator_name", columns="job_start_year", values="api", aggfunc="count"
)
# See which operators were active every year
# operator_year_count[operator_year_count.count(axis=1) == 11]

# See who has been active for the last 5 years
operator_active_5y = operator_year_count.loc[
    ~operator_year_count.iloc[:, -5:].isna().any(axis=1)
].fillna(0)

# do some styler table formatting from pandas
style = operator_active_5y.style.background_gradient(
    cmap="cet_CET_L2_r", axis=1, vmin=0, vmax=operator_year_count.max().max()
)
# Format the numbers in the table as integers
style = style.format("{:.0f}")
print(f"Number of operators active for the last 5 years: {len(operator_active_5y)}")
style

In [None]:
# Create a mask for rows where the first 2 characters of the api number are not 42 nor 30.
# This is done to filter out rows that do not belong to the states we are interested in (Texas and New Mexico).
api_mask = ~registry_nm_tx_gdf["api"].str[0:2].isin(["42", "30"])

# Apply the mask to the registry_nm_tx_gdf DataFrame to get the rows that match the condition.
mismatch_state = registry_nm_tx_gdf[api_mask]

# Display selected columns from the mismatch_state DataFrame.
# These columns provide information about the well, its location, and the job start date.
display(
    mismatch_state[
        [
            "api",
            "api_number",
            "state_right",
            "state_name",
            "state_number",
            "well_name",
            "operator_name",
            "county_name",
            "latitude",
            "longitude",
            "geometry",
            "crs",
            "job_start_date",
            # "county",
            "countyfp",
        ]
    ]
)

In [27]:
# get a background map
bg_map = get_background_map()

In [None]:
# Convert the coordinates to Mercator
mercator_coords = platecaree_to_mercator_vectorised(
    registry_nm_tx_gdf["geometry"].x, registry_nm_tx_gdf["geometry"].y
)

# Round the coordinates and create a DataFrame
mer_points = pd.DataFrame(np.round(mercator_coords), columns=["x", "y"])

# Create a Points object for plotting
gpoints = gv.Points(
    mer_points.reset_index(), ["x", "y"], ["index"], crs=ccrs.GOOGLE_MERCATOR
).opts(height=600, width=800, color="skyblue", size=1, tools=["hover"])

# Create a layout with the background map and the points
layout = bg_map * gpoints
layout

### Map files taken from the EIA website.


### Formations

In [None]:
# Use the function 'extract_gdfs_from_zip_url_concurrent' to get GeoDataFrames from the URLs in 'basins_url_list'
# This function concurrently downloads and extracts GeoDataFrames from the given URLs
basins_dict = extract_gdfs_from_zip_url_concurrent(basins_url_list)

# Display the keys of 'basins_dict' to see the names of the basins
display(basins_dict.keys())

# Concatenate the GeoDataFrames in 'basins_dict' into a single GeoDataFrame using the function 'concat_gdf_from_dict'
basins_gdf = concat_gdf_from_dict(basins_dict)

# Convert the column names of 'basins_gdf' to snake case for consistency
# The function 'pascal_to_snake' is used to convert PascalCase or camelCase to snake_case
basins_gdf.columns = [pascal_to_snake(col) for col in basins_gdf.columns]

In [None]:
print(f"Number of sub basins/ geodataframes: {len(basins_dict)}\n")

for k, gdf in basins_dict.items():
    print(f"{k}| Shape:{gdf.shape}| CRS:{gdf.crs.to_string()}")
    display(gdf.sample())
    print()

In [None]:
# get the shapefile of the basin boundaries
basins_dict = extract_gdfs_from_zip_url_concurrent(basins_url_list)
display(basins_dict.keys())
basins_gdf = concat_gdf_from_dict(basins_dict)
# scrub the column names
basins_gdf.columns = [pascal_to_snake(col) for col in basins_gdf.columns]

In [None]:
# Plot shale plays and basin boundaries of the different formations in the Permian Basin
bg_map * basins_gdf.hvplot(
    geo=True,
    alpha=0.5,
    title="Shale Plays in the Permian Basin",
    legend=True,
    by="shale_play",
    muted_alpha=0.01,
).opts(
    tools=["hover", "tap"],
    legend_position="right",
    height=600,
    width=800,
)

In [None]:
from holoviews import opts

# Dissolve the geometries of the basins_gdf GeoDataFrame into a single geometry


shale_plays_gdf = basins_gdf[["geometry"]].dissolve()
# get the counties in the Permian Basin
permian_counties = county_fips_gdf.intersects(shale_plays_gdf)
# plot the counties in the Permian Basin
bg_map * gv.Polygons(county_fips_gdf[permian_counties]).opts(
    title="Counties in the Permian Basin",
    tools=["hover"],
    height=600,
    width=800,
    alpha=0.5,
    color="skyblue",
)
# plot an outline of the Permian Basin over the counties
overlay = (
    bg_map
    * gv.Polygons(county_fips_gdf[permian_counties])
    * gv.Path(shale_plays_gdf)
    # * gpoints
)

overlay.opts(
    opts.Polygons(alpha=0.5, cmap=["#73d2ff"], line_color="gray"),
    opts.Path(alpha=0.5, color="black"),
    opts.Overlay(tools=["hover"], height=600, width=800),
    opts.Points(color="crimson"),
)


# plot to confirm that the geometries have been dissolved
# bg_map * gv.Polygons(shale_plays_gdf).opts(
#     # geo=True,
#     title="Dissolved Shale play in the Permian Basin",
#     height=600,
#     width=800,
# )

#### State land leases

#### New Mexico:
> 


In [None]:
# Use the extract_gdfs_from_zip_url_concurrent function to download and extract GeoDataFrames
# from the shapefile zip files at the URLs in the shp_url_list. The function returns a dictionary
# where the keys are the names of the shapefiles and the values are the corresponding GeoDataFrames.
nm_slo_dict = extract_gdfs_from_zip_url_concurrent(nm_slo_url_list)

# Display the keys of the land_map_dict dictionary. These are the names of the shapefiles
# that were downloaded and extracted.
nm_slo_dict.keys()

In [None]:
# land_map_dict = extract_gdfs_from_zip_url_concurrent(shp_url_list)
# land_map_dict.keys()

In [None]:
# sample the gdfs in the dictionary
for k, gdf in nm_slo_dict.items():
    print(f"{k}| Shape:{gdf.shape}| CRS:{gdf.crs.to_string()}")
    display(gdf.sample(3))
    display(gdf.info())

In [None]:
# Create 2 separate gdfs instead of concatenating them as they have distinct columns
nm_slo_gdfs = list(nm_slo_dict.values())
# first one is the geologic regions
nm_slo_geo = nm_slo_gdfs[0]
# scrub the columns
nm_slo_geo.columns = [pascal_to_snake(col) for col in nm_slo_geo.columns]

# Define  a dictionary for the opts to include in plot function
poly_opts = dict(
    alpha=0.8,
    height=600,
    width=800,
    line_width=0,
    line_color="lightgray",
    tools=["hover"],
)


# Adjust opts for this plot
poly_opts_copy = poly_opts.copy()
poly_opts_copy["line_width"] = 1

# plot the geologic regions gdf
bg_map * gv.Polygons(nm_slo_geo.to_crs("EPSG:4269"), vdims=["label"]).opts(
    **poly_opts_copy, cmap=["#73d2ff"] * 256, title="New Mexico Geologic Regions"
)

In [None]:
# Second one is the oil and gas leases on New Mexico State Trust Lands
nm_slo_lease = nm_slo_gdfs[1]
# scrub the columns
nm_slo_lease.columns = [pascal_to_snake(col) for col in nm_slo_lease.columns]
# create a new column for the area of the lease
nm_slo_lease["area"] = nm_slo_lease["geometry"].area
# groupby the ogrid_nam and sum the area
# add the transformed area to the gdf
nm_slo_lease["ogrid_area"] = (
    nm_slo_lease.groupby("ogrid_nam")["area"].transform("sum") / 1e6
)

# plot the oil and gas leases gdf for New Mexico State Trust Lands
bg_map * gv.Polygons(
    nm_slo_lease.to_crs("EPSG:4269"), vdims=["ogrid_nam", "ogrid_area"]
).opts(
    **poly_opts,
    cnorm="eq_hist",
    colorbar=True,
    title="Oil and Gas Leases on New Mexico State Trust Lands"
)

In [None]:
# list(land_map_dict.values())
# [gdf['geometry'] for gdf in land_map_dict.values()]
random_color_list = [
    "#" + "".join([random.choice("0123456789ABCDEF") for j in range(6)])
    for i in range(len(nm_slo_dict))
]
plots = []
new_map = get_background_map()
plots.append(new_map)
for color, (name, gdf) in zip(random_color_list, nm_slo_dict.items()):
    # Add new column with the name of the shapefile for the hover tool
    gdf["label"] = name
    plot = gv.Polygons(gdf.to_crs("EPSG:4269"), vdims=["label"]).opts(
        tools=["hover"], height=600, width=800, alpha=0.5, title=""
    )
    plots.append(plot)

overlay = hv.Overlay(plots)
overlay

In [None]:
nm_slo_lease.info()

In [None]:
# Concatenate the GeoDataFrames in well_dict into a single GeoDataFrame
swell_data_gdf = concat_gdf_from_dict(swell_dict)

# Convert the column names to snake case for consistency
swell_data_gdf.columns = [pascal_to_snake(col) for col in swell_data_gdf.columns]
# get the county code from the source_file column
swell_data_gdf["county_code"] = swell_data_gdf["source_file"].str.extract(r"(\d{3})")

# Display a sample of 3 rows from the DataFrame
display(swell_data_gdf.sample(3))

# Display information about the DataFrame, including the number of non-null entries in each column
swell_data_gdf.info()

In [None]:
# add a column to the surv_data_gdf with the county number
# the county_number wil be the numbers in the source_file column
surv_data_gdf["county_number"] = surv_data_gdf["source_file"].str.extract(r"(\d{3})")

# using just the geometry and the county_number columns, intersect with the permian basin gdf
surv_permian_gdf = surv_data_gdf[["geometry", "county_number"]].sjoin(
    shale_plays_gdf[["geometry"]], how="inner", predicate="intersects"
)
# see which counties are in the permian basin
pb_county_numbers = surv_permian_gdf["county_number"].unique().tolist()

# plot the survey lines in the permian basin
bg_map * gv.Polygons(
    surv_permian_gdf.to_crs("EPSG:4269"), vdims=["county_number"]
).opts(
    tools=["hover"],
    height=600,
    width=800,
    alpha=0.5,
    line_width=0,
    title="Permian Basin Survey Lines",
)

# surv_data_gdf.sample(3)

In [None]:
# see how land survey polygon data looks on map
pb_plot = shale_plays_gdf.hvplot(geo=True, color="red", alpha=0.5, line_width=0).opts(
    height=600, width=800
)
survey_plot = surv_data_gdf.hvplot(geo=True, color="blue", alpha=0.5, line_width=0)

# bg_map * survey_plot * pb_plot

In [None]:
# Get the intersection of the survey polygons and the Permian Basin polygon
survey_pb_gdf = gpd.overlay(surv_data_gdf, shale_plays_gdf, how="intersection")
survey_pb_gdf.info()

In [None]:
# survey_pb_gdf.explore()
# surv_data_gdf.sample(100)

In [None]:
# spatial join of the registry_gdf(from fracfocus) and surv_data_gdf
registry_join_gdf = gpd.sjoin(
    registry_gdf[
        [
            "geometry",
            "api",
            "operator_name",
            "well_name",
            "state",
            "county_name",
            "county_number",
        ]
    ],
    surv_data_gdf,
).drop(columns=["index_right"])


registry_join_gdf.sort_values(by="api")

registry_join_gdf.county_name.value_counts()

# create a well_id column from the api column

registry_join_gdf["tx_api"] = registry_join_gdf["api"].str[2:10]


# merge the welltype column from well_data_gdf to registry_join_gdf on the api_short column
swell_data_gdf["tx_api"] = swell_data_gdf["api"].copy()

registry_join_gdf = (
    registry_join_gdf.merge(swell_data_gdf[["tx_api", "well_type"]], on="tx_api")
    .drop(columns=["scrap_file", "level4_sur"])
    .rename(columns={"level2_blo": "block"})
)
# registry_join_gdf.explore()
# plot polygons using geoviews
# bg_map * gv.Polygons(registry_join_gdf.to_crs("EPSG:4269"), vdims=["well_type"]).opts(
#     **poly_opts, color="well_type", title="Well Types in the Permian Basin"
# )

bg_map * registry_join_gdf.hvplot(
    geo=True,
    by="well_type",
    alpha=0.8,
    legend="right",
    width=800,
    height=600,
    size=1,
    muted_alpha=0.01,
    tools=["box_select"],
)

### Geodatabase files taken from the Texas GLO (General Land Office.)

These files contained both the Oil and Gas Leases (active only), managed by the Texas GLO, and Oil & Gas units (active only) which is Oil and Gas pooling agreements managed by the Texas GLO. 

In [None]:
# get the geodataframe of the active leases
# active_gdb_dict = read_gdb_from_zip_url(gdb_zip_urls)

# get the geodataframe of the active leases using concurrent futures
active_gdb_dict = read_gdb_from_zip_url_concurrent(GDB_ZIP_URLS)

In [None]:
active_gdb_dict.keys()

In [None]:
for k, gdf in active_gdb_dict.items():
    print(f"{k}| Shape:{gdf.shape}| CRS:{gdf.crs.to_string()}")
    display(gdf.sample(3))
    print()

In [None]:
# Read in the active lease geodatabase
active_leases_gdf = active_gdb_dict["OAG_Leases_Active"]
# clean column names
active_leases_gdf.columns = [pascal_to_snake(col) for col in active_leases_gdf.columns]
active_leases_gdf.describe(include="all").T.sort_values(
    by="unique", ascending=False
).fillna("")

In [None]:
# get the columns with the date in it using regex
date_cols = [col for col in active_leases_gdf.columns if re.search(r"date", col)]
# add any other columns that should be dates
date_cols.extend(["lease_input"])

date_cols

In [None]:
# convert the date columns to datetime
active_leases_gdf[date_cols] = pd.concat(
    [pd.to_datetime(active_leases_gdf[col]) for col in date_cols], axis=1
)
# active_leases_gdf[date_cols] = active_leases_gdf[date_cols].fillna(
#     pd.Timestamp("1900-06-28")
# )

In [None]:
# get the columns interested in seeing
columns_of_interest = date_cols + [
    "county",
    "geometry",
    "land_type",
    "primary_term_year",
    "original_lessee",
    "lessor",
    "field_name",
    "lease_type",
    "lease_status",
    "lease_number",
]

In [None]:
active_leases_gdf[columns_of_interest].info()
active_leases_gdf[active_leases_gdf[columns_of_interest].isna().any(axis=1)][
    columns_of_interest
].sort_values(by="effective_date", ascending=False)

In [None]:
active_leases_gdf_trimmed = active_leases_gdf[columns_of_interest]

active_leases_gdf_trimmed["lease_type"].unique()

In [None]:
non_date_cols = list(set(columns_of_interest) - set(date_cols))
pd.concat(
    [
        active_leases_gdf[non_date_cols],
        active_leases_gdf[date_cols].astype(
            str
        ),  # the .explore() does not work with NaT in datetime columns
    ],
    axis=1,
).explore()

In [None]:
# Read in the active units geodatabase
active_units_gdf = active_gdb_dict["OAG_Units_Active"]
# clean column names
active_units_gdf.columns = [pascal_to_snake(col) for col in active_units_gdf.columns]
active_units_gdf.describe(include="all").T.sort_values(
    by="unique", ascending=False
).fillna("")

In [None]:
active_units_gdf.field_name.unique().tolist()

In [None]:
# active_units_gdf.explore()

### Dask

#### Production Data Query Dump from RRC

| Table | Description|
|---|---|
|GP_COUNTY | General purpose table that stores county information.|
|GP_DATE_RANGE_CYCLE | General purpose table of PDQ data range ( Jan. 1993-current production report month/year). |
|GP_DISTRICT | General purpose table that contains district information. |
|OG_COUNTY_CYCLE | Contains production report data reported by lease and month (YYYYMM) aggregated by the county in which the wells are located.  |This is an estimate only based on allowables and potentials.
|OG_COUNTY_LEASE_CYCLE | Contains production report data reported by lease and month (YYYYMM) aggregated by lease and county in which the wells  |are located. This is an estimate only based on allowables and potentials.
|OG_DISTRICT_CYCLE | Contains production report data reported by lease and month (YYYYMM) aggregated by the completion district for the lease ID. |
|OG_FIELD_CYCLE | Contains production report data reported by lease and month (YYYYMM) aggregated by the field in which the well(s) for the lease  |are completed.
|OG_FIELD_DW | Table of field identifying data. |
|OG_LEASE_CYCLE | Contains production report data reported by lease and month (YYYYMM). |
|OG_LEASE_CYCLE_DISP | Contains production report disposition data reported by lease and month (YYYYMM). |
|OG_OPERATOR_CYCLE | Contains production report data reported by lease and month (YYYYMM) aggregated by the operator of the lease. |
|OG_OPERATOR_DW | This table contains identifying operator information. |
|OG_REGULATORY_LEASE_DW | This table contains identifying lease information. |
|OG_SUMMARY_MASTER_LARGE | Summary table. (Used for query purposes at the operator level) |
|OG_SUMMARY_ONSHORE_LEASE | Summary table. (Used for query purposes on the leases in onshore counties) |
|OG_WELL_COMPLETION | This table contains identifying well completion information. |

Load data i from GCS by downloading the zip file to disk and then unipping it. 

In [28]:
from google.cloud import storage


def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

In [None]:
bucket_name = "mrprime_dataset"
blob_name = "capstone_journey/rrc/PDQ_DSV.zip"
dl_file = Path("../data/PDQ_DSV.zip")

# Check if the directory exists
if not dl_file.parent.exists():
    # If the directory does not exist, create it
    dl_file.parent.mkdir(parents=True)

download_blob(bucket_name, blob_name, str(dl_file))

# Create a new directory with the same name as the stem of the zip file
extract_dir = dl_file.parent / dl_file.stem
extract_dir.mkdir(exist_ok=True)

# Extract the zip file
with ZipFile(dl_file, "r") as zip_ref:
    for name in zip_ref.namelist():
        zip_ref.extract(name, extract_dir)

Read in the data as parquet files from GCS. We were able to bring in the data without loading it on to disk.

In [100]:
from google.cloud import storage

# Create a client
client = storage.Client()

# Define the bucket name and the prefix
bucket_name = "mrprime_dataset"
prefix = "capstone_journey/rrc/processed/"

# Get the bucket
bucket = client.get_bucket(bucket_name)

# Get the blobs (files) in the bucket that match the prefix
blobs = bucket.list_blobs(prefix=prefix)

# Get the URLs of the data tables
data_tables = set(
    blob.name.rsplit("/", 2)[1] for blob in blobs if "_DATA_TABLE.parquet" in blob.name
)

# Load each data table into a separate DataFrame
dfs = {
    data_table: dd.read_parquet(
        f"gs://{bucket_name}/{prefix}{data_table}/*.parquet", engine="pyarrow"
    )
    for data_table in tqdm(data_tables)
}

100%|██████████| 16/16 [00:50<00:00,  3.14s/it]


In [101]:
dfs.keys()

dict_keys(['OG_FIELD_DW_DATA_TABLE.parquet', 'OG_DISTRICT_CYCLE_DATA_TABLE.parquet', 'OG_LEASE_CYCLE_DATA_TABLE.parquet', 'OG_WELL_COMPLETION_DATA_TABLE.parquet', 'OG_REGULATORY_LEASE_DW_DATA_TABLE.parquet', 'OG_COUNTY_CYCLE_DATA_TABLE.parquet', 'OG_SUMMARY_ONSHORE_LEASE_DATA_TABLE.parquet', 'OG_COUNTY_LEASE_CYCLE_DATA_TABLE.parquet', 'GP_DATE_RANGE_CYCLE_DATA_TABLE.parquet', 'OG_LEASE_CYCLE_DISP_DATA_TABLE.parquet', 'OG_OPERATOR_DW_DATA_TABLE.parquet', 'GP_DISTRICT_DATA_TABLE.parquet', 'GP_COUNTY_DATA_TABLE.parquet', 'OG_OPERATOR_CYCLE_DATA_TABLE.parquet', 'OG_FIELD_CYCLE_DATA_TABLE.parquet', 'OG_SUMMARY_MASTER_LARGE_DATA_TABLE.parquet'])

In [102]:
# shortent the keys for brevity
dfs = {k.split(".")[0]: v for k, v in dfs.items()}
data_table_list = list(dfs.keys())
display(data_table_list)
dfs[data_table_list[0]].head()

['OG_FIELD_DW_DATA_TABLE',
 'OG_DISTRICT_CYCLE_DATA_TABLE',
 'OG_LEASE_CYCLE_DATA_TABLE',
 'OG_WELL_COMPLETION_DATA_TABLE',
 'OG_REGULATORY_LEASE_DW_DATA_TABLE',
 'OG_COUNTY_CYCLE_DATA_TABLE',
 'OG_SUMMARY_ONSHORE_LEASE_DATA_TABLE',
 'OG_COUNTY_LEASE_CYCLE_DATA_TABLE',
 'GP_DATE_RANGE_CYCLE_DATA_TABLE',
 'OG_LEASE_CYCLE_DISP_DATA_TABLE',
 'OG_OPERATOR_DW_DATA_TABLE',
 'GP_DISTRICT_DATA_TABLE',
 'GP_COUNTY_DATA_TABLE',
 'OG_OPERATOR_CYCLE_DATA_TABLE',
 'OG_FIELD_CYCLE_DATA_TABLE',
 'OG_SUMMARY_MASTER_LARGE_DATA_TABLE']

Unnamed: 0,FIELD_NO,FIELD_NAME,DISTRICT_NO,DISTRICT_NAME,FIELD_CLASS,FIELD_H2S_FLAG,FIELD_MANUAL_REV_FLAG,WILDCAT_FLAG,O_DERIVED_RULE_TYPE_CODE,G_DERIVED_RULE_TYPE_CODE,O_RESCIND_DT,G_RESCIND_DT,O_SALT_DOME_FLAG,G_SALT_DOME_FLAG,O_OFFSHORE_CODE,G_OFFSHORE_CODE,O_DONT_PERMIT,G_DONT_PERMIT,O_NOA_MAN_REV_RULE,G_NOA_MAN_REV_RULE,O_COUNTY_NO,G_COUNTY_NO,O_DISCOVERY_DT,G_DISCOVERY_DT,O_SCHED_REMARKS,G_SCHED_REMARKS,O_COMMENTS,G_COMMENTS,CREATE_BY,CREATE_DT,MODIFY_BY,MODIFY_DT
0,8000,WILDCAT-COMMISSION USE ONLY,4,4,O,N,,N,SW,,,,N,N,L,,Y,N,,,47.0,,11-AUG-04,,,,,,FLU390 mainframe,19-OCT-23,,
1,8001,WILDCAT,4,4,B,Y,N,Y,SW,SW,,,N,N,L,L,N,N,,,273.0,,01-MAR-72,,,,,,FLU390 mainframe,19-OCT-23,,
2,8666,WILDCAT-COMMISSION USE ONLY,4,4,O,N,,N,SW,,,,N,N,L,,Y,N,,,47.0,,11-AUG-04,,,,,,FLU390 mainframe,19-OCT-23,,
3,8997,WILDCAT-COMMISSION USE ONLY,4,4,O,N,,N,SW,,,,N,N,L,,Y,N,,,355.0,,11-AUG-04,,,,,,FLU390 mainframe,19-OCT-23,,
4,8998,WILDCAT-COMMISSION USE ONLY,4,4,O,N,,N,SW,,,,N,N,L,,Y,N,,,215.0,,11-AUG-04,,,,,,FLU390 mainframe,19-OCT-23,,


In [103]:
# tables = [
#     "GP_COUNTY_DATA_TABLE",
#     "GP_DISTRICT_DATA_TABLE",
#     "GP_DATE_RANGE_CYCLE_DATA_TABLE",
#     "OG_COUNTY_CYCLE_DATA_TABLE",
#     "OG_COUNTY_LEASE_CYCLE_DATA_TABLE",
#     "OG_LEASE_CYCLE_DATA_TABLE",
#     "OG_LEASE_CYCLE_DISP_DATA_TABLE",
#     "OG_DISTRICT_CYCLE_DATA_TABLE",
#     "OG_FIELD_CYCLE_DATA_TABLE",
#     "OG_OPERATOR_CYCLE_DATA_TABLE",
#     "OG_FIELD_DW_DATA_TABLE",
#     "OG_OPERATOR_DW_DATA_TABLE",
#     "OG_REGULATORY_LEASE_DW_DATA_TABLE",
#     "OG_WELL_COMPLETION_DATA_TABLE",
#     "OG_SUMMARY_MASTER_LARGE_DATA_TABLE",
#     "OG_SUMMARY_ONSHORE_LEASE_DATA_TABLE",
# ]


short_table = [item.replace("_DATA_TABLE", "").lower() for item in data_table_list]


table_zip = zip(short_table, data_table_list)


# Make the short table names the keys with dfs dict values
data_tables_dict = {k: dfs[v] for k, v in table_zip}
list(data_tables_dict.keys())

['og_field_dw',
 'og_district_cycle',
 'og_lease_cycle',
 'og_well_completion',
 'og_regulatory_lease_dw',
 'og_county_cycle',
 'og_summary_onshore_lease',
 'og_county_lease_cycle',
 'gp_date_range_cycle',
 'og_lease_cycle_disp',
 'og_operator_dw',
 'gp_district',
 'gp_county',
 'og_operator_cycle',
 'og_field_cycle',
 'og_summary_master_large']

In [29]:
# general purpose table paths
gp_county_path = Path("../data/PDQ_DSV/GP_COUNTY_DATA_TABLE.dsv")
gp_district_path = Path("../data/PDQ_DSV/GP_DISTRICT_DATA_TABLE.dsv")
gp_date_range_path = Path("../data/PDQ_DSV/GP_DATE_RANGE_CYCLE_DATA_TABLE.dsv")


# production data estimates based potentials and allowables paths
og_county_cycle_path = Path("../data/PDQ_DSV/OG_COUNTY_CYCLE_DATA_TABLE.dsv")
og_county_lease_cycle_path = Path(
    "../data/PDQ_DSV/OG_COUNTY_LEASE_CYCLE_DATA_TABLE.dsv"
)

# production disposition data by lease and month paths
og_lease_cycle_disp_path = Path("../data/PDQ_DSV/OG_LEASE_CYCLE_DISP_DATA_TABLE.dsv")


# production report data by lease paths
og_lease_cycle_path = Path("../data/PDQ_DSV/OG_LEASE_CYCLE_DATA_TABLE.dsv")

# production data by lease aggregated by another feature
og_district_cycle_path = Path("../data/PDQ_DSV/OG_DISTRICT_CYCLE_DATA_TABLE.dsv")
og_field_cycle_path = Path("../data/PDQ_DSV/OG_FIELD_CYCLE_DATA_TABLE.dsv")
og_operator_cycle_path = Path("../data/PDQ_DSV/OG_OPERATOR_CYCLE_DATA_TABLE.dsv")


# identifying tables paths
og_field_dw_path = Path("../data/PDQ_DSV/OG_FIELD_DW_DATA_TABLE.dsv")
og_operator_dw_path = Path("../data/PDQ_DSV/OG_OPERATOR_DW_DATA_TABLE.dsv")
og_regulatory_lease_dw_path = Path(
    "../data/PDQ_DSV/OG_REGULATORY_LEASE_DW_DATA_TABLE.dsv"
)
# well completion data
og_well_completion_path = Path("../data/PDQ_DSV/OG_WELL_COMPLETION_DATA_TABLE.dsv")


# summary tables
og_summary_master_large_path = Path(
    "../data/PDQ_DSV/OG_SUMMARY_MASTER_LARGE_DATA_TABLE.dsv"
)
og_summary_onshore_lease_path = Path(
    "../data/PDQ_DSV/OG_SUMMARY_ONSHORE_LEASE_DATA_TABLE.dsv"
)

pdq_dsv_paths = [
    gp_county_path,
    gp_district_path,
    gp_date_range_path,
    og_county_cycle_path,
    og_county_lease_cycle_path,
    og_lease_cycle_path,
    og_lease_cycle_disp_path,
    og_district_cycle_path,
    og_field_cycle_path,
    og_operator_cycle_path,
    og_field_dw_path,
    og_operator_dw_path,
    og_regulatory_lease_dw_path,
    og_well_completion_path,
    og_summary_master_large_path,
    og_summary_onshore_lease_path,
]

In [30]:
# def get_csv_dtypes(path, sep="}", nrows=10000):
#     """Reads in a csv file and returns the dtypes of the columns"""
#     data = pd.read_csv(path, sep=sep, nrows=nrows)
#     return data.dtypes.to_dict()
from convert_to_parquet import get_csv_dtypes_for_all_files

In [31]:
def format_in_B(num):
    """Formats a number in thousands"""
    for unit in ["", "k", "M", "G", "T"]:
        if abs(num) < 1024:
            return f"{num:3.2f} {unit}B"
        num /= 1024
    return f"{num:.1f} TB"

In [32]:
def get_table_overview_from_frame(table_name_dict, table_name):
    """Returns an overview of the data in a dask dataframe"""

    # get the dask dataframe from the dictionary
    ddf = table_name_dict[table_name]
    # get the number of rows and columns
    num_rows = ddf.shape[0]
    # get the size on disk
    size = ddf.memory_usage(deep=True).sum()
    # get the column names
    columns = ddf.columns.tolist()
    # get the column dtypes
    dtypes = ddf.dtypes.to_list()
    # get the number of partitions

    # compute values and put these properties in 1 row of a dataframe
    num_rows, size = dask.compute(num_rows, size)
    # create a dataframe with the properties
    df = pd.DataFrame(
        {
            "table": [table_name],
            "rows": [f"{num_rows:,}"],
            "cols": [len(columns)],
            "columns": [columns],
            "size": [format_in_B(size)],
            "col_dtypes": [dtypes],
        }
    )
    return df

In [33]:
def get_table_overview(path, path_name, column_types={}):
    """
    Returns an overview of the data in the file at the given path.

    This function returns a dictionary containing the file name, the number of rows,
    the column names, the first five rows of the data, and the column data types.
    It uses Dask to efficiently compute the number of rows in the file, which makes
    it suitable for large files.

    Parameters:
    path (Path or str): The path to the data file.

    Returns:
    dict: A pandas DataFrame containing the file overview.
    """
    # column_types = {}
    # Initialize ddf as an empty Dask DataFrame
    while True:
        try:
            ddf = dd.read_csv(path, sep="}", dtype=column_types)
            num_rows = len(ddf)
            break
        except ValueError as e:
            match = re.search(r"dtype=\{'(.*?)': '(.*?)'", str(e))
            # match = re.search(r"dtype=\{'(.*?)': 'object'", str(e))
            if match:
                column_name = match.group(1)
                column_types[column_name] = "object"

    big_sample_data = pd.read_csv(path, sep="}", nrows=10000, dtype=column_types)
    if num_rows < 4:
        sample_data = big_sample_data.sample(num_rows)
    else:
        sample_data = big_sample_data.sample(3)
    # columns = sample_data.columns.tolist()

    # column_dtypes = ddf.dtypes.tolist()
    column_dtypes = ddf.dtypes.to_dict()
    file_size = format_in_B(path.stat().st_size)

    overview = pd.DataFrame(
        {
            "table_var": [path_name],
            "file_name": [path.stem],
            "num_rows": [f"{num_rows:,}"],
            "num_cols": [len(column_dtypes)],
            "size_on_disk": [file_size],
            "cols": [list(column_dtypes.keys())],
            "col_dtypes": [list(column_dtypes.values())],
            "file_path": [path],
            "file_extension": [path.suffix],
            "sample_data": [sample_data.to_json()],
        },
    )
    return overview

In [34]:
def get_table_description(path, column_types={}):
    """
    Returns a Dask dataframe with the statistical description of the data in the file at the given path.
    """
    columns_needed = pd.read_csv(path, sep="}", nrows=1).columns.tolist()
    # create a new dict with only the keys needed
    column_types_needed = {k: column_types[k] for k in columns_needed}
    # Read the CSV file into a Dask dataframe
    ddf = dd.read_csv(path, sep="}", dtype=column_types_needed)

    # Compute the descriptive statistics for the numeric columns
    desc_ddf = ddf.describe(include="all")
    # add row with value in each of the columns in the describe dataframe for the dtype
    num_dtypes = dd.from_pandas(
        pd.DataFrame(desc_ddf.dtypes.apply(lambda x: x.name), columns=["dtype"]).T,
        npartitions=1,
    )
    # Return the descriptive statistics as a tuple of Dask dataframes
    desc_ddf = dd.concat([desc_ddf, num_dtypes])

    return desc_ddf

In [35]:
def get_table_description_from_frame(table_name_dict, table_name):
    """Returns a Dask Dataframe with the statistical description of the data in the table"""
    # get the dask dataframe from the dictionary
    ddf = table_name_dict[table_name]
    # compute the descriptive statistics for the numeric columns
    desc_ddf = ddf.describe(include="all")
    # add row with value in each of the columns in the describe dataframe for the dtype
    ddf_dtypes = dd.from_pandas(
        pd.DataFrame(desc_ddf.dtypes.apply(lambda x: x.name), columns=["dtype"]).T,
        npartitions=1,
    )
    # Return the descriptive statistics as a tuple of Dask dataframes
    desc_ddf = dd.concat([desc_ddf, ddf_dtypes])
    return desc_ddf

In [36]:
def get_data_for_plots(ddf):
    """takes in a dask dataframe and returns the df data to be used for plotting"""
    # define order for the columns
    column_order = [
        "dtype",
        "count",
        "unique",
        "top",
        "freq",
        "mean",
        "std",
        "min",
        "25%",
        "50%",
        "75%",
        "max",
    ]
    # get the data for the plots
    computed_ddf = ddf.compute()
    return computed_ddf.T[column_order].sort_values(by="dtype", ascending=False)


def plot_statistics_table_nonmissing_hbar(ddf, title=""):
    """Plots a barh plot and a table in a layout."""

    # state opts of barh plot
    hbar_opts = dict(
        title=title,
        ylabel="",
        xlabel="",
        xaxis="bare",
        tools=["hover"],
    )
    df = pd.DataFrame()
    df = get_data_for_plots(ddf)
    # turn the count column into an int dtype column
    df["count"] = pd.to_numeric(df["count"]).astype(int)
    # round the values of the floats to integers
    float_cols = ["mean", "std", "min", "25%", "50%", "75%", "max"]
    for col in float_cols:
        df[col] = pd.to_numeric(df[col].fillna("0.0"), errors="coerce").astype(int)
    # df[float_cols] = df[float_cols].round(0)
    # create a horizontal bartplot of the count column using hvplot
    df["fraction_nonmissing"] = round(df["count"] / df["count"].max(), 4)

    element_height = (1 + df.shape[0]) * 33

    # set the index to the query_field column

    # hv_table = df.hvplot.table(
    #     df.reset_index().columns.tolist(), width=1110, height=element_height
    # )
    table_panel = pnw.Tabulator(df.iloc[::-1], height=element_height)

    barh_plot = df.hvplot.barh(
        y="fraction_nonmissing", height=element_height, **hbar_opts
    ).opts(active_tools=["box_zoom"], toolbar="above")
    barh_panel = pn.panel(barh_plot)

    # return a panel row with the tabulator table and the bar plot
    return pn.Row(
        barh_panel,
        table_panel,
        sizing_mode="stretch_width",
    )

In [37]:
def display_table_overview_from_frame(
    persisted_overviews, table_name_dict, table_name, nrows=15, column_types={}
):
    """Display an overview of the data in a Dask DataFrame."""
    over_view_table = persisted_overviews[table_name]
    # over_view_table = get_table_overview_from_frame(
    #     table_name_dict, table_name, column_types
    # )

    # display the shape of the table with decorative ~*~ around it
    num_rows = over_view_table["rows"].values[0]
    num_cols = over_view_table["cols"].values[0]
    print(f"~*~ {table_name} ~*~")
    print(f"{num_rows} rows, {num_cols} columns")

    # get ddf from the dictionary
    ddf = table_name_dict[table_name]
    # display nrows of the dask dataframe
    display(ddf.head(nrows))

In [38]:
def display_table_overview(path, nrows=15, column_types={}):
    """
    Displays an overview of the data in the file at the given path.

    This function prints the file name, the number of rows, the column names,
    and the first five rows of the data. It uses Dask to efficiently compute
    the number of rows in the file, which makes it suitable for large files.

    Parameters:
    path (Path or str): The path to the data file.
    """
    # make a decorative line
    decor_length = len(path.name) + 8
    decor = "~*~" * (decor_length // 3)

    print(f"{decor}")
    print(f"File: {path.name}")
    print(f"{decor}")

    # column_types = {}

    while True:
        try:
            ddf = dd.read_csv(path, sep="}", dtype=column_types)
            print(f"Number of rows: {len(ddf):,}")
            break
        except ValueError as e:
            # print(e)
            # If a ValueError occurs, parse the error message to get the column name
            match = re.search(r"dtype=\{'(.*?)': '(.*?)'", str(e))
            # match = re.search(r"dtype=\{'(.*?)': 'object'", str(e))
            if match:
                column_name = match.group(1)
                print(f"Problematic column: {column_name}")
                # Add the problematic column to the column types dictionary
                column_types[column_name] = "object"

    column_dtypes = ddf.dtypes.to_dict()
    print(f"File size: {format_in_B(path.stat().st_size)}")
    print(f"Number of columns: {len(column_dtypes)}")
    # display(pd.DataFrame.from_dict(column_dtypes, orient="index", columns=["dtype"]))
    print()

    print(f"Sample of data in {path.name}:")
    display(pd.read_csv(path, delimiter="}", nrows=nrows, dtype=column_types))

In [39]:
# define a function that combines get_table_description, plot_statistics_table_nonmissing_hbar
# returns a panel card with the table and the bar plot
def get_table_card(path, column_types={}):
    """returns a panel card with the table and the bar plot"""
    results = get_table_description(path, column_types)
    bar_table = plot_statistics_table_nonmissing_hbar(results, title=path.stem)
    return pn.Card(bar_table, title=path.stem)

In [40]:
def get_table_card_from_frame(table_name_dict, table_name):
    """returns a panel card with the table and the bar plot"""
    results = get_table_description_from_frame(table_name_dict, table_name)
    bar_table = plot_statistics_table_nonmissing_hbar(results, title=table_name)
    return pn.Card(bar_table, title=table_name)

In [None]:
# create a default diction to store the dtypes of the columns
csv_dtypes = get_csv_dtypes_for_all_files()

# # loop through the paths and get the dtypes of the columns
# for fpath in pdq_dsv_paths:
#     temp_dict = get_csv_dtypes(fpath)
#     # only update values if the key is not in the dictionary
#     for k, v in temp_dict.items():
#         if k not in csv_dtypes:
#             csv_dtypes[k] = v

# csv_dtypes = {k: v.name for k, v in csv_dtypes.items()}
# # csv_dtypes

In [None]:
# variable name is the path stem before _DATA_TABLE in lowercase
# regular_expression?
# pattern = r"(\w+)_DATA_TABLE"

# # find the variable pair from in the pdq_dsv_paths
# variable_pairs = [
#     (my_path, re.match(pattern, my_path.stem.upper()).group(1).lower())
#     for my_path in pdq_dsv_paths
#     if re.match(pattern, my_path.stem.upper())
# ]
# variable_pairs

#### Overview list of data tables

In [None]:
# get a list of 1-row snips of the size of the data tables
# overview_dfs_list = [
#     get_table_overview(*pair, column_types=csv_dtypes) for pair in tqdm(variable_pairs)
# ]
# concaternate summary rows into a single dataframe
# overview_dfs = pd.concat(overview_dfs_list, ignore_index=True)

In [None]:
# overview_dfs.iloc[0]

##### Dask Client 

We can see that some tables are as large as 11GB, and this is why we use Dask. Dask enables us to handle large data files by spliting the data into chunks so that all the data is not loaded into memory at once. Dask also enables parallel processing when possible and enable us to manage the computational resources via a `Client` object. It can do a lot more too but that is outside our scope for this project.

In [104]:
# Create a Dask client with specified configuration
# This client will use 4 workers, each with 1 thread and a memory limit of 2GB
# The Dask diagnostic dashboard will be served at the address ":8788"
client = Client(
    n_workers=4, threads_per_worker=1, memory_limit="3GB", dashboard_address=":8788"
)

# Print the link to the Dask diagnostic dashboard
# This link can be used to monitor the progress and performance of Dask computations
print(client.dashboard_link)

http://127.0.0.1:8788/status


In [105]:
persisted_overviews = {}

for table_name in tqdm(data_tables_dict.keys()):
    overview_table = get_table_overview_from_frame(data_tables_dict, table_name)
    (persisted_overview,) = persist(overview_table)
    persisted_overviews[table_name] = persisted_overview

persisted_overviews_list = [
    persisted_overviews[table_name] for table_name in data_tables_dict
]
overview_dfs_from_frame = pd.concat(persisted_overviews_list, ignore_index=True)
overview_dfs_from_frame

100%|██████████| 16/16 [08:10<00:00, 30.63s/it]


Unnamed: 0,table,rows,cols,columns,size,col_dtypes
0,og_field_dw,65803,32,"[FIELD_NO, FIELD_NAME, DISTRICT_NO, DISTRICT_N...",15.76 MB,"[int64, string, int64, string, string, string,..."
1,og_district_cycle,4823,9,"[DISTRICT_NO, CYCLE_YEAR, CYCLE_MONTH, CYCLE_Y...",367.38 kB,"[int64, int64, int64, int64, string, int64, in..."
2,og_lease_cycle,70708285,32,"[OIL_GAS_CODE, DISTRICT_NO, LEASE_NO, CYCLE_YE...",19.69 GB,"[string, int64, int64, int64, int64, int64, in..."
3,og_well_completion,796550,16,"[OIL_GAS_CODE, DISTRICT_NO, LEASE_NO, WELL_NO,...",92.11 MB,"[string, int64, int64, string, int64, int64, i..."
4,og_regulatory_lease_dw,532918,12,"[OIL_GAS_CODE, DISTRICT_NO, LEASE_NO, DISTRICT...",68.75 MB,"[string, int64, int64, string, string, int64, ..."
5,og_county_cycle,223715,24,"[COUNTY_NO, DISTRICT_NO, CYCLE_YEAR, CYCLE_MON...",42.22 MB,"[int64, int64, int64, int64, int64, int64, flo..."
6,og_summary_onshore_lease,1040422,10,"[OIL_GAS_CODE, DISTRICT_NO, LEASE_NO, OPERATOR...",130.68 MB,"[string, int64, int64, int64, int64, int64, in..."
7,og_county_lease_cycle,68969454,33,"[OIL_GAS_CODE, DISTRICT_NO, LEASE_NO, CYCLE_YE...",19.88 GB,"[string, int64, int64, int64, int64, int64, in..."
8,gp_date_range_cycle,1,5,"[OLDEST_PROD_CYCLE_YEAR_MONTH, NEWEST_PROD_CYC...",58.00 B,"[int64, int64, int64, string, string]"
9,og_lease_cycle_disp,44002513,52,"[OIL_GAS_CODE, DISTRICT_NO, LEASE_NO, CYCLE_YE...",19.09 GB,"[string, int64, int64, int64, int64, int64, in..."


##### Table 0 - County

General purpose table that stores county information.

Below is the district_no, a number representing the RRC district_name in the RRC system associated with lease reporting.

 DISTRICT_no (RRC VALUE) | DISTRICT_NAME |
|--|--|
01|01 
02|02 
04|04 
05|05 
06|06 
07|6E (oil only)
|08|7B
|10|08
|11|8A
|13|09
|14|10
This value is not used. 12 | 8B

In [None]:

display_table_overview_from_frame(persisted_overviews, data_tables_dict, "gp_county")
get_table_card_from_frame(data_tables_dict, "gp_county")

In [None]:
# read in the dask dataframe
# gp_county_ddf = dd.read_csv(file_path_0, sep="}", dtype=csv_dtypes)
gp_county_ddf = data_tables_dict["gp_county"]

gp_county_ddf.head()

In [None]:
# convert the the column names to snake case
gp_county_ddf.columns = [pascal_to_snake(col) for col in gp_county_ddf.columns]
# create column is_onshore based on on_shore_flag column
gp_county_ddf["is_onshore"] = gp_county_ddf["on_shore_flag"].map(
    {"Y": True, "N": False}
)
gp_county_ddf["is_onshore_assc_cnty"] = gp_county_ddf["onshore_assc_cnty_flag"].map(
    {"Y": True, "N": False}
)

# create a small table to link the district_no to the district_name
gp_district_name = (
    gp_county_ddf[["district_no", "district_name"]]
    .drop_duplicates()
    .sort_values(by="district_no")
    .reset_index(drop=True)
    .compute()
)
gp_district_name

In [None]:
# drop some of the extra columns
column_to_drop = [
    "county_fips_code",
    "district_name",
    "on_shore_flag",
    "onshore_assc_cnty_flag",
]
columns_to_keep = [col for col in gp_county_ddf.columns if col not in column_to_drop]
gp_county_ddf = gp_county_ddf[columns_to_keep]
gp_county_ddf.info(memory_usage="deep")
gp_county_ddf.head()

In [None]:
# get the number of counties in each district
district_county_count = (
    gp_county_ddf.groupby("district_no")
    .county_no.nunique()
    .rename("county_count")
    .compute()
)

district_county_count.hvplot.bar(
    title="Count of Counties in each District", xlabel="", ylabel="", hover_cols="all"
).opts(
    toolbar="above",
    active_tools=["box_zoom"],
)

In [None]:
# gp_county_ddf["is_onshore_assc_cnty"] = gp_county_ddf["is_onshore_assc_cnty"].astype(
#     int
# )
# gp_county_ddf["is_onshore"] = gp_county_ddf["is_onshore"].astype(int)

counts = (
    gp_county_ddf.groupby(["district_no", "is_onshore", "is_onshore_assc_cnty"])
    .count()
    .reset_index()
)

In [None]:
def plot_with_labels(var_1, var_2, col1, col2):
    # Filter the data for the given 'is_onshore' value
    data = counts[(counts[col1] == var_1) & (counts[col2] == var_2)]

    # Create a bar plot
    bar_plot = data.hvplot.bar(
        x="district_no",
        y="county_no",
        stacked=True,
        xlabel="",
        yaxis="bare",
        title=f"Count of Counties in each District No. where {col1}={var_1} & {col2}={var_2}",
    )

    # Create a list to hold the text elements
    texts = []

    # Loop over the DataFrame and create a text element for each row
    for row in data.itertuples():
        # The position of the text is the center of the bar
        x = getattr(row, "district_no")
        y = getattr(row, "county_no") / 2
        # The text is the height of the bar
        text = str(getattr(row, "county_no"))
        # Create the text element and add it to the list
        texts.append(hv.Text(x, y, text).opts(text_color="white"))

    # Overlay the text elements on the plot
    labelled_plot = bar_plot * hv.Overlay(texts)

    return labelled_plot


# Create a DynamicMap with the 'plot_with_labels' function
dynamic_map = hv.DynamicMap(
    lambda var_1, var_2: plot_with_labels(
        var_1, var_2, "is_onshore", "is_onshore_assc_cnty"
    ),
    kdims=[
        hv.Dimension("var_1", label="is_onshore"),
        hv.Dimension("var_2", label="is_onshore_assc_cnty"),
    ],
).redim.range(
    var_1=(0, 1),
    var_2=(0, 1),
)
dynamic_map

In [None]:
# gp_county_ddf[gp_county_ddf["is_onshore"] == 0].compute()
gp_county_ddf[gp_county_ddf["is_onshore_assc_cnty"] == 1].compute()

In [None]:
# Double check that each county is only in 1 district
district_counties_dict = (
    gp_county_ddf.compute().groupby("district_no")["county_no"].apply(set).to_dict()
)
# district_counties_dict[1]

keys = list(district_counties_dict.keys())
found_common = False

# Convert the values in district_counties_dict to sets
district_counties_dict = {k: set(v) for k, v in district_counties_dict.items()}

# Iterate over each key in the list
for i in range(len(keys)):
    # Get the key and values for the current index
    key1 = keys[i]
    values1 = district_counties_dict[key1]

    # Iterate over all subsequent keys in the list
    for j in range(i + 1, len(keys)):
        # Get the key and values for the subsequent index
        key2 = keys[j]
        values2 = district_counties_dict[key2]

        # Check if any value from the first pair is in the values of the second pair
        common_values = values1.intersection(values2)
        if common_values:
            print(f"Key {key1} has common values with key {key2}: {common_values}")
            found_common = True
if not found_common:
    print("No common values found")

# district_counties_dict

##### Table 1

General purpose table that contains district information.

Includes the info of the office phone number and which town they are located in per that district.

In [None]:

display_table_overview_from_frame(persisted_overviews, data_tables_dict, "gp_district")
get_table_card_from_frame(data_tables_dict, "gp_district")

##### Table 2

General purpose table of PDQ data range (Jan. 1993-current production report month/year).

In [None]:

data_tables_dict["gp_date_range_cycle"].head()

##### Table 3 - County Cycle

Contains production report data reported by lease and month (YYYYMM) aggregated by the county in which the wells are located. This is an estimate only based on allowables and potentials.

In [None]:

display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_county_cycle"
)
get_table_card_from_frame(data_tables_dict, "og_county_cycle")

In [None]:
# drop the columns with any null value in the first row
og_county_cycle_ddf = data_tables_dict["og_county_cycle"]
og_county_cycle_ddf = og_county_cycle_ddf.drop(
    og_county_cycle_ddf.columns[og_county_cycle_ddf.isnull().any()], axis=1
)

In [None]:

# change column to lower case
og_county_cycle_ddf.columns = [
    pascal_to_snake(col) for col in og_county_cycle_ddf.columns
]
# create column is_gas based on the oil_gas_code column
og_county_cycle_ddf["is_gas"] = (
    og_county_cycle_ddf["oil_gas_code"].map({"G": True, "O": False}).astype("int8")
)

non_null_columns = [
    col
    for col in og_county_cycle_ddf.columns
    if col
    not in ["district_name", "county_name", "cycle_month", "cycle_year", "oil_gas_code"]
]
# drop the columns we don't want from the ddf
og_county_cycle_ddf = og_county_cycle_ddf[non_null_columns]
# drop rows with the cycle_year_month < 2013
og_county_cycle_ddf = og_county_cycle_ddf[og_county_cycle_ddf.cycle_year_month > 201300]

# get shape of the ddf
num_rows = og_county_cycle_ddf.shape[0]
num_cols = og_county_cycle_ddf.shape[1]

# get the size of the ddf
size = og_county_cycle_ddf.memory_usage(deep=True).sum()

# compute values and put these properties in 1 row of a dataframe
num_rows, size = dask.compute(num_rows, size)

print(
    f"Shape of og_county_cycle_ddf: ({og_county_cycle_ddf.shape[0].compute()} , {og_county_cycle_ddf.shape[1]})"
)
print(f"Size of og_county_cycle_ddf: {format_in_B(size)}")
og_county_cycle_ddf.head()

##### Table 4 - County Lease Cycle

Contains production report data reported by lease and month (YYYYMM) aggregated by lease and county in which the wells are located. This is an estimate only based on allowables and potentials.

In [None]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_county_lease_cycle"
)
get_table_card_from_frame(data_tables_dict, "og_county_lease_cycle")

##### Table 5 - Lease Cycle

Contains production report data reported by lease and month (YYYYMM).

In [None]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_lease_cycle"
)
get_table_card_from_frame(data_tables_dict, "og_lease_cycle")

In [None]:
# read in the dask dataframe
og_lease_cycle_ddf = data_tables_dict["og_lease_cycle"]
# read in the dask dataframe
# og_lease_cycle_ddf = dd.read_csv(file_path_5, sep="}", dtype=csv_dtypes)
# convert the column names to lower case
og_lease_cycle_ddf = og_lease_cycle_ddf.rename(columns=str.lower)
columns_list = og_lease_cycle_ddf.columns.tolist()
# drop certain columns based on substrings
# create column is_gas based on the oil_gas_code column
og_lease_cycle_ddf["is_gas"] = (
    og_lease_cycle_ddf["oil_gas_code"].map({"G": True, "O": False}).astype("int8")
)

# Define the regex pattern
pattern = re.compile(r"_code$|_name$|cycle_month|cycle_year$|^district_no|lease_no$")

# Filter the columns
columns_to_keep = [col for col in columns_list if not pattern.search(col)]
og_lease_cycle_ddf = og_lease_cycle_ddf[columns_to_keep]
# filter out the row with cycle_year_month < 201300
og_lease_cycle_ddf = og_lease_cycle_ddf[og_lease_cycle_ddf.cycle_year_month > 201300]

# get shape
num_rows = og_lease_cycle_ddf.shape[0]
num_cols = og_lease_cycle_ddf.shape[1]
# lease_no are unique within districts
num_lease_districts = og_lease_cycle_ddf.lease_no_district_no.nunique()
# get the memory size of the new filtered ddf
mem_size = og_lease_cycle_ddf.memory_usage(deep=True).sum()

num_rows, num_lease_districts, mem_size = dask.compute(
    num_rows, num_lease_districts, mem_size
)
print(f"Shape of og_lease_cycle_ddf: ({num_rows} , {num_cols})")
print(f"Number of unique lease_no_district_no: {num_lease_districts:,}")
print(f"Memory size of og_lease_cycle_ddf: {format_in_B(mem_size)}")
og_lease_cycle_ddf.head()

In [None]:
og_lease_cycle_ddf["district_no"] = (
    og_lease_cycle_ddf["lease_no_district_no"].astype(str).str[-2:]
)
# total production numbers by district
og_lease_cycle_ddf.groupby("district_no")[
    [
        "lease_oil_prod_vol",
        "lease_gas_prod_vol",
        "lease_cond_prod_vol",
        "lease_csgd_prod_vol",
    ]
].sum().compute()

##### Table 6 - Lease Cycle Disposition

Contains production report disposition data reported by lease and month (YYYYMM).



In [None]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_lease_cycle_disp"
)
get_table_card_from_frame(data_tables_dict, "og_lease_cycle_disp")

In [None]:
# read in the dask dataframe
og_lease_cycle_disp_ddf = data_tables_dict["og_lease_cycle_disp"]
# og_lease_cycle_disp_ddf = dd.read_csv(file_path_6, sep="}", dtype=csv_dtypes)
# convert the column names to lower case
og_lease_cycle_disp_ddf.columns = [
    pascal_to_snake(col) for col in og_lease_cycle_disp_ddf.columns
]
# columns_list = og_lease_cycle_disp_ddf.columns.tolist()

# og_lease_cycle_disp_ddf = og_lease_cycle_disp_ddf.rename(columns=str.lower)
# create column is_gas based on the oil_gas_code column
og_lease_cycle_disp_ddf["is_gas"] = (
    og_lease_cycle_disp_ddf["oil_gas_code"].map({"G": True, "O": False}).astype("int8")
)
og_lease_cycle_disp_ddf["lease_no_district_no"] = og_lease_cycle_disp_ddf[
    "lease_no"
].astype(str) + og_lease_cycle_disp_ddf["district_no"].astype(str).str.zfill(2)
og_lease_cycle_disp_ddf["lease_no_district_no"] = og_lease_cycle_disp_ddf[
    "lease_no_district_no"
].astype(int)

# Define the regex pattern
pattern = re.compile(r"_code$|_name$|cycle_month|cycle_year$|^district_no|lease_no$")

# Filter the columns
columns_to_keep = [
    col for col in og_lease_cycle_disp_ddf.columns if not pattern.search(col)
]
og_lease_cycle_disp_ddf = og_lease_cycle_disp_ddf[columns_to_keep]
# filter out the row with cycle_year_month < 201300
og_lease_cycle_disp_ddf = og_lease_cycle_disp_ddf[
    og_lease_cycle_disp_ddf.cycle_year_month > 201300
]

num_rows = og_lease_cycle_disp_ddf.shape[0]
num_cols = og_lease_cycle_disp_ddf.shape[1]
# group by district_no and get the nunique of lease_no
num_lease_districts = og_lease_cycle_disp_ddf["lease_no_district_no"].nunique()
memory_usage = og_lease_cycle_disp_ddf.memory_usage(index=True, deep=True).sum()
num_rows, num_lease_districts, memory_usage = dask.compute(
    num_rows, num_lease_districts, memory_usage
)
print(
    f"Shape of og_lease_cycle_disp_ddf: ({num_rows} , {num_cols})\nNumber of unique lease_no_district_no: {num_lease_districts}"
)
print(f"Total number of leases: {num_lease_districts}")
print(f"Total memory usage: {format_in_B(memory_usage)}")
# og_lease_cycle_disp_ddf.info(memory_usage="deep")
og_lease_cycle_disp_ddf.head()

In [None]:
# og_lease_cycle_disp_ddf.memory_usage(index=True, deep=True).compute()
og_lease_cycle_disp_ddf.dtypes

##### Table 7 - District Cycle

Contains production report data reported by lease and month (YYYYMM) aggregated by the completion district for the lease ID.


In [None]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_district_cycle"
)
get_table_card_from_frame(data_tables_dict, "og_district_cycle")

In [None]:
# read in the dask dataframe
og_district_cycle_ddf = data_tables_dict["og_district_cycle"]
# og_district_cycle_ddf = dd.read_csv(file_path_7, sep="}", dtype=csv_dtypes)
# convert the column names to lower case
og_district_cycle_ddf = og_district_cycle_ddf.rename(columns=str.lower)
# get list of columns
columns_list = og_district_cycle_ddf.columns.tolist()

# drop the columns we don't want from columns list
columns_list = [
    col
    for col in columns_list
    if col not in ["district_name", "cycle_month", "cycle_year"]
]
# drop the column from the dataframe
og_district_cycle_ddf = og_district_cycle_ddf[columns_list]

# drop  row with cycle_year < 2013
og_district_cycle_ddf = og_district_cycle_ddf[
    og_district_cycle_ddf.cycle_year_month > 201300
]
num_rows = og_district_cycle_ddf.shape[0]
memory_usage = og_district_cycle_ddf.memory_usage(index=True, deep=True).sum()
# compute the number of rows and memory usage
num_rows, memory_usage = dask.compute(num_rows, memory_usage)
# show first few rows and shape
print(f"Shape of og_district_cycle_ddf: ({num_rows} , {len(columns_list)}")
print(f"Total memory usage: {format_in_B(memory_usage)}")

og_district_cycle_ddf.head()

##### Table 8 - Field Cycle

Contains production report data reported by lease and month (YYYYMM) aggregated by the field in which the well(s) for the lease are completed.


In [None]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_field_cycle"
)
get_table_card_from_frame(data_tables_dict, "og_field_cycle")

In [None]:
# read in the dask dataframe
og_field_cycle_ddf = data_tables_dict["og_field_cycle"]

# og_field_cycle_ddf = dd.read_csv(file_path_8, sep="}", dtype=csv_dtypes)

# convert the column names to lower case

og_field_cycle_ddf = og_field_cycle_ddf.rename(columns=str.lower)

# drop the columns we don't want from columns list

columns_list = og_field_cycle_ddf.columns.tolist()

columns_to_keep = [
    col
    for col in columns_list
    if col not in ["district_name", "county_name", "cycle_month", "cycle_year"]
]

# drop the column from the dataframe

og_field_cycle_ddf = og_field_cycle_ddf[columns_to_keep]

# drop  row with cycle_year_month < 201300

og_field_cycle_ddf = og_field_cycle_ddf[og_field_cycle_ddf.cycle_year_month > 201300]

# drop the rows with null values for the field_name column

og_field_cycle_ddf = og_field_cycle_ddf.dropna(subset=["field_name"])


# get new number of rows and memory usage

num_rows = og_field_cycle_ddf.shape[0]

memory_usage = og_field_cycle_ddf.memory_usage(index=True, deep=True).sum()

# compute the number of rows and memory usage

num_rows, memory_usage = dask.compute(num_rows, memory_usage)

# show first few rows and shape

print(
    f"Shape of og_field_cycle_ddf: ({num_rows} , {len(columns_list)})\nTotal memory usage: {format_in_B(memory_usage)}"
)


og_field_cycle_ddf.head()

##### Table 9 - Operator Cycle

Contains production report data reported by lease and month (YYYYMM) aggregated by the operator of the lease.


In [None]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_operator_cycle"
)
get_table_card_from_frame(data_tables_dict, "og_operator_cycle")

In [None]:
# read in the dask dataframe
# og_operator_cycle_ddf = dd.read_csv(file_path_9, sep="}", dtype=csv_dtypes)
og_operator_cycle_ddf = data_tables_dict["og_operator_cycle"]
# convert the column names to lower case
og_operator_cycle_ddf = og_operator_cycle_ddf.rename(columns=str.lower)
# drop the 'cycle_month' and 'cycle_year' columns
columns_to_keep = [
    col
    for col in og_operator_cycle_ddf.columns
    if col not in ["cycle_month", "cycle_year"]
]
og_operator_cycle_ddf = og_operator_cycle_ddf[columns_to_keep]
# filter out the row with cycle_year_month < 201300
og_operator_cycle_ddf = og_operator_cycle_ddf[
    og_operator_cycle_ddf.cycle_year_month > 201300
]
# drop the rows with null values for the operator_name column
og_operator_cycle_ddf = og_operator_cycle_ddf.dropna(subset=["operator_name"])
# show first few rows and get shape
num_rows = og_operator_cycle_ddf.shape[0]
num_cols = og_operator_cycle_ddf.shape[1]
memory_usage = og_operator_cycle_ddf.memory_usage(index=True, deep=True).sum()
num_rows, memory_usage = dask.compute(num_rows, memory_usage)
print(
    f"Shape of og_operator_cycle_ddf: ({num_rows} , {num_cols})\nTotal memory usage: {format_in_B(memory_usage)}"
)
og_operator_cycle_ddf.head()

In [None]:
# create column for the total production to filter out the non producing operators
og_operator_cycle_ddf["total_oper_prod_vol"] = (
    og_operator_cycle_ddf["oper_oil_prod_vol"]
    + og_operator_cycle_ddf["oper_gas_prod_vol"]
    + og_operator_cycle_ddf["oper_cond_prod_vol"]
    + og_operator_cycle_ddf["oper_csgd_prod_vol"]
)
og_operator_cycle_ddf["is_producing"] = og_operator_cycle_ddf["total_oper_prod_vol"] > 0
producing_operator_nos = (
    og_operator_cycle_ddf[og_operator_cycle_ddf["is_producing"]]["operator_no"]
    .unique()
    .values
)

##### Table 10 - Field DW


Table of field identifying data.



In [None]:
# file_path_10 = overview_dfs.loc[10, "file_path"]
# display_table_overview(file_path_10, column_types=csv_dtypes)
# get_table_description(file_path_10, column_types=csv_dtypes).compute().T
# get_table_card(file_path_10, column_types=csv_dtypes)
display_table_overview_from_frame(persisted_overviews, data_tables_dict, "og_field_dw")
get_table_card_from_frame(data_tables_dict, "og_field_dw")

In [None]:
# read in the dask dataframe
# og_field_dw_df = dd.read_csv(file_path_10, sep="}", dtype=csv_dtypes)
og_field_dw_ddf = data_tables_dict["og_field_dw"]

# can load the whole dataframe into memory as it is small
og_field_dw_df = og_field_dw_ddf.compute()

# convert the column names to lower case
og_field_dw_df = og_field_dw_df.rename(columns=str.lower)

columns_list = og_field_dw_df.columns.tolist()

# drop the columns with all nans and get the columns list
substrings = {"modify", "remarks", "rev_rule", "create", "district_name"}
columns_list = [
    col for col in columns_list if not any(substring in col for substring in substrings)
]

# convert field_no to an int then zfill(8)
og_field_dw_df["field_no"] = (
    og_field_dw_df["field_no"].astype(int).astype(str).str.zfill(8)
)


# convert the o_discovery_dt and g_discovery_dt to datetime
def parse_dates(date_string):
    if pd.isnull(date_string):
        return pd.NaT
    dt = datetime.strptime(date_string, "%d-%b-%y")
    if dt.year > 2023:  # replace with the current year
        dt = dt.replace(year=dt.year - 100)
    return dt


og_field_dw_df["o_discovery_dt"] = og_field_dw_df["o_discovery_dt"].apply(parse_dates)
og_field_dw_df["g_discovery_dt"] = og_field_dw_df["g_discovery_dt"].apply(parse_dates)


# drop the column from the dataframe
og_field_dw_df = og_field_dw_df[columns_list]
# get the number of rows and memory usage
num_rows = og_field_dw_df.shape[0]
memory_usage = og_field_dw_df.memory_usage(index=True, deep=True).sum()

# show first few rows and shape
print(f"Shape of og_field_dw_df: ({og_field_dw_df.shape}")
print(f"Total memory usage: {format_in_B(memory_usage)}")
og_field_dw_df.sample(15)

##### Table 11 - Operator DW

This table contains identifying operator information.



In [None]:
# file_path_11 = overview_dfs.loc[11, "file_path"]
# display_table_overview(file_path_11, column_types=csv_dtypes)
# # get_table_description(file_path_11, column_types=csv_dtypes).compute().T
# get_table_card(file_path_11, column_types=csv_dtypes)
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_operator_dw"
)
get_table_card_from_frame(data_tables_dict, "og_operator_dw")

In [None]:
# read in the dask dataframe
# og_operator_dw_ddf = dd.read_csv(file_path_11, sep="}", dtype=csv_dtypes)
og_operator_dw_ddf = data_tables_dict["og_operator_dw"]

# load in as pandas dataframe as it is small
og_operator_dw_df = og_operator_dw_ddf.compute()


# convert the column names to lower case
og_operator_dw_df.columns = [pascal_to_snake(col) for col in og_operator_dw_df.columns]

# drop the columns with all nans and other useless columns
pattern = re.compile(r"modify|efile|record|create")
columns_to_keep = [col for col in og_operator_dw_df.columns if not pattern.search(col)]

# convert p5Llast_filed_dt to datetime
og_operator_dw_df["p5_last_filed_dt"] = pd.to_datetime(
    og_operator_dw_df["p5_last_filed_dt"].astype(str), errors="coerce"
)

# drop the column from the dataframe
og_operator_dw_df = og_operator_dw_df[columns_to_keep]

# strip the excess space from the Letter in the p5_status_code column
og_operator_dw_df["p5_status_code"] = og_operator_dw_df["p5_status_code"].str.strip()

# get the number of rows and memory usage
num_rows = og_operator_dw_df.shape[0]
memory_usage = og_operator_dw_df.memory_usage(index=True, deep=True).sum()

# show first few rows and shape
print(f"Shape of og_operator_dw_df: ({num_rows} , {len(columns_to_keep)})")
print(f"Total memory usage: {format_in_B(memory_usage)}")
og_operator_dw_df.head(30)

In [None]:
# get a table with the operator_no and operator_name
rrc_pattern = "RAILROAD COMMISSION"
operator_no_name = og_operator_dw_df[
    ~og_operator_dw_df["operator_name"].str.contains(rrc_pattern, regex=True)
][["operator_no", "operator_name"]]


operator_no_name

In [None]:
producing_operator_nos_list = producing_operator_nos.compute().tolist()
og_operator_dw_df[og_operator_dw_df["operator_no"].isin(producing_operator_nos_list)]

# get the operator_name of the producing_operator_nos
# filtered_df = og_operator_dw_ddf[
#     og_operator_dw_ddf["operator_no"].isin(producing_operator_nos)
# ].persist()

# filtered_df.compute()

##### Table 12 - Regulatory Lease DW

This table contains identifying lease information.

In [None]:

display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_regulatory_lease_dw"
)
get_table_card_from_frame(data_tables_dict, "og_regulatory_lease_dw")

In [None]:
# get the dask dataframe
# og_regulatory_lease_dw_ddf = dd.read_csv(file_path_12, sep="}", dtype=csv_dtypes)
og_regulatory_lease_dw_ddf = data_tables_dict["og_regulatory_lease_dw"]
# convert the column names to lower case
og_regulatory_lease_dw_ddf.columns = [
    pascal_to_snake(col) for col in og_regulatory_lease_dw_ddf.columns
]
# create column is_gas based on the oil_gas_code column
og_regulatory_lease_dw_ddf["is_gas"] = og_regulatory_lease_dw_ddf["oil_gas_code"].map(
    {"G": True, "O": False}
)
# drop the columns we do not need to keep
pattern = re.compile(r"field_name|operator_name|district_name|oil_gas_code|field_name")
columns_to_keep = [
    col for col in og_regulatory_lease_dw_ddf.columns if not pattern.search(col)
]
# drop the column from the dataframe
og_regulatory_lease_dw_ddf = og_regulatory_lease_dw_ddf[columns_to_keep]
# get the number of rows and memory usage
num_rows = og_regulatory_lease_dw_ddf.shape[0]
memory_usage = og_regulatory_lease_dw_ddf.memory_usage(index=True, deep=True).sum()
num_rows, memory_usage = dask.compute(num_rows, memory_usage)
# show first few rows and shape
print(
    f"Shape of og_regulatory_lease_dw_ddf: ({num_rows} , {len(columns_to_keep)})\nTotal memory usage: {format_in_B(memory_usage)}"
)
og_regulatory_lease_dw_ddf.head()

In [None]:
og_regulatory_lease_dw_df = og_regulatory_lease_dw_ddf.compute()


counts_df = (
    og_regulatory_lease_dw_df.groupby(["district_no", "is_gas"])["lease_no"]
    .count()
    .reset_index()
    .set_index("district_no")
)

In [None]:
counts_df[counts_df["is_gas"] == 1]["lease_no"].hvplot(kind="bar")

In [None]:
pn.state.kill_all_servers()

In [None]:
import hvplot.pandas


def plot_data(gas):
    filtered_data = og_regulatory_lease_dw_df[
        og_regulatory_lease_dw_df["is_gas"] == gas
    ]
    counts_df = filtered_data.groupby("district_no")["lease_no"].count().reset_index()
    counts_df.columns = ["district_no", "lease_count"]
    return counts_df.hvplot.bar(
        x="district_no",
        y="lease_count",
        title=f"Count of Leases in each District whre is_gas={gas}",
    ).opts(
        toolbar="above",
        active_tools=["box_zoom"],
    )


is_gas_slider = pn.widgets.IntSlider(name="is_gas", start=0, end=1, step=1, value=0)
ibars = pn.panel(pn.bind(plot_data, gas=is_gas_slider), width=880)

pn.Card(is_gas_slider, ibars)

In [None]:
import hvplot.dask
import hvplot.pandas

lease_count = (
    og_regulatory_lease_dw_ddf.compute()
    .groupby(["district_no", "is_gas"])["lease_no"]
    .count()
    .reset_index()
)


# create a dynamic map to plot the number of leases in each district bar plot and is_gas widget controller
def lease_plot_with_labels(var1, col1):
    # Filter the data for the given 'is_gas' value
    data = lease_count[(lease_count[col1] == var1)]

    # Create a bar plot
    bar_plot = data.hvplot.bar(
        x="district_no",
        y="lease_no",
        stacked=True,
        xlabel="",
        yaxis="bare",
        title=f"Count of Leases in each District No. where {col1}={var1}",
        hover_cols="all",
    )

    # Create a list to hold the text elements
    texts = []

    # Loop over the DataFrame and create a text element for each row
    for row in data.itertuples():
        # The position of the text is center of bar
        x = getattr(row, "district_no")
        y = getattr(row, "lease_no") * 1.01
        # The text is the height of the bar
        text = str(getattr(row, "lease_no"))
        # Create the text element and add it to the list
        texts.append(hv.Text(x, y, text, valign="bottom").opts(text_color="black"))

    # Overlay the text elements on the plot
    labelled_plot = bar_plot * hv.Overlay(texts)

    return labelled_plot


lease_dmap = hv.DynamicMap(
    lambda var1: lease_plot_with_labels(var1, "is_gas"),
    kdims=[
        hv.Dimension("var1", label="is_gas"),
    ],
).redim.range(var1=(0, 1))
pn.panel(lease_dmap).show()

##### Table 13 - Well Completion

This table contains identifying well completion information.

In [106]:
display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_well_completion"
)
get_table_card_from_frame(data_tables_dict, "og_well_completion")

~*~ og_well_completion ~*~
796,550 rows, 16 columns


Unnamed: 0,OIL_GAS_CODE,DISTRICT_NO,LEASE_NO,WELL_NO,API_COUNTY_CODE,API_UNIQUE_NO,ONSHORE_ASSC_CNTY,DISTRICT_NAME,COUNTY_NAME,OIL_WELL_UNIT_NO,WELL_ROOT_NO,WELLBORE_SHUTIN_DT,WELL_SHUTIN_DT,WELL_14B2_STATUS_CODE,WELL_SUBJECT_14B2_FLAG,WELLBORE_LOCATION_CODE
0,G,9,67641,2,435,31091,435,7C,SUTTON,,348113,0,0,N,N,L
1,G,9,67642,2,435,31080,435,7C,SUTTON,,348114,0,0,N,N,L
2,G,9,68026,1,435,31282,435,7C,SUTTON,,352816,0,0,N,N,L
3,G,9,68027,2,435,31280,435,7C,SUTTON,,352817,201207,201207,Y,A,L
4,G,9,68028,3,435,31281,435,7C,SUTTON,,352818,0,0,N,N,L
5,G,9,68088,2,435,31255,435,7C,SUTTON,,351880,0,0,N,N,L
6,G,9,68123,0012S,435,31048,435,7C,SUTTON,,352819,0,0,N,N,L
7,G,9,68133,2,435,31275,435,7C,SUTTON,,351882,0,0,N,N,L
8,G,9,68225,2 T,435,31170,435,7C,SUTTON,,350031,0,200703,N,N,L
9,G,9,68247,2,435,31245,435,7C,SUTTON,,352469,0,0,N,N,L


BokehModel(combine_events=True, render_bundle={'docs_json': {'5b017654-cc51-4832-b2d1-3b969967e2c7': {'version…

In [107]:
# get the dask dataframe
# og_well_completion_ddf = dd.read_csv(file_path_13, sep="}", dtype=csv_dtypes)
og_well_completion_ddf = data_tables_dict["og_well_completion"]
# convert the column names to lower case
og_well_completion_ddf.columns = [
    pascal_to_snake(col) for col in og_well_completion_ddf.columns
]
og_well_completion_ddf["is_gas"] = og_well_completion_ddf["oil_gas_code"].map(
    {"G": True, "O": False}
)

# drop the columns we do not need to keep
pattern = re.compile(r"_name|oil_gas_code")
columns_to_keep = [
    col for col in og_well_completion_ddf.columns if not pattern.search(col)
]
# drop the column from the dataframe
og_well_completion_ddf = og_well_completion_ddf[columns_to_keep]

# create column lease_no_district_no which combines the lease_no and district_no
og_well_completion_ddf["lease_no_district_no"] = og_well_completion_ddf[
    "lease_no"
].astype(str) + og_well_completion_ddf["district_no"].astype(str).str.zfill(2)

# create column tx_api which combines the api_county_code and api_unique_no
og_well_completion_ddf["tx_api"] = og_well_completion_ddf["api_county_code"].astype(
    str
).str.zfill(3) + og_well_completion_ddf["api_unique_no"].astype(str).str.zfill(5)
# create a column for the tx_api count
og_well_completion_ddf["tx_api_count"] = og_well_completion_ddf.groupby("tx_api")[
    "tx_api"
].transform("count")
og_well_completion_ddf["lease_no_district_no_count"] = og_well_completion_ddf.groupby(
    "lease_no_district_no"
)["lease_no_district_no"].transform("count")


# get the number of rows and memory usage
num_rows = og_well_completion_ddf.shape[0]
memory_usage = og_well_completion_ddf.memory_usage(index=True, deep=True).sum()
num_rows, memory_usage = dask.compute(num_rows, memory_usage)
# show first few rows and shape
print(f"Shape of og_well_completion_ddf: ({num_rows} , {len(columns_to_keep)}")
print(f"Total memory usage: {format_in_B(memory_usage)}")
og_well_completion_ddf.head()

Shape of og_well_completion_ddf: (796550 , 14
Total memory usage: 106.11 MB


Unnamed: 0,district_no,lease_no,well_no,api_county_code,api_unique_no,onshore_assc_cnty,oil_well_unit_no,well_root_no,wellbore_shutin_dt,well_shutin_dt,well_14b2_status_code,well_subject_14b2_flag,wellbore_location_code,is_gas,lease_no_district_no,tx_api,tx_api_count,lease_no_district_no_count
0,9,67641,2,435,31091,435,,348113,0,0,N,N,L,True,6764109,43531091,1,1
1,9,67642,2,435,31080,435,,348114,0,0,N,N,L,True,6764209,43531080,1,1
2,9,68026,1,435,31282,435,,352816,0,0,N,N,L,True,6802609,43531282,1,1
3,9,68027,2,435,31280,435,,352817,201207,201207,Y,A,L,True,6802709,43531280,1,1
4,9,68028,3,435,31281,435,,352818,0,0,N,N,L,True,6802809,43531281,1,1


In [None]:
og_well_completion_ddf[
    og_well_completion_ddf.lease_no_district_no == "313711"
].compute().sort_values(["tx_api_count", "tx_api"], ascending=False)

##### Table 14 - Summary Master Large

Summary table. (Used for query purposes at the operator level)

In [None]:

display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_summary_master_large"
)
get_table_card_from_frame(data_tables_dict, "og_summary_master_large")

##### Table 15 - Summary Onshore Lease

Summary table. (Used for query purposes on the leases in onshore counties)

In [None]:

display_table_overview_from_frame(
    persisted_overviews, data_tables_dict, "og_summary_onshore_lease"
)
get_table_card_from_frame(data_tables_dict, "og_summary_onshore_lease")

In [None]:
# active leases
og_summary_onshore_lease_ddf = data_tables_dict["og_summary_onshore_lease"]
# convert the column names to lower case
og_summary_onshore_lease_ddf.columns = [
    pascal_to_snake(col) for col in og_summary_onshore_lease_ddf.columns
]
# create a lease_no_district_no column
og_summary_onshore_lease_ddf["lease_no_district_no"] = og_summary_onshore_lease_ddf[
    "lease_no"
].astype(str) + og_summary_onshore_lease_ddf["district_no"].astype(str).str.zfill(2)
# get the number of rows and memory usage
num_rows = og_summary_onshore_lease_ddf.shape[0]
memory_usage = og_summary_onshore_lease_ddf.memory_usage(index=True, deep=True).sum()
num_rows, memory_usage = dask.compute(num_rows, memory_usage)
# show first few rows and shape
print(
    f"Shape of og_summary_onshore_lease_ddf: ({num_rows} , {len(og_summary_onshore_lease_ddf.columns)}"
)
print(f"Total memory usage: {format_in_B(memory_usage)}")
og_summary_onshore_lease_ddf.head()

In [None]:
# active wells have a cycle_year_month_max  of 202311
og_summary_onshore_lease_ddf[
    og_summary_onshore_lease_ddf["cycle_year_month_max"] == 202311
].compute()

Top Ranked Operators in Texas per Monthly Production numbers

From the aggregated Operator cycle table

In [None]:
# group by cycle_year_month and get the top 5 operators for each of the prod_vol columns for 2023
# filter for 20XX
og_operator_cycle_ddf_20XX = og_operator_cycle_ddf[
    og_operator_cycle_ddf.cycle_year_month > 201300
]
# get each of the prod_vol columns
prod_columns = [
    "oper_oil_prod_vol",
    "oper_gas_prod_vol",
    "oper_cond_prod_vol",
    "oper_csgd_prod_vol",
]

In [None]:
# len(og_operator_cycle_ddf_2023)
# og_operator_cycle_ddf_2023.head()
duplicate_operators = (
    og_operator_cycle_ddf_20XX.groupby(["cycle_year_month", "operator_name"])
    .size()
    .compute()
)

# Check if any operator_name appears more than once in the same cycle_year_month
any_duplicates = any(duplicate_operators > 1)

print(
    f"Any operator_name appears more than once in the same cycle_year_month: {any_duplicates}"
)

og_operator_cycle_ddf_20XX["total_prod_vol"] = og_operator_cycle_ddf_20XX[
    prod_columns
].sum(axis=1)
og_operator_cycle_ddf_20XX.loc[
    (og_operator_cycle_ddf_20XX["cycle_year_month"] == 202308)
    & (og_operator_cycle_ddf_20XX["total_prod_vol"] != 0)
].compute()

In [None]:
# Filter the DataFrame
og_operator_cycle_ddf_20XX = og_operator_cycle_ddf_20XX[
    og_operator_cycle_ddf_20XX["cycle_year_month"] <= 202308
]

# Group the DataFrame
grouped_og_operator = og_operator_cycle_ddf_20XX.groupby("cycle_year_month")


# Define a function to get the top 3 rows for each product column
def get_topn(df):
    result = {}
    for col in prod_columns:
        topn = df.nlargest(1, col)[["operator_name"] + prod_columns].copy()
        topn["prod_column"] = col
        result[col] = topn
    return pd.concat(result.values(), keys=result.keys())


# Apply the function to each group
topn_og_operator = grouped_og_operator.apply(
    get_topn,
    meta=pd.DataFrame(
        {
            "operator_name": pd.Series(dtype="object"),
            "oper_oil_prod_vol": pd.Series(dtype="float64"),
            "oper_gas_prod_vol": pd.Series(dtype="float64"),
            "oper_cond_prod_vol": pd.Series(dtype="float64"),
            "oper_csgd_prod_vol": pd.Series(dtype="float64"),
            "prod_column": pd.Series(dtype="object"),
        }
    ),
)

# Compute the result and sort by cycle_year_month
result = topn_og_operator.compute().sort_values(
    by=["cycle_year_month", "prod_column"], ascending=False
)
result.reset_index().drop(columns=["level_1", "level_2"])

From the lease cycle table

In [None]:
# compare result to if we had worked from the lease production table

# create column for the total production to filter out the non-producing operators
og_lease_cycle_ddf["total_lease_prod_vol"] = (
    og_lease_cycle_ddf["lease_oil_prod_vol"]
    + og_lease_cycle_ddf["lease_gas_prod_vol"]
    + og_lease_cycle_ddf["lease_cond_prod_vol"]
    + og_lease_cycle_ddf["lease_csgd_prod_vol"]
)

# get the lease production table and filter out the rows with cycle_year_month > 202308 and 0 production
filtered_og_lease_cycle_ddf = og_lease_cycle_ddf.loc[
    (og_lease_cycle_ddf.cycle_year_month <= 202308)
    & (og_lease_cycle_ddf["total_lease_prod_vol"] > 0)
]

# group by the operator and the cycle_year_month and sum the prod_vol columns
grouped_og_operator_no = (
    filtered_og_lease_cycle_ddf[
        [
            "cycle_year_month",
            "operator_no",
            "lease_oil_prod_vol",
            "lease_gas_prod_vol",
            "lease_cond_prod_vol",
            "lease_csgd_prod_vol",
        ]
    ]
    .groupby(["cycle_year_month", "operator_no"])[
        "lease_oil_prod_vol",
        "lease_gas_prod_vol",
        "lease_cond_prod_vol",
        "lease_csgd_prod_vol",
    ]
    .sum()
    .reset_index()
)

In [None]:
# active leases are those which have a production for 202308
active_leases = (
    filtered_og_lease_cycle_ddf[["cycle_year_month", "lease_no_district_no"]][
        filtered_og_lease_cycle_ddf["cycle_year_month"] == 202308
    ]["lease_no_district_no"]
    .unique()
    .compute()
)

# get the cumulative production for each of active_leases
active_og_lease_cycle_ddf = filtered_og_lease_cycle_ddf[
    filtered_og_lease_cycle_ddf["lease_no_district_no"].isin(active_leases)
]

In [None]:
persisted_active_og_lease_cycle_ddf = active_og_lease_cycle_ddf.persist()

In [None]:
lease_prod_cols = [
    "lease_oil_prod_vol",
    "lease_gas_prod_vol",
    "lease_cond_prod_vol",
    "lease_csgd_prod_vol",
]
filtered_og_lease_cycle_ddf[
    filtered_og_lease_cycle_ddf["lease_no_district_no"] == 313711
][lease_prod_cols + ["cycle_year_month", "operator_no"]].compute()

In [None]:
lease_prod_cols = [
    "lease_oil_prod_vol",
    "lease_gas_prod_vol",
    "lease_cond_prod_vol",
    "lease_csgd_prod_vol",
]
active_lease_sums = (
    persisted_active_og_lease_cycle_ddf.groupby("lease_no_district_no")[lease_prod_cols]
    .sum()
    .compute()
)

In [None]:
# top producing leases
for col in lease_prod_cols:
    display(active_lease_sums.nlargest(10, col))
# active_lease_sums.nlargest(10, "lease_oil_prod_vol")

In [None]:
computed_grouped_og_operator_no = grouped_og_operator_no.compute()

lease_prod_cols = [
    "lease_oil_prod_vol",
    "lease_gas_prod_vol",
    "lease_cond_prod_vol",
    "lease_csgd_prod_vol",
]

# create a empty list to store the max rows
max_rows = []
# go through each of the prod_columns and get the max row for each cycle_year_month
for col in lease_prod_cols:
    computed_grouped_og_operator_no["prod_column"] = col
    max_index = computed_grouped_og_operator_no.groupby("cycle_year_month")[
        col
    ].idxmax()

    max_rows.append(computed_grouped_og_operator_no.loc[max_index])

result = pd.concat(max_rows)
# merge the operator_no_name table to get the operator_name
result.merge(operator_no_name, on="operator_no").sort_values(
    by=["cycle_year_month", "prod_column"], ascending=False
)

In [None]:
info = client.scheduler_info()  # get scheduler info

workers = info["workers"]  # get the workers

print(f"Number of workers: {len(workers)}")

In [None]:
client.close()