In [0]:
%sql
DROP TABLE test_events;
CREATE TABLE test_events (
    event_time TIMESTAMP,
    device_id STRING,
    user_id STRING,
    merged_id STRING
)USING DELTA
LOCATION 's3://foxpanel-test-data/test/default/test_events/';

-- User1 aliases: d_1, d_2, u_1, u_2
-- User2 aliases: d_3, u_3
-- User3 aliases: d_4
INSERT INTO test_events (event_time, device_id, user_id) VALUES
('2023-11-01', 'd_1', null),
('2023-11-02', 'd_1', 'u_1'),
('2023-11-03', 'd_1', 'u_1'),
('2023-11-04', 'd_2', null),
('2023-11-06', 'd_2', 'u_1'),
('2023-11-07', 'd_2', null),
('2023-11-08', 'd_2', 'u_2'),
('2023-11-09', 'd_3', null),
('2023-11-10', 'd_3', 'u_3'),
('2023-11-09', 'd_3', null),
('2023-11-10', 'd_4', null);

select * from test_events;

event_time,device_id,user_id,merged_id
2023-11-01T00:00:00Z,d_1,,
2023-11-02T00:00:00Z,d_1,u_1,
2023-11-03T00:00:00Z,d_1,u_1,
2023-11-04T00:00:00Z,d_2,,
2023-11-06T00:00:00Z,d_2,u_1,
2023-11-07T00:00:00Z,d_2,,
2023-11-08T00:00:00Z,d_2,u_2,
2023-11-09T00:00:00Z,d_3,,
2023-11-10T00:00:00Z,d_3,u_3,
2023-11-09T00:00:00Z,d_3,,


In [0]:
# Installing graphframes
#
# For DB Runtime 14.2 scala 2.12 use the following JAR from Maven.
# graphframes:graphframes:0.6.0-spark2.3-s_2.12

In [0]:
from pyspark.sql import SparkSession
from graphframes import *

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setCheckpointDir('s3://foxpanel-test-data/checkpoints/')


vertices = spark.sql(
    """
    select          
        device_id as id,
        min(event_time) as first_event_time -- selecting first event time for the device_id
        from test_events 
        where device_id is not null
        group by 1
    union all
    select         
        user_id as id,
        min(event_time) as first_event_time -- selecting first event time for the user_id
        from test_events 
        where user is not null
        group by 1
    """
)

vertices.write.option("path", "s3://foxpanel-test-data/test/default/vertices/").saveAsTable("vertices", mode="overwrite")


edges = spark.sql(
    """
    with directed as (
    select 
        distinct 
        device_id as src, -- device_id is always present
        coalesce(user_id, device_id) as dst -- if edge pointing then pointing itself
    from test_events
    )
    select src, dst from directed
    union 
    select dst, src from directed
    """
)

g = GraphFrame(e=edges, v=vertices)
res = g.connectedComponents()

res.show()

res.write.option("mergeSchema", "true").option("path", "s3://foxpanel-test-data/test/default/user_aliases/").saveAsTable("user_aliases", mode="overwrite")

+---+-------------------+------------+
| id|   first_event_time|   component|
+---+-------------------+------------+
|d_4|2023-11-10 00:00:00| 77309411328|
|d_3|2023-11-09 00:00:00|266287972352|
|d_1|2023-11-01 00:00:00|  8589934592|
|d_2|2023-11-04 00:00:00|  8589934592|
|u_2|2023-11-08 00:00:00|  8589934592|
|u_3|2023-11-10 00:00:00|266287972352|
|u_1|2023-11-02 00:00:00|  8589934592|
+---+-------------------+------------+



In [0]:
%sql
select
  id as alias,
  component, 
  first(id) over (partition by component order by first_event_time) as merged_id
from user_aliases 


alias,component,merged_id
d_1,8589934592,d_1
u_1,8589934592,d_1
d_2,8589934592,d_1
u_2,8589934592,d_1
d_4,77309411328,d_4
d_3,266287972352,d_3
u_3,266287972352,d_3
