In [1]:
!git clone https://github.com/saeedahmadian/sample_data.git

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190917161826-0000
KERNEL_ID = 6de882d8-0326-43e4-a3cd-4968c72134f1
Cloning into 'sample_data'...
remote: Enumerating objects: 14, done.[K
remote: Counting objects: 100% (14/14), done.[K
remote: Compressing objects: 100% (12/12), done.[K
remote: Total 14 (delta 2), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (14/14), done.


In [9]:
!ls sample_data/text_proc

test.csv.zip  train.csv.zip


In [22]:
!unzip -d ~/ sample_data/text_proc/train.csv.zip

Archive:  sample_data/text_proc/train.csv.zip
replace /home/spark/shared/train.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [25]:
df_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .option('nanValue', ' ')\
  .option('nullValue', ' ')\
  .load('train.csv')

In [27]:
df_data.printSchema

<bound method DataFrame.printSchema of DataFrame[Dates: timestamp, Category: string, Descript: string, DayOfWeek: string, PdDistrict: string, Resolution: string, Address: string, X: double, Y: double]>

In [30]:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = df_data.select([column for column in df_data.columns if column not in drop_list])
data.show(10)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 10 rows



In [31]:
from pyspark.sql.functions import col
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



In [32]:
data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|60022|
|       LOST PROPERTY|31729|
|             BATTERY|27441|
|   STOLEN AUTOMOBILE|26897|
|DRIVERS LICENSE, ...|26839|
|      WARRANT ARREST|23754|
|SUSPICIOUS OCCURR...|21891|
|AIDED CASE, MENTA...|21497|
|PETTY THEFT FROM ...|19771|
|MALICIOUS MISCHIE...|17789|
|   TRAFFIC VIOLATION|16471|
|PETTY THEFT OF PR...|16196|
|MALICIOUS MISCHIE...|15957|
|THREATS AGAINST LIFE|14716|
|      FOUND PROPERTY|12146|
|ENROUTE TO OUTSID...|11470|
|GRAND THEFT OF PR...|11010|
|POSSESSION OF NAR...|10050|
|PETTY THEFT FROM ...|10029|
|PETTY THEFT SHOPL...| 9571|
+--------------------+-----+
only showing top 20 rows



In [36]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline as pl
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
indexer = StringIndexer(inputCol="Category", outputCol="label")
text_pipeline = pl(stages=[regexTokenizer,stopwordsRemover,countVectors,indexer])
new_data = text_pipeline.fit(data).transform(data)
new_data.show()

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(809,[17,32],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,104...|  0.0|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|

## https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel

In [65]:
train,test=new_data.randomSplit([0.8, 0.2],seed=100)
train.show()

+--------+--------+-------+--------+-----------------+-----+
|Category|Descript|  words|filtered|         features|label|
+--------+--------+-------+--------+-----------------+-----+
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|
|   ARSON|   ARSON|[arso

In [66]:
LR = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0).setFeaturesCol('features').setLabelCol('label').setPredictionCol('prediction')
model= LR.fit(train)

In [43]:
# model = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,
#                           featuresCol='features',labelCol='label',predictionCol='prediction')
# # bias_predic= model.fit(new_data)

In [67]:
bias_predic= model.transform(train)
var_predic = model.transform(test) 

In [81]:
bias_predic.sample(.2,True).show(30)

+--------+--------+-------+--------+-----------------+-----+--------------------+--------------------+----------+
|Category|Descript|  words|filtered|         features|label|       rawPrediction|         probability|prediction|
+--------+--------+-------+--------+-----------------+-----+--------------------+--------------------+----------+
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|   ARSON|[arson]| [arson]|(809,[200],[1.0])| 27.0|[2.93349836385523...|[0.1147

In [73]:
var_predic.sample(.1,False).show()

+--------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|Category|            Descript|               words|            filtered|            features|label|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|   ARSON|               ARSON|             [arson]|             [arson]|   (809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|               ARSON|             [arson]|             [arson]|   (809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|               ARSON|             [arson]|             [arson]|   (809,[200],[1.0])| 27.0|[2.93349836385523...|[0.11471990211876...|       1.0|
|   ARSON|               ARSON|             [arson]|             [arson]|   

In [68]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label",metricName="accuracy")



In [69]:
train_acc = evaluator.evaluate(bias_predic)

In [70]:
train_acc

0.9782833903560584

In [77]:
test_acc = evaluator.evaluate(var_predic)

In [78]:
test_acc

0.9785741028579208

In [50]:
!unzip -d ~/ sample_data/text_proc/test.csv.zip

Archive:  sample_data/text_proc/test.csv.zip
  inflating: /home/spark/shared/test.csv  


In [52]:
test = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .option('nanValue', ' ')\
  .option('nullValue', ' ')\
  .load('test.csv')
test.show()

+---+-------------------+---------+----------+--------------------+-------------------+------------------+
| Id|              Dates|DayOfWeek|PdDistrict|             Address|                  X|                 Y|
+---+-------------------+---------+----------+--------------------+-------------------+------------------+
|  0|2015-05-10 23:59:00|   Sunday|   BAYVIEW|2000 Block of THO...|-122.39958770418998|  37.7350510103906|
|  1|2015-05-10 23:51:00|   Sunday|   BAYVIEW|  3RD ST / REVERE AV|  -122.391522893042|  37.7324323864471|
|  2|2015-05-10 23:50:00|   Sunday|  NORTHERN|2000 Block of GOU...|  -122.426001954961|  37.7922124386284|
|  3|2015-05-10 23:45:00|   Sunday| INGLESIDE|4700 Block of MIS...|  -122.437393972517|  37.7214120621391|
|  4|2015-05-10 23:45:00|   Sunday| INGLESIDE|4700 Block of MIS...|  -122.437393972517|  37.7214120621391|
|  5|2015-05-10 23:40:00|   Sunday|   TARAVAL|BROAD ST / CAPITO...|-122.45902362242902|  37.7131719025215|
|  6|2015-05-10 23:30:00|   Sunday| I

In [85]:
from pyspark.ml.feature import HashingTF,IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) 
pipeline = pl(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, indexer])
new_data =pipeline.fit(data).transform(data)

In [86]:
new_data.show()

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|         rawFeatures|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(10000,[2279,3942...|(10000,[2279,3942...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(10000,[604,3942,...|(10000,[604,3942,...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(10000,[604,3942,...|(10000,[604,3942,...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(10000,[274,713,3...|(10000,[274,713,3...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(10000,[274,713,3...|(100

In [89]:
train,test=new_data.randomSplit([0.8,.2],seed=100)
trained_model=LR.fit(train)
prediction=trained_model.transform(test)

In [96]:
prediction.sample(.1,True,seed=3000).show(40)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|Category|            Descript|               words|            filtered|         rawFeatures|            features|label|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|   ARSON|               ARSON|             [arson]|             [arson]|(10000,[5156],[1.0])|(10000,[5156],[6....| 27.0|[2.94812455355073...|[0.11604759793934...|       1.0|
|   ARSON|               ARSON|             [arson]|             [arson]|(10000,[5156],[1.0])|(10000,[5156],[6....| 27.0|[2.94812455355073...|[0.11604759793934...|       1.0|
|   ARSON|               ARSON|             [arson]|             [arson]|(10000,[5156],[1.0])|(10000,[5156],[6....| 27.0|[2.9

In [91]:
prediction_acc = evaluator.evaluate(prediction)

In [92]:
prediction_acc

0.9783698846166937

In [101]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier, LinearSVC, NaiveBayes
# https://spark.apache.org/docs/2.0.0/api/python/pyspark.ml.html?highlight=random%20forest#pyspark.ml.classification.RandomForestClassifier

# rndforest=RandomForestClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, minInstancesPerNode=1,
#                                  minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=None)

# https://spark.apache.org/docs/2.0.0/api/python/pyspark.ml.html?highlight=random%20forest#pyspark.ml.classification.DecisionTreeClassifier
# dct = DecisionTreeClassifier(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", maxDepth=5, maxBins=32, 
#                              minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", seed=None)

# NB = NaiveBayes(self, featuresCol="features", labelCol="label", predictionCol="prediction", probabilityCol="probability", rawPredictionCol="rawPrediction", smoothing=1.0, modelType="multinomial", thresholds=None)
rdfo= RandomForestClassifier(numTrees=5,maxDepth=2)
dct = DecisionTreeClassifier(maxDepth=2)
NB = NaiveBayes(smoothing=1.0)
SVC = LinearSVC(maxIter=10, regParam=0.1)



In [102]:
rdfo_model =rdfo.fit(train)

KeyboardInterrupt: 

In [None]:
rdfo_model =rdfo.fit(train)
dct_model =dct.fit(train)
NB_model =NB.fit(train)
SVC_model =SVC.fit(train)

models = [rdfo_model,dct_model,NB_model,SVC_model]
prediction= []

for model in models:
    tmp =model.transform(test)
    prediction.append(tmp)
    print("prediction results for model {}".format(str(m)))


## Hyper Parameter tuning in Spark

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(LR.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(LR.elasticNetParam, [0.0, 0.5, 1]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=LR, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

trained_model = cv.fit(train)
test_res = trained_model(test)
evaluator.evaluate(test_res)