## 2. ML Section

In [2]:
from pyspark.sql.functions import regexp_replace, to_timestamp, date_format, col, year, desc, asc, hour, when, sum
#Reading The data
F = "/FileStore/tables/Final P./train_*"
df = spark.read.csv(F,header=True,inferSchema=True)
df = df.withColumn("Dates",to_timestamp(df.Dates, format= "MM/dd/yyyy HH:mm"))
display(df.limit(3))

Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y
2015-05-13T23:53:00.000+0000,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747
2015-05-13T23:53:00.000+0000,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747
2015-05-13T23:33:00.000+0000,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",VANNESS AV / GREENWICH ST,-122.42436302145,37.8004143219856


#### `` Categorizing String Data: ``

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['Dates','X','Y'])) ]

pipeline = Pipeline(stages=indexers)
xdf = pipeline.fit(df).transform(df)

xdf = xdf.selectExpr('Dates','Category_index as Category','Descript_index as Descript','DayOfWeek_index as DayOfWeek','PdDistrict_index as\
                      PdDistrict','Resolution_index as Resolution','Address_index as Address','X','Y')
xdf.show(5)

#### `` Creating Feature vector and Normalizing values: ``

In [6]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, MaxAbsScaler

inputCols = list(set(xdf.columns)-set(['Dates','Category']))
assembler = VectorAssembler().setInputCols(inputCols).setOutputCol("features")
transformVector = assembler.transform(xdf)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformVector)
scaledData = scalerModel.transform(transformVector)
scaledData = scaledData.drop('features').withColumnRenamed('scaledFeatures','features').withColumnRenamed('Category','label')
train, test = scaledData.randomSplit([0.7, 0.3])
scaledData.limit(10).show()

####        `#`  `` Classfiers: ``

In [8]:
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


####        `#1`  `` logistic regression ``

In [10]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

lrModel = lr.fit(train)
predictions = lrModel.transform(test)

predictions.select("prediction", "label", "features").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

accuracy

####        `#2`  `` Decision tree ``

In [12]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier()

dtModel = dt.fit(train)
predictions = dtModel.transform(test)

predictions.select("prediction", "label", "features").show(5)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
accuracy

####        `#3`  `` Random forest ``

In [14]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier()

rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select("prediction", "label", "features").show(5)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
accuracy

####        `#4`  `` Naive Bayes ``

In [16]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()

nbModel = nb.fit(train)
predictions = nbModel.transform(test)
predictions.select("prediction", "label", "features").show(5)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
accuracy

####        `#5`  `` One-vs-Rest (LogisticRegression) ``

In [18]:
from pyspark.ml.classification import LogisticRegression, OneVsRest

lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

ovr = OneVsRest( classifier=lr )

ovrModel = ovr.fit(train)
predictions = ovrModel.transform(test)
predictions.select("prediction", "label", "features").show(5)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
accuracy

#### `` Paramter Tuning (Best 3 Classfiers): ``
##### `` #1 logistic regression: ``

In [20]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

In [21]:

lr = LogisticRegression()

paramGrid = ParamGridBuilder()\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0])\
    .addGrid(lr.tol, [1e-04, 1e-06, 1e-08])\
    .build()

pipeline = Pipeline(stages=[lr])
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator())

lrModel = crossval.fit(train)
predictions = lrModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

bestPipeline = lrModel.bestModel
bestLRModel = bestPipeline.stages[0]
bestParams = bestLRModel.extractParamMap()

accuracy

In [22]:
bestParams

###### LR Best Paramters are: [``elasticNetParam:0.0``, ``regParam:0.01``, ``tol:1e-06``]

##### `` #2 Decision tree: ``

In [25]:
dt = DecisionTreeClassifier()

paramGrid = ParamGridBuilder()\
    .addGrid(dt.maxMemoryInMB,[256, 512,1024])\
    .addGrid(dt.checkpointInterval,[5, 10,20])\
    .addGrid(dt.maxBins,[32, 64,128])\
    .addGrid(dt.maxDepth,[5, 10,15])\
    .addGrid(dt.minInstancesPerNode,[2, 4,10])\
    .build()

pipeline = Pipeline(stages=[dt])
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator())

dtModel = crossval.fit(train)
predictions = dtModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

bestPipeline = dtModel.bestModel
bestDTModel = bestPipeline.stages[0]
bestParams = bestDTModel.extractParamMap()

accuracy

In [26]:
bestParams

###### DT Best Paramters are: [``maxMemoryInMB:256``, ``checkpointInterval:10``, ``maxBins:32``, ``maxDepth:5``, ``minInstancesPerNode:1``]

##### `` #3 Random Forest: ``

In [29]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier()

paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees,[10, 25, 50])\
    .addGrid(rf.maxDepth,[4, 8, 10])\
    .addGrid(rf.maxBins,[8, 16, 32]) \
    .build()

pipeline = Pipeline(stages=[rf])
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator())

rfModel = crossval.fit(train)
predictions = rfModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

bestPipeline = rfModel.bestModel
bestRFModel = bestPipeline.stages[0]
bestParams = bestRFModel.extractParamMap()

accuracy

In [30]:
bestParams

###### RF Best Paramters are: [``numTrees:50``, ``maxDepth:10``, ``maxBins:32``]

## 3. Testing Section

###### `` Selecting Colums that match the (Test Data) and Transforming it: ``

In [34]:
#Selecting Colums that match the (Test Data)
xxdf = xdf.selectExpr('Dates','Category','Descript','DayOfWeek','PdDistrict','Resolution','Address','X','Y')
inputCols = list(set(xdf.columns)-set(['Dates','Category','Descript','Resolution']))
#Feature Vector
assembler = VectorAssembler().setInputCols(inputCols).setOutputCol("features")
transformVector = assembler.transform(xxdf)
#Normalizing
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformVector)
scaledData = scalerModel.transform(transformVector)
train = scaledData.drop('features').withColumnRenamed('scaledFeatures','features').withColumnRenamed('Category','label')
train.limit(5).show()

###### `` Test Data and it's Transformations: ``

In [36]:
#Reading The Test data
Ft = "/FileStore/tables/Final P./test_*"
dft = spark.read.csv(Ft,header=True,inferSchema=True)
dft = dft.withColumn("Dates",to_timestamp(dft.Dates, format= "MM/dd/yyyy HH:mm"))
display(dft.limit(3))

Id,Dates,DayOfWeek,PdDistrict,Address,X,Y
0,2015-05-10T23:59:00.000+0000,Sunday,BAYVIEW,2000 Block of THOMAS AV,-122.39958770418998,37.7350510103906
1,2015-05-10T23:51:00.000+0000,Sunday,BAYVIEW,3RD ST / REVERE AV,-122.391522893042,37.7324323864471
2,2015-05-10T23:50:00.000+0000,Sunday,NORTHERN,2000 Block of GOUGH ST,-122.426001954961,37.7922124386284


In [37]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(dft) for column in list(set(dft.columns)-set(['Dates','X','Y'])) ]

pipeline = Pipeline(stages=indexers)
xdft = pipeline.fit(dft).transform(dft)

xdft = xdft.selectExpr('Id','Dates','DayOfWeek_index as DayOfWeek','PdDistrict_index as PdDistrict','Address_index as Address','X','Y')

inputCols = list(set(xdft.columns)-set(['Id','Dates']))
assembler = VectorAssembler().setInputCols(inputCols).setOutputCol("features")
transformVector = assembler.transform(xdft)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(transformVector)
scaledData = scalerModel.transform(transformVector)
test = scaledData.drop('features').withColumnRenamed('scaledFeatures','features')
test.limit(5).show()

###### `` Testing : ``

In [39]:
dt = DecisionTreeClassifier(maxMemoryInMB=256, checkpointInterval=5, maxBins=64, maxDepth=15, minInstancesPerNode=2)

Model = dt.fit(train)
predictions = Model.transform(test)


###### `` Return String labels : ``

In [41]:
from pyspark.ml.feature import IndexToString
#Getting the List of 'labels' (String)
xindexers = StringIndexer(inputCol='Category', outputCol='label').fit(df)
xxInt = xindexers.transform(df)
Ze = xxInt.select('Category','label').groupBy('Category','label').count().orderBy(asc("label"))
Zee = list(Ze.select('Category').toPandas()['Category'])
#Mapping each StringLabel with IntLabel (returns Strings)
converter = IndexToString(inputCol="prediction", outputCol="label", labels=Zee)
converted = converter.transform(predictions)
converted.select('Id',"label", "probability", "features").show(5)

In [42]:
display(converted)