In [97]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from waispark.core import create_spark_session
from deltautils.delta_core import TableType, DeltaMetaTags, CompositeOperationTags
from waispark.delta_lake import read_from_delta
import mlnbddata.path as pathd
from pyspark.sql import functions as F

In [2]:
pd.set_option("max_colwidth", None)
pd.set_option("display.max_columns", None)

In [3]:
spark = create_spark_session("best-logs-name-notebook-IV", is_local=True)

/opt/spark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-126279c0-2fdf-4228-8215-ea66f709b366;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 285ms :: artifacts dl 9ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |

### Workspace definition

In [4]:
basepath = "/hpda/"
company = "Schlumberger"
input_run = "1"

fs = pathd.DataFileSystem(root=basepath)

workspace_id = "b8a46742-f715-4023-aac2-d8499bbd707f"

## Candidate logs with pandas

In [5]:
dd = pathd.DataDescriptor(company=company, data_quality=pathd.DataQuality.COMP, 
                          data_type=pathd.DataType.ASSIGN, study_id=workspace_id,
                          run_id=input_run)

df_best_logs = pd.read_parquet(fs.getPath(dd), filters=[[("selected", "=", True), ("alias", "!=", "DS_INDEX")]])

In [6]:
# Adding the well name
dd = pathd.DataDescriptor(company=company, data_quality=pathd.DataQuality.FAM,
                          data_type=pathd.DataType.DSET, study_id=workspace_id,
                          run_id=input_run)

df_fam_dset = pd.read_parquet(fs.getPath(dd), columns=["ds_ref_id", "WAI_WELL_H", "WAI_WELL_NAME", "welllog_Name"]).drop_duplicates()
df_best_logs = pd.merge(df_best_logs, df_fam_dset, how="inner", on="ds_ref_id")
df_best_logs = df_best_logs[["WAI_WELL_H", "WAI_WELL_NAME", "alias", "curvename", "welllog_Name", "ds_ref_id"]]

df_best_logs.head()

Unnamed: 0,WAI_WELL_H,WAI_WELL_NAME,alias,curvename,welllog_Name,ds_ref_id
0,d6b11be7aa5eade440cf61cb5e723ab97f577a9689d5a96de544a51a7f20419e,4903526742,GR,GR,3526742E,14e625e722fda9d6f8c074e5a1d2839bda65fc8a1d63bf88b7a2fef59d2a2522
1,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,4903528243,DTC,DT,3528243A,1c057fe39d2c18a24ccaa33099005ef402d3ee301a5ea721741b8b5622487bd9
2,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,4903528243,GR,GR,3528243A,1c057fe39d2c18a24ccaa33099005ef402d3ee301a5ea721741b8b5622487bd9
3,d6b11be7aa5eade440cf61cb5e723ab97f577a9689d5a96de544a51a7f20419e,4903526742,SHALRES,AT10,3526742C,23a748ebb5783c670f996c8e5333e9339068efbf780fc3112849db33fcddec6f
4,d6b11be7aa5eade440cf61cb5e723ab97f577a9689d5a96de544a51a7f20419e,4903526742,MEDRES,AT60,3526742C,23a748ebb5783c670f996c8e5333e9339068efbf780fc3112849db33fcddec6f


## Wellbore log data with spark

In [7]:
# Best logs
dd = pathd.DataDescriptor(company=company, data_quality=pathd.DataQuality.COMP, 
                          data_type=pathd.DataType.ASSIGN, study_id=workspace_id,
                          run_id=input_run)

comp_assign = spark.read.parquet(fs.getPath(dd))
comp_assign = comp_assign.filter((col("selected") == True) & (col("alias")  != "DS_INDEX"))

                                                                                

In [8]:
# unstack DS_INDEX
dd = pathd.DataDescriptor(company=company, data_quality=pathd.DataQuality.FAMQC,
                          data_type=pathd.DataType.UDATA, study_id=workspace_id, run_id=input_run)

famqc_udata = spark.read.parquet(fs.getPath(dd))
famqc_udata = famqc_udata.withColumnRenamed("WAI_WELL_H", "WAI_WELL_ID")

famqc_udata = famqc_udata.join(comp_assign.select(col("ds_ref_id"), col("alias"), col("curvename")),
                               on=["ds_ref_id", "alias", "curvename"], how="inner")

In [9]:
famqc_udata.drop(col("descr")).show(5, truncate=False)

+----------------------------------------------------------------+-------+---------+---+----+-----+-------------+--------+--------+----------------------------------------------------------------+
|ds_ref_id                                                       |alias  |curvename|rn |data|unit |WAI_WELL_NAME|DS_INDEX|log_name|WAI_WELL_ID                                                     |
+----------------------------------------------------------------+-------+---------+---+----+-----+-------------+--------+--------+----------------------------------------------------------------+
|4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524|SHALRES|AT10     |0  |null|ohm.m|4903527827   |1163.0  |3527827D|4aef2961d7aafbe99a853027885f9c6cca3e449471c72b15bf80d370f1838e31|
|4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524|SHALRES|AT10     |1  |null|ohm.m|4903527827   |1163.5  |3527827D|4aef2961d7aafbe99a853027885f9c6cca3e449471c72b15bf80d370f1838e31|
|4beb646250eaf0

In [10]:
df_famqc_udata = famqc_udata.toPandas()

                                                                                

In [21]:
df_famqc_udata[["ds_ref_id", "alias", "curvename", "log_name"]][df_famqc_udata["alias"]=="GR"].head()

Unnamed: 0,ds_ref_id,alias,curvename,log_name
40083,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40084,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40085,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40086,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40087,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D


## Composite and its best logs used to create it

This example only shows results for GR, DEEPRES and RHOB. Notice that column with suffix "_orig" mean the "ds_ref_id" used to created the composite in the respective "DS_INDEX"

In [11]:
compqc_data_df = read_from_delta(spark, basepath=basepath, study_id=workspace_id,
                                 table_type=TableType.COMPQCDATA,
                                 tag=CompositeOperationTags.QC_DATA_CREATE)

  series = series.astype(t, copy=False)


[2m2024-05-07 08:23:50[0m [[32m[1minfo     [0m] [1mDeltaLakeSparkReader : Requested version of delta table when multiple versions exist after applying filters: latest[0m
[2m2024-05-07 08:23:50[0m [[32m[1minfo     [0m] [1mDeltaLakeSparkReader : loading version 0 of delta table...[0m


  series = series.astype(t, copy=False)


In [12]:
compqc_data_df.filter(col("WAI_WELL_H")=="d6b11be7aa5eade440cf61cb5e723ab97f577a9689d5a96de544a51a7f20419e")\
    .select(col("WAI_WELL_H"), col("DS_INDEX"), col("GR"), col("GR_orig"), 
            col("DEEPRES"), col("DEEPRES_orig"), col("RHOB"), col("RHOB_orig")).show(truncate=False)

24/05/07 08:24:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+----------------------------------------------------------------+--------+------------------+--------------------------------------------------------------------+-------+------------+----+---------+
|WAI_WELL_H                                                      |DS_INDEX|GR                |GR_orig                                                             |DEEPRES|DEEPRES_orig|RHOB|RHOB_orig|
+----------------------------------------------------------------+--------+------------------+--------------------------------------------------------------------+-------+------------+----+---------+
|d6b11be7aa5eade440cf61cb5e723ab97f577a9689d5a96de544a51a7f20419e|6438.5  |102.59100341796876|14e625e722fda9d6f8c074e5a1d2839bda65fc8a1d63bf88b7a2fef59d2a2522%GR%|null   |nan         |null|nan      |
|d6b11be7aa5eade440cf61cb5e723ab97f577a9689d5a96de544a51a7f20419e|6439.0  |112.41329956054688|14e625e722fda9d6f8c074e5a1d2839bda65fc8a1d63bf88b7a2fef59d2a2522%GR%|null   |nan         |null|nan      |


                                                                                

In [13]:
df_compqc_data = compqc_data_df.toPandas()

                                                                                

In [14]:
print(*df_compqc_data.columns)

GR DTC SHALRES MEDRES DEEPRES SP CALI DENC NEUT RHOB POR ds_orig_id original GR_min GR_max DTC_min DTC_max SHALRES_min SHALRES_max MEDRES_min MEDRES_max DEEPRES_min DEEPRES_max SP_min SP_max CALI_min CALI_max DENC_min DENC_max NEUT_min NEUT_max RHOB_min RHOB_max POR_min POR_max CALI_orig DEEPRES_orig DENC_orig GR_orig MEDRES_orig NEUT_orig RHOB_orig SHALRES_orig SP_orig DTC_orig DS_INDEX WAI_WELL_H


In [24]:
df_famqc_udata[["ds_ref_id", "alias", "curvename", "log_name"]][df_famqc_udata["alias"]=="GR"].head()

Unnamed: 0,ds_ref_id,alias,curvename,log_name
40083,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40084,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40085,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40086,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D
40087,4beb646250eaf00a12e003eda11435e2dc71e098370d889fc30e78dc88e4d524,GR,HGR,3527827D


In [29]:
df_compqc_data[["WAI_WELL_H", "DS_INDEX", "GR", "GR_orig", "DEEPRES", "DEEPRES_orig", "RHOB", "RHOB_orig"]].head()

Unnamed: 0,WAI_WELL_H,DS_INDEX,GR,GR_orig,DEEPRES,DEEPRES_orig,RHOB,RHOB_orig
0,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1145.0,55.177399,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4%HGR%,,,,
1,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1145.5,60.431499,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4%HGR%,,,,
2,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1146.0,70.942802,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4%HGR%,,,,
3,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1146.5,42.9146,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4%HGR%,,,,
4,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1147.0,66.563202,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4%HGR%,,,,


In [101]:
ALIAS = ["GR", "DEEPRES", "RHOB"]

df_final = pd.DataFrame()
initial = True

for col in df_compqc_data.columns:
    if "_orig" in col and col.split("_", 1)[0] in ALIAS:
        
        alias = col.split("_", 1)[0]
        
        df_temp = df_famqc_udata[df_famqc_udata["alias"]==alias]\
                        [["ds_ref_id", "alias", "curvename", "log_name"]].drop_duplicates().copy()
        
        df_alias = df_compqc_data[["WAI_WELL_H", "DS_INDEX", alias, col]].copy()
        df_alias["ds_ref_id"] = df_alias[col].str.split("%").str[0]
        df_alias = pd.merge(df_alias, df_temp, how="left", on=["ds_ref_id"])
        df_alias.drop(columns=[col, "alias"], inplace=True)
        df_alias.rename(columns={"ds_ref_id" : col, "curvename" : f"{alias}_curvename",
                                 "log_name" : f"{alias}_log_name"}, inplace=True)
        
        if initial:
            df_final = pd.concat([df_final, df_alias], axis=1)
            initial = False
        else:
            df_final = pd.concat([df_final, df_alias[[alias, col, f"{alias}_curvename", f"{alias}_log_name"]]], axis=1)

In [102]:
df_final

Unnamed: 0,WAI_WELL_H,DS_INDEX,DEEPRES,DEEPRES_orig,DEEPRES_curvename,DEEPRES_log_name,GR,GR_orig,GR_curvename,GR_log_name,RHOB,RHOB_orig,RHOB_curvename,RHOB_log_name
0,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1145.0,,,,,55.177399,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4,HGR,3528243B,,,,
1,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1145.5,,,,,60.431499,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4,HGR,3528243B,,,,
2,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1146.0,,,,,70.942802,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4,HGR,3528243B,,,,
3,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1146.5,,,,,42.914600,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4,HGR,3528243B,,,,
4,2199194ab8ab39b217d368c274a0ef0771a6a27fb5c159b5834ce94f30cbdc66,1147.0,,,,,66.563202,959981c470b9f6ced701ef7fafdc3c14434777b55eacbb6a91388a9243b32bb4,HGR,3528243B,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
120863,7ac5a8127b65ff1bd39f90f9ffe6de9991b419c87c534aec77b54696ab2462dd,14063.5,,,,,,,,,,,,
120864,7ac5a8127b65ff1bd39f90f9ffe6de9991b419c87c534aec77b54696ab2462dd,14064.0,,,,,,,,,,,,
120865,7ac5a8127b65ff1bd39f90f9ffe6de9991b419c87c534aec77b54696ab2462dd,14064.5,,,,,,,,,,,,
120866,7ac5a8127b65ff1bd39f90f9ffe6de9991b419c87c534aec77b54696ab2462dd,14065.0,,,,,,,,,,,,


In [103]:
df_final.to_csv("detaild_composite_and_best_logs.csv")