### This project parses 3 TB nested Json file into csv using pyspark along with sparksql for further analysis

# Read Json, parse

## Import dependencies

In [1]:
import pyspark
from pyspark.sql import SQLContext
from scipy.interpolate import interp1d # import dependency

In [2]:
sc
sqlContext = SQLContext(sc)

## Read JSON File

In [61]:
bus_file='test.jsons'
bus = sqlContext.read.json(bus_file)
bus.registerTempTable("bus")

## Show Schema

In [None]:
bus.printSchema()

## load and apply SQL Query

In [63]:
with open("spark_extract.sql") as fr:
     query = fr.read()
output = sqlContext.sql(query)

## Flatten the list

### Method A

In [None]:
import itertools
def extract(parts):
    for p in parts:
        for o in itertools.izip(p.Line,p.Latitude,p.Longitude,p.RecordedAtTime,p.vehicleID,p.Trip,p.TripDate):
            yield o

### Method B

In [64]:
def parse_list(p):
    if p.ROUTE_ID!=None:
        return zip(p.ROUTE_ID,p.latitude,p.longitude,p.recorded_time\
                   ,p.vehicle_id,p.TRIP_ID,p.tripdate,p.SHAPE_ID\
                   ,p.STOP_ID,p.distance_stop,p.distance_shape,p.status,p.destination)
    else:
        return []

## Tranfer time to Unix time for interpolatation

In [80]:
import time
import dateutil.parser
def unix_time(x):
    dt = dateutil.parser.parse(x)
    return time.mktime(dt.timetuple())

## Interpolate function

## Method A

In [116]:
def findIncreasingList(parts):
    prev = 0
    for record in parts:
        if record[-1]<prev:
            return
        prev = record[-1]
        yield record

In [238]:
from scipy.interpolate import interp1d # import dependency
def predict(x):
    pre_x = [p[-1] for p in x if p[-1]!=None]
    if len(pre_x) >= 2:
        pre_y = [unix_time(p[1]) for p in x if p[-1]!=None]
        f = interp1d(pre_x, pre_y)
    else:
        return []
    return findIncreasingList([(p[0],p[2],p[3],f(p[-1]+p[-2]))\
                               for p in x if p[-1]!=None and (p[-1]+p[-2]) <= pre_x[-1]])

## method b

In [None]:
from scipy.interpolate import interp1d
def predict_map(x):
    train_y = [unix_time(p[3]) for p in x if p[-3]!=None ]
    if len(train_y) >= 2:
        train_x = [p[-3] for p in x if p[-3]!=None]
        f = interp1d(train_x, train_y)
        distance = [(p[-3]+p[-4]) for p in x \
                    if p[-3]!=None and (p[-3]+p[-4]) <= train_x[-1]]
        stoptimes = f(distance)
        stops = [p[-5] for p in x if p[-3]!=None]
    else:
        return[]
    return map(lambda a,b: (a,b), stops,stoptimes)
    #return [(p[-4],f(p[-2]+p[-3])) for p in x if (p[-2]!=None and p[-3]!=None) and (p[-2]+p[-3]) <= pre_x[-1]] 

## Groupby Date and Line & Apply Interpolation

## Simple Extraction

In [None]:
output.flatMap(parse_list)\
      .take(2)

## Group By TRIP_ID and Date

In [105]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),x)).groupByKey()\
      .map(lambda x: x[1])\
      .take(10)

[<pyspark.resultiterable.ResultIterable at 0x112c96d10>,
 <pyspark.resultiterable.ResultIterable at 0x112c96c90>,
 <pyspark.resultiterable.ResultIterable at 0x112c96150>,
 <pyspark.resultiterable.ResultIterable at 0x112c96a10>,
 <pyspark.resultiterable.ResultIterable at 0x112c96f90>,
 <pyspark.resultiterable.ResultIterable at 0x112c96e10>,
 <pyspark.resultiterable.ResultIterable at 0x112c95a10>,
 <pyspark.resultiterable.ResultIterable at 0x112c959d0>,
 <pyspark.resultiterable.ResultIterable at 0x112c95c90>,
 <pyspark.resultiterable.ResultIterable at 0x112c95050>]

## Groupbykey and Apply Interpolation

In [113]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),x)).groupByKey() \
      .map(lambda x: x[1]).take(10)

[<pyspark.resultiterable.ResultIterable at 0x112ceff50>,
 <pyspark.resultiterable.ResultIterable at 0x112d291d0>,
 <pyspark.resultiterable.ResultIterable at 0x112d292d0>,
 <pyspark.resultiterable.ResultIterable at 0x112d29390>,
 <pyspark.resultiterable.ResultIterable at 0x112d29510>,
 <pyspark.resultiterable.ResultIterable at 0x112d29690>,
 <pyspark.resultiterable.ResultIterable at 0x112d29810>,
 <pyspark.resultiterable.ResultIterable at 0x112d29990>,
 <pyspark.resultiterable.ResultIterable at 0x112d29b10>,
 <pyspark.resultiterable.ResultIterable at 0x112d29c10>]

In [148]:
a = max(sorted(output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),x)).groupByKey().mapValues(lambda x: len(x)).map(lambda x: x[1]).collect()))

14

In [237]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),(x[0],x[3],x[5],x[8],x[-4],x[-3])))\
      .groupByKey()\
      .flatMap(lambda x: predict(x[1]))\
      .take(10)

[(u'MTA NYCT_Q46',
  u'MTA NYCT_QV_W5-Weekday-110900_Q46_6',
  u'MTA_502327',
  array(1451520016.346892)),
 (u'MTA NYCT_Q46',
  u'MTA NYCT_QV_W5-Weekday-110900_Q46_6',
  u'MTA_502331',
  array(1451520119.0)),
 (u'MTA NYCT_Q20A',
  u'MTA NYCT_CS_W5-Weekday-113900_MISC_769',
  u'MTA_505023',
  array(1451520122.033537)),
 (u'MTA NYCT_Q20A',
  u'MTA NYCT_CS_W5-Weekday-113900_MISC_769',
  u'MTA_505024',
  array(1451520122.0558827)),
 (u'MTA NYCT_Q20A',
  u'MTA NYCT_CS_W5-Weekday-113900_MISC_769',
  u'MTA_505026',
  array(1451520338.3900702)),
 (u'MTA NYCT_BX8',
  u'MTA NYCT_WF_W5-Weekday-110800_BX8_13',
  u'MTA_100947',
  array(1451520014.054865)),
 (u'MTA NYCT_BX8',
  u'MTA NYCT_WF_W5-Weekday-110800_BX8_13',
  u'MTA_103181',
  array(1451520125.8062081)),
 (u'MTA NYCT_BX8',
  u'MTA NYCT_WF_W5-Weekday-110800_BX8_13',
  u'MTA_103183',
  array(1451520377.7902856)),
 (u'MTA NYCT_BX10',
  u'MTA NYCT_KB_W5-Weekday-111000_BX10_32',
  u'MTA_103960',
  array(1451519992.9190516)),
 (u'MTA NYCT_BX10',

## Remove the prefix, timezones and save as CSV

In [None]:
output.flatMap(parse_list)\
      .map(lambda x:((x[5],x[6]),(x[0],)).groupByKey() \
      .flatMap(lambda x: predict(x[1]))\
      .map(lambda x: ",".join(map(str, x)))\
      .map(lambda x: x.replace('MTA NYCT_', '').replace('MTABC_','').replace('MTA_','').replace('-05:00',''))\
      .saveAsTextFile('stoptimes')

# Read From CSV and SQL Manupilation

## Import dependencies

In [3]:
from pyspark.sql.types import *

## Reset Schemas and Indexing

In [None]:
customSchema = StructType([StructField("ROUTE_ID", StringType(), True),\
                           StructField("latitude", DoubleType(), True),\
                           StructField("longitude", DoubleType(), True),\
                           StructField("recorded_time", StringType(), True),\
                           StructField("vehicle_id", StringType(), True),\
                           StructField("TRIP_ID", StringType(), True),\
                           StructField("tripdate", DateType(), True),\
                           StructField("SHAPE_ID", StringType(), True),\
                           StructField("STOP_ID", StringType(), True),\
                           StructField("distance_stop", StringType(), True),\
                           StructField("distance_shape", StringType(), True),\
                           StructField("status", StringType(), True),\
                           StructField("destination", StringType(), True)])             

In [221]:
stop_times_schema = StructType([StructField("trip_id", StringType(), True),\
                           StructField("arrival_time", StringType(), True),\
                           StructField("departure_time", StringType(), True),\
                           StructField("stop_id", StringType(), True),\
                           StructField("stop_sequence", IntegerType(), True),\
                           StructField("pickup_type", IntegerType(), True),
                           StructField("drop_off_type", IntegerType(), True)])

In [222]:
real_stoptimes_schema = StructType([StructField("ROUTE_ID", StringType(), True),\
                           StructField("TRIP_ID", StringType(), True),\
                           StructField("STOP_ID", StringType(), True),\
                           StructField("time",IntegerType(), True)])

## Use CSV=>DF tool to read saved csv

In [223]:
real_stoptimes = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('stops.csv', schema = real_stoptimes_schema)

In [224]:
stoptimes = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('stop_times.txt',schema = stop_times_schema)

In [225]:
real_stoptimes.show()

+--------+--------------------+-------+----------+
|ROUTE_ID|             TRIP_ID|STOP_ID|      time|
+--------+--------------------+-------+----------+
|     Q46|QV_W5-Weekday-110...| 502327|1451520016|
|     Q46|QV_W5-Weekday-110...| 502331|1451520119|
|    Q20A|CS_W5-Weekday-113...| 505023|1451520122|
|    Q20A|CS_W5-Weekday-113...| 505024|1451520122|
|    Q20A|CS_W5-Weekday-113...| 505026|1451520338|
|     BX8|WF_W5-Weekday-110...| 100947|1451520014|
|     BX8|WF_W5-Weekday-110...| 103181|1451520126|
|     BX8|WF_W5-Weekday-110...| 103183|1451520378|
|    BX10|KB_W5-Weekday-111...| 103960|1451519993|
|    BX10|KB_W5-Weekday-111...| 103966|1451520202|
|    BX10|KB_W5-Weekday-111...| 103962|1451520342|
|    BX10|KB_W5-Weekday-111...| 103954|1451520347|
|     B54|FP_W5-Weekday-112...| 306929|1451520018|
|     B54|FP_W5-Weekday-112...| 306927|1451520130|
|     B54|FP_W5-Weekday-112...| 304391|1451520300|
|     B54|FP_W5-Weekday-112...| 304392|1451520333|
|      B6|UP_W5-Weekday-109...|

In [226]:
stoptimes.show()

+--------------------+------------+--------------+-------+-------------+-----------+-------------+
|             trip_id|arrival_time|departure_time|stop_id|stop_sequence|pickup_type|drop_off_type|
+--------------------+------------+--------------+-------+-------------+-----------+-------------+
|EN_U5-Weekday-008...|    01:20:00|      01:20:00| 901471|            1|          0|            0|
|EN_U5-Weekday-008...|    01:21:28|      01:21:28| 301386|            2|          0|            0|
|EN_U5-Weekday-008...|    01:22:00|      01:22:00| 301387|            3|          0|            0|
|EN_U5-Weekday-008...|    01:22:55|      01:22:55| 301388|            4|          0|            0|
|EN_U5-Weekday-008...|    01:23:13|      01:23:13| 301389|            5|          0|            0|
|EN_U5-Weekday-008...|    01:23:53|      01:23:53| 301390|            6|          0|            0|
|EN_U5-Weekday-008...|    01:24:50|      01:24:50| 307120|            7|          0|            0|
|EN_U5-Wee

In [None]:
stoptimes.take(1)

In [None]:
record = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('all.csv', schema = customSchema)

In [None]:
record.registerTempTable('record')

In [212]:
from pyspark.sql.functions import split

In [213]:
new_time = real_stoptimes.withColumn('realtime',split(pyspark.sql.functions.from_unixtime(real_stoptimes.time), ' ')[1])

In [214]:
new_time.show(2)

+--------+--------------------+-------+----------+--------+
|ROUTE_ID|             TRIP_ID|STOP_ID|      time|realtime|
+--------+--------------------+-------+----------+--------+
|     Q46|QV_W5-Weekday-110...| 502327|1451520016|19:00:16|
|     Q46|QV_W5-Weekday-110...| 502331|1451520119|19:01:59|
+--------+--------------------+-------+----------+--------+
only showing top 2 rows



In [215]:
new_time.registerTempTable('new_time')

In [216]:
stoptimes.registerTempTable('stoptimes')

In [217]:
from pyspark.sql.functions import udf
def get_sec(s):
    l = s.split(':')
    return int(l[0]) * 3600 + int(l[1]) * 60 + int(l[2])
sqlContext.registerFunction("getsec", lambda x: get_sec(x), IntegerType())

In [219]:
join = sqlContext.sql('SELECT ROUTE_ID,TRIP_ID,STOP_ID,time,(getsec(realtime)-getsec(arrival_time)) as delay\
                       FROM new_time\
                       INNNER JOIN stoptimes\
                       ON (TRIP_ID = trip_id AND STOP_ID = stop_id)')

In [220]:
join.show(2)

+--------+--------------------+-------+----------+-----+
|ROUTE_ID|             TRIP_ID|STOP_ID|      time|delay|
+--------+--------------------+-------+----------+-----+
|     B41|FB_W5-Weekday-109...| 303241|1451520199|  754|
|     B49|FB_W5-Weekday-110...| 303985|1451520386| 1046|
+--------+--------------------+-------+----------+-----+
only showing top 2 rows



In [175]:
join.registerTempTable('new_join')

In [191]:
new = sqlContext.sql('SELECT ROUTE_ID, COUNT(IF((delay BETWEEN -60 AND 300),1,null))/COUNT(delay) as ontime_ratio\
                      FROM new_join\
                      GROUP BY ROUTE_ID').show()

+--------+-------------------+
|ROUTE_ID|       ontime_ratio|
+--------+-------------------+
|     Q24| 0.1111111111111111|
|     B31| 0.7272727272727273|
|     B32| 0.6666666666666666|
|     B35|           0.265625|
|     B36| 0.3333333333333333|
|     B37|               0.75|
|     B38| 0.4727272727272727|
|     B39|                1.0|
|      B1|0.27586206896551724|
|      B2|                0.5|
|      B3|0.11764705882352941|
|      B4|0.23529411764705882|
|      B6|0.30578512396694213|
|      B7| 0.5714285714285714|
|      B8|               0.26|
|      B9|               0.35|
|     B41|0.18181818181818182|
|     B42|                1.0|
|     B43| 0.6216216216216216|
|     B44|0.45652173913043476|
+--------+-------------------+
only showing top 20 rows



## Time Tansfer to UnixTimeStamp

## Calculate the trips of each line of everyday to test the data intergrety

In [None]:
gaps = sqlContext.sql('SELECT Route_Id, tripdate, count(recorded_time) AS trips\
                       FROM record\
                       GROUP BY Route_Id, tripdate\
                       ORDER BY tripdate DESC') #apply sql Query

In [None]:
gaps.show(3)

## Next Step _ Merge Data from Stoptimes to interpolated times

In [None]:
combine = sqlContext.sql(
    'SELECT record.TripRef, record.Stop_ID, record.RecordedAtTime \
    FROM record \
    JOIN stop_times \
    on (record.Stop_ID = stop_times.stop_id AND record.TripRef = stop_times.trip_id)')

In [None]:
#output_df = output1.toDF(['Line','Lat','Lon','Recordtime','ID','Trip','TripDate'])

In [None]:
#output_df.show()