In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [11]:
import pixiedust
pixiedust.enableSparkJobProgressMonitor()

Spark Job Progress Monitor already enabled


In [12]:
data=spark.read.csv('Food_Inspections.csv', header=True, inferSchema=True)

In [13]:
data.show(5)

+-------------+--------------------+--------------------+---------+--------------------+---------------+--------------------+-------+-----+-----+---------------+--------------------+-------+--------------------+------------------+------------------+--------------------+
|Inspection ID|            DBA Name|            AKA Name|License #|       Facility Type|           Risk|             Address|   City|State|  Zip|Inspection Date|     Inspection Type|Results|          Violations|          Latitude|         Longitude|            Location|
+-------------+--------------------+--------------------+---------+--------------------+---------------+--------------------+-------+-----+-----+---------------+--------------------+-------+--------------------+------------------+------------------+--------------------+
|      2385809|BRITISH INTERNATI...|                null|  2718080|Children's Servic...|  Risk 1 (High)|   821 W EASTMAN ST |CHICAGO|   IL|60622|     08/06/2020|             License|   Pa

In [16]:
data.printSchema()

root
 |-- Inspection ID: integer (nullable = true)
 |-- DBA Name: string (nullable = true)
 |-- AKA Name: string (nullable = true)
 |-- License #: integer (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Risk: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- Inspection Date: string (nullable = true)
 |-- Inspection Type: string (nullable = true)
 |-- Results: string (nullable = true)
 |-- Violations: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [17]:
#Converting Latitude and Longitude to string type
#Converting inspection date into datetime object
from pyspark.sql.types import StringType
data= data.withColumn("Longitude", data["Longitude"].cast(StringType()))
data= data.withColumn("Latitude", data["Latitude"].cast(StringType()))

In [18]:
#Changing Date column to datetime format and printing the dataframe schema
import pyspark.sql.functions as f
data=(data.withColumn('Inspection Date', f.to_date('Inspection Date','MM/d/yyyy')
                     )
     )
data.printSchema()

root
 |-- Inspection ID: integer (nullable = true)
 |-- DBA Name: string (nullable = true)
 |-- AKA Name: string (nullable = true)
 |-- License #: integer (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Risk: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- Inspection Date: date (nullable = true)
 |-- Inspection Type: string (nullable = true)
 |-- Results: string (nullable = true)
 |-- Violations: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



#### Exploratory Analysis: SQL Transformations

In [19]:
data.createOrReplaceTempView('data_view')


In [20]:
#Count of number of inspected facilities per result category
(spark.sql('''
SELECT COUNT(Results) AS Count, Results FROM data_view GROUP BY Results ORDER BY Count DESC
''').show()
)

+------+--------------------+
| Count|             Results|
+------+--------------------+
|110014|                Pass|
| 40438|                Fail|
| 31344|  Pass w/ Conditions|
| 18181|     Out of Business|
|  6924|            No Entry|
|  2133|           Not Ready|
|    72|Business Not Located|
+------+--------------------+



In [21]:
#Count of number of restaurants per result category
(spark.sql('''
SELECT COUNT(*) AS Number_of_Facilities, Risk FROM data_view GROUP BY Risk ORDER BY Number_of_Facilities DESC
''').show()
)

+--------------------+---------------+
|Number_of_Facilities|           Risk|
+--------------------+---------------+
|              149470|  Risk 1 (High)|
|               40797|Risk 2 (Medium)|
|               18736|   Risk 3 (Low)|
|                  72|           null|
|                  31|            All|
+--------------------+---------------+



In [22]:
# Top 10 Violation offences in Chicago
(spark.sql('''
SELECT COUNT(*) AS Number_of_Facilities, Violations FROM data_view GROUP BY Violations ORDER BY Number_of_Facilities DESC LIMIT 15
''').show()
)

+--------------------+--------------------+
|Number_of_Facilities|          Violations|
+--------------------+--------------------+
|               55569|                null|
|                  11|32. FOOD AND NON-...|
|                  10|45. FOOD HANDLER ...|
|                  10|30. FOOD IN ORIGI...|
|                   7|45. FOOD HANDLER ...|
|                   7|2. FACILITIES TO ...|
|                   7|3. MANAGEMENT, FO...|
|                   7|32. FOOD AND NON-...|
|                   6|3. MANAGEMENT, FO...|
|                   6|40. REFRIGERATION...|
|                   6|38. VENTILATION: ...|
|                   5|45. FOOD HANDLER ...|
|                   5|2. FACILITIES TO ...|
|                   5|41. PREMISES MAIN...|
|                   5|2. FACILITIES TO ...|
+--------------------+--------------------+



In [23]:
(spark.sql('''
SELECT Risk, Results, COUNT(*) AS Number_of_Facilities FROM data_view
WHERE Results IN ('Pass', 'Pass w/ Conditions', 'Fail') AND Risk NOT IN ('null','All')
GROUP BY Risk, Results ORDER BY Risk, Number_of_Facilities DESC 
''').show()
)

+---------------+------------------+--------------------+
|           Risk|           Results|Number_of_Facilities|
+---------------+------------------+--------------------+
|  Risk 1 (High)|              Pass|               81112|
|  Risk 1 (High)|              Fail|               28471|
|  Risk 1 (High)|Pass w/ Conditions|               23932|
|Risk 2 (Medium)|              Pass|               21170|
|Risk 2 (Medium)|              Fail|                7875|
|Risk 2 (Medium)|Pass w/ Conditions|                6013|
|   Risk 3 (Low)|              Pass|                7720|
|   Risk 3 (Low)|              Fail|                4065|
|   Risk 3 (Low)|Pass w/ Conditions|                1399|
+---------------+------------------+--------------------+



In [24]:
#Count of Facilities Inspected excluding the null values
(spark.sql('''
SELECT `Facility Type`, COUNT(*) AS Number_of_Facilities FROM data_view
WHERE `Facility Type` NOT IN ("null")
GROUP BY `Facility Type` ORDER BY Number_of_Facilities DESC 
''').show()
)

+--------------------+--------------------+
|       Facility Type|Number_of_Facilities|
+--------------------+--------------------+
|          Restaurant|              139169|
|       Grocery Store|               27334|
|              School|               12828|
|Children's Servic...|                3447|
|              Bakery|                3054|
|Daycare (2 - 6 Ye...|                2768|
|Daycare Above and...|                2546|
|      Long Term Care|                1424|
|            Catering|                1257|
|              Liquor|                 920|
|Mobile Food Dispe...|                 879|
|  Daycare Combo 1586|                 748|
|Mobile Food Preparer|                 668|
|        Golden Diner|                 597|
|            Hospital|                 582|
|           Wholesale|                 543|
|              TAVERN|                 304|
|Daycare (Under 2 ...|                 265|
|       Special Event|                 225|
|Shared Kitchen Us...|          

### Feature Engineering & Model Fitting

In [25]:
#Filtering data: filtering out those food joints whose results were neither pass, pass with conditions or fail
dat=data.filter('Results IN ("Fail","Pass","Pass w/ Conditions")').select('DBA Name','Risk','Violations','Results')
dat.show(5)
 

+--------------------+---------------+--------------------+-------+
|            DBA Name|           Risk|          Violations|Results|
+--------------------+---------------+--------------------+-------+
|BRITISH INTERNATI...|  Risk 1 (High)|                null|   Pass|
|LITTLE CAESAR PIZ...|Risk 2 (Medium)|                null|   Pass|
|   7-ELEVEN #38484 A|Risk 2 (Medium)|                null|   Pass|
|          STAR GYROS|  Risk 1 (High)|                null|   Pass|
|           CLAUDIO'S|  Risk 1 (High)|55. PHYSICAL FACI...|   Pass|
+--------------------+---------------+--------------------+-------+
only showing top 5 rows



In [68]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction
"""Converting the existing dataframe(df) into a new dataframe where each inspection is represented as a label-violations pair. 
In this case, a label of 0.0 represents a failure, a label of 1.0 represents a success, 
and a label of -1.0 represents some results besides those two"""
def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = data.select(label(data.Results).alias('label'), data.Violations).where('label >= 0')

In [69]:
#Let's view one row of the labeled data
labeledData.take(7)

[Row(label=1.0, Violations=None),
 Row(label=1.0, Violations=None),
 Row(label=1.0, Violations=None),
 Row(label=1.0, Violations=None),
 Row(label=1.0, Violations='55. PHYSICAL FACILITIES INSTALLED, MAINTAINED & CLEAN - Comments: MOP SINK TOO CLOSE TO DRY UTENSILS RACK, INSTRUCTED TO INSTALL A BARRIER.'),
 Row(label=1.0, Violations="1. PERSON IN CHARGE PRESENT, DEMONSTRATES KNOWLEDGE, AND PERFORMS DUTIES - Comments: THE PIC DOESN'T HAVE A FOOD MANAGERS CERTIFICATE.MUST PROVIDE AND MAINTAIN.(PRIORITY FOUNDATION) | 2. CITY OF CHICAGO FOOD SERVICE SANITATION CERTIFICATE - Comments: OBSERVED NO CERTIFIED FOOD MANAGER ON DUTY WHILE TCS FOODS ARE BEING PREPARED,HANDLED AND SERVED SUCH AS (CHICKEN,GRAVY,SPAGHETTI SAUCE,CABBAGE,ETC). FOOD MANAGER ARRIVED ON SITE AT 11:05 A.M.MUST BE ON SITE AT ALL TIMES.(COS)(PRIORITY FOUNDATION 7-38-012) | 3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: OBSERVED NO SIGNED EMPLOYEES HEALTH POLICIES.M

In [35]:
#Splitting into Train and Test Data 
training_df,test_df=labeledData.randomSplit([0.75,0.25])
print(training_df.count()) 
print(test_df.count())

136346
45450


##### The final task is to convert the labeled data into a format that can be analyzed by logistic regression. The input to a logistic regression algorithm needs be a set of label-feature vector pairs, where the "feature vector" is a vector of numbers representing the input point. 

##### So, we need to convert the "violations" column, which is semi-structured and contains many comments in text format, to an array of real numbers that a machine could understand.

##### One standard machine learning approach for processing natural language is to assign each distinct word an "index", and then pass a vector to the machine learning algorithm such that each index's value contains the relative frequency of that word in the text string

##### MLlib provides an easy way to perform this operation. First, we tokenize each violations string to get the individual words in each string. Then, use a HashingTF to convert each set of tokens into a feature vector that can then be passed to the Machine Leraning algorithm to construct a model. We conduct all of these steps in sequence using a "pipeline".

In [71]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

tokenizer = Tokenizer(inputCol="Violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeledData)

Py4JJavaError: An error occurred while calling o490.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 43.0 failed 1 times, most recent failure: Lost task 5.0 in stage 43.0 (TID 1748, usbmbgspri55795, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(Tokenizer$$Lambda$3622/0x0000000801c95040: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer.$anonfun$createTransformFunc$1(Tokenizer.scala:40)
	... 30 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2188)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1220)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1196)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:504)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:492)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:487)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:277)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(Tokenizer$$Lambda$3622/0x0000000801c95040: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer.$anonfun$createTransformFunc$1(Tokenizer.scala:40)
	... 30 more


In [58]:
count=(spark.sql('''
SELECT results, COUNT(results) AS cnt FROM data_view GROUP BY results ORDER BY cnt DESC
''').show()
)

+--------------------+------+
|             results|   cnt|
+--------------------+------+
|                Pass|110014|
|                Fail| 40438|
|  Pass w/ Conditions| 31344|
|     Out of Business| 18181|
|            No Entry|  6924|
|           Not Ready|  2133|
|Business Not Located|    72|
+--------------------+------+



AttributeError: 'NoneType' object has no attribute 'dtype'

In [70]:

%matplotlib inline
import matplotlib.pyplot as plt


labels =data['results']
sizes = 
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
plt.axis('equal')

SyntaxError: invalid syntax (<ipython-input-70-e9253203e9e7>, line 6)

In [40]:

from pyspark.ml.feature import Bucketizer, StringIndexer, VectorAssembler, IndexToString
risk_idx = StringIndexer(inputCol = "Risk", outputCol="Risk_idx",handleInvalid="keep")
label_idx = StringIndexer(inputCol="Results", outputCol="Label",handleInvalid="keep")
#Create labels list to decode predictions 
resultLabels= label_idx.fit(dat).labels
riskLabels=risk_idx.fit(dat).labels
riskLabels

['Risk 1 (High)', 'Risk 2 (Medium)', 'Risk 3 (Low)', 'All']

In [41]:
#create a  a single vector combining all input features
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['Label'],outputCol="features") 

In [42]:
train_df,test_df=dat.randomSplit([0.7,0.3])
print(train_df.count()) 
print(test_df.count())


119401
51369


In [44]:
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt= DecisionTreeClassifier(labelCol="Risk_idx", featuresCol="features")
lc = IndexToString(inputCol="prediction",outputCol="predictedLabel",labels=resultLabels)
dt_pipeline = Pipeline(stages=[label_idx,risk_idx,va,dt,lc])
dtModel =dt_pipeline.fit(train_df) 
resultDF = dtModel.transform(test_df)
#Look for observations where prediction did not match
resultDF.filter("Label != prediction").select( "DBA Name","Label","prediction","Results","predictedLabel").show() 
# Calculation of the accuracy of our classifier
predictionAndLabels = resultDF.select("prediction", "label") 
evaluator = MulticlassClassificationEvaluator().setMetricName("accuracy")
print ("Accuracy:",(evaluator.evaluate(predictionAndLabels))*100,"%")


+--------------------+-----+----------+------------------+--------------+
|            DBA Name|Label|prediction|           Results|predictedLabel|
+--------------------+-----+----------+------------------+--------------+
|       #1 WOK N ROLL|  1.0|       0.0|              Fail|          Pass|
|1,200 SQ.FT. - YE...|  1.0|       0.0|              Fail|          Pass|
|10 PIN  BOWLING L...|  1.0|       0.0|              Fail|          Pass|
|    11 DEGREES NORTH|  2.0|       0.0|Pass w/ Conditions|          Pass|
|           14 PARISH|  2.0|       0.0|Pass w/ Conditions|          Pass|
|1492 CUBAN FUSION...|  2.0|       0.0|Pass w/ Conditions|          Pass|
|        1800 LIQUORS|  1.0|       0.0|              Fail|          Pass|
|24/7 EXPRESS FOOD...|  1.0|       0.0|              Fail|          Pass|
|          25 DEGREES|  2.0|       0.0|Pass w/ Conditions|          Pass|
|            3 ABEJAS|  1.0|       0.0|              Fail|          Pass|
|3 JJJ'S BETTER TA...|  1.0|       0.0

In [19]:
#Random Forest Classifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
rf = RandomForestClassifier(labelCol="Risk_idx", featuresCol="features",numTrees=50) # Train 50 decision trees on data
rf_pipeline = Pipeline(stages=[risk_idx, label_idx,va,dt,lc]) 
rfResultDF = rf_pipeline.fit(train_df).transform(test_df) 
rfResultDF.filter("Label != prediction").select("DBA Name","Label","Results","prediction","predictedLabel").show()
predictionAndLabels = rfResultDF.select("prediction", "label") 
evaluator = MulticlassClassificationEvaluator().setMetricName("accuracy") 
print ("Accuracy:",(evaluator.evaluate(predictionAndLabels))*100,"%")


+--------------------+-----+------------------+----------+--------------+
|            DBA Name|Label|           Results|prediction|predictedLabel|
+--------------------+-----+------------------+----------+--------------+
|1,200 SQ.FT. - YE...|  1.0|              Fail|       0.0|          Pass|
|10 PIN  BOWLING L...|  1.0|              Fail|       0.0|          Pass|
|1000 LIQUORS / BI...|  2.0|Pass w/ Conditions|       0.0|          Pass|
|      11 DINING, LLC|  2.0|Pass w/ Conditions|       0.0|          Pass|
|        1800 LIQUORS|  1.0|              Fail|       0.0|          Pass|
|24 HRS GYROS & SU...|  1.0|              Fail|       0.0|          Pass|
|          25 DEGREES|  2.0|Pass w/ Conditions|       0.0|          Pass|
|3 JJJ'S BETTER TA...|  1.0|              Fail|       0.0|          Pass|
|          4 BANDERAS|  1.0|              Fail|       0.0|          Pass|
|          4 BANDERAS|  2.0|Pass w/ Conditions|       0.0|          Pass|
|5 STARS MINI MART...|  1.0|          