In [1]:
%load_ext autoreload
# reload modules automatically before each cell
%autoreload 2

import requests
import os
os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt" # if not already set by the OS; doesn't hurt

# fixes a bug in SPARQLwrapper
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

# importing pandas to make printed dataframes prettier
import pandas as pd, hashlib
pd.set_option('display.max_colwidth', None)
pd.set_option('display.colheader_justify', 'left')
pd.set_option('display.width', 200)

In [2]:
from pmd_demo_tools import mesh_tools, sparql_tools
from pmd_demo_tools.query_collection import S355queries
S355queries = S355queries()

In [23]:
# read in all servers on the mesh
partners_full = mesh_tools.mesh_namespace_grouped_by_company()

# attach hosted services
_ = mesh_tools.attach_services_in_place(partners_full)

# attach web tokens
import json
with open('../secrets/tokens.json') as f:
    tokens = json.load(f, object_hook=mesh_tools.namespace_object_hook())

partners_full.Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT.iwt.services.ontodocker.token = tokens.Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT.ontodocker.token
partners_full.Fraunhofer_IWM.iwm.services.ontodocker.token = tokens.Fraunhofer_IWM.ontodocker.token
partners_full.KIT.kit_3.services.ontodocker_proxy.token = tokens.KIT.ontodocker_proxy.token
partners_full.MPISusMat.mpi_susmat.services.ontodocker.token = tokens.MPISusMat.ontodocker.token



In [24]:
selection = ["Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT", "Fraunhofer_IWM", "KIT"]
partners = mesh_tools.select_toplevel(partners_full, selection, deepcopy=True)

In [25]:
datasets = ["pmdco2_tto_example_parallel", "pmdco2_tto_example_perpendicular","pmdco2_tto_example_diagonal"]

In [26]:
query = S355queries.orientation()
orientation_results = sparql_tools.federated_query(partners=partners, datasets=datasets, query=query.query, columns=query.headers, print_to_screen=False)
orientation_results_rns = mesh_tools.RecursiveNamespace(**orientation_results)
type(query)

pmd_demo_tools.sparql_tools.SparqlQuery

In [27]:
orientation_uri_term = S355queries.orientation().headers[0] # uri
orientation_term = S355queries.orientation().headers[1] # cut orientation

In [28]:
query = S355queries.csv_url()
csv_url_results = sparql_tools.federated_query(partners=partners, datasets=datasets, query=query.query, columns=query.headers, print_to_screen=False)
csv_url_results_rns = mesh_tools.RecursiveNamespace(**csv_url_results)

In [29]:
query = S355queries.csv_columns()
csv_columns_results = sparql_tools.federated_query(partners=partners, datasets=datasets, query=query.query, columns=query.headers, print_to_screen=False)
csv_columns_results_rns = mesh_tools.RecursiveNamespace(**csv_columns_results)

In [30]:
csv_uri_term = S355queries.csv_columns().headers[0] # uri
csv_url_term = S355queries.csv_columns().headers[1] # url
csv_id_term = S355queries.csv_columns().headers[2] # name
csv_column_number_term = S355queries.csv_columns().headers[3] # column number
csv_quantity_term = S355queries.csv_columns().headers[4] # quantity
csv_unit_term = S355queries.csv_columns().headers[5] # unit

In [31]:
query = S355queries.primary_data()
primary_data_results = sparql_tools.federated_query(partners=partners, datasets=datasets, query=query.query, columns=query.headers, print_to_screen=False)
primary_data_results_rns = mesh_tools.RecursiveNamespace(**primary_data_results)

In [32]:
primary_uri_term = S355queries.primary_data().headers[0] # uri
primary_quantity_term = S355queries.primary_data().headers[1] # quantity
primary_value_term = S355queries.primary_data().headers[2] # value
primary_unit_term = S355queries.primary_data().headers[3] # unit
cross_section_term = "crossSectionArea_S0"

In [33]:
pmd_prefix = "https://w3id.org/pmd/co/"
displacement_term = "CrossheadSeparation"
displacement_expression = pmd_prefix+displacement_term
force_term = "Force"
force_expression = pmd_prefix+force_term

percentage_extension_term = "PercentageExtension"
percentage_extension_expression = pmd_prefix+percentage_extension_term 

unit_prefix = "http://qudt.org/vocab/unit/"
displacement_unit_expected = "http://qudt.org/vocab/unit/MilliM"
percentage_extension_unit_expected = "http://qudt.org/vocab/unit/PERCENT"
force_unit_expected = "http://qudt.org/vocab/unit/KiloN"
cross_section_unit_expected = "http://qudt.org/vocab/unit/MilliM2"

In [34]:
import numpy as np

def calc_stress(force: np.array, cross_section_area: float) -> np.array:
    return force/cross_section_area

In [35]:
from scipy.optimize import curve_fit
from dataclasses import dataclass
import numpy as np

@dataclass
class ElasticModulusCalculation:
    elastic_modulus: float
    variance_elastic_modulus: float
    stress_offset: float
    variance_stress_offset: float
    elastic_strain_limit: float
    elastic_limit_index: int

def lin_func(x,a,b):
        return a*x+b

def elastic_modulus_calculation(stress: np.array,
                         strain: np.array,
                         elastic_strain_limit: float = 0.00025,
                         elastic_modulus_guess: float = 200.0,
                         stress_offset_guess: float = -2.0
                        ) -> ElasticModulusCalculation:
 
    strain = np.asarray(strain)
    stress = np.asarray(stress)

    elastic_limit_index = 0
    # avoid IndexError if all strain <= elastic_strain_limit
    while (
        elastic_limit_index < len(strain) - 1
        and strain[elastic_limit_index] <= elastic_strain_limit
    ):
        elastic_limit_index += 1

    if elastic_limit_index < 1:
        raise ValueError(
            f"Not enough points in elastic region (strain <= {elastic_strain_limit})."
        )

    popt, pcov = curve_fit(
        lambda x, *p: p[0] * x + p[1],
        strain[:elastic_limit_index + 1],
        stress[:elastic_limit_index + 1],
        p0=np.asarray([elastic_modulus_guess, stress_offset_guess]),
    )

    elastic_modulus = popt[0]
    stress_offset = popt[1]
    variance_elastic_modulus = pcov[0, 0]
    variance_stress_offset = pcov[1, 1]
    elastic_strain_limit_measured = strain[elastic_limit_index]

    return ElasticModulusCalculation(
        elastic_modulus,
        variance_elastic_modulus,
        stress_offset,
        variance_stress_offset,
        elastic_strain_limit_measured,
        elastic_limit_index,
    )

In [36]:
@dataclass
class OffsetYieldCalculation:
    offset_strain: float
    yield_stress: float
    yield_strain: float

def offset_yield_calculation(stress: np.array,
                             strain: np.array,
                             elastic_modulus: float | None = None,
                             offset_strain: float = 0.002
                            ) -> OffsetYieldCalculation:
    if elastic_modulus is None:
        E = elastic_modulus_calculation(stress=stress, strain=strain)
        elastic_modulus = E.elastic_modulus
        
    offset_elastic_line = elastic_modulus * (strain - offset_strain)
    # difference between curves
    delta = stress - offset_elastic_line

    # find indices where sign changes (curve intersection)
    sign = np.sign(delta)
    # treat exact zeros as positive to avoid double crossings
    sign[sign == 0] = 1.0
    crossings = np.where(np.diff(sign) != 0)[0]

    if len(crossings) == 0:
        # fallback: closest point if we never actually cross
        arg = np.argmin(np.abs(delta))
        yield_strain = strain[arg]
        yield_stress = stress[arg]
    else:
        i = crossings[0]  # first crossing
        # linear interpolation in [i, i+1] on delta = 0
        x0, x1 = strain[i], strain[i + 1]
        d0, d1 = delta[i], delta[i + 1]

        # avoid division by zero if two delta’s are equal (very degenerate)
        if d1 == d0:
            yield_strain = x0
            yield_stress = stress[i]
        else:
            # strain at intersection (delta == 0)
            yield_strain = x0 - d0 * (x1 - x0) / (d1 - d0)

            # linearly interpolate stress at that strain
            s0, s1 = stress[i], stress[i + 1]
            yield_stress = s0 + (s1 - s0) * (yield_strain - x0) / (x1 - x0)

    return OffsetYieldCalculation(offset_strain, yield_stress, yield_strain)

In [37]:
@dataclass
class ConvertedStrain:
    strain: np.array
    raw_strain: np.array
    baseline_offset: float
    raw_strain_is_percent: bool | None

def convert_strain(raw_strain: np.array,
                   raw_strain_is_percent: bool | None = True
                  ) -> ConvertedStrain:
    """
    Convertes strain to comparable and normalized data:
    - unit normalization: convert from percent to true (enineering) strain
    - baseline correction: all strains start at 0
    """
    if raw_strain_is_percent:
        strain = raw_strain / 100.
    else:
        strain = raw_strain
    
    baseline_offset = strain[0]
    strain = strain - baseline_offset

    return ConvertedStrain(
        strain = strain,
        raw_strain = raw_strain,
        baseline_offset = baseline_offset,
        raw_strain_is_percent = raw_strain_is_percent,
    )
        

In [38]:
from scipy.signal import savgol_filter

@dataclass
class SmoothenedStress:
    stress: np.array
    raw_stress: np.array
    window_length: int
    polyorder: int

def smoothen_stress(raw_stress: np.array,
                    window_length: int = 61,
                    polyorder: int = 3) -> SmoothenedStress:
    
    # ensure window_length is odd and not larger than the data
    if window_length < 3:
        return raw_stress  # too small to smooth meaningfully
    if window_length % 2 == 0:
        window_length += 1
    if window_length > raw_stress.size:
        # largest odd window that still fits
        window_length = raw_stress.size if raw_stress.size % 2 == 1 else raw_stress.size - 1
    if window_length <= polyorder:
        # reduce polyorder if needed
        polyorder = max(1, window_length - 2)

    stress = savgol_filter(raw_stress, window_length=window_length, polyorder=polyorder, mode="interp")
    return SmoothenedStress(
        stress = stress,
        raw_stress=raw_stress,
        window_length = window_length,
        polyorder = polyorder,
    )

In [39]:
url_results_list = [csv_url_results_rns.Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT_iwt.ontodocker.pmdco2_tto_example_parallel.result,
                    csv_url_results_rns.KIT_kit_3.ontodocker_proxy.pmdco2_tto_example_perpendicular.result,
                    csv_url_results_rns.Fraunhofer_IWM_iwm.ontodocker.pmdco2_tto_example_diagonal.result
                   ]

columns_results_list = [csv_columns_results_rns.Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT_iwt.ontodocker.pmdco2_tto_example_parallel.result,
                        csv_columns_results_rns.KIT_kit_3.ontodocker_proxy.pmdco2_tto_example_perpendicular.result,
                        csv_columns_results_rns.Fraunhofer_IWM_iwm.ontodocker.pmdco2_tto_example_diagonal.result
                       ]
rolling_direction_results_list =[orientation_results_rns.Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT_iwt.ontodocker.pmdco2_tto_example_parallel.result,
                                 orientation_results_rns.KIT_kit_3.ontodocker_proxy.pmdco2_tto_example_perpendicular.result,
                                 orientation_results_rns.Fraunhofer_IWM_iwm.ontodocker.pmdco2_tto_example_diagonal.result
                                ]

primary_data_results_list = [primary_data_results_rns.Leibniz_Institut_fuer_Werkstofforientierte_Technologien_IWT_iwt.ontodocker.pmdco2_tto_example_parallel.result,
                             primary_data_results_rns.KIT_kit_3.ontodocker_proxy.pmdco2_tto_example_perpendicular.result,
                             primary_data_results_rns.Fraunhofer_IWM_iwm.ontodocker.pmdco2_tto_example_diagonal.result
                            ]

AttributeError: 'RecursiveNamespace' object has no attribute 'pmdco2_tto_example_parallel'

In [22]:
import matplotlib.pyplot as plt
%matplotlib inline

import warnings

In [None]:
fig_fd, ax_fd = plt.subplots(2,2, figsize=(12,12))
linestyles = ("solid", "dotted", "dashed")
markers = ("o", "x", "^")
for url_results, columns_results, roll_dir_results, primary_data_results, linestyle, marker in zip(url_results_list, columns_results_list, rolling_direction_results_list, primary_data_results_list, linestyles, markers):
    for url, uri in zip(url_results[S355queries.csv_url().headers[1]], url_results[S355queries.csv_url().headers[0]]):

        # check strain unit consistency
        percentage_extension_unit = columns_results.query(f"{csv_uri_term} == @uri and {csv_quantity_term} == @percentage_extension_expression")[csv_unit_term].values[0]
        if not percentage_extension_unit == percentage_extension_unit_expected:
            warnings.warn("Inconsistent 'percentage extension' units detected!", category=UserWarning)
            # conversion ...

        # assign strain column number
        percentage_extension_column_number = int(columns_results.query(f"{csv_uri_term} == @uri and {csv_quantity_term} == @percentage_extension_expression")[csv_column_number_term].values[0])-1

        # check force unit consistency
        force_unit = columns_results.query(f"{csv_uri_term} == @uri and {csv_quantity_term} == @force_expression")[csv_unit_term].values[0]
        if not force_unit == force_unit_expected:
            warnings.warn("Inconsistent force units detected!", category=UserWarning)
            # conversion ...

        # assign force column number
        force_column_number = int(columns_results.query(f"{csv_uri_term} == @uri and {csv_quantity_term} == @force_expression")[csv_column_number_term].values[0])-1

        # check cross section unit
        cross_section_unit = primary_data_results.query(f"{primary_uri_term} == @uri and `{primary_quantity_term}`.str.contains(@cross_section_term, na=False)", engine="python")[primary_unit_term].values[0]
        if not cross_section_unit == cross_section_unit_expected:
            warnings.warn("Inconsistent cross section units detected!", category=UserWarning)
            # conversion ...

        # assign cross section
        cross_section = float(primary_data_results.query(f"{primary_uri_term} == @uri and `{primary_quantity_term}`.str.contains(@cross_section_term, na=False)", engine="python")[primary_value_term].values[0])
        
        #roll_dir = roll_dir_results.loc[roll_dir_mask, S355queries.orientation().headers[1]].iloc[0]
        roll_dir = roll_dir_results.query(f"{orientation_uri_term} == @uri")[orientation_term].values[0]
        
        # read csv from ressource location
        csv = pd.read_csv(url, delimiter=";", decimal=",", on_bad_lines='skip', skiprows=[1])

        raw_strain = csv.iloc[:, strain_column_number]
        converted_strain = convert_strain(raw_strain=raw_strain, raw_strain_is_percent=True)
        strain = converted_strain.strain
        
        force = csv.iloc[:, force_column_number]
        raw_stress = force/cross_section

        smoothened_stress = smoothen_stress(raw_stress)
        stress = smoothened_stress.stress
        
        E = elastic_modulus_calculation(stress=stress, strain=strain) # is already stadard unit: kN/mm^2 = GPa
        e_mod = E.elastic_modulus
        Rp = offset_yield_calculation(stress=stress, strain=strain, elastic_modulus=E.elastic_modulus, offset_strain=0.002)
        yield_stress_02 = Rp.yield_stress*1e+03 # convert kN/mm^2 to MPa

        identifier = uri.replace("https://w3id.org/pmd/ao/tte/pmdao-tto-tt-","")
        identifier = identifier.replace("_process", "")
        ax_fd[0][0].plot(strain, stress, linestyle=linestyle, label=f"cut {roll_dir} ({identifier})\nE={e_mod:.1f}GPa; Rp0.2={yield_stress_02:.1f}MPa")
        ax_fd[0][1].plot(strain, stress, linestyle=linestyle, label=f"cut {roll_dir}")

        if uri == "https://w3id.org/pmd/ao/tte/pmdao-tto-tt-S355-3_process":
            ax_fd[1][0].plot(strain, raw_stress, linestyle='solid', label="raw data")
            ax_fd[1][0].plot(strain, stress, linestyle='solid', label="Savitzky–Golay filter applied")
            ax_fd[1][0].plot(strain, E.elastic_modulus*strain, color="k", dashes=(6.,2.), label="elastic extrapolation")
            ax_fd[1][0].plot(strain, E.elastic_modulus*(strain-0.002), color="g", dashes=(6.,2.), label="offset yield elasticity")
            ax_fd[1][0].plot(Rp.yield_strain, Rp.yield_stress, linestyle="None", marker="o", color="r", label="0.2% offset yield point")
            axins = ax_fd[1][0].inset_axes(bounds=[0.45, 0.37, 0.55, 0.47],xlim=(0.003, 0.011), ylim=(0.37, 0.38))
            axins.plot(strain, raw_stress, linestyle='solid', linewidth=3.5)
            axins.plot(strain, stress, linestyle='solid')
            ax_fd[1][0].indicate_inset_zoom(axins, edgecolor="black")

        ax_fd[1][1].plot([e_mod], [yield_stress_02], marker=marker, markersize=10.,linestyle="None", label=f"{identifier}")

ax_fd[0][0].set_xlabel("strain")
ax_fd[0][0].set_ylabel("stress"+" / GPa")

ax_fd[0][1].set_xlabel(ax_fd[0][0].get_xlabel())

ax_fd[1][0].set_xlabel(ax_fd[0][0].get_xlabel())
ax_fd[1][0].set_ylabel(ax_fd[0][0].get_ylabel())
#ax_fd[1][0].xticks(list(ax_fd[1][0].xticks()[0]) + 0.002)

ax_fd[1][1].set_xlabel("elastic modulus / GPa")
ax_fd[1][1].set_ylabel(r"offset yield strength $R_{p0.2}$ / MPa")

#dummy plots to adjust legends
ax_fd[0][0].plot(0,0,ls="None", c="None", label=" ")
ax_fd[1][1].plot(180,390,ls="None", c="None", label=" ")
#ax_fd[0][0].plot(0,0,ls="None", c="None", label=" ")

# put timestamp of data retrieval into figure legend
timestamp = pd.Timestamp.now(tz="Europe/Berlin")
ax_fd[0][0].plot(0,0,ls="None", c="None", label=r"$\bf{{data\ retrieved:}}$"+ f"\n{timestamp}")

ax_fd[0][1].set_xlim(left=.00075, right=.0421)
ax_fd[0][1].set_ylim(bottom=.365, top=.462)

#print(ax_fd[0][1].get_xlim())
ax_fd[1][0].set_xlim(-0.0005, right=0.02)
ax_fd[1][0].set_ylim(bottom=0., top=0.42)
#axins = ax_fd[1][0].inset_axes(bounds=[0.45, 0.37, 0.47, 0.47],xlim=(0.005, 0.015), ylim=(0.25, 0.4))

ax_fd[0][0].legend(loc="upper left", bbox_to_anchor=(-0.13, 1.4), ncol=3, fontsize=10.)
ax_fd[1][0].legend(loc="lower right", ncol=1, fontsize=12.)
ax_fd[1][1].legend(loc="upper left", bbox_to_anchor=(0.,1.07), ncol=3, columnspacing=0.2, handletextpad=0.01, fontsize=10., framealpha=0.95)

fig_fd.show()

In [None]:

fig_fd.savefig(fname="S355_stress-strain-overview.png", bbox_inches="tight")