
# **Data Preprocessing**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

Run a local spark session to test your installation:

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from pyspark import SparkConf, SQLContext
from pyspark import SparkContext as sc
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import from_unixtime,unix_timestamp
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql import functions as F
import datetime as dt
import os
from datetime import datetime, timedelta

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType,IntegerType,FloatType

Congrats! Your Colab is ready to run Pyspark. Let's import the data and start coding.

In [None]:
from google.colab import files
files.upload()

Saving Data_cleaning6.csv to Data_cleaning6.csv


{'Data_cleaning6.csv': b",code,c-value,segmentClosed,score,speed,average,reference,travelTimeMinutes,time,FID\r\n0,1485581516,99,,30,63,64,64,0.41,2019-03-01 00:00:07 CST,34208\r\n1,1485768829,98,,30,65,65,65,0.382,2019-03-01 00:00:07 CST,48672\r\nLet's see if spark will take care of this one\r\n5,\r\n2,1485517139,97,,30,67,66,66,0.177,2019-03-01 00:00:07 CST,29294\r\n3,1485706642,100,,30,65,65,65,0.675,1899-03-01 00:00:07 CST1899-03-01 00:00:07 CST1899-03-01 00:00:07 CST,44017\r\n4,1485876925,,,10,61,61,61,0.56,2019-03-01 00:00:07 CST,55858\r\n5,1485876676,99,,30,65,64,64,0.495,2019-03-01 00:00:07 CST,55835\r\n6,1485613605,,,10,60,60,60,0.237,1899-03-01 00:00:07 CST,36855\r\n7,1485621940,79,,30,71,65,65,0.3,2019-03-01 00:00:07 CST,37583\r\n8,1485876574,98,,30,67,66,66,0.631,2019-03-01 00:00:07 CST,55826\r\n"}

Check the dataset is uploaded correctly in the system by the following command

In [None]:
!ls

inrix_reduced.csv  spark-3.0.1-bin-hadoop2.7
sample_data	   spark-3.0.1-bin-hadoop2.7.tgz


In [None]:
df1 = spark.read.csv('Data_cleaning6.csv',inferSchema=True, header =True)



Let us check out the dataframe using ".show()" so that we know that the data has loaded into the dataframe properly. In df1 do you notice that the line "Let's see if spark can handle this" got pushed into the first column. This is not something we want. Let us remove this in the next step




In [None]:
#display the dataframe
df1.show()

+--------------------+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+
|                 _c0|      code|c-value|segmentClosed|score|speed|average|reference|travelTimeMinutes|                time|  FID|
+--------------------+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+
|                   0|1485581516|     99|         null|   30|   63|     64|       64|             0.41|2019-03-01 00:00:...|34208|
|                   1|1485768829|     98|         null|   30|   65|     65|       65|            0.382|2019-03-01 00:00:...|48672|
|Let's see if spar...|      null|   null|         null| null| null|   null|     null|             null|                null| null|
|                   5|      null|   null|         null| null| null|   null|     null|             null|                null| null|
|                   2|1485517139|     97|         null|   30|   67|     66|       6

Run the code below and see how it helps remove the line of text

In [None]:
df2=spark.read.format('csv').option("mode","DROPMALFORMED").option('header','true').load('Data_cleaning6.csv',inferSchema=True)

In [None]:
#display contents of dataframe df2
df2.show()

+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+
|_c0|      code|c-value|segmentClosed|score|speed|average|reference|travelTimeMinutes|                time|  FID|
+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+
|  0|1485581516|     99|         null|   30|   63|     64|       64|             0.41|2019-03-01 00:00:...|34208|
|  1|1485768829|     98|         null|   30|   65|     65|       65|            0.382|2019-03-01 00:00:...|48672|
|  2|1485517139|     97|         null|   30|   67|     66|       66|            0.177|2019-03-01 00:00:...|29294|
|  3|1485706642|    100|         null|   30|   65|     65|       65|            0.675|1899-03-01 00:00:...|44017|
|  4|1485876925|   null|         null|   10|   61|     61|       61|             0.56|2019-03-01 00:00:...|55858|
|  5|1485876676|     99|         null|   30|   65|     64|       64|            0.495|20

Now let's look at the datatypes of all the columns

In [None]:
df2.dtypes

[('_c0', 'string'),
 ('code', 'int'),
 ('c-value', 'int'),
 ('segmentClosed', 'string'),
 ('score', 'int'),
 ('speed', 'int'),
 ('average', 'int'),
 ('reference', 'int'),
 ('travelTimeMinutes', 'double'),
 ('time', 'string'),
 ('FID', 'int')]

We see that time is a string. We may want to convert to datatype 'timestamp' so that it is convenient to manipulate it.For that we are going to use a function "from_utc_timestamp"

In [None]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

df3 = df2.withColumn('datetime', from_utc_timestamp(df2.time, 'CST')) 

In [None]:
df3=df3.withColumn('date',F.date_format('datetime','yyyy-MM-dd'))
df3.show()

+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+-------------------+----------+
|_c0|      code|c-value|segmentClosed|score|speed|average|reference|travelTimeMinutes|                time|  FID|           datetime|      date|
+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+-------------------+----------+
|  0|1485581516|     99|         null|   30|   63|     64|       64|             0.41|2019-03-01 00:00:...|34208|2019-03-01 00:00:07|2019-03-01|
|  1|1485768829|     98|         null|   30|   65|     65|       65|            0.382|2019-03-01 00:00:...|48672|2019-03-01 00:00:07|2019-03-01|
|  2|1485517139|     97|         null|   30|   67|     66|       66|            0.177|2019-03-01 00:00:...|29294|2019-03-01 00:00:07|2019-03-01|
|  3|1485706642|    100|         null|   30|   65|     65|       65|            0.675|1899-03-01 00:00:...|44017|               nu

We spot another problem with the timestamp. We see that one of the values has an invalid year '1899' in it.We can go ahead and drop it. But it is best that we check out all the distinct values that are in the column

In [None]:
df3.select('datetime').distinct().show()

+-------------------+
|           datetime|
+-------------------+
|               null|
|1899-03-01 00:00:07|
|2019-03-01 00:00:07|
+-------------------+



We know that the year under consideration is 2019. So it is best that we filter out all records with 2019.In the process nulla will also be removed

In [None]:
df4=df3.filter(df3.datetime.contains('2019'))

In [None]:
df4.show()

+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+-------------------+----------+
|_c0|      code|c-value|segmentClosed|score|speed|average|reference|travelTimeMinutes|                time|  FID|           datetime|      date|
+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+-------------------+----------+
|  0|1485581516|     99|         null|   30|   63|     64|       64|             0.41|2019-03-01 00:00:...|34208|2019-03-01 00:00:07|2019-03-01|
|  1|1485768829|     98|         null|   30|   65|     65|       65|            0.382|2019-03-01 00:00:...|48672|2019-03-01 00:00:07|2019-03-01|
|  2|1485517139|     97|         null|   30|   67|     66|       66|            0.177|2019-03-01 00:00:...|29294|2019-03-01 00:00:07|2019-03-01|
|  4|1485876925|   null|         null|   10|   61|     61|       61|             0.56|2019-03-01 00:00:...|55858|2019-03-01 00:00:

The dataframe df4 does not have any dates with non-conformities.We can quickly check how many records have been removed

In [None]:
print("Initial number of records:",df2.count())
print("Number of records afetr data cleaning is:", df4.count())

Initial number of records: 11
Number of records afetr data cleaning is: 7


We still have records that have null. It is not practical to drop them. Let's fill them up with zeros

In [None]:
df4=df4.fillna(0)

df4.show()

+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+-------------------+----------+
|_c0|      code|c-value|segmentClosed|score|speed|average|reference|travelTimeMinutes|                time|  FID|           datetime|      date|
+---+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+-------------------+----------+
|  0|1485581516|     99|         null|   30|   63|     64|       64|             0.41|2019-03-01 00:00:...|34208|2019-03-01 00:00:07|2019-03-01|
|  1|1485768829|     98|         null|   30|   65|     65|       65|            0.382|2019-03-01 00:00:...|48672|2019-03-01 00:00:07|2019-03-01|
|  2|1485517139|     97|         null|   30|   67|     66|       66|            0.177|2019-03-01 00:00:...|29294|2019-03-01 00:00:07|2019-03-01|
|  4|1485876925|      0|         null|   10|   61|     61|       61|             0.56|2019-03-01 00:00:...|55858|2019-03-01 00:00:

Next, let's see the case where my data comes without a header.

In [None]:
from google.colab import files
files.upload()

Saving Data_cleaning5.csv to Data_cleaning5.csv


{'Data_cleaning5.csv': b'0,1485581516,99,,30,63,64,64,0.41,2019-03-01 00:00:07 CST,34208\r\n1,1485768829,98,,30,65,65,65,0.382,2019-03-01 00:00:07 CST,48672\r\n2,1485517139,97,,30,67,66,66,0.177,2019-03-01 00:00:07 CST,29294\r\n3,1485706642,100,,30,65,65,65,0.675,1899-03-01 00:00:07 CST1899-03-01,44017\r\n4,1485876925,,,10,61,61,61,0.56,2019-03-01 00:00:07 CST,55858\r\n5,1485876676,99,,30,65,64,64,0.495,2019-03-01 00:00:07 CST,55835\r\n6,1485613605,,,10,60,60,60,0.237,1899-03-01 00:00:07 CST,36855\r\n7,1485621940,79,,30,71,65,65,0.3,2019-03-01 00:00:07 CST,37583\r\n8,1485876574,98,,30,67,66,66,0.631,2019-03-01 00:00:07 CST,55826\r\n'}

Let's first define the schema

In [None]:
cols=[]
for col_name in ['index','code','c-value']:
    cols.append(StructField(col_name,IntegerType(),True))
for col_name in ['segmentClosed']:
    cols.append(StructField(col_name,StringType(),True))
for col_name in ['score','speed','average','reference']:
    cols.append(StructField(col_name,IntegerType(),True))
for col_name in ['travelTimeMinutes']:
    cols.append(StructField(col_name,FloatType(),True))
for col_name in ['time','FID']:
    cols.append(StructField(col_name,StringType(),True))
schema = StructType(cols)
print(schema)
#schema=StructType([StructField('index',IntegerType(),True),StructField('code',IntegerType(),True),StructField('c-value',IntegerType(),True),StructField('segmentClosed',StringType(),True)],[StructField('score',IntegerType(),True),StructField('speed',IntegerType(),True),StructField('average',IntegerType(),True),StructField('reference',IntegerType(),True),StructField('travelTimeMinutes',FloatType(),True),StructField('time',StringType(),True),StructField('FID',IntegerType(),True)])

StructType(List(StructField(index,IntegerType,true),StructField(code,IntegerType,true),StructField(c-value,IntegerType,true),StructField(segmentClosed,StringType,true),StructField(score,IntegerType,true),StructField(speed,IntegerType,true),StructField(average,IntegerType,true),StructField(reference,IntegerType,true),StructField(travelTimeMinutes,FloatType,true),StructField(time,StringType,true),StructField(FID,StringType,true)))


Now, let's test the schema and see if the data is being read into the dataframe properly

In [None]:
df=spark.read.format('csv').option('header','false').schema(schema).load('Data_cleaning5.csv')
df.show(5)

+-----+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+
|index|      code|c-value|segmentClosed|score|speed|average|reference|travelTimeMinutes|                time|  FID|
+-----+----------+-------+-------------+-----+-----+-------+---------+-----------------+--------------------+-----+
|    0|1485581516|     99|         null|   30|   63|     64|       64|             0.41|2019-03-01 00:00:...|34208|
|    1|1485768829|     98|         null|   30|   65|     65|       65|            0.382|2019-03-01 00:00:...|48672|
|    2|1485517139|     97|         null|   30|   67|     66|       66|            0.177|2019-03-01 00:00:...|29294|
|    3|1485706642|    100|         null|   30|   65|     65|       65|            0.675|1899-03-01 00:00:...|44017|
|    4|1485876925|   null|         null|   10|   61|     61|       61|             0.56|2019-03-01 00:00:...|55858|
+-----+----------+-------+-------------+-----+-----+-------+---------+--

In [None]:
df.select('time').distinct().show()

+--------------------+
|                time|
+--------------------+
|1899-03-01 00:00:...|
|1899-03-01 00:00:...|
|2019-03-01 00:00:...|
+--------------------+



There you go!!! We successfully read in the data by defining a schema beforehand and reading the data without header into the dataframe using the predefined schema

# **Data Post Processing**

Let's do a quick code to detect congestion

In [None]:
from google.colab import files
files.upload()

Saving inrixdata.csv to inrixdata (1).csv


{'inrixdata.csv': b'Code,C-Value,SegmentClosed,Score,Speed,Average,Reference,Travel,CentralTime,Time\r\n1450847770,,,10,57,57,57,0.491,2018-01-09 02:08:43 CST,2018-01-09T06:04:36Z\r\n1450847744,,,10,57,57,57,0.561,2018-01-09 03:01:43 CST,2018-01-09T06:04:36Z\r\n1450846576,,,10,57,57,57,0.502,2018-01-09 00:14:43 CST,2018-01-09T06:04:36Z\r\n1450847831,,,10,57,57,57,0.561,2018-01-09 00:01:43 CST,2018-01-09T06:04:36Z\r\n1450846563,,,10,57,57,57,0.502,2018-01-09 00:01:43 CST,2018-01-09T06:04:36Z\r\n1450847755,,,10,57,57,57,0.491,2018-01-09 00:01:43 CST,2018-01-09T06:04:36Z\r\n1450846657,,,10,56,56,56,0.999,2018-01-09 00:01:43 CST,2018-1-09T06:04:36Z\r\n1450846632,,,10,56,56,56,0.61,2018-01-09 00:01:43 CST,2018-1-09T06:04:36Z\r\n1450846643,,,10,56,56,56,0.999,2018-01-09 00:01:43 CST,2018-01-09T06:04:36Z\r\n1450846619,,,10,56,56,56,0.61,2018-01-09 00:01:43 CST,01-2018-09T06:04:36Z\r\n1450846714,,,10,56,56,56,0.267,2018-01-09 00:01:43 CST,2018-01-09T06:04:36Z\r\n1450846687,,,10,57,57,57,0.797,

In [None]:
#Load the inrix dataset into a dataframe
dataset = spark.read.csv('inrixdata.csv',inferSchema=True, header =True)

In [None]:
dataset.show(5)

+----------+-------+-------------+-----+-----+-------+---------+------+--------------------+--------------------+
|      Code|C-Value|SegmentClosed|Score|Speed|Average|Reference|Travel|         CentralTime|                Time|
+----------+-------+-------------+-----+-----+-------+---------+------+--------------------+--------------------+
|1450847770|   null|         null|   10|   57|     57|       57| 0.491|2018-01-09 02:08:...|2018-01-09T06:04:36Z|
|1450847744|   null|         null|   10|   57|     57|       57| 0.561|2018-01-09 03:01:...|2018-01-09T06:04:36Z|
|1450846576|   null|         null|   10|   57|     57|       57| 0.502|2018-01-09 00:14:...|2018-01-09T06:04:36Z|
|1450847831|   null|         null|   10|   57|     57|       57| 0.561|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450846563|   null|         null|   10|   57|     57|       57| 0.502|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
+----------+-------+-------------+-----+-----+-------+---------+------+-----------------

In [None]:
# A couple of columns have to be renamed.This is done using 'withColumnRenamed' function
df_new = dataset.withColumnRenamed('Code', 'code').withColumnRenamed('C-Value', 'cvalue').withColumnRenamed('Score','conf').withColumnRenamed('Speed','speed').withColumnRenamed('Average','average').withColumnRenamed('Reference','ref_speed').withColumnRenamed('Travel','travel').withColumnRenamed('CentralTime','ctime').withColumnRenamed('Time','gmt')
df_new.show()

+----------+------+-------------+----+-----+-------+---------+------+--------------------+--------------------+
|      code|cvalue|SegmentClosed|conf|speed|average|ref_speed|travel|               ctime|                 gmt|
+----------+------+-------------+----+-----+-------+---------+------+--------------------+--------------------+
|1450847770|  null|         null|  10|   57|     57|       57| 0.491|2018-01-09 02:08:...|2018-01-09T06:04:36Z|
|1450847744|  null|         null|  10|   57|     57|       57| 0.561|2018-01-09 03:01:...|2018-01-09T06:04:36Z|
|1450846576|  null|         null|  10|   57|     57|       57| 0.502|2018-01-09 00:14:...|2018-01-09T06:04:36Z|
|1450847831|  null|         null|  10|   57|     57|       57| 0.561|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450846563|  null|         null|  10|   57|     57|       57| 0.502|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450847755|  null|         null|  10|   57|     57|       57| 0.491|2018-01-09 00:01:...|2018-01-09T06:

In [None]:
!rm -r segments.csv
!rm -r segments\ \(1\).csv
!rm -r segments\ \(2\).csv

rm: cannot remove 'segments.csv': No such file or directory
rm: cannot remove 'segments (1).csv': No such file or directory
rm: cannot remove 'segments (2).csv': No such file or directory


In [None]:
#Let's load another file which will be joined with the inrix dataset called segments 
from google.colab import files
files.upload()

Saving segments.csv to segments (1).csv


{'segments.csv': b'Direction,Miles,Order,RoadName,,AADT,WorkZone2,XDSegID,PreviousXD,NextXDSegI,FRC,RoadNumber,LinearID,Country,State,County,District,Lanes,SlipRoad,SpecialRoa,RoadList,StartLat,StartLong,EndLat,EndLong,Bearing,XDGroup,ShapeSRID,Direction,Path,RouteID,MM,FED_FUNCTIONAL_CLASS,CITY_NUMBER,OWNER_CODE,RoadwayType\r\nNE,0.357524556,1,I 35,4,67200,1am,1485480134,1485480190,1485480148,0,35, ,United States of America,Iowa,Polk,Ankeny,3,0, ,I 35,41.70796,-93.57614,41.71313,-93.57626,N,508126,4326,1,,S001910035N,90.95395217,1,187,1,Municipal Interstate\r\nNE,0.454346616,2,I 35,4,67200,1am,1485480148,1485480134,1485480162,0,35, ,United States of America,Iowa,Polk,Ankeny,3,0, ,I 35,41.71313,-93.57626,41.71971,-93.5764,N,508126,4326,1,,S001910035N,91.31067136,1,187,1,Municipal Interstate\r\nNE,0.482339388,3,I 35,4,67200,1am,1485480162,1485480148,1485728143,0,35, ,United States of America,Iowa,Polk,Ankeny,3,0, ,I 35,41.71971,-93.5764,41.72669,-93.57655,N,508126,4326,1,,S001910035N,91

In [None]:
sgmt_all= spark.read.csv('segments.csv',inferSchema=True, header =True)

In [None]:
# Let's see how many distinct segment are there
sgmt_all.distinct().count()


1308

In [None]:
sgmt_all.show()

+---------+----------+-----+--------+----------+-----+--------+----------+-----+
|Direction|     Miles|Order|RoadName|RoadNumber| AADT|Workzone|      code|class|
+---------+----------+-----+--------+----------+-----+--------+----------+-----+
|       NE|0.35752454|    1|    I 35|        35|67200|     1am|1485480134|    1|
|       NE|0.45434663|    2|    I 35|        35|67200|     1am|1485480148|    1|
|       NE|0.48233938|    3|    I 35|        35|67200|     1am|1485480162|    1|
|       NE|0.68312305|    4|    I 35|        35|67200|     1am|1485728143|    1|
|       NE|0.42010283|    5|    I 35|        35|48100|     1am|1485528739|    1|
|       NE|0.47384524|    6|    I 35|        35|51900|     1am|1485528725|    1|
|       NE|0.41008633|    7|    I 35|        35|51900|     1am| 154010750|    1|
|       NE| 0.9373633|    8|    I 35|        35|47800|     1am| 154636562|    1|
|       NE|  0.650265|    9|    I 35|        35|45400|     1am|1485528683|    1|
|       NE| 0.6175932|   10|

In [None]:
sgmt_all=sgmt_all.select('Direction0','Miles','Order','RoadName','RoadNumber','AADT','WorkZone2','XDSegID','FED_FUNCTIONAL_CLASS').withColumnRenamed('Direction0','Direction').withColumnRenamed('WorkZone2','Workzone').withColumnRenamed('XDSegID','code').withColumnRenamed('FED_FUNCTIONAL_CLASS','class')
sgmt_all=sgmt_all.withColumn('Miles',sgmt_all['Miles'].cast(FloatType()))
sgmt_all.show()

+---------+----------+-----+--------+----------+-----+--------+----------+-----+
|Direction|     Miles|Order|RoadName|RoadNumber| AADT|Workzone|      code|class|
+---------+----------+-----+--------+----------+-----+--------+----------+-----+
|       NE|0.35752454|    1|    I 35|        35|67200|     1am|1485480134|    1|
|       NE|0.45434663|    2|    I 35|        35|67200|     1am|1485480148|    1|
|       NE|0.48233938|    3|    I 35|        35|67200|     1am|1485480162|    1|
|       NE|0.68312305|    4|    I 35|        35|67200|     1am|1485728143|    1|
|       NE|0.42010283|    5|    I 35|        35|48100|     1am|1485528739|    1|
|       NE|0.47384524|    6|    I 35|        35|51900|     1am|1485528725|    1|
|       NE|0.41008633|    7|    I 35|        35|51900|     1am| 154010750|    1|
|       NE| 0.9373633|    8|    I 35|        35|47800|     1am| 154636562|    1|
|       NE|  0.650265|    9|    I 35|        35|45400|     1am|1485528683|    1|
|       NE| 0.6175932|   10|

In [None]:
sgmt=sgmt_all.filter("Miles>0")
sgmt.columns

['Direction',
 'Miles',
 'Order',
 'RoadName',
 'RoadNumber',
 'AADT',
 'Workzone',
 'code',
 'class']

Let us filter out all the rows where the speed is greater than or equal to 0 and the confidence is 30

In [None]:

dataset2=df_new.filter("speed >=0 AND conf ==30")
dataset2.show() 

+----------+------+-------------+----+-----+-------+---------+------+--------------------+--------------------+
|      code|cvalue|SegmentClosed|conf|speed|average|ref_speed|travel|               ctime|                 gmt|
+----------+------+-------------+----+-----+-------+---------+------+--------------------+--------------------+
|1450859604|     0|         null|  30|   15|     16|       16| 1.319|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450889939|     0|         null|  30|   15|     16|       16| 0.144|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450846910|     0|         null|  30|   15|     16|       16| 0.547|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450846973|    85|         null|  30|   55|     54|       54| 0.618|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450490349|   100|         null|  30|   53|     54|       54| 0.413|2018-01-09 00:01:...|2018-01-09T06:04:36Z|
|1450490320|   100|         null|  30|   55|     52|       52| 1.066|2018-01-09 00:01:...|2018-01-09T06:

In [None]:
sgmt.show()

+---------+----------+-----+--------+----------+-----+--------+----------+-----+
|Direction|     Miles|Order|RoadName|RoadNumber| AADT|Workzone|      code|class|
+---------+----------+-----+--------+----------+-----+--------+----------+-----+
|       NE|0.35752454|    1|    I 35|        35|67200|     1am|1485480134|    1|
|       NE|0.45434663|    2|    I 35|        35|67200|     1am|1485480148|    1|
|       NE|0.48233938|    3|    I 35|        35|67200|     1am|1485480162|    1|
|       NE|0.68312305|    4|    I 35|        35|67200|     1am|1485728143|    1|
|       NE|0.42010283|    5|    I 35|        35|48100|     1am|1485528739|    1|
|       NE|0.47384524|    6|    I 35|        35|51900|     1am|1485528725|    1|
|       NE|0.41008633|    7|    I 35|        35|51900|     1am| 154010750|    1|
|       NE| 0.9373633|    8|    I 35|        35|47800|     1am| 154636562|    1|
|       NE|  0.650265|    9|    I 35|        35|45400|     1am|1485528683|    1|
|       NE| 0.6175932|   10|

In the next step we have to perform a join between the inrix dataset and the segments dataset. We see that the two datasets are of different sizes. Therefore, we go a broadcast join

In [None]:
joining_all=dataset2.join(F.broadcast(sgmt),['code'])

joining_all.show()

+----------+------+-------------+----+-----+-------+---------+------+--------------------+--------------------+---------+----------+-----+----------+----------+-----+--------+-----+
|      code|cvalue|SegmentClosed|conf|speed|average|ref_speed|travel|               ctime|                 gmt|Direction|     Miles|Order|  RoadName|RoadNumber| AADT|Workzone|class|
+----------+------+-------------+----+-----+-------+---------+------+--------------------+--------------------+---------+----------+-----+----------+----------+-----+--------+-----+
|1450447673|    27|         null|  30|   72|     66|       66| 0.451|2018-01-09 00:01:...|2018-01-09T06:04:36Z|       SW| 0.5950934|    1|I 35;IA 27|        35|19100|      2l|    1|
|1450447686|    27|         null|  30|   72|     66|       66| 0.451|2018-01-09 00:01:...|2018-01-09T06:04:36Z|       SW| 0.5417425|    2|I 35;IA 27|        35|19100|      2l|    1|
|1450447699|    81|         null|  30|   69|     66|       66| 0.471|2018-01-09 00:01:...|

In the next few cells we do a few manipulations with the timestamp. Not all the steps are required. It is just to acquaint with the possibile ways to manipulate timestamps.The "substring" function takes 2 parameters,the first one being the position of the start of the string and the second one is the length of the string

In [None]:
df4=joining_all.withColumn('date',joining_all['ctime'].substr(1,10)).withColumn('time',joining_all['ctime'].substr(12,8))

In [None]:
df4.select('time').show(5)

+--------+
|    time|
+--------+
|00:01:43|
|00:01:43|
|00:01:43|
|00:01:43|
|00:01:43|
+--------+
only showing top 5 rows



The hour and minute columns can be obtained as shown below

In [None]:
df6=df4.withColumn('hour',df4['time'].substr(1,2))
df6=df6.withColumn('minute',df4['time'].substr(4,2))



Next we will introduce a time column by concatenating the date and hour and minute columns

In [None]:
df7 = df6.withColumn('time',F.concat(F.col('date'),F.lit(" "),F.col('hour'),F.lit(':'),F.col('minute')))

We can convert the time column from string into a timestamp

In [None]:
df7=df7.withColumn('timenew',F.to_timestamp(F.col('time'),'yyyy-MM-dd HH:mm'))

In [None]:
df7.select('time').distinct().show()

+----------------+
|            time|
+----------------+
|2018-01-09 00:02|
|2018-01-09 00:03|
|2018-01-09 00:01|
+----------------+



Next we have to round of the time to 5minutes

In [None]:
df8= df7.withColumn("minute", F.minute("timenew")).withColumn("new_minute", F.ceil(F.col("minute")/5)*5).withColumn("minute_add", F.col("new_minute") - F.col("minute"))
df8=df8.withColumn("timestamp",F.from_unixtime(F.unix_timestamp("timenew") + F.col("minute_add")*60))


#tdf=tdf.withColumn('time',F.to_timestamp(F.unix_timestamp('timestamp')), 'yyyyMMddHHmm')


In [None]:
df8.select('timestamp').distinct().show()

+-------------------+
|          timestamp|
+-------------------+
|2018-01-09 00:05:00|
+-------------------+



Next, we have to group the data by the code,class,data and timestamp and find the average speed and cvalue

In [None]:
speed_5min=df8.groupBy('code','class','date','timestamp').agg(F.avg('speed'),F.count('speed'),F.avg('cvalue')).withColumnRenamed('avg(speed)','speed').withColumnRenamed('avg(cvalue)','cvalue').withColumnRenamed('count(speed)','counts')
#speed_5min.show()
speed_5min=speed_5min.select('code','class','timestamp','speed','counts','cvalue',date_format('date', 'yyy/MM/dd').alias('date'))
#Adding a day column
speed_5min=speed_5min.withColumn('Day',F.dayofweek('timestamp'))
speed_5min.show()

+----------+-----+-------------------+-----------------+------+-----------------+----------+---+
|      code|class|          timestamp|            speed|counts|           cvalue|      date|Day|
+----------+-----+-------------------+-----------------+------+-----------------+----------+---+
|1485495058|    1|2018-01-09 00:05:00|             71.0|     2|             85.5|2018/01/09|  3|
|1485595158|    1|2018-01-09 00:05:00|             65.0|     3|91.66666666666667|2018/01/09|  3|
|1485595172|    1|2018-01-09 00:05:00|67.66666666666667|     3|90.66666666666667|2018/01/09|  3|
|1485686937|    1|2018-01-09 00:05:00|             67.0|     3|74.33333333333333|2018/01/09|  3|
|1485715813|    1|2018-01-09 00:05:00|             66.0|     1|             99.0|2018/01/09|  3|
|1485711582|    1|2018-01-09 00:05:00|             67.0|     3|            100.0|2018/01/09|  3|
|1485691197|    1|2018-01-09 00:05:00|             66.0|     3|             99.0|2018/01/09|  3|
|1485521446|    1|2018-01-09 0

The select function helps select specific columns we are interested in

In [None]:
#Add a new column day of the week


dates=speed_5min.select('code','class','date','day')


We need to assign 1 to the congested column if the speed is below 45 and 0 otherwise. For this we will use a conditional statement

In [None]:
dfa = speed_5min.withColumn("congested",when(col("speed")<45,1).otherwise(0))


Next we can group the data by date and find the sum of the number of congested minutes

In [None]:
df_congested=dfa.groupBy('date').agg(sum('congested').alias('count_congestedmin'))

In [None]:
df_congested.distinct().show()

+----------+------------------+
|      date|count_congestedmin|
+----------+------------------+
|2018/01/09|                22|
+----------+------------------+



In [None]:
df_congested.coalesce(1).write.format('csv').mode('Overwrite').save('congested.csv')