In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
from IPython.display import display, clear_output
from datetime import datetime
import math

In [3]:
from os import environ
environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

In [4]:
spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
spark.version

'3.1.1'

# Import rozkładu przystanków z pliku

In [None]:
# df_bus = spark.read.format('parquet').load('df_bus.parquet')
# df_bus.count()

In [5]:
df_tram_tmp = spark.read.format('parquet').load('./Schedule/df_tram.parquet')
df_tram_tmp.count()

209517

In [None]:
# df_all = spark.read.format('parquet').load('df_all.parquet')
# df_all.count()

In [15]:
df_tram_tmp.printSchema()

root
 |-- line: long (nullable = true)
 |-- busStopName: string (nullable = true)
 |-- sLat: double (nullable = true)
 |-- sLon: double (nullable = true)
 |-- sTime: string (nullable = true)
 |-- vehicleID: string (nullable = true)
 |-- busStopID: string (nullable = true)



In [6]:
df_tram_tmp.show(10)

+----+-----------+---------+---------+--------+---------+---------+
|line|busStopName|     sLat|     sLon|   sTime|vehicleID|busStopID|
+----+-----------+---------+---------+--------+---------+---------+
|  25|Poborzańska|52.289921|21.029526|04:54:00|     25/4|  1084/02|
|  25|Poborzańska|52.289921|21.029526|05:14:00|     25/5|  1084/02|
|  25|Poborzańska|52.289921|21.029526|05:24:00|     25/8|  1084/02|
|  25|Poborzańska|52.289921|21.029526|05:35:00|   25/014|  1084/02|
|  25|Poborzańska|52.289921|21.029526|05:54:00|    25/11|  1084/02|
|  25|Poborzańska|52.289921|21.029526|06:04:00|    25/12|  1084/02|
|  25|Poborzańska|52.289921|21.029526|06:14:00|     25/2|  1084/02|
|  25|Poborzańska|52.289921|21.029526|06:34:00|   25/013|  1084/02|
|  25|Poborzańska|52.289921|21.029526|06:42:00|     25/3|  1084/02|
|  25|Poborzańska|52.289921|21.029526|06:55:00|     25/4|  1084/02|
+----+-----------+---------+---------+--------+---------+---------+
only showing top 10 rows



## Dodatkowe obrobienie df_tram

In [8]:
test = '14:54:00'
def convDateTime(x):
    strDT =  f"{datetime.strftime(datetime.date(datetime.now()),'%Y-%m-%d')} {x}" 
    
    tmpMonth = int(strDT[5:7])
    tmpDay = int(strDT[8:10])
    tmpHour = int(strDT[11:13])
        
    if tmpHour != 24:
        return datetime.strptime(strDT,'%Y-%m-%d %H:%M:%S')
    else:
        newHour = '00'
        newDay = str(tmpDay + 1)
        if newDay != '32':
            resStr = strDT[:8] + newDay + strDT[10:11] + newHour + strDT[13:]
        else:
            newDay = '01'
            newMonth = str(tmpMonth + 1)
            resStr = strDT[:5] + newMonth + strDT[7:8] + newDay + strDT[10:11] + newHour + strDT[13:]
            
        return datetime.strptime(resStr,'%Y-%m-%d %H:%M:%S')
    
    
convDateTime(test)

datetime.datetime(2021, 6, 1, 14, 54)

In [9]:
def secsFrom1970(x):
    epoch = datetime.utcfromtimestamp(0)
    return (x - epoch).total_seconds()

In [10]:
udfSecsFrom1970 = f.udf(secsFrom1970, t.FloatType())

In [3]:
import math
def measureDist(lat1, lon1, lat2, lon2) :
    R = 6378.137  # Radius of earth in KM
    dLat = lat2 * math.pi / 180 - lat1 * math.pi / 180
    dLon = lon2 * math.pi / 180 - lon1 * math.pi / 180
    a = math.sin(dLat/2) * math.sin(dLat/2) + \
        math.cos(lat1 * math.pi / 180) * math.cos(lat2 * math.pi / 180) * \
        math.sin(dLon/2) * math.sin(dLon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = R * c
    return d * 1000  # meters

In [8]:
measureDist(52.28995, 21.02957, 52.28990, 21.02957)

5.565974539011866

In [13]:
udfMeasureDist = f.udf(measureDist, t.FloatType())

In [16]:
df_tram_tmp.printSchema()

root
 |-- line: long (nullable = true)
 |-- busStopName: string (nullable = true)
 |-- sLat: double (nullable = true)
 |-- sLon: double (nullable = true)
 |-- sTime: string (nullable = true)
 |-- vehicleID: string (nullable = true)
 |-- busStopID: string (nullable = true)



In [19]:
rdd1 = df_tram_tmp.rdd.map(list)
rdd2 = rdd1.map(lambda x: [x[0], x[1], x[2], x[3], convDateTime(x[4]), x[5], x[6]])
rdd3 = rdd2.map(lambda x: [x[0], x[1], x[2], x[3], x[4], x[5], x[6], secsFrom1970(x[4])])
# rdd2 = rdd1.map(lambda x: [x[0], x[1], x[2], f"{datetime.strftime(datetime.date(datetime.now()),'%Y-%m-%d')} {x[3]}", x[4], x[5]])
# rdd3 = rdd2.map(lambda x: [x[0], x[1], x[2], datetime.strptime(x[3],'%Y-%m-%d %H:%M:%S'), x[4], x[5]])
# rdd4 = rdd3.map(lambda x: [x[0], x[1], x[2], x[3], x[4], x[5], datetime.utcfromtimestamp(x[3])])
# df_tram = rdd4.toDF(['busStopName', 'sLat', 'sLon', 'sTime', 'vehicleID', 'busStopID', 'sEpoch'])
df_tram = rdd3.toDF(['line','busStopName', 'sLat', 'sLon', 'sTime', 'sVehicleID', 'busStopID', 'sTimer'])
df_tram.printSchema()

root
 |-- line: long (nullable = true)
 |-- busStopName: string (nullable = true)
 |-- sLat: double (nullable = true)
 |-- sLon: double (nullable = true)
 |-- sTime: timestamp (nullable = true)
 |-- sVehicleID: string (nullable = true)
 |-- busStopID: string (nullable = true)
 |-- sTimer: double (nullable = true)



In [20]:
df_tram.show(5)

+----+-----------+---------+---------+-------------------+----------+---------+------------+
|line|busStopName|     sLat|     sLon|              sTime|sVehicleID|busStopID|      sTimer|
+----+-----------+---------+---------+-------------------+----------+---------+------------+
|  25|Poborzańska|52.289921|21.029526|2021-06-01 04:54:00|      25/4|  1084/02|1.62252324E9|
|  25|Poborzańska|52.289921|21.029526|2021-06-01 05:14:00|      25/5|  1084/02|1.62252444E9|
|  25|Poborzańska|52.289921|21.029526|2021-06-01 05:24:00|      25/8|  1084/02|1.62252504E9|
|  25|Poborzańska|52.289921|21.029526|2021-06-01 05:35:00|    25/014|  1084/02| 1.6225257E9|
|  25|Poborzańska|52.289921|21.029526|2021-06-01 05:54:00|     25/11|  1084/02|1.62252684E9|
+----+-----------+---------+---------+-------------------+----------+---------+------------+
only showing top 5 rows



In [21]:
df_tram.count()

209517

# Test kafki

## Uruchomienie pracy równoległej

In [None]:
import os
import ipyparallel as ipp

rc = ipp.Client(profile='default')
ar = rc[:].apply_async(os.getpid)
pid_map = ar.get_dict()

In [None]:
rc.ids

## Subskrybcja topicu tram z Kafki

In [22]:
# bootstrap_servers = 'kafka-1:19091,kafka-2:29091,kafka-3:39091'
# bootstrap_servers = 'localhost:19092,localhost:29092,localhost:39092'
bootstrap_servers = 'localhost:19092'
topic = 'tram'

In [23]:
raw_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

In [24]:
raw_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [25]:
# {
#     "Lines": "213",
#     "Lon": 21.1291008,
#     "VehicleNumber": "1000",
#     "Time": "2021-05-21 20:17:34",
#     "Lat": 52.2146371,
#     "Brigade": "2"
# }

schema = t.StructType() \
    .add("Lines", t.StringType()) \
    .add("Lon", t.FloatType()) \
    .add("VehicleNumber", t.StringType()) \
    .add("Time", t.TimestampType()) \
    .add("Lat", t.FloatType()) \
    .add("Brigade", t.StringType())

In [26]:
stream_tram = raw_df.select( \
    raw_df.key.cast('string'),
    f.from_json(raw_df.value.cast('string'), schema))

In [27]:
stream_tram = stream_tram.withColumnRenamed('from_json(CAST(value AS STRING))', 'json')

In [28]:
stream_tram = stream_tram \
    .withColumnRenamed('key', 'vVehicleID') \
    .withColumn('vLon', stream_tram.json.Lon) \
    .withColumn('vLat', stream_tram.json.Lat) \
    .withColumn('vTime', stream_tram.json.Time)

In [29]:
stream_tram = stream_tram.drop(stream_tram.json)

In [30]:
stream_tram = stream_tram.withColumn('vTimer', udfSecsFrom1970(f.col('vTime')))

In [31]:
df_tram.printSchema()

root
 |-- line: long (nullable = true)
 |-- busStopName: string (nullable = true)
 |-- sLat: double (nullable = true)
 |-- sLon: double (nullable = true)
 |-- sTime: timestamp (nullable = true)
 |-- sVehicleID: string (nullable = true)
 |-- busStopID: string (nullable = true)
 |-- sTimer: double (nullable = true)



In [32]:
stream_tram.printSchema()

root
 |-- vVehicleID: string (nullable = true)
 |-- vLon: float (nullable = true)
 |-- vLat: float (nullable = true)
 |-- vTime: timestamp (nullable = true)
 |-- vTimer: float (nullable = true)



Utworzenie query do sprawdzania opóźnień na przystankach

In [52]:
accDiff = 0.00005
cond1 = [df_tram.sVehicleID == stream_tram.vVehicleID, \
        df_tram.sLat - stream_tram.vLat < accDiff, \
        df_tram.sLat - stream_tram.vLat > -accDiff, \
        df_tram.sLon - stream_tram.vLon < accDiff, \
        df_tram.sLon - stream_tram.vLon > -accDiff ]

In [56]:
query1 = stream_tram.join(df_tram, cond1)
query1 = query1.withColumn('del', f.abs(query1.sTimer - query1.vTimer)).groupBy(f.col('vVehicleID'), f.col('busStopName'), f.col('busStopID'), f.col('line')).agg(f.min('del').alias('delay'))

Utworzenie query do sprawdzenia odległości pojazdu od przystanku

In [39]:
cond2 = [df_tram.sVehicleID == stream_tram.vVehicleID, \
        df_tram.sTime == stream_tram.vTime ]

In [40]:
# measureDist(lat1, lon1, lat2, lon2)
query2 = stream_tram.join(df_tram, cond2).withColumn('distanceInMeters', udfMeasureDist(f.col('sLat'), f.col('sLon'), f.col('vLat'), f.col('vLon')))

In [57]:
datetime(2021, 5, 26, 4, 54) == datetime(2021, 5, 26, 4, 54)

True

In [36]:
q = stream_tram \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("myQuery") \
    .start()

q.awaitTermination(30)
q.stop()

In [58]:
q1 = query1 \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("myQuery") \
    .start()

q1.awaitTermination(60)
q1.stop()

In [45]:
q2 = query2 \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("myQuery") \
    .start()

q2.awaitTermination(120)
q2.stop()

StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: myQuery [id = f2ace432-60a5-4af2-9c7d-dcd4ddb550e9, runId = fc04ec78-b6b3-47d4-8a45-54156d9af735]
Current Committed Offsets: {KafkaV2[Subscribe[tram]]: {"tram":{"0":2277103}}}
Current Available Offsets: {KafkaV2[Subscribe[tram]]: {"tram":{"0":2277400}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@41bd8fc8
+- Project [vVehicleID#192, vLon#195, vLat#199, vTime#204, vTimer#215, busStopName#109, sLat#110, sLon#111, sTime#112, sVehicleID#113, busStopID#114, sTimer#115, measureDist(sLat#110, sLon#111, vLat#199, vLon#195) AS distanceInMeters#26450]
   +- Join Inner, ((sVehicleID#113 = vVehicleID#192) AND (sTime#112 = vTime#204))
      :- Project [vVehicleID#192, vLon#195, vLat#199, vTime#204, secsFrom1970(vTime#204) AS vTimer#215]
      :  +- Project [vVehicleID#192, vLon#195, vLat#199, vTime#204]
      :     +- Project [vVehicleID#192, json#189, vLon#195, vLat#199, json#189.Time AS vTime#204]
      :        +- Project [vVehicleID#192, json#189, vLon#195, json#189.Lat AS vLat#199]
      :           +- Project [vVehicleID#192, json#189, json#189.Lon AS vLon#195]
      :              +- Project [key#186 AS vVehicleID#192, json#189]
      :                 +- Project [key#186, from_json(CAST(value AS STRING))#185 AS json#189]
      :                    +- Project [cast(key#171 as string) AS key#186, from_json(StructField(Lines,StringType,true), StructField(Lon,FloatType,true), StructField(VehicleNumber,StringType,true), StructField(Time,TimestampType,true), StructField(Lat,FloatType,true), StructField(Brigade,StringType,true), cast(value#172 as string), Some(Europe/Warsaw)) AS from_json(CAST(value AS STRING))#185]
      :                       +- StreamingDataSourceV2Relation [key#171, value#172, topic#173, partition#174, offset#175L, timestamp#176, timestampType#177], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@aa25b06, KafkaV2[Subscribe[tram]]
      +- LogicalRDD [busStopName#109, sLat#110, sLon#111, sTime#112, sVehicleID#113, busStopID#114, sTimer#115], false


In [59]:
display(spark.sql('SELECT * FROM myQuery').show())

+----------+-----------+---------+----+-----+
|vVehicleID|busStopName|busStopID|line|delay|
+----------+-----------+---------+----+-----+
+----------+-----------+---------+----+-----+



None

## Chyba zbędne

In [None]:


while True:
    clear_output(wait=True)
    display(q1.status)
    display(spark.sql('SELECT * FROM myQuery').show())
    sleep(5)

In [None]:
__.display_outputs()

In [None]:
raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

In [None]:
q1 = raw.writeStream.start()

In [None]:
query = raw.writeStream.outputMode("append").format("console").start()
query.awaitTermination(60)
query.stop()

In [None]:
words = raw.select(f.explode(f.split(raw.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()

In [None]:
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination(60)
query.stop()