Let's assume we have two streams of data that are sent independently —to different Kafka topics, or different Data Sources via Events API or whatever:
GPS position:
{
"timestamp": "2022-10-27T11:43:02",
"vehicle_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
"latitude": 40.4169866,
"longitude": -3.7034816
}
{
"timestamp": "2022-10-27T11:44:03",
"vehicle_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
"latitude": 40.4169867,
"longitude": -3.7034818
}
and Vehicle data:
{
"timestamp": "2022-10-27T11:43:02",
"vehicle_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
"speed": 91,
"fuel_level_percentage": 85
}
{
"timestamp": "2022-10-27T11:44:03",
"vehicle_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
"speed": 89,
"fuel_level_percentage": 84
}
Sometimes joining them at query time, that is, in the pipe whose output is an API Endpoint, is perfectly fine, and we recommend starting there and move to storing the joined data into a different Data Source only when neccessary.
But it is true that sometimes, due to performance needs, we want them joined using timestamp and vehicle_id in another Data Source:
timestamp | vehicle_id | latitude | longitude | speed | fuel_level_percentage |
---|---|---|---|---|---|
2022-10-27T11:43:02 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.4169866 | -3.7034816 | 91 | 85 |
2022-10-27T11:44:03 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.4169867 | -3.7034818 | 89 | 84 |
So, our first thought would be to create a Materialized View that joins both streams:
NODE mat_node
SQL >
SELECT timestamp, vehicle_id, latitude, longitude, speed, fuel_level_percentage
FROM gps_data g
JOIN
(
SELECT timestamp, vehicle_id, speed, fuel_level_percentage
FROM vehicle_data
WHERE (timestamp, vehicle_id) IN (SELECT timestamp, vehicle_id FROM gps_data)
) v
ON g.vehicle_id = v.vehicle_id
AND g.timestamp = v.timestamp
TYPE materialized
DATASOURCE mat_node_mv
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "vehicle_id, timestamp"
But note that we don't have control ove when data arrives to Tinybird and when it is ingested. Probably someone would already have spotted the issue with the MV but let's simulate it to see what happens:
cd dataproject0_the_issue
tb auth
tb push
. ../clean_and_ingest_rows.sh
tb sql "select * from vehicle_data"
#----------------------------------------------------------------------------------------------
#| timestamp | vehicle_id | speed | fuel_level_percentage |
#----------------------------------------------------------------------------------------------
#| 2022-10-27 11:44:03 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 91 | 85 |
#| 2022-10-27 11:43:02 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 91 | 85 |
#----------------------------------------------------------------------------------------------
tb sql "select * from gps_data"
#--------------------------------------------------------------------------------------
#| timestamp | vehicle_id | latitude | longitude |
#--------------------------------------------------------------------------------------
#| 2022-10-27 11:43:02 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.7034817 |
#| 2022-10-27 11:44:03 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.7034817 |
#--------------------------------------------------------------------------------------
tb sql "select * from mv_joined_data"
#----------------------------------------------------------------------------------------------------------------------
#| timestamp | vehicle_id | latitude | longitude | speed | fuel_level_percentage |
#----------------------------------------------------------------------------------------------------------------------
#| 2022-10-27 11:43:02 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.7034817 | 91 | 85 |
#----------------------------------------------------------------------------------------------------------------------
What happened here? Why are we missing one row in the mat_gps_join_vehicle Data Source? From the docs:
Materialized Views generated using JOIN clauses are tricky. The resulting Data Source will be only automatically updated if and when a new operation is performed over the Data Source in the FROM.
So, to overcome this issue there are several alternatives, each one with its tradeoffs.
The easiest way would be to add another pipe that does the JOIN the other way:
NODE mat_node
SQL >
SELECT timestamp, vehicle_id, latitude, longitude, speed, fuel_level_percentage
FROM vehicle_data v
JOIN
(
SELECT timestamp, vehicle_id, latitude, longitude
FROM gps_data
WHERE (timestamp, vehicle_id) IN (SELECT timestamp, vehicle_id FROM vehicle_data)
) g
ON g.vehicle_id = v.vehicle_id
AND g.timestamp = v.timestamp
TYPE materialized
DATASOURCE mat_node_mv
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "vehicle_id, timestamp"
Testing it:
tb workspace clear --yes
cd ../dataproject1_two_MVs_join
cp ../dataproject0_the_issue/.tinyb ./
tb push
. ../clean_and_ingest_rows.sh
tb sql "select * from vehicle_data"
tb sql "select * from gps_data"
tb sql "select * from mv_joined_data_from_2_pipes"
Now we do have the expected 2 rows:
----------------------------------------------------------------------------------------------------------------------
| timestamp | vehicle_id | latitude | longitude | speed | fuel_level_percentage |
----------------------------------------------------------------------------------------------------------------------
| 2022-10-27 11:44:03 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.703482 | 89 | 84 |
| 2022-10-27 11:43:02 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.7034817 | 91 | 85 |
----------------------------------------------------------------------------------------------------------------------
However, with high rates of ingest, there may be race conditions that lead to missing or duplicated rows, which may be solved by using a ReplacingMergeTree as the target Data Source.
Also, depending on your scale and how well queries and sorting keys are defined, the JOIN approach can lead to memory errors.
So, if you face these errors or if you need a lot of accuracy, consider these other 2 options:
It seems a bit strange going for a AggregatingMergeTree, but what we really want from here is its ability to materialize the streams into a DS independently and then the background process and the deduplication at query time would take care of joining them.
tb workspace clear --yes
cd ../dataproject2_two_MVs_AggregatingMT
cp ../dataproject0_the_issue/.tinyb ./
tb push
. ../clean_and_ingest_rows.sh
tb sql "select * from vehicle_data"
tb sql "select * from gps_data"
tb sql "
select
timestamp,
vehicle_id,
argMaxMerge(latitude) latitude,
argMaxMerge(longitude)
longitude, argMaxMerge(speed) speed,
argMaxMerge(fuel_level_percentage) fuel_level_percentage
from mv_combined_data_amt
group by timestamp, vehicle_id"
Copy Pipes can help us overcome some of the limitations of MVs for this use case, but some assumptions are needed.
- We need to define a time window that we think is safe for our usecase. Otherwise we would be scanning the entire Data Source on every copy job and would make the solution prohibitive. In the example in this pipe we are taking 10 mins, so if some messages takes longer we may lose them in the joined target Data Source.
tb workspace clear --yes
cd ../dataproject3_using_copy_pipes
cp ../dataproject0_the_issue/.tinyb ./
tb push
. ../clean_and_ingest_rows.sh
tb sql "select * from vehicle_data"
tb sql "select * from gps_data"
tb pipe copy run copy_join --yes
# You can check the copy status with `tb job details` and then query the data source once status is done.
tb sql "select * from ds_joined_data"
#----------------------------------------------------------------------------------------------------------------------
#| timestamp | vehicle_id | latitude | longitude | speed | fuel_level_percentage |
#----------------------------------------------------------------------------------------------------------------------
#| 2022-10-27 11:43:02 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.7034817 | 91 | 85 |
#| 2022-10-27 11:44:03 | 8d1e1533-6071-4b10-9cda-b8429c1c7a67 | 40.41699 | -3.703482 | 89 | 84 |
#----------------------------------------------------------------------------------------------------------------------
If freshness is a hard requierement for your API Endpoint —and, in Tinybird, it usually is—, this approach can be combined with joining at query time. But these is way more performant than joining everything at query time since with this approach you only have to join the data that was not processed in the latest copy batch. Also, with this kappa (batch join + realtime join) approach, we could relax the frequency of the scheduled copy operations.
The kappa pipe is equivalent* to copy_join.pipe, only that at the end wwe retrieve the data from ds_joined_data too.
--(... same as copy_join)
NODE endpoint
DESCRIPTION >
and unioning them with the already processed
SQL >
SELECT * FROM ds_joined_data
UNION ALL
SELECT * FROM inner_join
*although applying some filters first if that matches your use case is highly recommended.