### Spark Streaming Databricks exercise

When making databricks notebook, you can choose which language should be native for it (should be python for you, or if we have scala lovers go on ...), however if you want to run some cell in different language, you can do it by writing *%language* at top of the cell - eg. for sql cell, write as the first commant in cell %sql.

Fist make sure, you are in the right repo (repository), in which you copied (forked?) the help files (eg. *pid_schema* notebook). 

You can open the pid_schema notebook on another tab and take a quick look there sometimes.

So we are streaming data from *Pražská integrovaná doprava* from the page [www.golemio.cz](www.golemio.cz), direct link to entries is [here](https://api.golemio.cz/v2/pid/docs/openapi/#/%F0%9F%9B%A4%20RealTime%20Vehicle%20Positions/get_vehiclepositions). They just updated the webpage, feel free to look through its documentation, however it won't be much of a help.

We are strearimg continously (every 1 min) into 5 kafka topics, which are named: *trams, trains, buses, regbuses, boats*. It is possible, that sometimes the streams may be cut off, if so, tell teachers and it will be promptly fixed.

Every data input in stream consist of information about one vehicle, its location and information/specification.

Running the following cell will load the content of the *pid_schema* notebook, we need schema saved there.

In [0]:
%run "./pid_schema"

Now we can for example call function *get_pid_schema* and get the schema for stream. Which was premade for this class, so you don't have to do it again.

In [0]:
get_pid_schema()

Out[2]: StructType([StructField('geometry', StructType([StructField('coordinates', ArrayType(StringType(), True), True), StructField('type', StringType(), True)]), True), StructField('properties', StructType([StructField('last_position', StructType([StructField('bearing', IntegerType(), True), StructField('delay', StructType([StructField('actual', IntegerType(), True), StructField('last_stop_arrival', StringType(), True), StructField('last_stop_departure', StringType(), True)]), True), StructField('is_canceled', BooleanType(), True), StructField('last_stop', StructType([StructField('arrival_time', StringType(), True), StructField('departure_time', StringType(), True), StructField('id', StringType(), True), StructField('sequence', IntegerType(), True)]), True), StructField('next_stop', StructType([StructField('arrival_time', StringType(), True), StructField('departure_time', StringType(), True), StructField('id', StringType(), True), StructField('sequence', IntegerType(), True)]), True)

Lets try and read the tram topic. We will be reading only one topic at time for clarity, however it is possible, to read all topics simultaneously or all in one stream. Read the incode comments for better understanding of each command.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col

# connect to broker
JAAS = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="fel.student" password="FelBigDataWinter2022bflmpsvz";'
# at frist just plain pwd
# Subscribe to 1 topic
# name of the topic we want to suscribe too is in last option
# by adding option   .option("startingOffsets", "earliest") we can read from the beggining of the stream, try it, but probably memory won't be able to handle it
df_trains = spark.readStream \
  .format("kafka")\
  .option("kafka.bootstrap.servers", "b-2-public.bdffelkafka.3jtrac.c19.kafka.us-east-1.amazonaws.com:9196, b-1-public.bdffelkafka.3jtrac.c19.kafka.us-east-1.amazonaws.com:9196") \
  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")\
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.jaas.config", JAAS) \
  .option("subscribe", "trains") \
  .load()

#get schema for the stream from the function in helper notebook
schema_pid=get_pid_schema() 

select_base_trains = df_trains.select(from_json(col("value").cast("string"),schema_pid).alias("data")).select("data.*") \
#lets start reading from the stream stream over casted to memory, be advised, you can ran out of it
#with option .outputMode("append") we are saving only the new data coming to the stream
#with option checkpoint, so the stream knows not to overwrite someother stream, in case we stream the same topics into two streams
#for saving into table we can add command .toTable("nameofthetable") , table will be stored in Data>hive_metastore>default>nameofthetable, this may prove usefull for some of you maybe
select_stream = select_base_trains.writeStream \
        .format("memory")\
        .queryName("mem2")\
        .outputMode("append")\
        .start()

In [0]:

df_buses = spark.readStream \
  .format("kafka")\
  .option("kafka.bootstrap.servers", "b-2-public.bdffelkafka.3jtrac.c19.kafka.us-east-1.amazonaws.com:9196, b-1-public.bdffelkafka.3jtrac.c19.kafka.us-east-1.amazonaws.com:9196") \
  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")\
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.jaas.config", JAAS) \
  .option("subscribe", "buses") \
  .load()

#get schema for the stream from the function in helper notebook
schema_pid=get_pid_schema() 

select_base_buses = df_buses.select(from_json(col("value").cast("string"),schema_pid).alias("data")).select("data.*") \
#lets start reading from the stream stream over casted to memory, be advised, you can ran out of it
#with option .outputMode("append") we are saving only the new data coming to the stream
#with option checkpoint, so the stream knows not to overwrite someother stream, in case we stream the same topics into two streams
#for saving into table we can add command .toTable("nameofthetable") , table will be stored in Data>hive_metastore>default>nameofthetable, this may prove usefull for some of you maybe
select_stream = select_base_buses.writeStream \
        .format("memory")\
        .queryName("mem_buses")\
        .outputMode("append")\
        .start()

In [0]:
# name of the topic we want to suscribe too is in last option
# by adding option   .option("startingOffsets", "earliest") we can read from the beggining of the stream, try it, but probably memory won't be able to handle it
df_trams = spark.readStream \
  .format("kafka")\
  .option("kafka.bootstrap.servers", "b-2-public.bdffelkafka.3jtrac.c19.kafka.us-east-1.amazonaws.com:9196, b-1-public.bdffelkafka.3jtrac.c19.kafka.us-east-1.amazonaws.com:9196") \
  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")\
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.jaas.config", JAAS) \
  .option("subscribe", "trams") \
  .load()

#get schema for the stream from the function in helper notebook
schema_pid=get_pid_schema() 

select_base_trams = df_trams.select(from_json(col("value").cast("string"),schema_pid).alias("data")).select("data.*") \
#lets start reading from the stream stream over casted to memory, be advised, you can ran out of it
#with option .outputMode("append") we are saving only the new data coming to the stream
#with option checkpoint, so the stream knows not to overwrite someother stream, in case we stream the same topics into two streams
#for saving into table we can add command .toTable("nameofthetable") , table will be stored in Data>hive_metastore>default>nameofthetable, this may prove usefull for some of you maybe
select_stream = select_base_trams.writeStream \
        .format("memory")\
        .queryName("mem")\
        .outputMode("append")\
        .start()

Lets make a new table, called *trams* with which we will try to make some transformation. Since creating new tables can end in error, if they already exists, it is better to delete them (if you are sure, you are not deleting others work).

In [0]:
%sql drop table trams

In [0]:
%sql
create table trams select * from mem

num_affected_rows,num_inserted_rows


Lets take a look on some of the the characteristics, we won't go in details much, since from the names of variables we can easily deduct, what they are. However eventhough some variables look like integers, they may be coded as strings for consistency over different type of vehicles.

After running next cell, you can click on the pointers to show more of the structure.

In [0]:
%sql
select * from trams limit 1

geometry,properties,type
"List(List(14.33102, 50.0759), Point)","List(List(77, List(9, 9, null), null, List(2022-11-28T15:28:00+01:00, 2022-11-28T15:28:00+01:00, U364Z1P, 2), List(2022-11-28T15:29:00+01:00, 2022-11-28T15:29:00+01:00, U610Z1P, 3), 2022-11-28T15:28:09+01:00, 0.611, null, at_stop, true), List(List(DP PRAHA, DP PRAHA), List(null, null), List(L25, 25, 0, Lehovec, 25_6208_221123, null), 25, 7, 2022-11-28T15:27:00+01:00, 9325, List(tramvaj, tram, 2), true, null))",Feature


Some of the notation is same for sql as for python, for example, you can access variables via dot notation. For example, we can try and check in the console, if the first ten entries in data are really from trams.

In [0]:
%sql
select properties.trip.vehicle_type.description_en from trams limit 10

description_en
tram
tram
tram
tram
tram
tram
tram
tram
tram
tram


##Task for you
Your task now, is to *somehow* print number of trams of each line (tram number) and sort the output from the line with largest amount of trams to the line with the smallest amount of trams.
There are plenty of options, how to do this. 

Some points which you may find helpfull:
 * If not sure, take a quick peek onto [https://www.w3schools.com/sql/sql_syntax.asp](https://www.w3schools.com/sql/sql_syntax.asp) for syntax and commands help.
 * Look at the variables in properties. Some may be helpfull, some are not.
 * Think about what you want to do and start from the elementary commands - eg. print only lines of trams, or print number of trams - maybe *count* could help
   * %sql select count(some_variable) from trams
 * Each tram may have more than one entry in streamed data, it may be necessary to deal with it - maybe with some *group by*, or *where like* or maybe *count(distinct)*
   * %sql select * from trams group by some_variable
 * There are more ways and more variables that you can use for filtering, however some may be easier and maybe some may prove to be wrong if used
 * You can save created output/select statement into the table with
   * %sql create table new_table from select properties.trip.vehicle_type.description_en from trams 
 * You can use select in select statements - think about the inner select as about table, which contains the selected columns 
   * %sql column_1, ... from (select column_x, ... from trams here_maybe_some_filtering?) here_some_more_filtering_num_2?
 * The outcome may be orderable in the output table. However try and order it with *order by (how?)* and maybe add *ASC or DESC* at the end

## Task for you, number 2

In this task we will try and find all stations, in which there is possibility to change transport type in desired time window - eg. in less then 5 minutes, so we won't catch cold by standing outside. Futhermore, we are sick of travelling in trams and want to hop onto a bus. We don't care where the bus will head on from that station, as long as it will arrive in 5 minutes (or less).

If you look closely into the structure of the topics, you will find out, that there are no names of the stations. Only thing we got there are IDs of the station. Since we are trying to find those stations, that fit the description, we need to find their names. 

First we will need to load following file called *stops.json*. In which there are information about all stops, which we are in dire need of. File comes from *PID* database -[link](https://data.pid.cz/stops/json/stops.json). For better loading and readability some minor changes to it were performed.

Since the file contains more jsons, we need to use the *multiline* option. You should store the file in dbfs, so you can read it easily.

In [0]:
df=spark.read.option("multiline","true").json("dbfs:/FileStore/stops.json")

We can check the loaded dataframe.

In [0]:
df.show()

+-----------+-----------+----------+----------+-----+------------+--------------------+------------+-----------------+-------+------------+-----------------+-----+--------------------+-----------------+
|   avgJtskX|   avgJtskY|    avgLat|    avgLon|  cis|districtCode|            fullName|idosCategory|         idosName|isTrain|municipality|             name| node|               stops|       uniqueName|
+-----------+-----------+----------+----------+-----+------------+--------------------+------------+-----------------+-------+------------+-----------------+-----+--------------------+-----------------+
|-675931.563|-1077494.13|49.8581047|15.4081345|   33|          KH|              Adamov|      301003|           Adamov|   null|      Adamov|           Adamov| 7288|[{Adamov, [U7288Z...|           Adamov|
|  -743138.3|-1045162.44|50.0679169|14.4207993|58936|          AB|            Albertov|      301003|         Albertov|   null|       Praha|         Albertov|  876|[{Albertov, [U876...|    

We will create sql table called stops, from which we will save values of fullName and Ids of stations to final table stopID.

In [0]:
df.createOrReplaceTempView("stops")

Dropping table stopID (if it exists we need to do it, if it does not exist, it will produce error).

In [0]:
%sql drop table stopID

Now we can extract name and IDs of stations.

In [0]:
%sql create table stopID select fullName as name,stops[0].gtfsIds[0] as stopid from stops

num_affected_rows,num_inserted_rows


In [0]:
%sql select * from stopID limit 3

name,stopid
Adamov,U7288Z1
Albertov,U876Z1P
Ametystová,U1274Z1P


We can for example check the ID for Florenc station.

In [0]:
%sql select * from stopID where name like "Florenc"

name,stopid
Florenc,U689Z1P


We need to load data from stream for buses/trams into the tables.

In [0]:
%sql 
drop table buses

In [0]:
%sql
create table buses select * from mem_buses

num_affected_rows,num_inserted_rows


In [0]:
%sql 
select * from buses limit 3

geometry,properties,type
"List(List(14.36957, 50.0547), Point)","List(List(290, List(97, null, null), null, List(2022-11-23T09:13:00+01:00, 2022-11-23T09:13:00+01:00, U685Z2P, 11), List(2022-11-23T09:14:00+01:00, 2022-11-23T09:14:00+01:00, U846Z2P, 12), 2022-11-23T09:14:37+01:00, 5.815, null, on_track, true), List(List(DP PRAHA, DP PRAHA), List(null, null), List(L149, 149, 3, Dejvická, 149_754_221031, null), 149, 6, 2022-11-23T08:58:00+01:00, 3593, List(autobus, bus, 3), true, null))",Feature
"List(List(14.40682, 50.006), Point)","List(List(76, List(23, null, null), null, List(2022-11-23T09:14:00+01:00, 2022-11-23T09:14:00+01:00, U1158Z2P, 12), List(2022-11-23T09:15:00+01:00, 2022-11-23T09:15:00+01:00, U939Z2P, 13), 2022-11-23T09:14:53+01:00, 6.72, null, on_track, true), List(List(DP PRAHA, DP PRAHA), List(null, null), List(L165, 165, 3, Háje, 165_123_221121, null), 165, 8, 2022-11-23T08:59:00+01:00, 3723, List(autobus, bus, 3), true, null))",Feature
"List(List(14.4096, 50.03958), Point)","List(List(18, List(50, null, null), null, List(2022-11-23T09:13:00+01:00, 2022-11-23T09:13:00+01:00, U614Z3P, 3), List(2022-11-23T09:15:00+01:00, 2022-11-23T09:15:00+01:00, U110Z7P, 4), 2022-11-23T09:13:50+01:00, 3.496, null, on_track, true), List(List(DP PRAHA, DP PRAHA), List(null, null), List(L118, 118, 3, Depo Kačerov, 118_676_220901, null), 118, 9, 2022-11-23T09:07:00+01:00, 6916, List(autobus, bus, 3), true, null))",Feature


Another tables for manipulating the data. In these we will copy arrival times, ID of the stop, number of the line of the bus/tram, vehicle ID and vehicle type. We will make these tables for both of the vehicle types. To do by you.

## Task for you, number three
At this time, there is no more tasks, however feel free to try more than was here written. You can try and upgrade the second task in sense, that the next station of the tram and of the bus is the same. It is not hard, since there is one more variable in data, that may prove good for you. All in all, feel free to try whatever you want, because more you try now, less you will need for your homeworks.

## End text
Some points in the end, which may prove usefull in the future (or totally wrong). 

You can read more than one topic in one stream. This may somehow help you in your HW, however, it should not be necessary. 

Saving your output into the table and not into memory maybe way better.

Remember, if the stream is not working, it may not be your fault, check with your peers.