In [0]:
import os
from pyspark.sql import SparkSession

In [0]:
event_hub_connection_string = "XXX"
encrypted_conection_string = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(event_hub_connection_string)
#https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration

In [0]:
#load data to bronze table
checkopoints_path="dbfs:/user/checkpoints/"
checkopoint_location=checkopoints_path + "ingest"

ingest = spark.readStream.format("eventhubs") \
    .option("eventhubs.connectionString", encrypted_conection_string) \
    .option("eventhubs.consumerGroup", "databricks1") \
    .option("checkpointLocation", checkopoint_location) \
    .load()


from pyspark.sql.functions import col
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

json_schema = StructType([
    StructField("_id", IntegerType()),
    StructField("Nr_Boczny", IntegerType()),
    StructField("Nr_Rej", StringType()),
    StructField("Brygada", StringType()),
    StructField("Nazwa_Linii", StringType()),
    StructField("Ostatnia_Pozycja_Szerokosc", DoubleType()),
    StructField("Ostatnia_Pozycja_Dlugosc", DoubleType()),
    StructField("Data_Aktualizacji", TimestampType())
])
#deal with json
bronze=(ingest.select(col("body").cast("string"))
        .withColumn("json_data", from_json("body", json_schema))
        .select(col("json_data.*"))
        )

#save to table
checkopoints_path="dbfs:/user/checkpoints/"
checkopoint_location=checkopoints_path + "bronze"
bronze = (bronze.withColumnRenamed("Ostatnia_Pozycja_Szerokosc", "X").withColumnRenamed("Ostatnia_Pozycja_Dlugosc", "Y"))
bronze.writeStream.format("delta").outputMode("append").option("checkpointLocation", checkopoint_location).toTable("bronze")

<pyspark.sql.streaming.query.StreamingQuery at 0x7fb831cdbbb0>

In [0]:
from pyspark.sql.functions import max
from pyspark.sql.functions import sqrt, pow, unix_timestamp, datediff



schema = StructType([
    StructField("_id", IntegerType()),
    StructField("Nr_Boczny", IntegerType()),
    StructField("Nr_Rej", StringType()),
    StructField("Brygada", StringType()),
    StructField("Nazwa_Linii", StringType()),
    StructField("X", DoubleType()),
    StructField("Y", DoubleType()),
    StructField("Last_X", DoubleType()),
    StructField("Last_Y", DoubleType()),
    StructField("Data_Aktualizacji", TimestampType()),
    StructField("Last_Data_Aktualizacji", TimestampType()),
])
speed_df = spark.createDataFrame([], schema=schema)

speed_df.write.format("delta").mode("append").saveAsTable("speed_table")

In [0]:
checkpoints_path="dbfs:/user/checkpoints/"
checkpoint_location=checkopoints_path + "update_stream1"



temp=(spark.readStream
              .table("bronze")
              .withWatermark("Data_Aktualizacji", "5 minutes")
              .groupBy("_id")
              .agg(max("Data_Aktualizacji").alias("max_Data_Aktualizacji"))
              .writeStream
              .format("delta")
              .outputMode("complete")
              .option("checkpointLocation", checkpoint_location)
              .toTable("newest_Data_Aktualizacji")
              )


In [0]:




newest_complete=(spark.readStream
              .table("newest_Data_Aktualizacji"))


temp2=(spark.readStream
              .table("bronze"))
temp2.printSchema()

joined_df=(temp2.alias("b1")
            .withWatermark("Data_Aktualizacji", "5 minutes")
            .join(newest_complete.alias("b2"),
                        (col("b1._id") == col("b2._id"))
                        & 
                        (col("b2.max_Data_Aktualizacji") == col("b1.Data_Aktualizacji"))
                        ,"right"
                    )
            .distinct()
            .select("b1._id", "b1.Nr_Boczny", "b1.Nr_Rej","b1.Brygada","b1.Nazwa_Linii","b1.X","b1.Y","b1.Data_Aktualizacji")  
        )

root
 |-- _id: integer (nullable = true)
 |-- Nr_Boczny: integer (nullable = true)
 |-- Nr_Rej: string (nullable = true)
 |-- Brygada: string (nullable = true)
 |-- Nazwa_Linii: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- Data_Aktualizacji: timestamp (nullable = true)



In [0]:
sql_statement = ''' MERGE INTO speed_table AS target
    USING updates AS source
    ON target._id = source._id
WHEN MATCHED  AND source.Data_Aktualizacji > target.Data_Aktualizacji THEN
UPDATE SET
    target.Last_X = target.X,
    target.Last_Y = target.Y,
    target.X = source.X,
    target.Y = source.Y,
    target.Last_Data_Aktualizacji = target.Data_Aktualizacji,
    target.Data_Aktualizacji = source.Data_aktualizacji 
WHEN NOT MATCHED THEN
INSERT (_id, Nr_Boczny, Nr_Rej, Brygada, Nazwa_Linii, X, Y, Last_X, Last_Y, Data_Aktualizacji,Last_Data_Aktualizacji)
VALUES (source._id, source.Nr_Boczny, source.Nr_Rej, source.Brygada, source.Nazwa_Linii, source.X, source.Y, null, null, source.Data_Aktualizacji,null)'''
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql(sql_statement)

In [0]:
#helper for gps data
from pyspark.sql import functions as F
class LatLongCalc():
    def cal_lat_log_dist(self, df, lat1, long1, lat2, long2):
        # Ref - https://en.wikipedia.org/wiki/Great-circle_distance#Formulae
        # We are using haversine formaula to derive this Distance between two Co-ordinates
        # Parameters:
        # Base DF with Four columns where it has LAT and LONG
        # Corresponding column name in Dataframe -> Cororidnate1 LAT1 LONG1
        # Corresponding column name in Dataframe -> Cororidnate2 LAT2 LONG2

        df = df.withColumn('distance_in_kms' , \
            F.round((F.acos((F.sin(F.radians(F.col(lat1))) * F.sin(F.radians(F.col(lat2)))) + \
                   ((F.cos(F.radians(F.col(lat1))) * F.cos(F.radians(F.col(lat2)))) * \
                    (F.cos(F.radians(long1) - F.radians(long2))))
                       ) * F.lit(6371.0)), 4))
        return df

In [0]:
joined_df.isStreaming

True

In [0]:
checkpoints_path="dbfs:/user/checkpoints/"
checkpoint_location=checkopoints_path + "update_stream2"

upsert_result=(joined_df.writeStream
        .outputMode("append")
        .option("checkpointLocation", checkpoint_location)
        .foreachBatch(upsertToDelta)
        .format("console")
        .start()
)

In [0]:


checkpoints_path="dbfs:/user/checkpoints/"
checkpoint_location=checkopoints_path + "update_stream3"
speed_result = (spark.readStream
                .table("speed_table")
                .withColumn("diff",unix_timestamp(col("Data_Aktualizacji"))- unix_timestamp(col("Last_Data_Aktualizacji")))
)


calc=LatLongCalc()
speed_result2=calc.cal_lat_log_dist(speed_result,"Y","X","Last_Y","Last_X")

speed_result2=speed_result2.withColumn("speed_kp_h",col("distance_in_kms")/(col("diff")/3600))

display(speed_result2.select("*").where("speed_kp_h is not null"))

_id,Nr_Boczny,Nr_Rej,Brygada,Nazwa_Linii,X,Y,Last_X,Last_Y,Data_Aktualizacji,Last_Data_Aktualizacji,diff,distance_in_kms,speed_kp_h


In [0]:





speed_result = spark.readStream.table("speed_table")
speed_result = (speed_result.
                withColumn("diff",unix_timestamp(speed_result.Data_Aktualizacji)- unix_timestamp(speed_result.Last_Data_Aktualizacji))
)


calc=LatLongCalc()
speed_result2=calc.cal_lat_log_dist(speed_result,"Y","X","Last_Y","Last_X")

speed_result2=speed_result2.withColumn("speed_kp_h",col("distance_in_kms")/(col("diff")/3600))

display(speed_result2.select("*").orderBy("_id"))

In [0]:
calc_speeds()

_id,Nr_Boczny,Nr_Rej,Brygada,Nazwa_Linii,X,Y,Last_X,Last_Y,Data_Aktualizacji,Last_Data_Aktualizacji,diff,distance_in_kms,speed_kp_h
1,0,,,,51.1240844726562,17.0413150787354,,,2024-02-07T10:07:20.577Z,,,,
2,1900,,,,51.1486206054688,17.0234527587891,,,2024-01-16T14:43:16.34Z,,,,
3,2206,,01113,11,51.1284484863281,17.0546855926514,51.1078910827637,17.030029296875,2024-02-21T12:30:54.343Z,2024-02-21T12:15:42.293Z,912.0,3.5062,13.840263157894736
4,2208,,02205,22,51.1348075866699,16.9535942077637,51.1128540039062,17.0144004821777,2024-02-21T12:30:54.31Z,2024-02-21T12:15:39.48Z,915.0,7.1531,28.143344262295084
5,2212,,,L,51.1086387634277,17.0340042114258,51.081600189209,17.0090179443359,2024-02-21T12:30:52.537Z,2024-02-21T12:15:47.56Z,905.0,3.998,15.90364640883978
6,2218,,00517,20,51.0960845947266,17.0235252380371,51.0944709777832,16.9795761108398,2024-02-21T12:30:54.383Z,2024-02-21T12:15:44.56Z,910.0,4.8899,19.34465934065934
7,2228,,00938,1,51.1044807434082,17.0857219696045,51.1011390686035,17.1092662811279,2024-02-21T12:30:54.397Z,2024-02-21T12:15:42.257Z,912.0,2.642,10.428947368421053
8,2237,,,,51.1241340637207,17.0409469604492,51.1242332458496,17.0408916473389,2024-02-21T12:30:46.33Z,2024-02-21T12:15:12.447Z,934.0,0.0122,0.0470235546038544
9,2238,,,,51.1238594055176,17.0406036376953,,,2023-04-30T02:21:09.067Z,,,,
10,2239,,00714,15,51.101978302002,17.0266513824463,51.1182556152344,17.0351886749268,2024-02-21T12:30:44.423Z,2024-02-21T12:15:44.433Z,900.0,1.9739,7.8956


In [0]:
%sql
drop table speed_Table;
drop table bronze;

In [0]:
dbutils.fs.ls("dbfs:/user/checkpoints/")

[FileInfo(path='dbfs:/user/checkpoints/bronze/', name='bronze/', size=0, modificationTime=1708344169000),
 FileInfo(path='dbfs:/user/checkpoints/update_stream1/', name='update_stream1/', size=0, modificationTime=1708597050000),
 FileInfo(path='dbfs:/user/checkpoints/update_stream2/', name='update_stream2/', size=0, modificationTime=1708597974000)]

In [0]:
dbutils.fs.rm("dbfs:/user/checkpoints/", True)

True