In [0]:
%pip install azure-storage-blob

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Databricks notebook source
import pyspark.sql.functions as F
import pyspark.sql.types as T
from delta.tables import *
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType, BooleanType, TimestampType, IntegerType, StringType
import json
import pandas as pd
from azure.storage.blob import BlobServiceClient
from pyspark.sql.functions import col, split
from pyspark.sql.types import *


# COMMAND ----------

SECRET_SCOPE = "AllUsers_SecretScope"

ADLS_NAME = dbutils.secrets.get(scope=SECRET_SCOPE, key="DDIStorageName")
ADLS_FILE_SYSTEM = dbutils.secrets.get(scope=SECRET_SCOPE, key="DDIStorageFileSystemMOL")
ADLS_ACCOUNT_KEY = dbutils.secrets.get(scope=SECRET_SCOPE, key="DDIStorageAccountKey")

spark.conf.set(
    f"fs.azure.account.key.{ADLS_NAME}.dfs.core.windows.net",
    ADLS_ACCOUNT_KEY
)

DELIMITER = "|~|"

DEFAULT_NUM_PARTITIONS = sc.defaultParallelism * 4


In [0]:


# Initialize Blob Service Client
blob_service_client = BlobServiceClient(account_url=f"https://{ADLS_NAME}.blob.core.windows.net", credential=ADLS_ACCOUNT_KEY)

# Specify the container and blob (file)
container_name = ADLS_FILE_SYSTEM
blob_name = "raw/latest/obr/trip_data.json"

# Read the blob into a string
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
stream = blob_client.download_blob()
data = json.loads(stream.readall())

# Convert JSON data to a Pandas DataFrame
df_pandas = pd.DataFrame(data)

# Display the first few rows of the Pandas DataFrame
print(df_pandas.head())


   afterHoursDistance afterHoursDrivingDuration  ...  vehicle_id   trip_id
0          112.260700                  01:00:10  ...        22bf  0ab29c9c
1           53.447380           00:46:13.064000  ...       1bdf4  a91b61aa
2            1.769824           00:10:18.937000  ...        cb8a  b4c89375
3            1.513573           00:04:41.937000  ...        4b35  bba33900
4            4.376471           00:08:05.937000  ...        b48f  90c92feb

[5 rows x 28 columns]


In [0]:
# Convert Pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Display the Spark DataFrame
display(df_spark)


afterHoursDistance,afterHoursDrivingDuration,afterHoursEnd,afterHoursStart,afterHoursStopDuration,averageSpeed,distance,drivingDuration,idlingDuration,isSeatBeltOff,maximumSpeed,nextTripStart,speedRange1,speedRange1Duration,speedRange2,speedRange2Duration,speedRange3,speedRange3Duration,start,stop,stopDuration,workDistance,workDrivingDuration,workStopDuration,id,engineHours,vehicle_id,trip_id
112.2607,01:00:10,True,False,00:06:34.063000,107.45612,131.09459,01:13:11.937000,00:00:15,False,136,2019-11-16 00:06:44.063000+00:00,15,00:38:47.810999,12,00:11:53.545999,0,00:00:00,2019-11-15 22:46:58.063000+00:00,2019-11-16 00:00:10+00:00,00:06:34.063000,18.83389,00:13:01.937000,00:00:00,b902CA9C,,22bf,0ab29c9c
53.44738,00:46:13.064000,True,True,00:59:55,69.38555,53.44738,00:46:13.064000,00:00:00,False,127,2019-11-16 01:00:18.127000+00:00,5,00:10:45.068000,0,00:00:00,0,00:00:00,2019-11-15 23:14:10.063000+00:00,2019-11-16 00:00:23.127000+00:00,00:59:55,0.0,00:00:00,00:00:00,b91A61AA,,1bdf4,a91b61aa
1.7698245,00:10:18.937000,True,True,00:05:15.063000,10.294049,1.7698245,00:10:18.937000,00:00:05,False,42,2019-11-16 00:06:48.063000+00:00,0,00:00:00,0,00:00:00,0,00:00:00,2019-11-15 23:51:14.063000+00:00,2019-11-16 00:01:33+00:00,00:05:15.063000,0.0,00:00:00,00:00:00,b9473C85,,cb8a,b4c89375
1.5135734,00:04:41.937000,True,True,1.18:28:20.1269999,19.32653,1.5135734,00:04:41.937000,00:00:02,False,55,2019-11-17 18:30:02.126000+00:00,0,00:00:00,0,00:00:00,0,00:00:00,2019-11-15 23:57:00.063000+00:00,2019-11-16 00:01:42+00:00,1.18:28:20.1269999,0.0,00:00:00,00:00:00,b903A03B,,4b35,bba33900
4.376471,00:08:05.937000,True,True,00:23:29.062999,32.42251,4.376471,00:08:05.937000,00:00:11,False,70,2019-11-16 00:25:13.062000+00:00,0,00:00:00,0,00:00:00,0,00:00:00,2019-11-15 23:53:38.063000+00:00,2019-11-16 00:01:44+00:00,00:23:29.062999,0.0,00:00:00,00:00:00,b902CEF9,,b48f,90c92feb
99.907326,01:02:17,True,False,00:29:59.063000,90.15359,105.80367,01:10:24.937000,00:00:02,False,105,2019-11-16 00:32:16.063000+00:00,0,00:00:00,0,00:00:00,0,00:00:00,2019-11-15 22:51:52.063000+00:00,2019-11-16 00:02:17+00:00,00:29:59.063000,5.8963485,00:08:07.937000,00:00:00,b91C2700,,abcb,217b0c90
9.247755,00:18:22.937000,True,True,04:31:05.063000,30.184786,9.247755,00:18:22.937000,00:00:08,False,55,2019-11-16 04:34:20.063000+00:00,0,00:00:00,0,00:00:00,0,00:00:00,2019-11-15 23:44:52.063000+00:00,2019-11-16 00:03:15+00:00,04:31:05.063000,0.0,00:00:00,00:00:00,b902F281,,b3751,8f29102b
4.836725,00:07:04.937000,True,True,00:05:08,40.975983,4.836725,00:07:04.937000,00:05:08,False,125,2019-11-16 00:08:47+00:00,1,00:00:26.630000,0,00:00:00,0,00:00:00,2019-11-15 23:56:34.063000+00:00,2019-11-16 00:03:39+00:00,00:05:08,0.0,00:00:00,00:00:00,b902CAC3,,3bab,3b2a0cc9
32.09207,00:27:40.937000,True,True,01:03:46.063000,69.558,32.09207,00:27:40.937000,00:00:31,False,122,2019-11-16 01:07:36.063000+00:00,3,00:07:04.157999,0,00:00:00,0,00:00:00,2019-11-15 23:36:09.063000+00:00,2019-11-16 00:03:50+00:00,01:03:46.063000,0.0,00:00:00,00:00:00,b91609D2,,dbf,b9690d21
3.5066507,00:12:35,True,True,10:55:12.062999,16.720453,3.5066507,00:12:35,00:00:08,False,57,2019-11-16 10:59:12.062000+00:00,0,00:00:00,0,00:00:00,0,00:00:00,2019-11-15 23:51:25+00:00,2019-11-16 00:04:00+00:00,10:55:12.062999,0.0,00:00:00,00:00:00,b9030040,,21b,00043b90


In [0]:


# Function to convert HH:MM:SS to seconds for specified columns in a DataFrame
def apply_duration_conversion(df, duration_columns):
    """Convert specified duration columns from HH:MM:SS to total seconds."""
    def convert_duration_to_seconds(col_name):
        return (F.split(F.col(col_name), ':')[0].cast('int') * 3600 +  # hours to seconds
                F.split(F.col(col_name), ':')[1].cast('int') * 60 +   # minutes to seconds
                F.split(F.col(col_name), ':')[2].cast('int'))         # seconds

    for col_name in duration_columns:
        df = df.withColumn(col_name, convert_duration_to_seconds(col_name))
    return df

# Example usage
# List of columns ending with 'Duration'
duration_columns = [c for c in df_spark.columns if c.endswith("Duration")]

# Apply the conversion to the duration columns
df_spark = apply_duration_conversion(df_spark, duration_columns)

# Display the updated DataFrame
display(df_spark)


afterHoursDistance,afterHoursDrivingDuration,afterHoursEnd,afterHoursStart,afterHoursStopDuration,averageSpeed,distance,drivingDuration,idlingDuration,isSeatBeltOff,maximumSpeed,nextTripStart,speedRange1,speedRange1Duration,speedRange2,speedRange2Duration,speedRange3,speedRange3Duration,start,stop,stopDuration,workDistance,workDrivingDuration,workStopDuration,id,engineHours,vehicle_id,trip_id
112.2607,,True,False,,107.45612,131.09459,,,False,136,2019-11-16 00:06:44.063000+00:00,15,,12,,0,,2019-11-15 22:46:58.063000+00:00,2019-11-16 00:00:10+00:00,,18.83389,,,b902CA9C,,22bf,0ab29c9c
53.44738,,True,True,,69.38555,53.44738,,,False,127,2019-11-16 01:00:18.127000+00:00,5,,0,,0,,2019-11-15 23:14:10.063000+00:00,2019-11-16 00:00:23.127000+00:00,,0.0,,,b91A61AA,,1bdf4,a91b61aa
1.7698245,,True,True,,10.294049,1.7698245,,,False,42,2019-11-16 00:06:48.063000+00:00,0,,0,,0,,2019-11-15 23:51:14.063000+00:00,2019-11-16 00:01:33+00:00,,0.0,,,b9473C85,,cb8a,b4c89375
1.5135734,,True,True,,19.32653,1.5135734,,,False,55,2019-11-17 18:30:02.126000+00:00,0,,0,,0,,2019-11-15 23:57:00.063000+00:00,2019-11-16 00:01:42+00:00,,0.0,,,b903A03B,,4b35,bba33900
4.376471,,True,True,,32.42251,4.376471,,,False,70,2019-11-16 00:25:13.062000+00:00,0,,0,,0,,2019-11-15 23:53:38.063000+00:00,2019-11-16 00:01:44+00:00,,0.0,,,b902CEF9,,b48f,90c92feb
99.907326,,True,False,,90.15359,105.80367,,,False,105,2019-11-16 00:32:16.063000+00:00,0,,0,,0,,2019-11-15 22:51:52.063000+00:00,2019-11-16 00:02:17+00:00,,5.8963485,,,b91C2700,,abcb,217b0c90
9.247755,,True,True,,30.184786,9.247755,,,False,55,2019-11-16 04:34:20.063000+00:00,0,,0,,0,,2019-11-15 23:44:52.063000+00:00,2019-11-16 00:03:15+00:00,,0.0,,,b902F281,,b3751,8f29102b
4.836725,,True,True,,40.975983,4.836725,,,False,125,2019-11-16 00:08:47+00:00,1,,0,,0,,2019-11-15 23:56:34.063000+00:00,2019-11-16 00:03:39+00:00,,0.0,,,b902CAC3,,3bab,3b2a0cc9
32.09207,,True,True,,69.558,32.09207,,,False,122,2019-11-16 01:07:36.063000+00:00,3,,0,,0,,2019-11-15 23:36:09.063000+00:00,2019-11-16 00:03:50+00:00,,0.0,,,b91609D2,,dbf,b9690d21
3.5066507,,True,True,,16.720453,3.5066507,,,False,57,2019-11-16 10:59:12.062000+00:00,0,,0,,0,,2019-11-15 23:51:25+00:00,2019-11-16 00:04:00+00:00,,0.0,,,b9030040,,21b,00043b90


In [0]:


# Define the schema with all columns and comments
schema = StructType([
    StructField("afterHoursDistance", DoubleType(), True, metadata={'comment': 'Distance covered after hours'}),
    StructField("afterHoursDrivingDuration", IntegerType(), True, metadata={'comment': 'Driving duration after hours in seconds'}),
    StructField("afterHoursEnd", BooleanType(), True, metadata={'comment': 'Indicates if the trip ended after hours'}),
    StructField("afterHoursStart", BooleanType(), True, metadata={'comment': 'Indicates if the trip started after hours'}),
    StructField("afterHoursStopDuration", IntegerType(), True, metadata={'comment': 'Stop duration after hours in seconds'}),
    StructField("averageSpeed", DoubleType(), True, metadata={'comment': 'Average speed during the trip'}),
    StructField("distance", DoubleType(), True, metadata={'comment': 'Total distance covered during the trip'}),
    StructField("drivingDuration", IntegerType(), True, metadata={'comment': 'Total driving duration in seconds'}),
    StructField("idlingDuration", IntegerType(), True, metadata={'comment': 'Total idling duration in seconds'}),
    StructField("isSeatBeltOff", BooleanType(), True, metadata={'comment': 'Indicates if the seatbelt was off'}),
    StructField("maximumSpeed", LongType(), True, metadata={'comment': 'Maximum speed recorded during the trip'}),
    StructField("nextTripStart", StringType(), True, metadata={'comment': 'Start time of the next trip'}),
    StructField("speedRange1", LongType(), True, metadata={'comment': 'Speed range 1 value'}),
    StructField("speedRange1Duration", IntegerType(), True, metadata={'comment': 'Duration for speed range 1 in seconds'}),
    StructField("speedRange2", LongType(), True, metadata={'comment': 'Speed range 2 value'}),
    StructField("speedRange2Duration", IntegerType(), True, metadata={'comment': 'Duration for speed range 2 in seconds'}),
    StructField("speedRange3", LongType(), True, metadata={'comment': 'Speed range 3 value'}),
    StructField("speedRange3Duration", IntegerType(), True, metadata={'comment': 'Duration for speed range 3 in seconds'}),
    StructField("start", StringType(), True, metadata={'comment': 'Start time of the trip'}),
    StructField("stop", StringType(), True, metadata={'comment': 'Stop time of the trip'}),
    StructField("stopDuration", IntegerType(), True, metadata={'comment': 'Total stop duration in seconds'}),
    StructField("workDistance", DoubleType(), True, metadata={'comment': 'Distance covered during work hours'}),
    StructField("workDrivingDuration", IntegerType(), True, metadata={'comment': 'Driving duration during work hours in seconds'}),
    StructField("workStopDuration", IntegerType(), True, metadata={'comment': 'Stop duration during work hours'}),
    StructField("id", StringType(), True, metadata={'comment': 'Unique identifier for the trip'}),
    StructField("engineHours", DoubleType(), True, metadata={'comment': 'Total engine hours used'}),
    StructField("vehicle_id", StringType(), True, metadata={'comment': 'Identifier for the vehicle'}),
    StructField("trip_id", StringType(), True, metadata={'comment': 'Identifier for the trip'}),
])

# Create the new DataFrame with the defined schema
df_transformed = spark.createDataFrame(df_spark.rdd, schema)

# Display the transformed DataFrame
display(df_transformed)


afterHoursDistance,afterHoursDrivingDuration,afterHoursEnd,afterHoursStart,afterHoursStopDuration,averageSpeed,distance,drivingDuration,idlingDuration,isSeatBeltOff,maximumSpeed,nextTripStart,speedRange1,speedRange1Duration,speedRange2,speedRange2Duration,speedRange3,speedRange3Duration,start,stop,stopDuration,workDistance,workDrivingDuration,workStopDuration,id,engineHours,vehicle_id,trip_id
112.2607,3610,True,False,394,107.45612,131.09459,4391,15,False,136,2019-11-16 00:06:44.063000+00:00,15,2327,12,713,0,0,2019-11-15 22:46:58.063000+00:00,2019-11-16 00:00:10+00:00,394,18.83389,781,0,b902CA9C,,22bf,0ab29c9c
53.44738,2773,True,True,3595,69.38555,53.44738,2773,0,False,127,2019-11-16 01:00:18.127000+00:00,5,645,0,0,0,0,2019-11-15 23:14:10.063000+00:00,2019-11-16 00:00:23.127000+00:00,3595,0.0,0,0,b91A61AA,,1bdf4,a91b61aa
1.7698245,618,True,True,315,10.294049,1.7698245,618,5,False,42,2019-11-16 00:06:48.063000+00:00,0,0,0,0,0,0,2019-11-15 23:51:14.063000+00:00,2019-11-16 00:01:33+00:00,315,0.0,0,0,b9473C85,,cb8a,b4c89375
1.5135734,281,True,True,5300,19.32653,1.5135734,281,2,False,55,2019-11-17 18:30:02.126000+00:00,0,0,0,0,0,0,2019-11-15 23:57:00.063000+00:00,2019-11-16 00:01:42+00:00,5300,0.0,0,0,b903A03B,,4b35,bba33900
4.376471,485,True,True,1409,32.42251,4.376471,485,11,False,70,2019-11-16 00:25:13.062000+00:00,0,0,0,0,0,0,2019-11-15 23:53:38.063000+00:00,2019-11-16 00:01:44+00:00,1409,0.0,0,0,b902CEF9,,b48f,90c92feb
99.907326,3737,True,False,1799,90.15359,105.80367,4224,2,False,105,2019-11-16 00:32:16.063000+00:00,0,0,0,0,0,0,2019-11-15 22:51:52.063000+00:00,2019-11-16 00:02:17+00:00,1799,5.8963485,487,0,b91C2700,,abcb,217b0c90
9.247755,1102,True,True,16265,30.184786,9.247755,1102,8,False,55,2019-11-16 04:34:20.063000+00:00,0,0,0,0,0,0,2019-11-15 23:44:52.063000+00:00,2019-11-16 00:03:15+00:00,16265,0.0,0,0,b902F281,,b3751,8f29102b
4.836725,424,True,True,308,40.975983,4.836725,424,308,False,125,2019-11-16 00:08:47+00:00,1,26,0,0,0,0,2019-11-15 23:56:34.063000+00:00,2019-11-16 00:03:39+00:00,308,0.0,0,0,b902CAC3,,3bab,3b2a0cc9
32.09207,1660,True,True,3826,69.558,32.09207,1660,31,False,122,2019-11-16 01:07:36.063000+00:00,3,424,0,0,0,0,2019-11-15 23:36:09.063000+00:00,2019-11-16 00:03:50+00:00,3826,0.0,0,0,b91609D2,,dbf,b9690d21
3.5066507,755,True,True,39312,16.720453,3.5066507,755,8,False,57,2019-11-16 10:59:12.062000+00:00,0,0,0,0,0,0,2019-11-15 23:51:25+00:00,2019-11-16 00:04:00+00:00,39312,0.0,0,0,b9030040,,21b,00043b90


In [0]:

# Unit Testing Function
def test_apply_duration_conversion():
    # Create a test DataFrame with example duration values
    data = [("00:00:00", "00:01:30", "01:00:00"), 
            ("01:30:15", "02:00:00", "00:45:30")]
    columns = ["duration1", "duration2", "duration3"]
    df_test = spark.createDataFrame(data, columns)

    # List of columns to convert
    duration_columns = ["duration1", "duration2", "duration3"]

    # Apply the duration conversion
    df_converted = apply_duration_conversion(df_test, duration_columns)

    # Collect results to check the correctness
    results = df_converted.collect()

    # Expected results
    expected_results = [
        (0, 90, 3600),    # 00:00:00 => 0, 00:01:30 => 90, 01:00:00 => 3600
        (5415, 7200, 2730) # 01:30:15 => 5415, 02:00:00 => 7200, 00:45:30 => 2730
    ]

    # Check if results match expected values
    for result, expected in zip(results, expected_results):
        for r, e in zip(result, expected):
            assert r == e, f"Expected {e} but got {r}"

# Run the unit test
test_apply_duration_conversion()
print("All tests passed!")

All tests passed!
