In [7]:
import hsfs
import datetime
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp, from_unixtime, col
import sys

connection = hsfs.connection()
fs = connection.get_feature_store();

Connected. Call `.close()` to terminate connection gracefully.

In [8]:
fg1_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f1", StringType(), True)    
])

fg1=spark.read.csv("hdfs:///Projects/demo_fs_meb10179/Resources/1-out.csv", header=True, schema=fg1_schema)
fg1=fg1.sort(col("id"),col("ts"))
fg1.show()

+---+----------+---+
| id|        ts| f1|
+---+----------+---+
|  1|1611505151|f10|
|  1|1611505155|f11|
|  1|1611505156|f12|
|  2|1611505152|f20|
|  2|1611505157|f21|
|  2|1611505161|f22|
|  3|1611505153|f30|
|  3|1611505158|f31|
|  4|1611505159|f41|
|  4|1611505160|f42|
+---+----------+---+

In [9]:
fg2_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("ts", IntegerType(), True),
  StructField("f2", StringType(), True)    
])

fg2=spark.read.csv("hdfs:///Projects/demo_fs_meb10179/Resources/2-out.csv", header=True, schema=fg2_schema)
fg2=fg2.sort(col("id"),col("ts"))
fg2.show()

+---+----------+----+
| id|        ts|  f2|
+---+----------+----+
|  1|1611505152|f211|
|  1|1611505154|f210|
|  1|1611505155|f213|
|  1|1611505169|f214|
|  2|1611505154|f220|
|  2|1611505159|f221|
|  2|1611505162|f212|
|  3|1611505163|f230|
|  4|1611505165|f240|
+---+----------+----+

In [10]:
maprdd = fg2.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1] for y in x[1]}))
result_dict = dict(maprdd.collect()) 
result_dict

{1: {1611505152, 1611505169, 1611505154, 1611505155}, 2: {1611505154, 1611505162, 1611505159}, 3: {1611505163}, 4: {1611505165}}

In [11]:
map_broadcast = sc.broadcast(result_dict)

In [32]:
def take_closest(id, ts):
    return min(map_broadcast.value[id],key=lambda x:((ts-x) if ts >= x else sys.maxsize))
columns = ["id2","join_ts","ts2"]

my_rdd = fg1.rdd.map(lambda x: (x[0],x[1],take_closest(x[0],x[1])))
filtered_rdd = my_rdd.filter(lambda x: x[1] >= x[2])
join2=spark.createDataFrame(filtered_rdd,columns).sort(col("id2"),col("join_ts"))

# columns_to_drop = ['ts']
# join2 = join2.drop(*columns_to_drop)
join2.show()

+---+----------+----------+
|id2|   join_ts|       ts2|
+---+----------+----------+
|  1|1611505155|1611505155|
|  1|1611505156|1611505155|
|  2|1611505157|1611505154|
|  2|1611505161|1611505159|
+---+----------+----------+

In [36]:
joined = fg1.join(join2,(fg1.id==join2.id2) & (fg1.ts==join2.join_ts),how="inner")
drop_cols = ['id2', 'join_ts']
inter = joined.drop(*drop_cols)
inter.show(15)

+---+----------+---+----------+
| id|        ts| f1|       ts2|
+---+----------+---+----------+
|  1|1611505155|f11|1611505155|
|  1|1611505156|f12|1611505155|
|  2|1611505157|f21|1611505154|
|  2|1611505161|f22|1611505159|
+---+----------+---+----------+

In [44]:
inter = inter.alias('inter')
fg2 = fg2.alias('fg2')
final = inter.join(fg2,(inter.id==fg2.id) & (inter.ts2==fg2.ts),how="inner").select('inter.id', 'inter.ts', 'inter.f1', 'fg2.f2')
final.show(15)

+---+----------+---+----+
| id|        ts| f1|  f2|
+---+----------+---+----+
|  1|1611505155|f11|f213|
|  1|1611505156|f12|f213|
|  2|1611505157|f21|f220|
|  2|1611505161|f22|f221|
+---+----------+---+----+