In [1]:
import findspark
findspark.init("/opt/spark")

In [2]:
from pyspark.sql import * 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder \
    .appName("finding_orders") \
    .master("local[2]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2ac662d2-617a-4e9a-a21c-84a70d561bff;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 1059ms :: artifacts dl 13ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0

In [4]:
df = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv("finding_order.csv")

                                                                                

In [5]:
df.show(50)

+--------+-------------+-----------+-----------+---------+
|ORDER_ID|SUBSCRIBER_ID|STATUS_DATE|STATUS_TIME|   STATUS|
+--------+-------------+-----------+-----------+---------+
|  100159|       200427|   20230223|      83209| ASSIGNED|
|  100159|       200427|   20230223|      74232| RETURNED|
|  100159|       200427|   20230222|      95056|  CREATED|
|  100410|       200366|   20230223|      91017| ASSIGNED|
|  100410|       200366|   20230223|      30301| RETURNED|
|  100410|       200366|   20230222|      93638|  CREATED|
|  100497|       200024|   20230222|     105418|     POOL|
|  100497|       200024|   20230222|     105418|  CREATED|
|  100539|       200012|   20230222|     112855|COMPLETED|
|  100539|       200012|   20230222|      95408|  CREATED|
|  100575|       200573|   20230223|      85951| ASSIGNED|
|  100575|       200573|   20230223|      41932| RETURNED|
|  100575|       200573|   20230222|     105441|  CREATED|
|  100259|       200192|   20230223|      83115| ASSIGNE

## First try

In [6]:
df.createOrReplaceTempView("table")

In [7]:
sqldf = spark.sql("""
SELECT 
    ORDER_ID,
    SUBSCRIBER_ID,
    STATUS,
    CONCAT(DATE_FORMAT(TO_DATE(CAST(STATUS_DATE AS STRING), 'yyyyMMdd'), 'yyyy-MM-dd'),
        ' ', FROM_UNIXTIME(STATUS_TIME, 'HH:mm:ss')) AS START_DATE
FROM table 
""")

sqldf.show(5)

+--------+-------------+--------+-------------------+
|ORDER_ID|SUBSCRIBER_ID|  STATUS|         START_DATE|
+--------+-------------+--------+-------------------+
|  100159|       200427|ASSIGNED|2023-02-23 23:06:49|
|  100159|       200427|RETURNED|2023-02-23 20:37:12|
|  100159|       200427| CREATED|2023-02-22 02:24:16|
|  100410|       200366|ASSIGNED|2023-02-23 01:16:57|
|  100410|       200366|RETURNED|2023-02-23 08:25:01|
+--------+-------------+--------+-------------------+
only showing top 5 rows



In [8]:
sqldf.createOrReplaceTempView("table1")

In [9]:
sqldf2 = spark.sql("""
WITH cap AS (
  SELECT
    ORDER_ID,
    SUBSCRIBER_ID,
    STATUS,
    CASE
        WHEN STATUS IN ('CREATED', 'POOL') THEN START_DATE
    END AS START_DATE,
    CASE
        WHEN STATUS IN ('COMPLETED', 'CANCELLED') THEN START_DATE
    END AS END_DATE
  FROM table1
)
SELECT 
    ORDER_ID,
    SUBSCRIBER_ID,
    MIN(STATUS) AS STATUS,
    MIN(START_DATE) AS START_DATE,
    MAX(END_DATE) AS END_DATE,
    CASE
        WHEN MAX(END_DATE) IS NOT NULL THEN
            CONCAT(
                TIMESTAMPDIFF(HOUR, MAX(START_DATE), MAX(END_DATE)), '.',
                LPAD(TIMESTAMPDIFF(MINUTE, MAX(START_DATE), MAX(END_DATE)) % 60, 2, '0')
                  )
        ELSE 0
    END AS DURATION
FROM cap
GROUP BY ORDER_ID, SUBSCRIBER_ID
""")

sqldf2.show(25)

[Stage 6:>                                                          (0 + 1) / 1]

+--------+-------------+---------+-------------------+-------------------+--------+
|ORDER_ID|SUBSCRIBER_ID|   STATUS|         START_DATE|           END_DATE|DURATION|
+--------+-------------+---------+-------------------+-------------------+--------+
|  100001|       200574| ASSIGNED|2023-02-22 02:28:26|               null|       0|
|  100002|       200121|  CREATED|2023-02-22 02:01:52|               null|       0|
|  100003|       200432| ASSIGNED|2023-02-22 02:06:41|               null|       0|
|  100004|       200234|COMPLETED|2023-02-22 02:02:38|2023-02-22 05:00:26|    2.57|
|  100005|       200546|  CREATED|2023-02-22 02:18:47|               null|       0|
|  100006|       200369| ASSIGNED|2023-02-22 05:16:53|               null|       0|
|  100007|       200486| ASSIGNED|2023-02-22 05:17:28|               null|       0|
|  100008|       200190| ASSIGNED|2023-02-22 02:12:11|               null|       0|
|  100009|       200058|COMPLETED|2023-02-22 02:22:31|2023-02-22 06:19:08|  

                                                                                