theme | size | transition | marp |
---|---|---|---|
mp-theme |
58140 |
slide |
true |
- Senior Software engineer @ SiriusXM
- Blog about Scala
- OSS from time to time
And this is my fs2 story with trams
Thus I asked myself
- Find data source of vehicle positions
- Fetch and parse
- Fetch some more
- Calculate diffs
- Produce statistics
https://mpk.wroc.pl/strefa-pasazera/zaplanuj-podroz/mapa-pozycji-pojazdow
curl -s 'https://mpk.wroc.pl/bus_position' \
-H 'accept: application/json, text/javascript, */*; q=0.01' \
-H 'content-type: application/x-www-form-urlencoded; charset=UTF-8' \
--data-raw 'busList%5Bbus%5D%5B%5D=110&busList%5Btram%5D%5B%5D=31&busList%5Btram%5D%5B%5D=33' \
--compressed | jq
[
{
"name": "31",
"type": "tram",
"y": 17.051546,
"x": 51.076923,
"k": 22471783
},
{
"name": "31",
"type": "tram",
"y": 17.049835,
"x": 51.081802,
"k": 22472415
},
/* ... */
]
[
{
"name": "31",
"type": "tram",
"y": 17.051546,
"x": 51.076923,
"k": 22471783
},
/* ... */
]
name
- line name like31
,33
,110
type
- one oftram
,bus
y
- latitudex
- longitudek
- 🤨
name
- line name like31
,33
,110
type
- one oftram
,bus
y
- latitudex
- longitudek
- 🤨 looks like a vehicle id
- Find data source of vehicle positions ✅
- Fetch and parse
- Fetch some more
- Calculate diffs
- Produce statistics
Shape of our request
- HTTP POST
- List of vehicles like
busList[bus][]=110&busList[tram][]=31&busList[tram][]=33
content-type: application/x-www-form-urlencoded
- Expect JSON output
val apiUri = uri"https://mpk.wroc.pl/bus_position"
def payload(buses: List[String], trams: List[String]) =
(trams.map(v => s"busList[tram][]=$v") ++
buses.map(v => s"busList[bus][]=$v")).mkString("&")
// ☝️ builds this thing: busList[bus][]=110&busList[tram][]=31&busList[tram][]=33
Data model
case class Record(
name: String,
x: Double,
y: Double,
k: Int // 🤨
) derives Codec.AsObject // This will generate JSON Encoder and Decoder
def request(
backend: SttpBackend[IO, Any], buses: List[String], trams: List[String]
): IO[List[Record]] = // Note the return type
basicRequest
.post(apiUri)
.body(payload(buses, trams)) // Something like busList[bus][]=110&busList[tram][]=31&busList[tram][]=33
.contentType(MediaType.ApplicationXWwwFormUrlencoded)
.response(asJson[List[Record]])
.send(backend)
.map(_.body) // We are only interested in the result
.rethrow // Fail `IO` on all errors, we are being simple here
- Our
request
returns anIO
, so we need a way to execute it - It requires
SttpBackend
so we need to create one
The easiest way to execute an IO
is to create a Main
class that handles it for us
object Main extends IOApp.Simple {
def run: IO[Unit] = ??? // our logic goes here
}
Let's create a backend, execute the request and print the result
object Main extends IOApp.Simple {
val buses = List("110")
val trams = List("31", "33")
def run =
HttpClientFs2Backend
.resource[IO]()
.use(backend => request(backend, buses, trams))
.flatMap(IO.println)
}
object Main extends IOApp.Simple {
val buses = List("110")
val trams = List("31", "33")
def run =
HttpClientFs2Backend
.resource[IO]()
.use(backend => request(backend, buses, trams))
.flatMap(IO.println)
}
Execution result
$ scala-cli sttp-client.scala
List(
Record(31,51.141502,16.95872,22475890), Record(31,51.110912,17.02159,22475017), Record(31,51.07934,17.050734,22475050),
Record(31,51.12252,17.011976,22475871), Record(31,51.097458,17.03275,22475942), Record(110,51.096992,17.037682,22312466),
Record(33,51.112633,16.99349,22476133), Record(33,51.107376,17.035055,22476039), Record(33,51.11388,17.1032,22476064),
Record(33,51.10771,17.040272,22476110)
)
but that k: Int // 🤨
Let's hide the API call behind an interface
trait Vehicles[F[_]] {
def list(): F[Seq[Vehicle]]
}
- Notice the
Vehicle
type - it needs better fields thanx
,y
,k
case class Vehicle(
lineName: Vehicle.LineName,
measuredAt: Instant,
position: Position,
id: Vehicle.Id // no more `k` 🎉
)
case class Vehicle(
lineName: Vehicle.LineName,
measuredAt: Instant,
position: Position,
id: Vehicle.Id
)
case class Position(latitude: Double, longitude: Double)
object Vehicle {
case class Id(value: String) extends AnyVal
case class LineName(value: String) extends AnyVal
}
Our existing code
val apiUri = uri"https://mpk.wroc.pl/bus_position"
def payload(buses: List[String], trams: List[String]) =
(trams.map(v => s"busList[tram][]=$v") ++ buses.map(v => s"busList[bus][]=$v")).mkString("&")
case class Record(
name: String,
x: Double,
y: Double,
k: Int
) derives Codec.AsObject // This will generate JSON Encoder and Decoder
def request(backend: SttpBackend[IO, Any], buses: List[String], trams: List[String]): IO[List[Record]] =
basicRequest
.post(apiUri)
.body(payload(buses, trams)) // Something like busList[bus][]=110&busList[tram][]=31&busList[tram][]=33
.contentType(MediaType.ApplicationXWwwFormUrlencoded)
.response(asJson[List[Record]])
.send(backend)
.map(_.body) // We are only interested in the result
.rethrow // Fail `IO` on all errors, we are being simple here
How hard can it be?
trait Vehicles[F[_]] {
def list(): F[Seq[Vehicle]]
// 👆 each time we call this, we receive latest data from API
}
object Vehicles {
def mpkWrocInstance(
backend: SttpBackend[IO, Any],
buses: List[String],
trams: List[String]
): Vehicles[IO] = ??? // make your guess
}
Just wrap the request
method
trait Vehicles[F[_]] {
def list(): F[Seq[Vehicle]]
// 👆 each time we call this, we receive latest data from API
}
object Vehicles {
def mpkWrocInstance(
backend: SttpBackend[IO, Any],
buses: List[String],
trams: List[String]
): Vehicles[IO] = { () =>
(request(backend, buses, trams), IO.realTimeInstant).mapN {
(responses, now) =>
responses.map { record =>
Vehicle(
lineName = Vehicle.LineName(record.name),
measuredAt = now,
position = Position(record.x, record.y),
id = Vehicle.Id(record.k.toString)
)
}
}
}
}
trait Vehicles[F[_]] {
def list(): F[Seq[Vehicle]]
}
Now we have an abstract way to fetch meaningful data
case class Vehicle(
lineName: Vehicle.LineName,
measuredAt: Instant,
position: Position,
id: Vehicle.Id
)
In the next step we'll be looking into the distance covered between two measurements
case class Vehicle(
lineName: Vehicle.LineName,
measuredAt: Instant,
position: Position,
id: Vehicle.Id
){
// calculate distance in meters
def distance(other: Vehicle): Double = ???
}
case class Vehicle(
lineName: Vehicle.LineName,
measuredAt: Instant,
position: Position,
id: Vehicle.Id
){
// It's a shameless copy-paste from StackOverflow 😅
def distance(other: Vehicle): Double = {
val earthRadius = 6371000 // Earth's radius in meters
val lat1 = toRadians(position.latitude)
val lon1 = toRadians(position.longitude)
val lat2 = toRadians(other.position.latitude)
val lon2 = toRadians(other.position.longitude)
val dlon = lon2 - lon1
val dlat = lat2 - lat1
val a = pow(sin(dlat / 2), 2) + cos(lat1) * cos(lat2) * pow(sin(dlon / 2), 2)
val c = 2 * atan2(sqrt(a), sqrt(1 - a))
val distance = earthRadius * c
distance
}
}
The point is, we can now calculate distance covered by vehicle
val measurement1: Vehicle = ???
val measurement2: Vehicle = ???
measurement1.distance(measurement2) // like this
It calculates Geographical Distance but Euclidean Distance would be good enough
- Find data source of vehicle positions ✅
- Fetch and parse ✅
- Fetch some more 🛠️
- Calculate diffs 🛠️
- Produce statistics
Think of a sequence of data elements that
- Computes values on demand; is lazy
- Can be infinite or finite
- Can be asynchronous - supports non-blocking operations
- Has a powerful API - offers rich combinators
Short intro
- Create an infinite stream of natural numbers
1, 2, 3, 4, 5, ...
- Keep only the odd ones
1, 3, 5, 7, ...
- Slide through by 3 elements
(1, 3, 5), (3, 5, 7), (5, 7, 9), ...
- Add each group
9, 15, 21, ...
- Take first
10
Here's how you do this
Stream
.iterate(1)(_ + 1) // Create an infinite stream of natural numbers
.filter(_ % 2 != 0) // Filter for odd numbers: 1, 3, 5, 7, 9, ...
.sliding(3) // Slides over each 3 elements: (1, 3, 5), (3, 5, 7), (5, 7, 9), ...
.map { chunk =>
chunk(0) + chunk(1) + chunk(2) // Add them together: 9, 15, 21, ...
}
.take(10) // Fetch the first ten results
Brought to you with https://zainab-ali.github.io/aquascape
- Upon each
N
seconds call the API for new data - Compare current result with previous one
- Find how vehicle moved, calculate diff
- After a fixed amount of updates, show some statistics
We want to build a stream that:
- Is infinite
- Acts on given time interval like every
N
seconds - Lists vehicles using
Vehicles[IO].list()
- Joins results by vehicle
id
- Calculate the distance using
Vehicle.distance
and the elapsed time - Build a map of
(Vehicle.LineName, Vehicle.Id) -> (Distance, AVG Speed)
Infinite stream that lists vehicles every N
seconds
Infinite stream that lists vehicles every N
seconds
val interval = 7.seconds
def stats(vehicles: Vehicles[IO]): IO[Map[(LineName, Id), VehicleStats]] =
fs2.Stream
.fixedRateStartImmediately[IO](interval)
.evalMap(_ => vehicles.list())
// TBC
Easy right?
- Is infinite ✅
- Acts on given time interval like every
N
seconds ✅ - When the time comes - it lists vehicles using
Vehicles[IO].list()
✅ - Joins previous and current result by vehicle
id
- Calculate the distance using
Vehicle.distance
and the elapsed time - Build a map of
(Vehicle.LineName, Vehicle.Id) -> VehicleStats
<style scoped> /* Center the image */ p { text-align: center; } </style>
Slide over data, take current and previous measurement and calculate the diff
Slide over data, take current and previous measurement and calculate the diff
def stats(vehicles: Vehicles[IO]): IO[Map[(LineName, Id), VehicleStats]] =
fs2.Stream
.fixedRateStartImmediately[IO](interval)
.evalMap(_ => vehicles.list())
.sliding(2)
.map(chunk => calculateDiff(chunk(0), chunk(1))) // That needs explaining
// TBC
Given two measurements, we want to produce a diff
(Vehicle, Vehicle) => VehiclePositionDiff
case class VehiclePositionDiff(
line: Vehicle.LineName,
id: Vehicle.Id,
secondsDuration: Double,
metersDistance: Double
)
case class VehiclePositionDiff(
line: Vehicle.LineName,
id: Vehicle.Id,
secondsDuration: Double,
metersDistance: Double
)
object VehiclePositionDiff {
def between(v1: Vehicle, v1: Vehicle): VehiclePositionDiff = {
val duration = secondDuration(v1.measuredAt, v2.measuredAt)
val distance = v1.distance(v2)
VehiclePositionDiff(v1.lineName, v1.id, duration, distance)
}
}
Now that we have (Vehicle, Vehicle) => VehiclePositionDiff
Let's do this for multiple measurements
(Seq[Vehicle], Seq[Vehicle]) => Seq[VehiclePositionDiff]
(Seq[Vehicle], Seq[Vehicle]) => Seq[VehiclePositionDiff]
Important part
def calculateDiff(snapshot1: Seq[Vehicle], snapshot2: Seq[Vehicle]): Seq[VehiclePositionDiff] =
snapshot1
.join(snapshot2)
.map((v1, v2) => VehiclePositionDiff.between(v1, v2))
Important part
def calculateDiff(snapshot1: Seq[Vehicle], snapshot2: Seq[Vehicle]): Seq[VehiclePositionDiff] =
snapshot1
.join(snapshot2)
.map( (v1, v2) =>
VehiclePositionDiff.between(v1, v2)
)
Boring stuff, not optimal
extension (snapshot: Seq[Vehicle]) {
def join(snapshot2: Seq[Vehicle]): Seq[(Vehicle, Vehicle)] =
snapshot.flatMap{ v1 => snapshot2.collect { case v2 if v2.id == v1.id => (v1, v2)} }
}
Where are we again?
def stats(vehicles: Vehicles[IO]): IO[Map[(LineName, Id), VehicleStats]] =
fs2.Stream
.fixedRateStartImmediately[IO](interval)
.evalMap(_ => vehicles.list())
.sliding(2)
.map(chunk => calculateDiff(chunk(0), chunk(1))) // basically calculate derivative
// TBC
Our stream lists vehicles every interval
and calculates a list of diffs
- Is infinite ✅
- Acts on given time interval like every
N
seconds ✅ - When the time comes - it lists vehicles using
Vehicles[IO].list()
✅ - Joins previous and current result by vehicle
id
✅ - Calculate the distance using
Vehicle.distance
and the elapsed time ✅ - Build a map of
(Vehicle.LineName, Vehicle.Id) -> VehicleStats
Let's calculate the stats
def stats(vehicles: Vehicles[IO]): IO[Map[(LineName, Id), VehicleStats]] =
fs2.Stream
.fixedRateStartImmediately[IO](interval)
.evalMap(_ => vehicles.list())
.sliding(2)
.map(chunk => calculateDiff(chunk(0), chunk(1)))
.take(numberOfSamples)
.fold(Map.empty)(summarize) // 👈 `summarize` needs explaining
.compile
.lastOrError
The summarize
describes incremental step of building the summary
def summarize(
previousSummary: Map[(Vehicle.LineName, Vehicle.Id), VehicleStats],
nextDiff: Seq[VehiclePositionDiff]
): Map[(LineName, Id), VehicleStats] = {
val currentSummary =
nextDiff
.groupMapReduce
(d => (d.line, d.id)) // group by line and id
(diff => VehicleStats(diff.metersDistance, diff.secondsDuration)) // map to VehicleStats
((a, b) => a) // in case there were multiple results, takes first
Monoid.combine(previousSummary, currentSummary) // Magic 🌠
}
def stats(vehicles: Vehicles[IO]): IO[Map[(LineName, Id), VehicleStats]] =
fs2.Stream
.fixedRateStartImmediately[IO](interval)
.evalMap(_ => vehicles.list())
.sliding(2)
.map(chunk => calculateDiff(chunk(0), chunk(1)))
.take(numberOfSamples)
.fold(Map.empty)(summarize)
.compile
.lastOrError
object Main extends IOApp.Simple {
def run =
HttpClientFs2Backend
.resource[IO]()
.use(backend => program(backend) *> IO.println("Program finished"))
private val trams = List("8", "16", "18", "20", "21", "22")
private val buses = List("124", "145", "149")
def program(backend: SttpBackend[IO, Any]) = for {
vehicles = Vehicles.mpkWrocInstance(backend, buses, trams)
stats <- StatsCalculator.stats(vehicles)
aggregate = StatsCalculator.aggregateLines(stats)
fastest = aggregate.maxBy((line, stats) => stats.avgSpeedKMH)
slowest = aggregate.minBy((line, stats) => stats.avgSpeedKMH)
avg = aggregate.values.map(_.avgSpeedKMH).reduce((a, b) => (a + b) / 2)
_ <- IO.println(s"Fastest: $fastest")
_ <- IO.println(s"Slowest: $slowest")
_ <- IO.println(s"Average: $avg")
} yield ()
}
Captured at 20.03.2024 09:48
Parameters
val interval = 9.seconds
val numberOfSamples = 72
- Buses 🚌
- Fastest: 20.7 km/h - line
149
- Slowest: 14.4 km/h - line
124
- Fastest: 20.7 km/h - line
- Trams 🚊
- Fastest: 13.7 km/h - line
21
- Slowest: 11.4 km/h - line
16
- Fastest: 13.7 km/h - line
- Buses 🚌
- Fastest: 20.7 km/h - line
149
- Slowest: 14.4 km/h - line
124
- Fastest: 20.7 km/h - line
- Trams 🚊
- Fastest: 13.7 km/h - line
21
- Slowest: 11.4 km/h - line
16
- Fastest: 13.7 km/h - line
- Average: 15.8 km/h
- Data is not easy to find
- Separation of concerns is important
- Streams can be a nice way to model business logic
- Aquascape is fun!
<style scoped> /* Styling for centering (required in default theme) */ h1, h2, h3, h4, h5, p, ul, li { text-align: center; } </style>
Keep in touch! 🤝
Blog: blog.michal.pawlik.dev Linkedin: Michał Pawlik Github: majk-p Mastodon: @majkp@hostux.social
Thanks to how monoids compose, you can combine two maps for free
//> using toolkit typelevel:latest
import cats.Monoid
val a = Map[String, Int]("foo" -> 1, "bar" -> 5)
val b = Map[String, Int]("bar" -> 3, "baz" -> 0)
println(
Monoid.combine(a, b)
)
Thanks to how monoids compose, you can combine two maps for free
//> using toolkit typelevel:latest
import cats.Monoid
val a = Map[String, Int]("foo" -> 1, "bar" -> 5)
val b = Map[String, Int]("bar" -> 3, "baz" -> 0)
println(
Monoid.combine(a, b)
)
The result is
$ scala-cli monoid.sc
Compiling project (Scala 3.3.0, JVM)
Compiled project (Scala 3.3.0, JVM)
Map(bar -> 8, baz -> 0, foo -> 1)
So when we provide a Monoid
for VehicleStats
like this
given Monoid[VehicleStats] with {
override def combine(x: VehicleStats, y: VehicleStats): VehicleStats =
VehicleStats(
x.metersDistance + y.metersDistance,
x.secondsDuration + y.secondsDuration
)
override def empty: VehicleStats = VehicleStats(0,0)
}
This works for free
Monoid.combine(previousSummary, currentDiffSummary) // combine previous stats with current one together