In [None]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark import SparkContext, SparkConf
import datetime
from functools import reduce
import functools 
import pytz
import argparse
import boto3
import sagemaker_pyspark
import botocore.session

session = botocore.session.get_session()
credentials = session.get_credentials()

conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(sagemaker_pyspark.classpath_jars())))


spark = (SparkSession.builder\
         .config(conf=conf)\
         .config('fs.s3a.access.key', credentials.access_key)\
         .config('fs.s3a.secret.key', credentials.secret_key)\
         .appName('HRI')\
         .config("spark.executor.memory", "70g")\
         .config("spark.driver.memory", "50g")\
         .config("spark.memory.offHeap.enabled",True)\
         .config("spark.memory.offHeap.size","16g")\
         .getOrCreate())

df_input = spark.read.csv("contour-export-22-02-2022.csv", inferSchema = True, header = True)

In [None]:
def prozent_taktzeit(df):    
    df = df.groupBy('dashboard_name', 'granularity', 'date', 'coalesce_process_step_code', 'time_start', 'time_end', 'part_id', 'part_type')\
           .agg((((F.count(when(F.col('time_diff') < 60, True)))-(F.count(when(F.col('time_diff') == 0, True))))/((F.count(when(F.col('time_diff') > 1, True))))).alias("prozent_taktzeit"))
    return df

def median_taktzeit(df):
    df = df.withColumn("median",when(F.col('time_diff') != 0, (F.col('time_diff'))))
    magic_percentile = F.expr("percentile_approx(median, 0.5)")
    df = df.groupBy('dashboard_name', 'granularity', 'date', 'coalesce_process_step_code', 'time_start', 'time_end', 'part_id', 'part_type').agg(magic_percentile.alias('median_taktzeit')).drop("median")
    return df

def half_hour_cycle_time(df):
    df = df.withColumn("date_convert", F.to_date("UTC", "yyyy-MM-dd"))
    df = df.withColumn("date", F.to_timestamp("date_convert").cast('string')).drop('date_convert')
    df = df.withColumn('hour', hour(df.time))
    df = df.withColumn('minute', minute(df.time))
    df = df.orderBy(F.col('UTC').desc())
    df = df.withColumn('granularity', lit('30min'))
    # add start time half hour 
    df = df.withColumn('time_start',
                      when((F.col('minute')>=0) & (F.col('minute')<30), (F.date_format((F.unix_timestamp('time', 'HH:mm')).cast('timestamp'), 'HH:00:00')))\
                     .when((F.col('minute')>=30) & (F.col('minute')<0), (F.date_format((F.unix_timestamp('time', 'HH:mm')).cast('timestamp'), 'HH:30:00')))\
                     .otherwise(F.date_format((F.unix_timestamp('time', 'HH:mm')).cast('timestamp'), 'HH:30:00')))
    # add time end half hour
    df = df.withColumn('time_30_min', F.col('time_start') + F.expr('INTERVAL 30 MINUTES'))
    df = df.withColumn("time_end", date_format("time_30_min", 'HH:mm:ss')).drop("time_30_min", "hour", "minute")
    return df

def daily_cycle_time(df):
    df = df.withColumn("date_new", F.to_date("UTC", "yyyy-MM-dd"))
    df = df.sort(df.measurement_datetime.desc())
    df = df.withColumn("day_sub", date_add(date_format('date_new', 'yyyy-MM-dd'),-1))
    df = df.withColumn('date_convert', 
                      when((F.col("time")>="00:00:00") & (F.col("time")<"04:55:00"),F.col("day_sub")) \
                     .when((F.col("time")>="04:55:00") & (F.col("time")<"00:00:00"), F.col("date_new")) \
                     .otherwise(F.col("date_new")))
    df = df.withColumn("date", F.to_timestamp("date_convert").cast('string')).drop('date_convert')
    # add time end half hour
    df = df.withColumn('granularity', lit('daily_4_55hmm'))
    df = df.withColumn('time_start_to_timestamp', F.to_timestamp(lit('04:55:00'), 'HH:mm:ss'))
    df = df.withColumn("time_start", date_format("time_start_to_timestamp", 'HH:mm:ss'))
    df = df.withColumn('time_end_to_timestamp', F.to_timestamp(lit('04:55:00'), 'HH:mm:ss'))
    df = df.withColumn("time_end", date_format("time_end_to_timestamp", 'HH:mm:ss')).drop("date_new", "day_sub", "time_start_to_timestamp", "time_end_to_timestamp")
    return df

def part_cycle_time(df):
    # add the granularity part for dynamic takt calc
    df = df.withColumn("date", F.to_date("UTC", "yyyy-MM-dd"))
    df = df.withColumn('granularity', lit('AFO'))
    w_part = Window.partitionBy('coalesce_process_step_code').orderBy(F.col('UTC').desc())
    df = df.withColumn('time_start', date_format(F.lead("UTC",10).over(w_part), 'HH:mm:ss'))
    df = df.withColumn('time_end', date_format(F.col("UTC"), 'HH:mm:ss'))
    return df

def unionAll(dfs):
    return functools.reduce(lambda df_daily_455,df_half_hour: df_daily_455.union(df_half_hour.select(df_daily_455.columns)), dfs) 

def join_col(df1, df2, condition, how='inner'):
    df = df1.join(df2, condition, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df
