In [None]:
import pyspark
from pyspark import SparkContext as sc, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window, DataFrame
from pyspark.sql.types import IntegerType, StringType, LongType, FloatType, DoubleType, StructType, StructField, ArrayType
from pyspark.sql import SQLContext
from functools import reduce
from pyspark.ml.feature import StandardScaler, MinMaxScaler, VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline

import functools 
import pytz
import os
import glob
import sys
from operator import attrgetter

import argparse
import boto3
import sagemaker_pyspark
import botocore.session
import seaborn as sns
from sagemaker import get_execution_role
import json
import math
import random

session = botocore.session.get_session()
credentials = session.get_credentials()

conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars())))


spark = (SparkSession.builder\
         .config(conf=conf)\
         .config('fs.s3a.access.key', credentials.access_key)\
         .config('fs.s3a.secret.key', credentials.secret_key)\
         .appName('HRI')\
         .config("spark.executor.memory", "70g")\
         .config("spark.driver.memory", "50g")\
         .config("spark.memory.offHeap.enabled",True)\
         .config("spark.memory.offHeap.size","16g")\
         .getOrCreate())

In [None]:
# Interpolation

max_ordnung = df2.agg({'ordnung': 'max'}).collect()[0].__getitem__('max(ordnung)')
min_ordnung = df2.agg({'ordnung': 'min'}).collect()[0].__getitem__('min(ordnung)')
step_size = round(((max_ordnung-min_ordnung)/(count_wert-1)),2)

def ordnung_range(t1, t2, step=0.01):
    """Return a list of equally spaced points between t1 and t2 with stepsize step."""
    return np.round(np.arange(t1, t2, step) ,4).tolist()

# define udf
ordnung_range_udf = F.udf(ordnung_range, ArrayType(FloatType()))

df_base_max  = df2.groupBy('DMC', 'spindle', 'processstep', 'TimeStamp').agg({"ordnung": "max"})
df_base_min  = df2.groupBy('DMC', 'spindle', 'processstep', 'TimeStamp').agg({"ordnung": "min"})
df_base  = df_base_max.join(df_base_min, on = ['DMC', 'spindle', 'processstep', 'TimeStamp'], how = 'leftouter')

# # generate ordnung-grid and explode

df_base = df_base.withColumn("ordnung", F.explode(ordnung_range_udf("min(ordnung)", "max(ordnung)")))
df2 = df2.withColumn('ordnung_copy', F.col('ordnung'))
df_all_ord = df_base.join(df2, ['DMC', 'spindle', 'processstep', 'ordnung', 'TimeStamp'], "outer")


window_ff = Window.partitionBy('DMC', 'spindle', 'processstep', 'TimeStamp')\
               .orderBy('ordnung')\
               .rowsBetween(-sys.maxsize, 0)
               
window_bf = Window.partitionBy('DMC', 'spindle', 'processstep', 'TimeStamp')\
               .orderBy('ordnung')\
               .rowsBetween(0, sys.maxsize)

# create the series containing the filled values
value_last = F.last(df_all_ord['Value'], ignorenulls=True).over(window_ff)
ordnung_last = F.last(df_all_ord['ordnung_copy'], ignorenulls=True).over(window_ff)

value_next = F.first(df_all_ord['Value'], ignorenulls=True).over(window_bf)
readtime_next = F.first(df_all_ord['ordnung_copy'], ignorenulls=True).over(window_bf)

# add the columns to the dataframe
df_filled = df_all_ord.withColumn('Value_ff', value_last)\
                      .withColumn('ordnung_ff', ordnung_last)\
                      .withColumn('Value_bf', value_next)\
                      .withColumn('ordnung_bf', readtime_next)

# define interpolation function
def interpol(x, x_prev, x_next, y_prev, y_next, y):
    if x_prev == x_next:
        return y
    else:
        m = (y_next-y_prev)/(x_next-x_prev)
        y_interpol = y_prev + m * (x - x_prev)
        return y_interpol
    
interpol_udf = F.udf(interpol, FloatType())  

# add interpolated columns to dataframe and clean up

df_filled = df_filled.withColumn('value_interpol', F.when(F.col('ordnung_ff') == F.col('ordnung_bf'), F.col('Value'))\
                                                   .otherwise((((((F.col('Value_bf')-F.col('Value_ff'))/(F.col('ordnung_bf')-F.col('ordnung_ff')))*(F.col('ordnung')-F.col('ordnung_ff')))+F.col('Value_ff')))))
