In [0]:
connectionString = "<connection-string>"
eventHubName = "<event-hub>"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}


In [0]:
tripRequestsRaw = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() 

tripRequestsRaw.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

json_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("xstart", IntegerType()),
    StructField("ystart", IntegerType()),
    StructField("xend", IntegerType()),
    StructField("yend", IntegerType()),
    StructField("ts", TimestampType()),
    StructField("source", StringType())
])

In [0]:
tripRequests = tripRequestsRaw.withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.user_id","body.xstart", "body.ystart", "body.xend", "body.yend", "body.ts", "body.source")

tripRequests.display()





In [0]:
dbutils.fs.unmount("/mnt/demandNavigator/")
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "a53ac462-aec8-490a-96d7-d36e1155d572",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="dnkey",key="dbsecret"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/7f77ffeb-3a63-436d-85f5-46b045efa8d7/oauth2/token"}

dbutils.fs.mount(
  source = "abfss://landingarea@dnadlg2.dfs.core.windows.net/",
  mount_point = "/mnt/demandNavigator/",
  extra_configs = configs)

dbutils.fs.ls("/mnt/demandNavigator/")

/mnt/demandNavigator/ has been unmounted.
Out[60]: [FileInfo(path='dbfs:/mnt/demandNavigator/area_requests/', name='area_requests/', size=0, modificationTime=1711962893000),
 FileInfo(path='dbfs:/mnt/demandNavigator/geographic data.csv', name='geographic data.csv', size=253, modificationTime=1711970442000)]

In [0]:
areasInfo = spark.read.option("header","true").option("inferSchema","true").csv("/mnt/demandNavigator/geographic data.csv")

areasInfo.printSchema()

areasInfo.show()

root
 |-- RegionID: integer (nullable = true)
 |-- RegionName: string (nullable = true)
 |-- startx: integer (nullable = true)
 |-- endx: integer (nullable = true)
 |-- starty: integer (nullable = true)
 |-- endy: integer (nullable = true)

+--------+-------------------+------+----+------+----+
|RegionID|         RegionName|startx|endx|starty|endy|
+--------+-------------------+------+----+------+----+
|       1|   Crescenta Valley|     1|   2|     1|   2|
|       2|           Downtown|     1|   2|     3|   4|
|       3|           Eastside|     1|   2|     5|   6|
|       4|        Harbor Area|     3|   4|     1|   2|
|       5|  Greater Hollywood|     3|   4|     3|   4|
|       6|       Northeast LA|     3|   4|     5|   6|
|       7|       Northwest LA|     5|   6|     1|   2|
|       8|San Fernando Valley|     5|   6|     3|   4|
|       9|           South LA|     5|   6|     5|   6|
+--------+-------------------+------+----+------+----+



In [0]:
areaRequests = tripRequests.join(areasInfo,(tripRequests.xstart<=areasInfo.startx)&(tripRequests.xend>=areasInfo.startx)&(tripRequests.ystart<=areasInfo.starty)&(tripRequests.yend>=areasInfo.starty)).select(tripRequests.user_id,areasInfo.RegionID,areasInfo.RegionName,tripRequests.ts)
areaRequests.display()


In [0]:
areaRequests.printSchema()

areaRequestsAgg = areaRequests.withWatermark("ts", "60 minutes").groupBy(window(col("ts"),"10 minutes"),areaRequests.RegionID,areaRequests.RegionName).agg(count(areaRequests.user_id).alias("totalRequests"))

json_schema = StructType([
    StructField("start", TimestampType()),
    StructField("end", TimestampType())
])

areaRequestsAgg = areaRequestsAgg.withColumn("ts_start",col("window.start"))
areaRequestsAgg.printSchema()
areaRequestsAgg.display()
#areaRequestsAgg = areaRequestsAgg.count()

In [0]:
from pyspark.sql.streaming import *

#spark.sql("drop table streaming.area_requests.totalrequests")

areaRequestsAgg.writeStream\
    .option("checkpointLocation", "/mnt/demandNavigator/area_requests/")\
    .outputMode("complete")\
    .format("delta")\
    .toTable("streaming.area_requests.totalrequests")

Out[69]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f8da6d32940>

In [0]:
df = spark.sql("select max(ts_start) from streaming.area_requests.totalrequests")
max_ts = df.rdd.collect()[0]['max(ts_start)']
print(max_ts)

df2 = spark.sql("select ts_start,RegionName,totalRequests from streaming.area_requests.totalrequests where ts_start='"+str(max_ts)+"' order by totalRequests desc")
df2.show()

2024-04-01 16:50:00
+-------------------+-------------------+-------------+
|           ts_start|         RegionName|totalRequests|
+-------------------+-------------------+-------------+
|2024-04-01 16:50:00|  Greater Hollywood|            9|
|2024-04-01 16:50:00|San Fernando Valley|            8|
|2024-04-01 16:50:00|       Northwest LA|            7|
|2024-04-01 16:50:00|       Northeast LA|            5|
|2024-04-01 16:50:00|           South LA|            5|
|2024-04-01 16:50:00|        Harbor Area|            5|
|2024-04-01 16:50:00|   Crescenta Valley|            3|
|2024-04-01 16:50:00|           Downtown|            3|
|2024-04-01 16:50:00|           Eastside|            3|
+-------------------+-------------------+-------------+

