In [25]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from glob import glob
from lmfit import Model
from statsmodels.formula.api import ols
from statsmodels.sandbox.regression.predstd import wls_prediction_std

import re

In [26]:
def values_between(x, y, minimum=float, maximum=float):
    '''
    Finds values betwwen minimum and maximum
       
    
    Parameters:
    -------
    x: np.array 
        X-values
    y: np.array
        Y.values
    minimum: float
        Minimum value
    maximum: float
        Maximum value
        
    Returns:
    -------
    x: np.array 
        X-values in the respective X-range
    y: np.array
        Y.values 
    
    '''
    y=y[(x<maximum)]
    x=x[(x<maximum)]
    y=y[x>minimum]
    x=x[x>minimum]
    return x, y

In [27]:
def gauss_func(x, max_intensity, two_theta, fwhm):
    '''
    Represents a Gauss function
       
    
    Parameters:
    -------
    x: np.array 
        X-values
    max_intensity: np.array
        Y.values
    two_theta:
        Two theta angle
    fwhm:
        Full with at half maximum
        
    Returns:
    -------
    Gauss Function
    '''
    return max_intensity * np.exp(-np.log(2) * ((x - two_theta) / (fwhm / 2)) ** 2)

In [28]:
def lorentz_func(x, max_intensity, two_theta, fwhm):
    '''
    Represents a Lorentz function
       
    
    Parameters:
    -------
    x: np.array 
        X-values
    max_intensity: np.array
        Y.values
    two_theta:
        Two theta angle
    fwhm:
        Full with at half maximum
        
    Returns:
    -------
    Lorentz Function
    '''
    return max_intensity / (1 + ((x - two_theta) / (fwhm / 2)) ** 2)

In [29]:
def psv_func(x, max_intensity, two_theta, fwhm, nu):
    '''
    Represents a Pseudo-Voigt function
       
    
    Parameters:
    -------
    x: np.array 
        X-values
    max_intensity: np.array
        Y.values
    two_theta:
        Two theta angle
    fwhm:
        Full with at half maximum
    nu: float between 0 & 1
        nu = 1 Pure Gauss
        nu = 0 Pure Lorentz
        
    Returns:
    -------
    PSV Function
    '''
    return nu * gauss_func(x, max_intensity, two_theta, fwhm) + (1 - nu) * lorentz_func(x, max_intensity, two_theta,fwhm)

In [30]:
def double_psv_func(x, max_intensity_1, max_intensity_2, two_theta_1, two_theta_2, fwhm_1, fwhm_2, nu_1, nu_2, delta):
    '''
    Represents a Double PSV function
       
    
    Parameters:
    -------
    x: np.array 
        X-values
    max_intensity: np.array
        Y.values
    two_theta:
        Two theta angle
    fwhm:
        Full with at half maximum
    nu: float between 0 & 1
        nu = 1 Pure Gauss
        nu = 0 Pure Lorentz
        
    Returns:
    -------
    Double PSV Function
    '''
    return psv_func(x, max_intensity=max_intensity_1, two_theta=two_theta_1, fwhm=fwhm_1, nu=nu_1) + \
           psv_func(x, max_intensity=max_intensity_2, two_theta=two_theta_2, fwhm=fwhm_2, nu=nu_2)

In [31]:
class PSVModel(Model):
    '''
    Implementation of the Pseudo Voigt Model
    Super of lmfit.Model --> https://lmfit.github.io/lmfit-py/model.html#lmfit.model.Model
    '''
    # overload lmfit.Model constructor with specific psv-function constructor
    def __init__(self):
        super(PSVModel, self).__init__(psv_func, missing='drop')
        self.name = 'pseudo-Voigt'

    # implement the guessing method for start parameters
    def guess(self, data=None, **kws):
        # set x to range of indices, if not provided
        assert 'x' in kws
        x = kws['x']

        # set sensible values for parameters
        max_intensity = max(data)
        two_theta = x[data.idxmax()]
        try:
            fwhm = x[data > max_intensity / 2].ptp()  # ptp = max - min
        except:
            fwhm = 0.1

        # set physically sensible boundaries for the parameters
        self.set_param_hint('nu', min=0, max=1)
        self.set_param_hint('max_intensity', min=0)
        self.set_param_hint('two_theta', min=min(x), max=max(x))
        self.set_param_hint('fwhm', min=0)

        # construct and return lmfit.Parameters object
        pars = self.make_params(max_intensity=max_intensity, two_theta=two_theta, fwhm=fwhm, nu=0.5)
        return pars

In [32]:
class  Time_Object(object):
    '''
    Creates a time Object with hours, minutes, seconds and milliseconds
    '''
    def __init__(self, hour=0, minute=0, second=0, millisecond=0):
        self.day=0
        self.hour = hour
        self.minute = minute
        self.second = second
        self.millisecond = millisecond       
        
    def add_millisecond(self, milli_second):
        '''
        Adds a millisecond
        '''
        self.millisecond += milli_second
        self.millisecond, diff = self.check(self.millisecond)
        if diff != 0:
            self.add_second(diff)
            
    def add_second(self, second):
        '''
        Adds a second
        '''
        self.second += second
        self.second, diff = self.check(self.second)
        if diff != 0:
            self.add_minute(diff)
    
    def add_minute(self, minute):
        '''
        Adds a minute
        '''
        self.minute += minute
        self.minute, diff = self.check(self.minute)
        if diff != 0:
            self.add_hour(diff)
        
    def add_hour(self, hour):
        '''
        Adds an hour
        '''
        self.hour += hour
        self.hour, diff = self.check(self.hour, 24)
        self.day += diff
        
    def add_day(self, day):
        self.day += day
        
    def check(self, period, number_per_period=60):
        '''
        Checks parameters if they are in boundaries
        '''
        diff = int(period/number_per_period)
        period -= diff * number_per_period
        return period, diff
    
            
    def show_time(self):
        '''
        Prints the time in a nice format
        '''
        print(f"Day:{self.day},Hour:{self.hour}, Minute:{self.minute}, Second:{self.second}, Millisecond:{self.millisecond}")  
        
    def greater_than(self, other):
        '''
        Compares times
        True if self > other.
        '''
        if self.day > other.day:
            return True
        elif self.hour > other.hour:
            return True
        elif self.minute > other.minute:
            return True
        elif self.second > other.second:
            return True
        elif self.millisecond > other.millisecond:
            return True
        else:
            return False
        
    def smaller_than(self, other):
        '''
        Compares times
        True if self < other.
        '''
        if self.day < other.day:
            return True
        elif self.hour < other.hour:
            return True
        elif self.minute < other.minute:
            return True
        elif self.second < other.second:
            return True
        elif self.millisecond < other.millisecond:
            return True
        else:
            return False

In [33]:
def fit(x, y, background_lower, peak, background_upper):
    '''
    Fits peaks  
    
    Parameters:
    -------
    x: np.array of floats
        e.g. theta2theta data
    y: : np.array of floats
        e.g. intensity
    background_lower: np.array of floats
        indices of background before peak
    peak: 
        indices of peak data
    background_upper:
        indices of background after peak
        
    Returns:
    -------

    '''
    # find backgroud and fit a line
    x_bg = np.concatenate((x[background_lower], x[background_upper]))
    y_bg = np.concatenate((y[background_lower], y[background_upper]))
    m, c = np.polyfit(x_bg, y_bg, 1) # fits a line according to background
    # find peak data
    xs = x[peak]
    ys = y[peak]
    # correct ys data
    y_corr = ys - (m * xs + c)
    
    # create model
    model = PSVModel()
    
    pars = model.guess(y_corr, x=xs) #produce start values x given as keyword argument, elsewise just range of indixes
    fit = model.fit(y_corr, pars, x=xs) # fit model
    #extract data from model
    two_theta = (fit.params['two_theta'].value, fit.params['two_theta'].stderr)
    max_intensity = (fit.params['max_intensity'].value, fit.params['max_intensity'].stderr)
    nu = (fit.params['nu'].value, fit.params['nu'].stderr)
    fwhm = (fit.params['fwhm'].value, fit.params['fwhm'].stderr)
    return (two_theta, max_intensity, nu, fwhm, m, c)

In [34]:
def fit_peak(tth, intensity, background_lower, peak_lower, peak_upper, background_upper, plot=True):
    '''
    Fits peaks, corrects for Background and Plots respective Data
    
    Parameters:
    -------
    tth: np.array of floats
        Theta2Theta values
    intensity: np.array of floats
        Intensity at Theta2Theta values
    background_lower: float
        Theta2Theta value of the bottom limit of the background substraction
    peak_lower: float
        Theta2Theta start value of the peak
    peak_upper: float
        Theta2Theta end value of the peak
    background_upper: float
        Theta2Theta value of the upper limit of the background substraction
        
    Returns:
    -------
    two_theta: float 
        two_theta value of the peak position of the fitted peak
    max_intensity: float
        maximal intensity of the fitted peak
    nu: float
       Shapefactor of pseudo voigt profile
    fwhm: float
        full width at half maximum of the fitted peak
    integral_int: float
        integral_intensity of the fitted peak
    integral_width: float
        integral_width of the fitted peak
    '''
    # find indices of lower and upper background between the limits
    ### Breaking changes in np.argwhere with pandas series --> np. arrays needed
    background_lower = np.intersect1d(np.argwhere(np.array(tth) > background_lower),np.argwhere(np.array(tth) < peak_lower))
    background_upper = np.intersect1d(np.argwhere(np.array(tth) > peak_upper),np.argwhere(np.array(tth) < background_upper))
    # find indices between peak_lower and peak upper
    peak = np.intersect1d(np.argwhere(np.array(tth) > peak_lower),np.argwhere(np.array(tth) < peak_upper))
    # fit data
    two_theta, max_intensity, nu, fwhm, m, c =  fit(tth, intensity, background_lower, peak, background_upper)
    print(two_theta)
    if plot:
        plt.plot(tth[background_lower], intensity[background_lower], 'ro')
        plt.plot(tth[background_upper], intensity[background_upper], 'ro')
        plt.plot(tth[peak], intensity[peak], 'bo')
        x_fit = tth[peak]
        y_fit =  psv_func(x_fit, max_intensity[0], two_theta[0], fwhm[0], nu[0])
        y_fit_linear = y_fit + m*x_fit + c
        plt.plot(x_fit, y_fit, 'g--')
        plt.plot(x_fit, y_fit_linear, 'g-')
        plt.xlabel("Theta2Theata")
        plt.ylabel("Intensity")
        plt.show()
    return two_theta, max_intensity, nu, fwhm

In [35]:
def read_dat_file(path, file):
    '''
    Reads Bessy II dat data of KMCII.
    
    Parameters:
    -------
    path: String
        Path to file
    file: String
        Filename including suffix .dat
    Returns:
    -------
    table.TTh: pandas series 
        Theta two Theta values
    table.Int: pandas series
        Intensity values
    '''
    table = pd.read_table(f"{path}\\{file}", skiprows=15,  delimiter=" 	", names=["TTh", "Int", "N_pix", "Err"], engine='python')
    return table.TTh, table.Int

In [36]:
def extract_scan_data(path, file, pattern):
    '''
    Extracts data from the SPEC file provided by KMCII.
    
    
     Parameters:
    -------
    path: String
        Path to file
    file: String
        Filename including suffix .dat
    pattern: String
        Regex to match
        
    Returns:
    -------
    psi_values: np.array
        Calculated Psi Values
    time: Time_Object
        Timestamp
    
    '''
    with open (f"{path}\\{file}", "r") as file: # opens the file
        for i, line in enumerate(file):
            if line.find(pattern) != -1 and i >0: # finds the line in the file
                timestamp=file.readline(i+1) # Timestamp is in the next line
                break
                
    timestamp=np.asarray(re.findall(r'\d+', timestamp)).astype(int)[1:4] #extracts the time from timestamp
    time=Time_Object(timestamp[0], timestamp[1], timestamp[2]) # creates Time_Object
    numbers=np.asarray(re.findall(r'\d+', line)).astype(int) #extracts the numbers for PSI Calculation
    step_size=(numbers[2]-numbers[1])/numbers[3] # calculates the stepsize of the scan
    psi_values=-np.asarray(np.arange(numbers[1], numbers[2]+step_size, step_size))+90
    return psi_values, time

In [37]:
def lattice_spacing(tth, wavelength, hkl):
    '''
    Calculates the lattice spacing following the formular:     
    lattice_spacing = 1/2*wavelength/sin(tth/2)*np.sqrt(np.sum(hkl**2)
    
      Parameters:
    -------
    tth: np.array
        Theta two Theta angles
    wavelength: Float
        Used wavelength of the experiment
    hkl: np.array
        [h,k,l] of the plane
        
    Returns:
    -------
    lattice_spacing: float
        Calculated lattice spacing
    '''
    return  1/2*wavelength/sin(tth/2)*np.sqrt(np.sum(hkl**2))

In [38]:
def calculate_stress(peak_positions, psi_values, xray_constant, wave_length, hkl):
    '''
    Calculates the stress using the sin2PSI method
    OLS using https://www.statsmodels.org/dev/examples/notebooks/generated/ols.html
        
    Parameters:
    -------
    peak_positions: np.array
        Position of the peaks
    psi_values: np.array
        Corresponding PSI values
    xray_constant: float
        X-Ray Constant
    wave_length: float
        Used Wavelength
    hkl: np.array
        [h,k,l] of the plane
        
        
    Returns:
    -------
    _ : dict
        Dict including values for:
            stress: np.array
                calculated stress
            stress_min: np.array
                minimal stress calculated from confidence interval
            stress_max: np.array
                maximal stress calculated from confidence interval
            k: float
                sloap of the fitted line y=kx+d
            d: float
                intercept of the fitted line y=kx+d
            lattice_spacings: float
                calculated lattice spacings
            psi_values: np.array
                corresponding PSI Values     
    '''
    psi_values=np.radians(psi_values)
    peak_positions=np.radians(peak_positions)
    sin_psi=np.sin(psi_values)**2
    
    d=wave_length/(2*np.sin(peak_positions/2))
    lattice_spacings =d*np.sqrt(np.sum(hkl**2))
    
    data = pd.DataFrame({'x': list(sin_psi), 'y': list(lattice_spacings)}) #breaking change with pandas

    model = ols("y ~ x", data).fit()
    k, d = model.params[1], model.params[0]

    #print(dir(model))
    ## check for 
    prstd, iv_l, iv_u = wls_prediction_std(model)
    
    
    conf_int = model.conf_int(alpha=0.1)
    plt.plot(sin_psi, lattice_spacings, 'o')
    plt.plot(sin_psi, k*sin_psi+d)
    plt.plot(sin_psi, conf_int[0][1]*sin_psi+conf_int[0][0], '--', 'r')
    plt.plot(sin_psi, conf_int[1][1]*sin_psi+conf_int[1][0], '--', 'r')
    
    plt.plot(sin_psi, iv_l, '-.', 'r')
    plt.plot(sin_psi, iv_u, '-.', 'r')
    print(prstd)
    plt.xlabel("sin2Psi")
    plt.ylabel("lattice spacing")
    
    plt.xlim((np.min(sin_psi), np.max(sin_psi)))
    plt.ylim((np.min(lattice_spacings), np.max(lattice_spacings)))
    plt.show()
    stress=k/(xray_constant*10**(-5)*lattice_spacings[0])
    stress_min = conf_int[0][1]/(xray_constant*10**(-5)*lattice_spacings[0])
    stress_max = conf_int[1][1]/(xray_constant*10**(-5)*lattice_spacings[0])
    
    return {"stress":stress,"stress_min":stress_min, "stress_max":stress_max, "k":k, "d":d , "lattice_spacing":lattice_spacings, "psi_values":psi_values, 'sin2_psi':sin_psi}

In [39]:
def check_time(stress_data):
    '''
    Checks if time stamp changes from 24:00 to 00:00 and adds a day to still be bigger than other timestamp
    
    Parameters:
    -------
    stress_data: dict()
                Dictionary must include key "time" inside single experimental datapoint
                stress_data["experimental datapoint"]["key"]: Time_Object
                    Time_Object that needs to be checked
        
        
    Returns:
    -------
     stress_data: dict()
                Dictionary must include key "time" inside single experimental datapoint
                stress_data["experimental datapoint"]["key"]: Time_Object
                    Time_Object that was checked
    '''
    next_day=False
    keys=list(stress_data.keys()) # keys are the numbers of the stress measurements
    
    for i, key in enumerate(keys):
        if next_day:
            stress_data[key]["time"].add_day(1)
        if i > 0:
            print(stress_data[keys[i-1]]["time"].show_time())
            if stress_data[keys[i-1]]["time"].hour == 24:
                if stress_data[key]["time"].smaller_than(stress_data[keys[i-1]]["time"]):
                    print("First Timestamp")
                    stress_data[key]["time"].show_time()
                    print("Second Timestamp")
                    stress_data[keys[i-1]]["time"].show_time()
                    next_day=True
                
    return stress_data

In [40]:
def evaluate_samples(path, dat_files, spec_file, pattern, xray_constant, wave_length, hkl, peak_parameters):
    '''
    Evaluates a single stress measurement and straining experiments from KMC-II Beamline of BESSY II. Correlates .dat and spec file.
        
    Parameters:
    -------
        path: String
            Path to file
        dat_files: String
            Name of the .dat file including .dat suffix
        spec_file: String
           Name of the spec file
        pattern:
            Pattern where to find infomation in the spec_file
        xray_constant: Float
            X-Ray Constant
        wave_length: Float
            Wave Length of Experiment
        hkl: np.array
            [h,k,l] of the plane
        peak_parameters: list/np.array
            List of theta angles - Positions to fit
            The Peak is automatically found between Start_Peak and End_Peak
            Noise is calculated between Start_Noise - Start_Peak and End_Peak-End_Noise
            [Start_Noise, Start_Peak, End_Peak, End_Noise]
            
    Returns:
    -------
        single_experiment: Dictonary
                Dictonary of Single streee measurement with different calculated an measured values as keys. 
    '''
    peak_positions=[]
    all_fwhm=[]
    for file in dat_files: # read and work all .dat files
        tth, intens = read_dat_file(path, file)
        print(file) # prints file_name to console
        two_theta, max_intensity, nu, fwhm=fit_peak(tth, intens, peak_parameters[0], peak_parameters[1], peak_parameters[2], peak_parameters[3])
        peak_positions.append(two_theta[0])
        all_fwhm.append(fwhm[0])
        
        
    peak_positions=np.asarray(peak_positions) 
    psi_values, time=extract_scan_data(path, spec_file, pattern)
    
    single_stress_measurement = calculate_stress(peak_positions, psi_values, xray_constant, wave_length, hkl) # evaluate single experiment for stress
    single_stress_measurement["time"] = time #add time_column to dict
    single_stress_measurement["fwhm"] = all_fwhm
    return single_stress_measurement

In [41]:
def read_anton(path, file):
    '''
    Reads .xls Files extracted from Anton Paar Setup    
    
    Parameters:
    -------
        path: String
            Path to file
        file: String
            Filename including the suffix .xls
       
            
    Returns:
    -------
        anton: dataframe
            Data extracted from xls. file, time allready checked for experiments running over midnight 
    '''
    anton=pd.read_excel(f"{path}\\{file}", skiprows=7, names=["timestamp", "duration", "position", "displacement", "force","rel_force", "elongation", "stress"])   
    next_day=False # for experiments changing from 24:00 to 00:00
    
    time=[]
    for i, element in enumerate(anton.timestamp):
        timestamp=list(re.findall(r'\d+', str(element)))
        if len(timestamp)<7:
            timestamp.append("0")
        time.append(Time_Object(int(timestamp[3]), int(timestamp[4]), int(timestamp[5]), int(float(f"0.{timestamp[6]}")*60)))
        if next_day:
            time[i].add_day(1)
        if i > 0: # wouldn't make sense for first element
            if time[i-1] == 24:
                if time[i].smaller_than(time[i-1]): # checks if day is over
                    next_day=True
    time=np.asarray(time)
    anton["time"]=time
    return anton

In [42]:
def correlate_anton(stress_result, path_anton, file_anton):
    '''
    Correlates data of evaluated in-situ straining of KMC II at BESSY II to data collected by Anton Paar straining device.
    Adds the corresponding strains to the evaluated values.
    
    Parameters:
    -------
        stress_reslut: Dictionary
            Dictionary of the evaluated Experiment
        path_anton: String
            Path to Anton-Paar xls File
        file_anton: String
            Name of Anton-Paar file including .xls suffix
       
            
    Returns:
    -------
        Correlation Plot.
        
        anton: dataframe
            Anton Paar data
        stress_result: Dicitionary
            Evaluated experiments now also including respective elongation 
            can be called usings:
                stress_result["stress_measurement"]["elongation"]
    '''
    keys = stress_result.keys()
    anton = read_anton(path_anton, file_anton)   
    fig, ax = plt.subplots()
    
    j=0
    for key in keys:
        for i,element in enumerate(anton.time):
            if i < j:
                pass
            if stress_result[key]['time'].greater_than(element):
                pass
            else:    
                ax.plot(anton.elongation[i], stress_result[key]["stress"], 'o', c="r")
                plt.vlines(anton.elongation[i],stress_result[key]["stress_min"],stress_result[key]["stress_max"], color='black')
                stress_result[key]["elongation"]=anton.elongation[i] # adding values to the stress_result dict
                i=j
                break
    ax2 = ax.twinx()
    ax2.plot(anton.elongation, anton.force)
    ax2.set_ylabel("Force - Anton Paar")
    ax.set_ylabel("Stress - sin2psi")
    ax.set_xlabel("Elongation - Anton Paar")
    return anton, stress_result

In [43]:
def run_sample(path, spec_file, scan_start, scan_end, xray_constant, wavelength, peak, peak_params):
    '''
    Starts complete evaluation of in-situ straining experiment performed at KMC II at BESSY II.
    
    Parameters:
    -------
        path: String
            Path to folder where dat and spec files are present
        spec_file: String
            Name of the Spec_file, no suffix needed
        scan_start: int
            First .dat file Number belonging to the experiment
        scan_end: int
            Last .dat file Number belonging to the experiment
        xray_constant: float
            X-Ray Constant
        wavelength: float
            Used wavelength during the experiment
        peak: list
            [h,k,l] of the peak
        peak_params: list/np.array
            List of theta angles - Positions to fit
            The Peak is automatically found between Start_Peak and End_Peak
            Noise is calculated between Start_Noise - Start_Peak and End_Peak-End_Noise
            [Start_Noise, Start_Peak, End_Peak, End_Noise] 
       
            
    Returns:
    -------
        stress_result: Dictionary
            
    '''
    os.chdir(path)
    stress_result=dict()
    for element in range(scan_start, scan_end+1):
        if element < 10:
            files = np.asarray(glob(f'*000{element}.*.dat'))
        else:
            files = np.asarray(glob(f'*00{element}.*.dat'))
        stress_result[element] = evaluate_samples(path, files, spec_file, f"#S {element}  ascan  chi", xray_constant, wavelength, np.asarray(peak), peak_params)
    
    
    keys=stress_result.keys()
    for key in keys:
        plt.plot(stress_result[key]["sin2_psi"], stress_result[key]["lattice_spacing"], "-o")
    plt.xlabel("Sin2Psi")
    plt.ylabel("lattice spacing")
    plt.show()
    
    #stress_result=check_time(stress_result)
    return stress_result

In [44]:
def save_data(path, file_name, stress_result):
    '''
    Saves stress and elongation of anton paar
    '''
    keys = list(stress_result.keys())
    stress = []
    elongation = []
    stress_max = []
    stress_min =[]
    for key in keys:
        stress.append(stress_result[key]["stress"])
        elongation.append(stress_result[key]["elongation"])
        stress_max.append(stress_result[key]["stress_max"])
        stress_min.append(stress_result[key]["stress_min"])
    end_result=pd.DataFrame({'stress' : stress,'elongation' : elongation, "stress_max": stress_max, "stress_min": stress_min})
    end_result.to_excel(f"{path}//{file_name}_results.xlsx") 