In [150]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import to_timestamp, unix_timestamp

In [3]:
spark = SparkSession.builder.master("local[*]").appName("Opal tap")\
    .config("spark.sql.shuffle.partitions", "5") \
    .getOrCreate()

In [199]:
file_path = "./data/time_loc_20161226-20170101.csv"

In [34]:
raw_csv_file = (spark.read.format("csv")
                .option("header", "true")
                .option("inferSchema", "true")
#                 .option("mode", "FAILFAST")
                .load("./data/time_loc_20161226-20170101.csv"))

In [35]:
raw_csv_file.count()

169554

In [36]:
raw_csv_file.printSchema()

root
 |-- mode: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- tap: string (nullable = true)
 |-- time: string (nullable = true)
 |-- loc: string (nullable = true)
 |-- count: integer (nullable = true)



In [202]:
raw_csv_file.show(10)

+----+--------+---+-----+----+-----+
|mode|    date|tap| time| loc|count|
+----+--------+---+-----+----+-----+
| bus|20161226|off|00:00|2000|   21|
| bus|20161226|off|00:00|2010|   20|
| bus|20161226|off|00:00|2021|   18|
| bus|20161226|off|00:00|2022|   50|
| bus|20161226|off|00:00|2031|   22|
| bus|20161226|off|00:00|2033|   22|
| bus|20161226|off|00:00|2050|   23|
| bus|20161226|off|00:00|2155|   21|
| bus|20161226|off|00:15|2000|   37|
| bus|20161226|off|00:15|2026|   31|
+----+--------+---+-----+----+-----+
only showing top 10 rows



In [20]:
raw_csv_file.select("mode").distinct().show()

+---------+
|     mode|
+---------+
|    ferry|
|lightrail|
|    train|
|      bus|
+---------+



## Data is messy

### missing the "tap" field

In [201]:
raw_csv_file.filter("mode = 'ferry' AND date = '20170101'").show(10)

+-----+--------+-----+--------------------+---+-----+
| mode|    date|  tap|                time|loc|count|
+-----+--------+-----+--------------------+---+-----+
|ferry|20170101|00:00|Circular Quay, No...| 61| null|
|ferry|20170101|00:00|         Manly Wharf|148| null|
|ferry|20170101|00:15|Circular Quay, No...|281| null|
|ferry|20170101|00:15|  Balmain East Wharf| 42| null|
|ferry|20170101|00:15|         Manly Wharf| 62| null|
|ferry|20170101|00:30|Circular Quay, No...|365| null|
|ferry|20170101|00:30|Circular Quay, No...| 77| null|
|ferry|20170101|00:30|  Balmain East Wharf| 44| null|
|ferry|20170101|00:30|    Kirribilli Wharf| 20| null|
|ferry|20170101|00:30|         Manly Wharf| 40| null|
+-----+--------+-----+--------------------+---+-----+
only showing top 10 rows



### time is incorrect

In [217]:
raw_csv_file.filter(~raw_csv_file["time"].rlike("(\d\d:\d\d)")).show(10)
# raw_csv_file.filter("time rlike '\d\d:\d\d'").show(10)

+----+--------+---+----+---+-----+
|mode|    date|tap|time|loc|count|
+----+--------+---+----+---+-----+
| bus|20161226|off|  -1| -1|10594|
| bus|20161226| on|  -1| -1|  246|
| bus|20161227|off|  -1| -1|11203|
| bus|20161227| on|  -1| -1|  105|
| bus|20161228|off|  -1| -1|15858|
| bus|20161228| on|  -1| -1|  149|
| bus|20161229|off|  -1| -1|16212|
| bus|20161229| on|  -1| -1|  165|
| bus|20161230|off|  -1| -1|15957|
| bus|20161230| on|  -1| -1|  195|
+----+--------+---+----+---+-----+
only showing top 10 rows



### location is unknown

In [219]:
raw_csv_file.filter("loc = '-1'").show(10)
raw_csv_file.filter("loc = 'UNKNOWN'").show(10)

+----+--------+---+-----+---+-----+
|mode|    date|tap| time|loc|count|
+----+--------+---+-----+---+-----+
| bus|20161226|off|07:30| -1|   28|
| bus|20161226|off|07:45| -1|   57|
| bus|20161226|off|08:15| -1|   36|
| bus|20161226|off|08:30| -1|   41|
| bus|20161226|off|08:45| -1|   36|
| bus|20161226|off|09:00| -1|   53|
| bus|20161226|off|09:30| -1|   77|
| bus|20161226|off|09:45| -1|   65|
| bus|20161226|off|   -1| -1|10594|
| bus|20161226|off|10:00| -1|   62|
+----+--------+---+-----+---+-----+
only showing top 10 rows

+-----+--------+---+----+-------+-----+
| mode|    date|tap|time|    loc|count|
+-----+--------+---+----+-------+-----+
|ferry|20161226|off|  -1|UNKNOWN| 1479|
|ferry|20161226| on|  -1|UNKNOWN| 2070|
|ferry|20161227|off|  -1|UNKNOWN| 1667|
|ferry|20161227| on|  -1|UNKNOWN| 2131|
|ferry|20161228|off|  -1|UNKNOWN| 1722|
|ferry|20161228| on|  -1|UNKNOWN| 2216|
|ferry|20161229|off|  -1|UNKNOWN| 1497|
|ferry|20161229| on|  -1|UNKNOWN| 1996|
|ferry|20161230|off|  -1|UNKNO

In [89]:
# mode,date,tap,time,loc,count
opal_schema = StructType([
    StructField("mode", StringType(), True),
    StructField("timestamp_str", StringType(), True),
    StructField("tap", StringType(), True),
    StructField("loc", StringType(), True),
    StructField("count", IntegerType(), True)
])

In [273]:
def validate_record(line):
    """
    Validate the records to have the correct time, and loc fields
    
    Valid formats are:
    'lightrail,20170101,00:15,Convention Centre Light Rail,48'
    'ferry,20170101,00:00,"Circular Quay, No. 3 Wharf",61'
    'ferry,20161231,on,18:15,Watsons Bay Wharf,18'
    'ferry,20161231,on,18:15,"Circular Quay, No. 3 Wharf",68'

    """
    if "-1" in line or "unknown" in line.lower() or "mode" in line.lower():
        return False
    return True

In [274]:
def parse_record(line):
    """
    Parse and clean the records with following steps:
    1. Parse the line and put them aligning with the header 
       [mode,date,tap,time,loc,count]. Some records are missing
       the top on/off information.
    2. Add timestamp field by combining date and time.
    3. Drop date and time fields.
    
    """
    res = None
    p = r'(\w+),(\d{8}),(\w+),(\d\d:\d\d),"(.+)",(\d+)'
    m = re.match(p, line)
    if m:
        res = list(m.groups())
    else:
        p = r'(\w+),(\d{8}),(\w+),(\d\d:\d\d),(.+),(\d+)'
        m = re.match(p, line)
        if m:
            res = list(m.groups())
        else:
            p = r'(\w+),(\d{8}),(\d\d:\d\d),"(.+)",(\d+)'
            m = re.match(p, line)
            if m:
                res = list(m.groups())
            else:
                p = r'(\w+),(\d{8}),(\d\d:\d\d),(.+),(\d+)'
                m = re.match(p, line)
                if m:
                    res = list(m.groups())
    if res is None:
        print(line)
        return
    
    if len(res) == 5:
        res.insert(2, "")
        
    return [res[0], res[1]+' '+res[3], res[2], res[4], int(res[5])]

In [267]:
with open(file_path, "r") as f:
    for line in f:
        if validate_record(line):
            parse_record(line)

In [275]:
raw_file = (spark.read.text(file_path)
    .rdd.map(lambda r: r[0])
    .filter(validate_rec)
    .map(parse_record))
opal_df = raw_file.toDF(schema=opal_schema)
opal_df.count()

169052

In [269]:
opal_df.show(10)

+----+--------------+---+----+-----+
|mode| timestamp_str|tap| loc|count|
+----+--------------+---+----+-----+
| bus|20161226 00:00|off|2000|   21|
| bus|20161226 00:00|off|2010|   20|
| bus|20161226 00:00|off|2021|   18|
| bus|20161226 00:00|off|2022|   50|
| bus|20161226 00:00|off|2031|   22|
| bus|20161226 00:00|off|2033|   22|
| bus|20161226 00:00|off|2050|   23|
| bus|20161226 00:00|off|2155|   21|
| bus|20161226 00:15|off|2000|   37|
| bus|20161226 00:15|off|2026|   31|
+----+--------------+---+----+-----+
only showing top 10 rows



In [240]:
df2 = opal_df.withColumn("timestamp", to_timestamp("timestamp_str", "yyyyMMdd HH:mm"))

In [243]:
df2.filter("timestamp_str = '20161226 00:00' and mode = 'train'").show(10)

+-----+--------------+---+--------------------+-----+-------------------+
| mode| timestamp_str|tap|                 loc|count|          timestamp|
+-----+--------------+---+--------------------+-----+-------------------+
|train|20161226 00:00|off|    Ashfield Station|   75|2016-12-26 00:00:00|
|train|20161226 00:00|off|   Bankstown Station|   23|2016-12-26 00:00:00|
|train|20161226 00:00|off|Bondi Junction St...|   46|2016-12-26 00:00:00|
|train|20161226 00:00|off|     Burwood Station|   20|2016-12-26 00:00:00|
|train|20161226 00:00|off|     Campsie Station|   63|2016-12-26 00:00:00|
|train|20161226 00:00|off|     Central Station|  102|2016-12-26 00:00:00|
|train|20161226 00:00|off|    Eastwood Station|   23|2016-12-26 00:00:00|
|train|20161226 00:00|off|      Epping Station|   39|2016-12-26 00:00:00|
|train|20161226 00:00|off|   Fairfield Station|   34|2016-12-26 00:00:00|
|train|20161226 00:00|off|     Hornsby Station|   19|2016-12-26 00:00:00|
+-----+--------------+---+------------

In [249]:
df2.createOrReplaceTempView("opal")

In [278]:
sql_stmt = """
    SELECT 
        mode, 
        CAST(concat(DATE(timestamp), ' ', lpad(HOUR(timestamp), 2, '0'), ':00') AS timestamp) AS dt_hourly,
        loc, sum(count) AS total
    FROM opal
    WHERE mode = 'train' -- AND loc = 'Central Station'
    GROUP BY 1, 2, 3
    ORDER BY 3, 2
"""

df3 = spark.sql(sql_stmt)
df3.show(24)

+-----+-------------------+-------------------+-----+
| mode|          dt_hourly|                loc|total|
+-----+-------------------+-------------------+-----+
|train|2016-12-28 17:00:00|Albion Park Station|   20|
|train|2017-01-01 02:00:00|Albion Park Station|   19|
|train|2016-12-26 05:00:00|    Allawah Station|   24|
|train|2016-12-26 06:00:00|    Allawah Station|   49|
|train|2016-12-26 07:00:00|    Allawah Station|   40|
|train|2016-12-26 08:00:00|    Allawah Station|   97|
|train|2016-12-26 09:00:00|    Allawah Station|  153|
|train|2016-12-26 10:00:00|    Allawah Station|  153|
|train|2016-12-26 11:00:00|    Allawah Station|  145|
|train|2016-12-26 12:00:00|    Allawah Station|  121|
|train|2016-12-26 13:00:00|    Allawah Station|  157|
|train|2016-12-26 14:00:00|    Allawah Station|  108|
|train|2016-12-26 15:00:00|    Allawah Station|  111|
|train|2016-12-26 16:00:00|    Allawah Station|  126|
|train|2016-12-26 17:00:00|    Allawah Station|  123|
|train|2016-12-26 18:00:00| 

In [279]:
df3.count()

19550

In [271]:
raw_file.saveAsTextFile("/tmp/opal.csv")

In [276]:
# opal_df.select("timestamp_str").distinct().count()
opal_df.write.format("csv").mode("overwrite").save("/tmp/opal.csv")