In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setMaster("local[4]").setAppName("etl").set("spark.scheduler.mode", "FAIR").set("spark.jars", "resources/jars/sqlite-jdbc-3.32.3.2.jar")
conf = conf.set("spark.driver.memory", "2g")

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


In [8]:
db_url = "jdbc:sqlite:resources/source_db/fsdata.db"

source_tables = {"checkins", "ratings", "socialgraph", "users",  "venues"}

source_df = {}
for table in source_tables:
    source_df[table] = spark.read.format('jdbc').options(url=db_url, dbtable=table ,driver='org.sqlite.JDBC').load()
    source_df[table].createOrReplaceTempView(table)

source_df

{'socialgraph': DataFrame[first_user_id: int, second_user_id: int],
 'checkins': DataFrame[id: int, user_id: int, venue_id: int, latitude: double, longitude: double, created_at: string],
 'users': DataFrame[id: int, latitude: double, longitude: double],
 'venues': DataFrame[id: int, latitude: double, longitude: double],
 'ratings': DataFrame[user_id: int, venue_id: int, rating: int]}

In [54]:
checkins_augment = spark.sql("""select c.id, c.user_id, venue_id, v.latitude, v.longitude, c.latitude c_latitude , c.longitude c_longitude, 
sqrt( pow(v.latitude - c.latitude, 2) + pow(v.longitude - c.longitude, 2)) dst,
timestamp(created_at) as created_at, 
count(1) over(partition by user_id) total_chk, 
row_number() over(partition by user_id order by timestamp(created_at)) checkin_seq, 
count(1) over(partition by user_id, venue_id) total_venue_chk
from checkins c left join venues v on v.id = c.venue_id """)
checkins_augment.createOrReplaceTempView("checkins_agg")

In [76]:
socical_graph_rollup = spark.sql("select first_user_id, collect_set(second_user_id) friends from socialgraph group by 1")
socical_graph_rollup.createOrReplaceTempView("socical_graph_rollup")
checkins_rollup = spark.sql("""select c.user_id, collect_list(struct(c.id, c.venue_id, c.latitude, c.longitude, c.c_latitude, c.c_longitude, c.created_at, r.rating, c.total_chk, c.checkin_seq, c.total_venue_chk, c.dst)) chks, 
count(1) total_chk,
count(distinct c.venue_id) distinct_chk,
approx_percentile(dst, 0.50) median_dst, 
approx_percentile(r.rating, 0.50) median_rating,
approx_percentile(r.rating, 0.95) p95_rating
from  checkins_agg c left join (select user_id, venue_id, avg(rating) rating from ratings group by 1,2) r on r.user_id = c.user_id and r.venue_id = c.venue_id group by 1 """)
checkins_rollup.createOrReplaceTempView("checkins_rollup")
profiles = spark.sql("""select u.id, u.latitude u_latitude, u.longitude u_longitude, c.*, g.friends
from users u 
inner join checkins_rollup c on u.id = c.user_id
left join socical_graph_rollup g on u.id = g.first_user_id
order by total_chk desc
""")

Note that a user can leave rating without visiting the place

* We can not link them to the checkin anyway

In [86]:
profiles.createOrReplaceTempView("profiles")

In [79]:
profiles.write.format("orc").mode("overwrite").save("data/profiles")

In [82]:
checkin_asl = spark.sql("select c.user_id, c.venue_id, max(c.total_venue_chk)  total_venue_chk, avg(r.rating) rating from checkins_agg c left join (select user_id, venue_id, avg(rating) rating from ratings group by 1,2)  r on r.user_id = c.user_id and r.venue_id = c.venue_id group by 1,2")
checkin_asl.write.format("orc").mode("overwrite").save("data/checkins")

In [100]:
profiles.show(10, 100)

profiles.take(1)

+-------+----------+------------+-------+----------------------------------------------------------------------------------------------------+---------+------------+-------------------+------------------+------------------+----------------------------------------------------------------------------------------------------+
|     id|u_latitude| u_longitude|user_id|                                                                                                chks|total_chk|distinct_chk|         median_dst|     median_rating|        p95_rating|                                                                                             friends|
+-------+----------+------------+-------+----------------------------------------------------------------------------------------------------+---------+------------+-------------------+------------------+------------------+----------------------------------------------------------------------------------------------------+
|1348362|47.6062095|-122.

[Row(id=1348362, u_latitude=47.6062095, u_longitude=-122.3320708, user_id=1348362, chks=[Row(id=225787, venue_id=39731, latitude=47.6239018, longitude=-122.3209124, c_latitude=0.0, c_longitude=0.0, created_at=datetime.datetime(2011, 12, 14, 15, 41, 29), rating=2.0, total_chk=57, checkin_seq=12, total_venue_chk=2, dst=131.26477681779113), Row(id=406394, venue_id=39731, latitude=47.6239018, longitude=-122.3209124, c_latitude=0.0, c_longitude=0.0, created_at=datetime.datetime(2011, 12, 27, 5, 17, 45), rating=2.0, total_chk=57, checkin_seq=22, total_venue_chk=2, dst=131.26477681779113), Row(id=25078, venue_id=169552, latitude=47.612879, longitude=-122.318902, c_latitude=0.0, c_longitude=0.0, created_at=datetime.datetime(2011, 12, 9, 11, 20, 48), rating=2.0, total_chk=57, checkin_seq=1, total_venue_chk=2, dst=131.2589045861432), Row(id=641158, venue_id=169552, latitude=47.612879, longitude=-122.318902, c_latitude=0.0, c_longitude=0.0, created_at=datetime.datetime(2012, 1, 31, 4, 47, 52), ra

In [99]:
checkin_asl.show(10, 100)

+-------+--------+---------------+------------------+
|user_id|venue_id|total_venue_chk|            rating|
+-------+--------+---------------+------------------+
|     57|    1179|              1|               2.5|
|    111|    2297|              2|               2.0|
|    551|     208|              1|               2.0|
|   1419|   29488|              1|               2.0|
|   1420|    5222|              1|               2.0|
|   1762|  164449|              1|               3.5|
|   2286|    5443|              1|3.6666666666666665|
|   2760|  102410|              1|               2.0|
|   3176|   12750|              1|               2.0|
|   3534|  628045|              1|               2.0|
+-------+--------+---------------+------------------+
only showing top 10 rows



In [95]:
profiles_complete = spark.sql("""
select p.*, g.friends_profile from profiles p left join (
select first_user_id user_id, collect_list(struct(p.user_id, p.chks)) friends_profile
from socialgraph g inner join profiles p  on g.second_user_id = p.id group by 1 ) g 
on g.user_id = p.user_id
""")

In [97]:
profiles_complete.write.format("orc").mode("overwrite").save("data/profiles_complete")

In [101]:
profiles_complete.take(1)

[Row(id=148, u_latitude=32.8140177, u_longitude=-96.9488945, user_id=148, chks=[Row(id=432205, venue_id=9310, latitude=32.897605780066, longitude=-97.0405197143555, c_latitude=32.8140177, c_longitude=-96.9488945, created_at=datetime.datetime(2012, 1, 23, 22, 40, 1), rating=2.0, total_chk=2, checkin_seq=1, total_venue_chk=1, dst=0.12402478395390161), Row(id=489937, venue_id=7489, latitude=42.2077950049541, longitude=-83.3562970161438, c_latitude=32.8140177, c_longitude=-96.9488945, created_at=datetime.datetime(2012, 1, 26, 10, 45, 41), rating=2.0, total_chk=2, checkin_seq=2, total_venue_chk=1, dst=16.522764853776877)], total_chk=2, distinct_chk=2, median_dst=0.12402478395390161, median_rating=2.0, p95_rating=2.0, friends=[88382, 88379, 52, 88386, 88383, 88380, 56, 88387, 88384, 88381, 88385, 7, 88378], friends_profile=[Row(user_id=88382, chks=[Row(id=322364, venue_id=310280, latitude=37.4860284355555, longitude=-122.228608131409, c_latitude=37.3541079, c_longitude=-121.9552356, created_