## Show tables with Scala and Spark

In [1]:
spark.catalog.listTables.show(false)

+--------------------------------+--------+-----------+---------+-----------+
|name                            |database|description|tableType|isTemporary|
+--------------------------------+--------+-----------+---------+-----------+
|btl_departures_arrivals_airports|default |null       |EXTERNAL |false      |
|btl_distances                   |default |null       |EXTERNAL |false      |
|int_airports                    |default |null       |EXTERNAL |false      |
|int_departures                  |default |null       |EXTERNAL |false      |
+--------------------------------+--------+-----------+---------+-----------+


In [2]:
spark.sql("select * from default.btl_distances where estarrivalairport = 'EDDR'").show

+-------------------+-----------------+-------------------+------------------+-----------------+-----------------+----------------+-----------------+-----------------+---------------------+
|estdepartureairport|estarrivalairport|           arr_name|  arr_latitude_deg|arr_longitude_deg|         dep_name|dep_latitude_deg|dep_longitude_deg|         distance|could_be_done_by_rail|
+-------------------+-----------------+-------------------+------------------+-----------------+-----------------+----------------+-----------------+-----------------+---------------------+
|               LSZB|             EDDR|Saarbrücken Airport|49.214599609400004|    7.10950994492|Bern-Belp Airport|       46.913419|         7.499747|257.5165792489347|                 true|
+-------------------+-----------------+-------------------+------------------+-----------------+-----------------+----------------+-----------------+-----------------+---------------------+


In [3]:
spark.table("default.int_departures").where($"estarrivalairport"==="LIMC").show

+-----------------------------+--------+-------------------------------+-----------------+------------------------------+-----------------------------+-------------------+--------------------------------+-------------------------------+----------+------+----------+--------+--------------------+
|arrivalAirportCandidatesCount|callsign|departureAirportCandidatesCount|estArrivalAirport|estArrivalAirportHorizDistance|estArrivalAirportVertDistance|estDepartureAirport|estDepartureAirportHorizDistance|estDepartureAirportVertDistance| firstSeen|icao24|  lastSeen|      dt|      dl_ts_captured|
+-----------------------------+--------+-------------------------------+-----------------+------------------------------+-----------------------------+-------------------+--------------------------------+-------------------------------+----------+------+----------+--------+--------------------+
|                            4|SSR07BM |                              0|             LIMC|                      

## Select data by using DataObjects configured in SmartDataLake

In [5]:
// import smartdatalake
import io.smartdatalake.config.SdlConfigObject.stringToDataObjectId
import io.smartdatalake.config.ConfigToolbox
import io.smartdatalake.workflow.dataobject._
import io.smartdatalake.workflow.ActionPipelineContext
import io.smartdatalake.workflow.action.SDLExecutionId
import io.smartdatalake.app.SmartDataLakeBuilderConfig
import io.smartdatalake.workflow.ExecutionPhase
implicit val ss = spark // make Spark session available implicitly

In [6]:
// read config from mounted directory
val (registry, globalConfig) = ConfigToolbox.loadAndParseConfig(Seq("/mnt/config"), Some(this.getClass.getClassLoader))
// Create the context used by SDL objects
implicit val context = ConfigToolbox.getDefaultActionPipelineContext(spark, registry)

In [7]:
// get a dataobject
val dataIntAirports = registry.get[DeltaLakeTableDataObject]("int-airports")
val dataIntDepartures = registry.get[DeltaLakeTableDataObject]("int-departures")

## Historization of airport data

In [9]:
dataIntDepartures.dropTable

Start Action historize-airports

In [12]:
dataIntAirports.getSparkDataFrame().printSchema

root
 |-- ident: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude_deg: string (nullable = true)
 |-- longitude_deg: string (nullable = true)
 |-- dl_ts_captured: timestamp (nullable = true)
 |-- dl_ts_delimited: timestamp (nullable = true)


In [13]:
dataIntAirports.getSparkDataFrame().orderBy($"ident",$"dl_ts_captured").show

+-----+--------------------+------------------+-------------------+--------------------+-------------------+
|ident|                name|      latitude_deg|      longitude_deg|      dl_ts_captured|    dl_ts_delimited|
+-----+--------------------+------------------+-------------------+--------------------+-------------------+
|  00A|   Total RF Heliport|         40.070985|         -74.933689|2023-10-02 12:10:...|9999-12-31 00:00:00|
| 00AA|Aero B Ranch Airport|         38.704022|        -101.473911|2023-10-02 12:10:...|9999-12-31 00:00:00|
| 00AK|        Lowell Field|         59.947733|        -151.692524|2023-10-02 12:10:...|9999-12-31 00:00:00|
| 00AL|        Epps Airpark| 34.86479949951172| -86.77030181884766|2023-10-02 12:10:...|9999-12-31 00:00:00|
| 00AN|Katmai Lodge Airport|         59.093287|        -156.456699|2023-10-02 12:10:...|9999-12-31 00:00:00|
| 00AR|Newport Hospital ...|           35.6087|         -91.254898|2023-10-02 12:10:...|9999-12-31 00:00:00|
| 00AS|      Fulton

In [14]:
dataIntAirports.dropTable

Delete all files in data/stg-airport and copy the historical result.csv from the folder data-fallback-download/stg-airport into the folder data/stg-aiport.
Now start the action historize-airports. Afterwards, start actions download-airports and historize-airports to download fresh data and build up the airport history.

In [16]:
dataIntAirports.getSparkDataFrame()
  .groupBy($"ident").count
  .orderBy($"count".desc)
  .show

+-------+-----+
|  ident|count|
+-------+-----+
|   0OH7|    2|
|   MTPX|    2|
|   1CL8|    2|
|   2LA2|    2|
|   36FA|    2|
|   8LL0|    2|
|AF-0006|    2|
|AU-0083|    2|
|CA-0259|    2|
|   ESSX|    2|
|   FBGD|    2|
|   GE25|    2|
|HU-0043|    2|
|   K1O4|    2|
|   KAAT|    2|
|   KCKA|    2|
|KZ-0016|    2|
|   LA65|    2|
|   LETC|    2|
|   LOGP|    2|
+-------+-----+


In [17]:
dataIntAirports.getSparkDataFrame()
  .where($"ident"==="CDV3")
  .show(false)

+-----+-------------------------------------------------+-------------+--------------+--------------------------+--------------------------+
|ident|name                                             |latitude_deg |longitude_deg |dl_ts_captured            |dl_ts_delimited           |
+-----+-------------------------------------------------+-------------+--------------+--------------------------+--------------------------+
|CDV3 |Charlottetown (Queen Elizabeth Hospital) Heliport|46.2554925916|-63.0988866091|2023-10-02 12:14:02.575663|2023-10-02 12:14:28.086627|
|CDV3 |Charlottetown (Queen Elizabeth Hospital) Heliport|46.255493    |-63.098887    |2023-10-02 12:14:28.087627|9999-12-31 00:00:00       |
+-----+-------------------------------------------------+-------------+--------------+--------------------------+--------------------------+


## Deduplication of flight data

In [19]:
val dataIntDepartures = registry.get[DeltaLakeTableDataObject]("int-departures")
dataIntDepartures.dropTable

Start Action deduplicate-departures

In [21]:
dataIntDepartures.getSparkDataFrame().printSchema

root
 |-- arrivalAirportCandidatesCount: long (nullable = true)
 |-- callsign: string (nullable = true)
 |-- departureAirportCandidatesCount: long (nullable = true)
 |-- estArrivalAirport: string (nullable = true)
 |-- estArrivalAirportHorizDistance: long (nullable = true)
 |-- estArrivalAirportVertDistance: long (nullable = true)
 |-- estDepartureAirport: string (nullable = true)
 |-- estDepartureAirportHorizDistance: long (nullable = true)
 |-- estDepartureAirportVertDistance: long (nullable = true)
 |-- firstSeen: long (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lastSeen: long (nullable = true)
 |-- dt: string (nullable = true)
 |-- dl_ts_captured: timestamp (nullable = true)


In [22]:
dataIntDepartures.getSparkDataFrame()
  .groupBy($"icao24", $"estdepartureairport", $"dt")
  .count
  .orderBy($"count".desc)
  .show

+------+-------------------+--------+-----+
|icao24|estdepartureairport|      dt|count|
+------+-------------------+--------+-----+
|4b43ab|               LSZB|20210829|    3|
|4b4b8d|               LSZB|20210829|    3|
|4b1b13|               LSZB|20210829|    2|
|4b4445|               LSZB|20210829|    2|
|4b4442|               LSZB|20210829|    1|
|4b43ab|               LSZB|20210830|    1|
|346603|               LSZB|20210829|    1|
|4b4449|               LSZB|20210829|    1|
|4b0f70|               LSZB|20210830|    1|
|49d283|               LSZB|20210829|    1|
|4b1a01|               LSZB|20210829|    1|
|4b4445|               LSZB|20210830|    1|
|4b51fa|               LSZB|20210829|    1|
|aa0da1|               LSZB|20210829|    1|
|4d02d7|               LSZB|20210829|    1|
|4b1b12|               LSZB|20210829|    1|
|494108|               LSZB|20210829|    1|
|4b3049|               LSZB|20210829|    1|
|4b1f2f|               LSZB|20210830|    1|
|44046b|               LSZB|2021