In [1]:
import os
spark_home = "/opt/spark"
os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook"
os.environ["SPARK_HOME"] = spark_home
os.environ["SPARK_CLASSPATH"] = "/opt/spark/jars/sqljdbc4.jar"

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("DateTimeLapse_Spark") \
    .getOrCreate()

In [2]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
import traceback

# from pyspark.sql import SQLContext
# sqlContext = pyspark.SQLContext(spark)

In [3]:
def DateTimeLapse_Spark (*, dataframe, end_col, end_format, start_col, start_format) :
    
    """Calculates the difference of days and time lapsed between two datetime or timestamp columns

    Note:
        Specify formats according to datetime pattern
        (https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html)

    Args:
        dataframe(DataFrame of str): A DataFrame consisting any two datetime or timestamp columns
        end_col  (Column): A Column of datetime or timestamp datatype
        end_format(str)  : Format of end_col
        start_col(Column): Another Column of datetime or timestamp datatype
        start_format(str): Format of start_col

    Returns:
        DataFrame of (str, str, double, double, double, bigint): A DataFrame containing start_col, end_col, 'diff_days', 'diff_hrs', 'diff_mins' and 'diff_secs' columns
    """
    
    try:
        df = dataframe
        end_col = str(end_col)
        end_format = str(end_format)
        start_col = str(start_col)
        start_format = str(start_format)
        
        spec_format = "yyyy-MM-dd HH:mm:ss"
        
        df = df.withColumn('endCol', when(to_date(col(end_col),end_format).isNotNull(), date_format(to_date(col(end_col),end_format), spec_format)))
        df = df.withColumn('startCol', when(to_date(col(start_col),start_format).isNotNull(), date_format(to_date(col(start_col),start_format), spec_format)))
        # df = df.withColumn('diff_days', datediff(col(end_col), col(start_col)))
        
        df = df.withColumn(start_col+'_unix', unix_timestamp(col('startCol'), format=spec_format))
        df = df.withColumn(end_col+'_unix', unix_timestamp(col('endCol'), format=spec_format))
        diff_secs_col = col(end_col+'_unix') - col(start_col+'_unix')
        df = df.withColumn("diff_days", diff_secs_col/(3600*24))
        df = df.withColumn("diff_hrs", diff_secs_col/3600)
        df = df.withColumn("diff_mins", diff_secs_col/60)
        df = df.withColumn("diff_secs", diff_secs_col)
        df = df.drop(end_col+'_unix',start_col+'_unix','startCol','endCol')
        
        return df
        
    except Exception as e:
        traceback.print_exc()
        return str(e)

In [6]:
# Loading spark dataframe as 'df'
df_path = "/home/hduser/SVM_Projects/Datasets/core_dataset.csv"
df = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(df_path)
df = df.select('Employee Name','DOB','Date of Hire')
df.show(5)

+--------------------+----------+------------+
|       Employee Name|       DOB|Date of Hire|
+--------------------+----------+------------+
|          Brown, Mia|11/24/1985|  10/27/2008|
|LaRotonda, William  | 4/26/1984|    1/6/2014|
|    Steans, Tyrone  |  9/1/1986|   9/29/2014|
|     Howard, Estelle| 9/16/1985|   2/16/2015|
|         Singh, Nan | 5/19/1988|    5/1/2015|
+--------------------+----------+------------+
only showing top 5 rows



In [7]:
dff = DateTimeLapse_Spark(dataframe=df,
                          end_col="Date of Hire",end_format="MM/dd/yyyy",
                          start_col="DOB",start_format="MM/dd/yyyy")
dff.show(5)
dff.dtypes

+--------------------+----------+------------+---------+--------+----------+---------+
|       Employee Name|       DOB|Date of Hire|diff_days|diff_hrs| diff_mins|diff_secs|
+--------------------+----------+------------+---------+--------+----------+---------+
|          Brown, Mia|11/24/1985|  10/27/2008|   8373.0|200952.0|1.205712E7|723427200|
|LaRotonda, William  | 4/26/1984|    1/6/2014|  10847.0|260328.0|1.561968E7|937180800|
|    Steans, Tyrone  |  9/1/1986|   9/29/2014|  10255.0|246120.0| 1.47672E7|886032000|
|     Howard, Estelle| 9/16/1985|   2/16/2015|  10745.0|257880.0| 1.54728E7|928368000|
|         Singh, Nan | 5/19/1988|    5/1/2015|   9843.0|236232.0|1.417392E7|850435200|
+--------------------+----------+------------+---------+--------+----------+---------+
only showing top 5 rows



[('Employee Name', 'string'),
 ('DOB', 'string'),
 ('Date of Hire', 'string'),
 ('diff_days', 'double'),
 ('diff_hrs', 'double'),
 ('diff_mins', 'double'),
 ('diff_secs', 'bigint')]