In [1]:
sc

In [2]:
from pyspark.sql import functions as sf
from pyspark.sql.functions import col, avg
import datetime

In [3]:
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
quotechar = '"'
file_location = 'FileStore/aries_accidents_2.csv'
file_type = "csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("quotechar", quotechar)\
  .load(file_location)

display(df.head(5))

_c0,Blocks,speedlim,weekday,light,timecollide,weather,frequency,x1,x2,y1,y2,Xcentr,Ycentr
0,474,40,TUESDAY,Dark (Not Lighted),6-8 PM,Clear,1,37.74999999999999,37.78999999999999,-87.31999999999985,-87.27999999999984,37.77,-87.29999999999986
1,964,40,MONDAY,Daylight,6-8 PM,Clear,1,37.94999999999999,37.98999999999999,-85.7199999999996,-85.6799999999996,37.96999999999999,-85.6999999999996
2,1677,30,THURSDAY,Dark (Not Lighted),6-8 PM,Cloudy,1,38.26999999999998,38.30999999999998,-85.99999999999964,-85.95999999999964,38.28999999999999,-85.97999999999965
3,1677,70,WEDNESDAY,Dark (Not Lighted),4-6 AM,Clear,1,38.26999999999998,38.30999999999998,-85.99999999999964,-85.95999999999964,38.28999999999999,-85.97999999999965
4,1677,70,SUNDAY,Daylight,6-8 PM,Clear,1,38.26999999999998,38.30999999999998,-85.99999999999964,-85.95999999999964,38.28999999999999,-85.97999999999965


In [4]:
#df.show()
df.count()

In [5]:
df1 = df.select(df['Blocks'],df['speedlim'],df['weekday'],df['light'],df['timecollide'],df['weather'],df['frequency'])

In [6]:
temp_table_name = "alldata"

df1.createOrReplaceTempView(temp_table_name)

In [7]:
%sql
Select count(1), avg(frequency) from alldata

count(1),avg(CAST(frequency AS DOUBLE))
388098,1.7579709248694917


In [8]:
from pyspark.sql.types import IntegerType
df2 = df1.withColumn("frequency", df1["frequency"].cast(IntegerType()))
#df2 = df2.withColumn("speedlim", df2["speedlim"].cast(IntegerType()))
#df2 = df2.withColumn("Blocks", df2["Blocks"].cast(IntegerType()))

In [9]:
#df2.sort(col('frequency').desc()).show()

In [10]:
#df2.count()

In [11]:
temp_table_name = "aries"

df2.createOrReplaceTempView(temp_table_name)

In [12]:
%sql

Select avg(frequency), count(1) From aries

avg(frequency),count(1)
1.7579709248694917,388098


In [14]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.mllib.classification import LogisticRegressionModel,LogisticRegressionWithLBFGS, SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

# LOAD PYSPARK LIBRARIES    
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer
import numpy as np

In [15]:
def indexing(df):

  # INDEX AND ENCODE WEATHER
  stringIndexer = StringIndexer(inputCol="weather", outputCol="weatherIndex")
  model = stringIndexer.fit(df)
  indexed = model.transform(df)
  encoder = OneHotEncoder(dropLast=False, inputCol="weatherIndex", outputCol="weatherVec")
  encoded1 = encoder.transform(indexed)

  # INDEX AND ENCODE LIGHT CONDITION
  stringIndexer = StringIndexer(inputCol="light", outputCol="lightIndex")
  model = stringIndexer.fit(encoded1)
  indexed = model.transform(encoded1)
  encoder = OneHotEncoder(dropLast=False, inputCol="lightIndex", outputCol="lightVec")
  encoded2 = encoder.transform(indexed)

  # INDEX AND ENCODE WEEKDAY
  stringIndexer = StringIndexer(inputCol="weekday", outputCol="weekdayIndex")
  model = stringIndexer.fit(encoded2)
  indexed = model.transform(encoded2)
  encoder = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
  encoded3 = encoder.transform(indexed)

  # INDEX AND ENCODE TIME COLLIDE
  stringIndexer = StringIndexer(inputCol="timecollide", outputCol="timeIndex")
  model = stringIndexer.fit(encoded3)
  indexed = model.transform(encoded3)
  encoder = OneHotEncoder(dropLast=False, inputCol="timeIndex", outputCol="timeVec")
  encoded4 = encoder.transform(indexed)
  
  # INDEX AND ENCODE BLOCKS
  stringIndexer = StringIndexer(inputCol="Blocks", outputCol="BlocksIndex")
  model = stringIndexer.fit(encoded4)
  indexed = model.transform(encoded4)
  encoder = OneHotEncoder(dropLast=False, inputCol="BlocksIndex", outputCol="BlocksVec")
  encoded5 = encoder.transform(indexed)
  
  # INDEX AND ENCODE SPEED LIMIT
  stringIndexer = StringIndexer(inputCol="speedlim", outputCol="speedIndex")
  model = stringIndexer.fit(encoded5)
  indexed = model.transform(encoded5)
  encoder = OneHotEncoder(dropLast=False, inputCol="speedIndex", outputCol="speedVec")
  encodedFinal = encoder.transform(indexed)
  
  return encodedFinal

In [16]:
encodedFinal = indexing(df2)

In [17]:
# FUNCTIONS FOR REGRESSION WITH TIP AMOUNT AS TARGET VARIABLE

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingRegression(line):
    features = np.array([line.speedIndex,line.BlocksIndex,line.weekdayIndex,line.lightIndex
                        ,line.timeIndex,line.weatherIndex])
    label = np.array(float(line.frequency))
    labPt = LabeledPoint(label, features)
    return  labPt

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO LINEAR REGRESSION MODELS
def parseRowOneHotRegression(line):
    features = np.concatenate((line.speedVec.toArray(),line.BlocksVec.toArray(),line.weekdayVec.toArray()
                                        ,line.lightVec.toArray(),line.timeVec.toArray(),line.weatherVec.toArray()  ), axis =0)
    labPt = LabeledPoint(line.frequency, features)
    return  labPt

In [19]:
# LOAD PYSPARK LIBRARIES
from pyspark.sql.functions import rand

# SPECIFY SAMPLING AND SPLITTING FRACTIONS
#samplingFraction = 0.25;
trainingFraction = 0.75; testingFraction = (1-trainingFraction);
seed = 1234;
#encodedFinalSampled = encodedFinal.sample(False, samplingFraction, seed=seed)

# SPLIT SAMPLED DATA-FRAME INTO TRAIN/TEST
# INCLUDE RAND COLUMN FOR CREATING CROSS-VALIDATION FOLDS (FOR USE LATER IN AN ADVANCED TOPIC)
dfTmpRand = encodedFinal.select("*", rand(0).alias("rand"));
trainData, testData = dfTmpRand.randomSplit([trainingFraction, testingFraction], seed=seed);

In [20]:
trainData.count()

In [21]:
# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg = trainData.rdd.map(parseRowIndexingRegression)
indexedTESTreg = testData.rdd.map(parseRowIndexingRegression)
oneHotTRAINreg = trainData.rdd.map(parseRowOneHotRegression)
oneHotTESTreg = testData.rdd.map(parseRowOneHotRegression)

In [22]:
# FEATURE SCALING

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.util import MLUtils

# SCALE VARIABLES FOR REGULARIZED LINEAR SGD ALGORITHM
#label = oneHotTRAINreg.map(lambda x: x.label)
#features = oneHotTRAINreg.map(lambda x: x.features)
#scaler = StandardScaler(withMean=False, withStd=True).fit(features)
#dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
#oneHotTRAINregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

#label = oneHotTESTreg.map(lambda x: x.label)
#features = oneHotTESTreg.map(lambda x: x.features)
#scaler = StandardScaler(withMean=False, withStd=True).fit(features)
#dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
#oneHotTESTregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

# PRINT ELAPSED TIME
#timeend = datetime.datetime.now()
#timedelta = round((timeend-timestart).total_seconds(), 2) 
#print("Time taken to execute above cell: " + str(timedelta) + " seconds"); 

In [26]:
from pyspark.sql.functions import lit
import numpy as np
import pandas as pd


In [27]:
#testPD.to_csv('/dbfs/FileStore/all_prediction_linear.csv')

In [28]:
indexedTRAINreg.take(10)

In [29]:
#PREDICT TIP AMOUNTS USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics
from scipy import stats

## TRAIN MODEL
categoricalFeaturesInfo={ 0:5, 1:9900, 2:7, 3:4, 4:12, 5:8}
gbtModel = GradientBoostedTrees.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo, 
                                                numIterations=100, maxBins=9900, maxDepth = 6, learningRate=0.01)

## EVALUATE A TEST DATA-SET
predictions1 = gbtModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels1 = indexedTESTreg.map(lambda lp: lp.label).zip(predictions1)

# TEST METRICS
testMetrics1 = RegressionMetrics(predictionAndLabels1)
print("RMSE = %s" % testMetrics1.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics1.r2)

# SAVE MODEL IN BLOB
#datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
#btregressionfilename = "GradientBoostingTreeRegression_" + datestamp;
#dirfilename = modelDir + btregressionfilename;
#gbtModel.save(sc, dirfilename)

# CONVERT RESULTS TO DF AND REGISTER TEMP TABLE
test_predictions = sqlContext.createDataFrame(predictionAndLabels1)
test_predictions.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time taken to execute above cell: " + str(timedelta) + " seconds"); 

In [30]:
## EVALUATE A TRAIN DATA-SET
predictions1_train = gbtModel.predict(indexedTRAINreg.map(lambda x: x.features))
predictionAndLabels1_train = indexedTRAINreg.map(lambda lp: lp.label).zip(predictions1_train)

# TEST METRICS
testMetrics1_train = RegressionMetrics(predictionAndLabels1_train)
print("RMSE = %s" % testMetrics1_train.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics1_train.r2)

In [31]:
indexedTESTreg.first()

In [32]:
## EVALUATE A TEST DATA-SET
predictions1 = gbtModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels1 = indexedTESTreg.map(lambda lp: lp.label).zip(predictions1)

# TEST METRICS
testMetrics1 = RegressionMetrics(predictionAndLabels1)
print("RMSE = %s" % testMetrics1.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics1.r2)

In [33]:
predictionAndLabels1.count()

In [34]:
predicted_labelRDD = predictionAndLabels1.map(lambda x: x[1])
col_pred3 = predicted_labelRDD.collect()

In [35]:
testData_1 = testData.select(testData['Blocks'],testData['speedlim'],testData['weekday'],testData['light'],testData['timecollide']
                            ,testData['weather'])


In [36]:
import folium

In [37]:
testPD.head()

Unnamed: 0,Blocks,speedlim,weekday,light,timecollide,weather,frequency
0,1002,30,FRIDAY,Daylight,2-4 PM,Clear,8.234894
1,1002,30,SATURDAY,Daylight,4-6 PM,Clear,5.415418
2,1002,30,TUESDAY,Dark (Not Lighted),10 PM - 12 AM,Clear,1.722055
3,1010,30,FRIDAY,Dark (Not Lighted),8-10 PM,Rain,0.984762
4,1010,30,FRIDAY,Daylight,6-8 PM,Clear,2.255643


In [38]:
temp_table_name_1 = "mysubsetTable"

testData_1.createOrReplaceTempView(temp_table_name_1)

temp_table_name = "blockstable"

df.createOrReplaceTempView(temp_table_name)

In [39]:
results = spark.sql("Select M.*,B.Xcentr,B.Ycentr from mysubsetTable M, blockstable B where M.BLocks = B.Blocks")

In [40]:
results.count()

In [41]:
preds = results
preds_list = preds.collect()

In [42]:
import pandas as pd
pred_PD = pd.DataFrame(pred_list)
pred_PD['frequency'] = pd.Series(col_pred3)

In [50]:
## create test dataset for THURSDAY after 6 PM
import itertools
blk_lst = []
for i in range(1,721):
  blk_lst.append(i)
  
a = [blk_lst,[30,40,50,60,70],['MONDAY','TUESDAY','WEDNESDAY','THURSDAY','FRIDAY','SATURDAY','SUNDAY'],['Dark (Not Lighted)','Dark (Lighted)','Daylight','Dawn/Dusk'],
    ['12-2 AM','2-4 AM','4-6 AM','6-8 AM','8-10 AM','10 AM - 12 PM','12-2 PM','2-4 PM','4-6 PM','6-8 PM','8-10 PM','10 PM - 12 AM'],
    ['Fog/Smoke/Smog','Cloudy','Clear','Blowing Sand/Soil/Snow','Severe Cross Wind','Snow','Rain','Sleet/Hail/Freezing Rain'],[0]]

b = list(itertools.product(*a))

from pyspark.sql import Row
rdd1 = sc.parallelize(b)
row_rdd = rdd1.map(lambda p: Row(Blocks=p[0],speedlim=p[1],weekday=p[2],light=p[3],timecollide=p[4],weather=p[5],frequency=p[6]))
testDF=sqlContext.createDataFrame(row_rdd)

In [52]:
testDF1 = testDF.select(testDF['Blocks'],testDF['speedlim'],testDF['weekday'],testDF['light'],testDF['timecollide'],testDF['weather'],testDF['frequency'])
#testDF1.show()

In [53]:
from pyspark.sql.types import StringType
testDF1 = testDF1.withColumn("Blocks", testDF1["Blocks"].cast(StringType()))
testDF1 = testDF1.withColumn("speedlim", testDF1["speedlim"].cast(StringType()))
testDF1 = testDF1.withColumn("frequency", testDF1["frequency"].cast(IntegerType()))

In [54]:
encodedTest = indexing(testDF1)

In [55]:
temp_table_name = "testData"

encodedTest.createOrReplaceTempView(temp_table_name)

In [56]:
## TRI
result1 = spark.sql("Select * from testData where weekday = 'THURSDAY' and timecollide = '6-8 PM' and weather = 'Clear'  ")
result1.count()

In [57]:
encodedThursday = result1

In [59]:
#encodedThursday = indexing(testDF2)

In [60]:
# prepare data for thursday prediction
#oneHotTHURSDAYreg = encodedThursday.rdd.map(parseRowOneHotRegression)
indexedTHURSDAYreg = encodedThursday.rdd.map(parseRowIndexingRegression)

In [62]:
indexedTHURSDAYreg.first()

In [63]:

# SCORE ON SCALED TEST DATA-SET & EVALUATE
#predictionAndLabels1 = linearModel.predict(oneHotTHURSDAYreg.map(lambda x: x.features))

#predictionAndLabels1 = indexedTHURSDAYreg.map(lambda lp: (float(gbtModel.predict(lp.features)), lp.label))

predictions1 = gbtModel.predict(indexedTHURSDAYreg.map(lambda x: x.features))

In [64]:
predictions1.first()

In [65]:
## DOES NOT WORK, RAISE ERROR 'DIMENSION MISMATCH'

#predicted_labelRDD1 = predictionAndLabels1.map(lambda x: x[0])
col_pred = predictions1.collect()



#testPD1 = testDF1.toPandas()
#testPD1['frequency'] = pd.Series(np.asarray(col_pred1))

#testPD1.head()

In [66]:
max(col_pred)

In [67]:
import pandas as pd
testData_5 = encodedThursday.select(encodedThursday['Blocks'],encodedThursday['speedlim'],encodedThursday['weekday'],encodedThursday['light'],encodedThursday['timecollide']
                            ,encodedThursday['weather'])

testPD1 = testData_5.toPandas()
testPD1['frequency'] = pd.Series(np.asarray(col_pred))

In [68]:
#testPD1['frequency'] = pd.Series(np.asarray(col_pred))

In [69]:
testPD1

In [70]:
testPD1[(testPD1.Blocks == '375')]

In [71]:
#testPD1.to_csv('/dbfs/FileStore/predictions.csv')

In [72]:
#dbutils.fs.rm("FileStore/aries_accidents_1.csv")