In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [68]:
lpp_static = sqlContext.read.format("org.apache.spark.sql.cassandra").load(table="lpp_static", keyspace="dice")
lpp_static.cache()
lpp_live = sqlContext.read.format("org.apache.spark.sql.cassandra").load(table="lpp_live", keyspace="dice")
lpp_live.cache()

DataFrame[station_int_id: int, route_int_id: int, arrival_time: timestamp, vehicle_int_id: int]

In [71]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, IntegerType

hourTokens = udf(lambda time: time.hour, IntegerType())
lpp_static = lpp_static.withColumn('hour', hourTokens(lpp_static.arrival_time))
lpp_live = lpp_live.withColumn('hour', hourTokens(lpp_live.arrival_time))

minuteTokens = udf(lambda time: time.minute, IntegerType())
lpp_static = lpp_static.withColumn('minute', minuteTokens(lpp_static.arrival_time))
lpp_live = lpp_live.withColumn('minute', minuteTokens(lpp_live.arrival_time))

dateTokens = udf(lambda time: time.date(), DateType())
lpp_static = lpp_static.withColumn('date', dateTokens(lpp_static.arrival_time))
lpp_live = lpp_live.withColumn('date', dateTokens(lpp_live.arrival_time))

In [72]:
lpp_live.first()

Row(station_int_id=2322, route_int_id=1581, arrival_time=datetime.datetime(2017, 4, 12, 7, 56, 34), vehicle_int_id=410, hour=7, minute=56, date=datetime.date(2017, 4, 12))

In [73]:
static = lpp_static.groupBy('station_int_id', 'route_int_id', 'date').count()
static.show()

+--------------+------------+----------+-----+
|station_int_id|route_int_id|      date|count|
+--------------+------------+----------+-----+
|          1920|         946|2017-04-08|    3|
|          2096|        1033|2017-04-12|    2|
|          3798|        1643|2017-04-07|    4|
|          2056|         911|2017-04-11|   11|
|          1963|         717|2017-04-12|   80|
|          1932|         726|2017-04-08|   19|
|          1946|         730|2017-04-13|   65|
|          1946|        1580|2017-04-09|    9|
|          1929|        1542|2017-04-13|   32|
|          2339|        1037|2017-04-07|    2|
|          1934|        1642|2017-04-10|    4|
|          2005|         364|2017-04-11|   35|
|          2006|         737|2017-04-14|   79|
|          1984|        1615|2017-04-09|   14|
|          1985|        1039|2017-04-10|    5|
|          1985|        1056|2017-04-11|    2|
|          1941|         727|2017-04-14|   40|
|          1941|        1196|2017-04-14|    2|
|          19

In [74]:
live = lpp_live.groupBy('station_int_id', 'route_int_id', 'date').count()
live.show()

+--------------+------------+----------+-----+
|station_int_id|route_int_id|      date|count|
+--------------+------------+----------+-----+
|          1963|         717|2017-04-12|    7|
|          1946|         730|2017-04-13|   29|
|          1946|        1580|2017-04-09|    7|
|          1929|        1542|2017-04-13|    2|
|          1927|        1186|2017-04-11|    4|
|          2005|         364|2017-04-11|    4|
|          2006|         737|2017-04-14|   34|
|          1985|        1039|2017-04-10|    2|
|          1941|         727|2017-04-14|   13|
|          1939|        1193|2017-04-08|   37|
|          1940|         346|2017-04-10|   38|
|          1944|         737|2017-04-14|   21|
|          1944|         791|2017-04-12|    1|
|          1944|        1563|2017-04-09|    9|
|          1976|        1563|2017-04-10|   29|
|          1976|        1582|2017-04-07|   10|
|          1976|        1587|2017-04-12|   38|
|          2083|         799|2017-04-07|    7|
|          19

In [76]:
comb = static.join(live, ['station_int_id', 'route_int_id', 'date', 'count'], 'inner').where('count > 50')
comb.show()

+--------------+------------+----------+-----+
|station_int_id|route_int_id|      date|count|
+--------------+------------+----------+-----+
|          1985|         717|2017-04-12|   79|
+--------------+------------+----------+-----+

