# SENG 550 Final Project 

Luka Petrovic UCID: <br>
Logan Boras UCID: <br> 
Kenny Jeon UCID: 30068677

### 1. Set up environment and Spark ###

In [1]:
!python3 -m pip install --upgrade pip
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# uncomment the following if you do not have spark installed in your project
#!wget --no-check-certificate https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
#!tar -xvf spark-3.3.3-bin-hadoop3.tgz
!pip install findspark
!pip install numpy
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/training/spark-3.3.3-bin-hadoop3"

[0m

In [2]:
import findspark
findspark.init()
import pyspark

sc = pyspark.SparkContext(appName="Seng550Project")

### 2. Create directory for input data to live ###

Note that the deerfoot.csv file or whatever input data you are using must be within the data folder then proceed to store it on the hdfs

In [3]:
!mkdir data
!hadoop fs -mkdir -p /inputs
!hadoop fs -put data /inputs

mkdir: cannot create directory ‘data’: File exists
put: `/inputs/data/deerfoot5Lines.csv': File exists
put: `/inputs/data/deerfoot.csv': File exists


### 3. Grab the specific data we want from the data set ###

In [4]:
file = '/inputs/data/deerfoot.csv'

deerfootRDD = sc.textFile(file, 8)
print(deerfootRDD.take(1))

['21/09/2013,Saturday,34,34,34,34,35,34,35,36,38,36,36,35,35,35,35,35,36,34,34,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,2']


#### 3a. Now we want to get the data only for morning and afternoon rush hour, 8am and 4pm will suffice ####

In [5]:
def getRushHours(RDD):

    rushHours = RDD.split(',')
    return (rushHours[1],rushHours[6],rushHours[13])

rushHoursRDD = deerfootRDD.map(getRushHours)
print(rushHoursRDD.take(1))

[('Saturday', '35', '35')]


#### 3b. Rush hour generally doesn't happen on weekends, so we need to filter out Saturdays and Sundays ####

In [6]:
# This function was taken from the Spark Basics Tutorial

def removeWeekends(deerfootRDDRecord):
    if deerfootRDDRecord[0]=="Saturday" or deerfootRDDRecord[0]=="Sunday":
        return False
    else:
        return True

weekdayRushHoursRDD = rushHoursRDD.filter(removeWeekends)
print(weekdayRushHoursRDD.take(5))

[('Monday', '45', '40'), ('Tuesday', '52', '40'), ('Wednesday', '39', '40'), ('Thursday', '49', '56'), ('Friday', '36', '43')]


### 4. Compute Statistics and Averages

For our analysis, we want to get both the weekly AM and PM averages, as well as the overall average. For a day to be considered "bad", the mean of the AM and PM commute times should be higher than the overall average commute time.

#### 4a. Get day counts

In [7]:
totalCount = weekdayRushHoursRDD.count()
print(totalCount)

144


#### 4b. Get overall rush hour average

In [8]:
totalRushHourSum = sc.accumulator(0) # total average


def sum(item):
    global totalRushHourSum
    totalRushHourSum += int(item[1])
    totalRushHourSum += int(item[2])
    
weekdayRushHoursRDD.foreach(sum)
totalRushHourAverage = totalRushHourSum.value/(totalCount*2)
print(totalRushHourAverage)    


41.795138888888886


#### Reduce down am and pm commute times into a daily average

In [9]:
weekdayAverageRushHoursRDD = weekdayRushHoursRDD.map(lambda x: (x[0], (int(x[1])+int(x[2]))/2))
print(weekdayAverageRushHoursRDD.take(5))
                                                    

[('Monday', 42.5), ('Tuesday', 46.0), ('Wednesday', 39.5), ('Thursday', 52.5), ('Friday', 39.5)]


### 5 - Prepare data for model

We want to add a "good" or "bad" label to each field. In order to do this, we will compare the daily average commute time with the total average. If it's higher, its a bad day. If its lower, consider it a good day. We will use the integer 1 as good and 0 as bad.

#### 5a - Add labels to data

We want to add a "good" or "bad" label to each field. In order to do this, we will compare the daily average commute time with the total average. If it's higher, its a bad day. If its lower, consider it a good day. We will use the integer 1 as good and 0 as bad.

In [10]:
def addLabels(item):
    goodOrBadLabel = 1
    if(item[1] > totalRushHourAverage):
        goodOrBadLabel = 0
        
    return (goodOrBadLabel,item[0], item[1])


labeledRDD = weekdayAverageRushHoursRDD.map(addLabels)
print(labeledRDD.take(2))

[(0, 'Monday', 42.5), (0, 'Tuesday', 46.0)]


#### 5b - Convert days of the week to integers
We want to use the weekday as a feature, but you can't use a string as a feature. This will convert the weekday into a 1-5 value, 1 being monday and 5 being friday.

In [11]:
def stringToInt(item):
    weekdayInt = 0;
    weekday = item[1]
    
    if weekday == 'Monday':
        weekdayInt = 1
    elif weekday == 'Tuesday':
        weekdayInt = 2
    elif weekday == 'Wednesday':
        weekdayInt = 3
    elif weekday == 'Thursday':
        weekdayInt = 4
    elif weekday == 'Friday':
        weekdayInt = 5
    else:
        weekdayInt = 1  # Bad data, default to Monday
      
    return (item[0], weekdayInt, item[2])    
        
labeledConvertedRDD = labeledRDD.map(stringToInt)
print(labeledConvertedRDD.take(2))

[(0, 1, 42.5), (0, 2, 46.0)]


#### 5c - Convert data to LabeledPoints

In [12]:
from pyspark.mllib.regression import LabeledPoint

def parsePoint(item):
    labeledPoint = LabeledPoint(item[0], item[1:])
    return labeledPoint

finalRDD = labeledConvertedRDD.map(parsePoint)
print(finalRDD.take(1))

[LabeledPoint(0.0, [1.0,42.5])]


### 6. Set up logistic regression model for prediction

Now that we have our inputs sorted, we can start to make our model

#### 6a - Make and train a model

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

trainingData, testingData = finalRDD.randomSplit([.8, .2],21)
print(trainingData.count())
print(testingData.count())

# trainingData.cache()
# testingData.cache()

model = LogisticRegressionWithLBFGS.train(trainingData)

### 7. Evalute Model

Now that we have our model built and trained, we will evaluate it with test data. We will evalutae it based on the area under the precision-recall curve and the area under the ROC curve.

In [21]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

predictions = test.map(lambda x: (float(model.predict(x.features)), x.label))

metrics = BinaryClassificationMetrics(predictions)

print("Area under precision-recall curve = %s" % metrics.areaUnderPR)

print("Area under ROC curve = %s" % metrics.areaUnderROC)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/training/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/training/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/training/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
