## Spark ML using logistic Regression  
The problem here is to predict the grade (Passing or Failing) of City of Chicago restaurant inspections based on the notes made by the inspector.  

This uses logistic regression.  Suppose you have a set of feature vectors $x_i \in R^n$ for $i$ in in $[0,m]$.  Associated with each feature vector we have a binary result $y_i$.  We are interested in the probability $P(y =1 | x)$ which we write as the function $p(x)$. However because $p(x)$ is between 0 and 1 it is not expressable as a linear function of x so we can't use regular linear regression, so we look at the odds expression $p(x) / (1-p(x))$ and make the guess that its log is linear. In other words

$$ ln( \frac{p(x)}{1-p(x)}) = b_0 + b \cdot x$$

where the offset $b_0$ and the vector $b = [b_1, b_2, ... b_n]$ define a hyperplane for linear regression.  solving this for $p(x)$ we get

$$p(x) = \frac {1}{1+e^{-(b_0 + b \cdot x)}}  $$

And we say $y=1$  if $p(x)>0$  otherwise it is zero.   Unfortunately finding the best $b_0$ and $b$ is not as easy as straight linear regression, but simple Newton like iterations will converge to good solutions. 


We note that the logistic function $\sigma (t)$ is defined as follows:

$$\sigma (t)= \frac {e^t}{e^{t}+1} =\frac {1}{1+e^{-t}}$$

It is used frequently in machine learning to map a real number into a probabilty range $[0,1]$ .


 

In [None]:
s

In [1]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark import SparkContext

spark = SparkContext.getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1626860397864_0010,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from pyspark import SQLContext

sqlCtx = SQLContext(spark)


def csvParse(s):
    import csv
    from StringIO import StringIO
    sio = StringIO(s)
    value = csv.reader(sio).next()
    sio.close()
    return value

## The version in this notebook uses a slightly different input file from the one in the Azure HDInsight demo.   
This notebook will run on spark on your laptop.

In [5]:
inspections = spark.textFile('wasb:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv')\
                .map(csvParse)



In [6]:
inspections.count()

13101

In [7]:
inspections.take(10)

[['413707', 'LUNA PARK INC', 'LUNA PARK  DAY CARE', '2049789', "Children's Services Facility", 'Risk 1 (High)', '3250 W FOSTER AVE ', 'CHICAGO', 'IL', '60625', '09/21/2010', 'License-Task Force', 'Fail', '24. DISH WASHING FACILITIES: PROPERLY DESIGNED, CONSTRUCTED, MAINTAINED, INSTALLED, LOCATED AND OPERATED - Comments: All dishwashing machines must be of a type that complies with all requirements of the plumbing section of the Municipal Code of Chicago and Rules and Regulation of the Board of Health. OBSEVERD THE 3 COMPARTMENT SINK BACKING UP INTO THE 1ST AND 2ND COMPARTMENT WITH CLEAR WATER AND SLOWLY DRAINING OUT. INST NEED HAVE IT REPAIR. CITATION ISSUED, SERIOUS VIOLATION 7-38-030 H000062369-10 COURT DATE 10-28-10 TIME 1 P.M. ROOM 107 400 W. SURPERIOR. | 36. LIGHTING: REQUIRED MINIMUM FOOT-CANDLES OF LIGHT PROVIDED, FIXTURES SHIELDED - Comments: Shielding to protect against broken glass falling into food shall be provided for all artificial lighting sources in preparation, service

In [8]:
schema = StructType([StructField("id", IntegerType(), False), 
                     StructField("name", StringType(), False), 
                     StructField("results", StringType(), False), 
                     StructField("violations", StringType(), True)])

df = sqlCtx.createDataFrame(inspections.map(lambda l: (int(l[0]), l[2], l[3], l[4])) , schema)
df.registerTempTable('CountResults')


In [9]:
df.show(5)

+------+--------------------+-------+--------------------+
|    id|                name|results|          violations|
+------+--------------------+-------+--------------------+
|413707| LUNA PARK  DAY CARE|2049789|Children's Servic...|
|391234|       CAFE SELMARIE|1069067|          Restaurant|
|413751|MANCHU WOK (T3-H/...|1909522|          Restaurant|
|413708|BENCHMARK HOSPITA...|2049411|          Restaurant|
|413722|           JJ BURGER|2055016|          Restaurant|
+------+--------------------+-------+--------------------+
only showing top 5 rows

In [10]:
print("passing = %d"%df[df.results == 'Pass'].count())
print("failing = %d"%df[df.results == 'Fail'].count())

passing = 0
failing = 0

In [11]:
df.count()

13101

In [12]:
df.select('results').distinct().show()

+-------+
|results|
+-------+
|2014004|
|1801390|
|1770896|
|1478647|
|  20158|
|1514155|
|   7252|
|2032033|
|1908805|
|1574657|
|1800376|
|  22121|
|  20428|
|1597586|
|  13192|
|1979986|
|2037268|
|2224948|
|  28117|
|  35640|
+-------+
only showing top 20 rows

In [13]:
#%%sql -o count_results_df
count_results_df = sqlCtx.sql("SELECT results, COUNT(results) AS cnt FROM \
       CountResults GROUP BY results").toPandas()
labels = count_results_df["results"]
sizes = count_results_df['cnt']
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']



In [15]:
count_results_df["results"]

0       1979986
1         22121
2       2037268
3       2224948
4         28117
5         35640
6         20158
7         29539
8       1647924
9       1042702
10      2032143
11      2037497
12        45141
13      1675566
14      2043013
15        55321
16      2042268
17      2021286
18      2032033
19        10436
20      1926867
21        47940
22      1801390
23        64038
24      2014004
25      1770896
26      1478647
27      1514155
28         7252
29      1908805
         ...   
8676    1769316
8677    1620340
8678    1242839
8679    2017994
8680       8507
8681    2013807
8682    1981131
8683    1473964
8684    2016763
8685    1422648
8686    1193152
8687    1122732
8688    1908389
8689      31100
8690    2224959
8691       6474
8692      34229
8693    2054839
8694      37566
8695    1519691
8696    1927197
8697    1869952
8698    2003825
8699      21665
8700    1897994
8701    1968597
8702    2134640
8703    1196998
8704    1880119
8705    1981894
Name: results, Length: 8

In [16]:
def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')

In [17]:
labeledData.take(1)

[]

In [18]:
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
#hashingTF = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)
model

An error occurred while calling o226.fit.
: java.lang.UnsupportedOperationException: empty.min
	at scala.collection.TraversableOnce$class.min(TraversableOnce.scala:222)
	at scala.collection.mutable.ArrayOps$ofDouble.min(ArrayOps.scala:270)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:523)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:494)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:489)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:279)
	at org.apache.spark.ml.Predictor.fit(P

In [None]:
hashingTF.getNumFeatures()

In [None]:
testData = spark.textFile('Food_Inspections2.csv')\
             .map(csvParse) \
             .map(lambda l: (int(l[0]), l[2], l[3], l[4]))
testDf = sqlCtx.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
predictionsDf = model.transform(testDf)
predictionsDf.registerTempTable('Predictions')
predictionsDf.columns

In [None]:
predictionsDf.take(1)

In [None]:
numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR 
                                      (prediction = 1 AND (results = 'Pass' OR 
                                                           results = 'Pass w/ Conditions'))""").count()
numInspections = predictionsDf.count()

print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"


In [None]:
from pyspark.sql.types import *
from IPython.core.magic import register_line_cell_magic

In [None]:
# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True    

In [None]:
@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    print val
    return sqlContext.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return sqlContext.sql(val).limit(max_show_lines).toPandas() 


In [None]:
#%%sql -q -o true_positive
#SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND results = 'Fail'
true_negative = spark.sql("SELECT count(*) AS cnt FROM Predictions WHERE \
        (prediction = 0 AND results = 'Fail')").toPandas()

In [None]:
#%%sql -q -o false_positive
false_negative = spark.sql("SELECT count(*) AS cnt FROM Predictions \
WHERE prediction = 0 AND (results = 'Pass' OR results = 'Pass w/ Conditions')").toPandas()

In [None]:
#%%sql -q -o true_negative
false_positive = spark.sql("SELECT count(*) AS cnt FROM Predictions WHERE \
       prediction = 1 AND results = 'Fail' ").toPandas()

In [None]:
#%%sql -q -o false_negative
true_positive = spark.sql("SELECT count(*) AS cnt FROM Predictions WHERE \
      prediction = 1 AND (results = 'Pass' OR results = 'Pass w/ Conditions')").toPandas()

In [None]:
false_negative['cnt']

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

labels = ['True positive', 'False positive', 'True negative', 'False negative']
sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
plt.pie(sizes, labels=labels, autopct='%10.1f%%', colors=colors)
plt.axis('equal')

Precision and recall are then defined as:

$$Precision=\frac {tp}{tp+fp}$$


$$ Recall = \frac {tp}{tp+fn} $$


Precision is the probability that a (randomly selected) positive prediction is correct.

Recall is the probability that a (randomly selected) resturant with a passing grade is predicted to be passing.


In [None]:
print('so precision = %f'% \
      (float(true_positive['cnt'])/(float(true_positive['cnt'])+float(false_positive['cnt']))))

In [None]:
print('and recall = %f'% \
      (float(true_positive['cnt'])/(float(true_positive['cnt'])+float(false_negative['cnt']))))

If we do this another way, we can ask how accurate are we in finding the failing resturants? This is a bit harder because there are far fewer of them.   In this case we are interested in true-negatives, so 

Precision is the probability that a (randomly selected) negative prediction is correct.

Recall is the probability that a (randomly selected) resturant with a failing grade is predicted to be failing.


$$Precision=\frac {tn}{tn+fn}$$


$$ Recall = \frac {tn}{tn+fp} $$


In [None]:
print('so the precision of failure prediction = %f'% \
      (float(true_negative['cnt'])/(float(true_negative['cnt'])+float(false_negative['cnt']))))

In [None]:
print('and recall is = %f'% \
      (float(true_negative['cnt'])/(float(true_negative['cnt'])+float(false_positive['cnt']))))