The very first beginning of our job is to import packages and setting environment:

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()
sc

Then we use `SparkContext` to parse the XML file that we need to analyze, which is "Posts.xml" in this part.

In [4]:
textFile = sc.textFile("s3://my4dbucket/math/Posts.xml")
textFile.take(5)

['<?xml version="1.0" encoding="utf-8"?>',
 '<posts>',
 '  <row Id="1" PostTypeId="1" AcceptedAnswerId="9" CreationDate="2010-07-20T19:09:27.200" Score="156" ViewCount="8964" Body="&lt;p&gt;Can someone explain to me how there can be different kinds of infinities?&lt;/p&gt;&#xA;&#xA;&lt;p&gt;I was reading &quot;&lt;a href=&quot;http://en.wikipedia.org/wiki/The_Man_Who_Loved_Only_Numbers&quot; rel=&quot;noreferrer&quot;&gt;The man who loved only numbers&lt;/a&gt;&quot; by &lt;a href=&quot;http://en.wikipedia.org/wiki/Paul_Hoffman_(science_writer)&quot; rel=&quot;noreferrer&quot;&gt;Paul Hoffman&lt;/a&gt; and came across the concept of countable and uncountable infinities, but they\'re only words to me.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Any help would be appreciated.&lt;/p&gt;&#xA;" OwnerUserId="10" LastEditorUserId="32803" LastEditorDisplayName="user126" LastEditDate="2018-03-01T19:53:22.017" LastActivityDate="2020-01-28T03:26:12.530" Title="What Does it Really Mean to Have Different Kinds of 

After we take the first 5 elements of this file, we can see that xml file is formatted much like an HTML document, but uses custom tags to define objects and the data within each object. The `Posts` consists of 20 'object': 
* Id
* PostTypeId
* ParentID
* AcceptedAnswerId
* CreationDate
* Score
* ViewCount
* Body
* OwnerUserId
* LastEditorUserId
* LastEditorDisplayName="Jeff Atwood"
* LastEditDate
* LastActivityDate
* CommunityOwnedDate
* ClosedDate
* Title
* Tags
* AnswerCount
* CommentCount
* FavoriteCount

There are total 2983556 rows.

In [5]:
textFile.count()

2983556

Then we import packages needed for execute sql and binary classification.

In [5]:
import re
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.mllib.evaluation import BinaryClassificationMetrics


Since our binary model intends to tag questions related and unrelated to 'calculus': there are 4 objects may be related to our prediction:`Id` `Title` `Body` `Tags`(this object is for validation and testing), so for the first step, we exclude rows with missing values of 4 objects mentioned above.

In [6]:
postsXml = textFile.map( lambda line: line.strip() ).\
    filter( lambda line: line != "<posts>" and line != "</posts>").\
    filter( lambda line: not line.startswith("<?xml version=") ).\
    filter( lambda line: line.find("Id=") >= 0 ).\
    filter( lambda line: line.find("Tags=") >= 0 ).\
    filter( lambda line: line.find("Body=") >= 0 ).\
    filter( lambda line: line.find("Title=") >= 0 )

In [7]:
print(textFile.count())
print(postsXml.count())

2983556
1257199


It can be seen that we filtered almost half of the rows, reducing the size of this RDD will help us a lot in terms of computational costs.

Intuitively, the tag we give to a certain question is base on both the question itself(`Title`) and further description of the question(`Body`), so we merge these 2 objects into one object `Text`, and we use ****lambda**** function to extract the objects and contents we need. We define Label = 1.0 for Tags = 'calculus', otherwise Label = 0.0.

In [16]:
from pyspark.sql import Row
targetTag = "calculus"
postsRDD = postsXml.map( lambda s: Row(\
                Id = re.search('Id=".+?"', s).group(0)[4:-1],\
                Label = 1.0 if re.search('Tags=".+?"', s) != None\
                            and re.search('Tags=".+?"', s).group(0)[6:-1].find(targetTag) >= 0 else 0.0,\
                Text = ((re.search('Title=".+?"', s).group(0)[7:-1] if re.search('Title=".+?"', s) != None else "") + " " + (re.search('Body=".+?"', s).group(0)[6:-1]) if re.search('Body=".+?"', s) != None else "")))


In [9]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [10]:
PostsLabeled = sqlContext.createDataFrame(postsRDD)

In [12]:
# Another way for create data frame: we create a schema first.
#dfSchema = StructType([
        #StructField("Id", StringType(), True),\
        #StructField("Label", FloatType(), True),\
        #StructField("Text", StringType(), True)
        #])
#PostsLabeled = sqlContext.createDataFrame(postsRDD, dfSchema)

It can be seen that "PostsLabeled" only includes ID, Label, and Text that we need.

In [11]:
PostsLabeled.take(5)

[Row(Id='1', Label=0.0, Text="What Does it Really Mean to Have Different Kinds of Infinities? &lt;p&gt;Can someone explain to me how there can be different kinds of infinities?&lt;/p&gt;&#xA;&#xA;&lt;p&gt;I was reading &quot;&lt;a href=&quot;http://en.wikipedia.org/wiki/The_Man_Who_Loved_Only_Numbers&quot; rel=&quot;noreferrer&quot;&gt;The man who loved only numbers&lt;/a&gt;&quot; by &lt;a href=&quot;http://en.wikipedia.org/wiki/Paul_Hoffman_(science_writer)&quot; rel=&quot;noreferrer&quot;&gt;Paul Hoffman&lt;/a&gt; and came across the concept of countable and uncountable infinities, but they're only words to me.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Any help would be appreciated.&lt;/p&gt;&#xA;"),
 Row(Id='3', Label=0.0, Text='List of interesting math podcasts? &lt;p&gt;&lt;a href=&quot;http://mathfactor.uark.edu/&quot;&gt;mathfactor&lt;/a&gt; is one I listen to.  Does anyone else have a recommendation?&lt;/p&gt;&#xA;'),
 Row(Id='5', Label=0.0, Text='How can you prove that the square root of t

Then we can jump into model building part, dividing dataset into `positive` and `negative`, here we define ****label = 0.0**** as negative and ****label = 1.0**** as positive. Split the data in to test and training set.

In [17]:
seed = 123
positive = PostsLabeled.filter(PostsLabeled.Label > 0.0)
negative = PostsLabeled.filter(PostsLabeled.Label < 1.0)

In [18]:
positiveTrain = positive.sample(False, 0.1,seed = 123)
negativeTrain = negative.sample(False, 0.1,seed = 123)
training = positiveTrain.unionAll(negativeTrain)

In [19]:
training.show(10)

+----+-----+--------------------+
|  Id|Label|                Text|
+----+-----+--------------------+
| 571|  1.0|What is the optim...|
| 936|  1.0|Looking for funct...|
|1215|  1.0|Solution to $1-f(...|
|1730|  1.0|How do you define...|
|2048|  1.0|Solution(s) to 'p...|
|2872|  1.0|How do you take t...|
|2899|  1.0|Can this standard...|
|3319|  1.0|Why is the number...|
|3483|  1.0|What are some goo...|
|4317|  1.0|Generalizing $\su...|
+----+-----+--------------------+
only showing top 10 rows



We change the column name "Label" into "Flag" in order to do filter.

In [20]:
negTrainTmp1 = negativeTrain.withColumnRenamed("Label", "Flag")
negativeTrainTmp = negTrainTmp1.select(negTrainTmp1.Id, negTrainTmp1.Flag)
negativeTest = negative.join( negativeTrainTmp, negative.Id == negativeTrainTmp.Id, "LeftOuter").\
                        filter("Flag is null").\
                        select(negative.Id, negative.Text, negative.Label)

In [21]:
posTrainTmp1 = positiveTrain.withColumnRenamed("Label", "Flag")
positiveTrainTmp = posTrainTmp1.select(posTrainTmp1.Id, posTrainTmp1.Flag)

positiveTest = positive.join( positiveTrainTmp, positive.Id == positiveTrainTmp.Id, "LeftOuter").\
                        filter("Flag is null").\
                        select(positive.Id, positive.Text, positive.Label)

And then we combine all testing data.

In [22]:
testing = negativeTest.unionAll(positiveTest)

For our next step, We split each sentence in `Text` into words using ****Tokenizer****. For each sentence (bag of words), we use ****HashingTF**** to hash the sentence into a feature vector. After this, we then train the model.

In [23]:
numFeatures = 20000
numEpochs = 120
regParam = 0.001

tokenizer = Tokenizer().setInputCol("Text").setOutputCol("Words")
hashingTF = HashingTF().setNumFeatures(numFeatures).\
                setInputCol(tokenizer.getOutputCol()).setOutputCol("Features")
lr = LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam).\
                                    setFeaturesCol("Features").setLabelCol("Label").\
                                    setRawPredictionCol("Score").setPredictionCol("Prediction")
pipeline = Pipeline().setStages([tokenizer, hashingTF, lr])


In [25]:
model = pipeline.fit(training)

Here we choose AUC(area under ROC as model evaluation index.

In [26]:
testingResult = model.transform(testing)
testingResultScores = testingResult.select("Prediction", "Label").rdd.map( lambda r: (float(r[0]), float(r[1])))
bc = BinaryClassificationMetrics(testingResultScores)
roc = bc.areaUnderROC
print("Area under the ROC:",  roc)

Area under the ROC: 0.6263879068653173


We can check the prediction result manually by 2 examples below:

The first post is ***related*** with calculus, whose label should be 1.0. It can be seen that the prediction is 1.0 as we expected.

In [27]:
testTitle = "How can you prove that a function has no closed form integral?"
testBody = """&lt;p&gt;I\'ve come across statements in the past along the lines of &quot;function &lt;span class=&quot;math-container&quot;&gt;$f(x)$&lt;/span&gt; has no closed form integral&quot;, which I assume means that there is no combination of the operations:&lt;/p&gt;&#xA;&#xA;&lt;ul&gt;&#xA;&lt;li&gt;addition/subtraction&lt;/li&gt;&#xA;&lt;li&gt;multiplication/division&lt;/li&gt;&#xA;&lt;li&gt;raising to powers and roots&lt;/li&gt;&#xA;&lt;li&gt;trigonometric functions&lt;/li&gt;&#xA;&lt;li&gt;exponential functions&lt;/li&gt;&#xA;&lt;li&gt;logarithmic functions&lt;/li&gt;&#xA;&lt;/ul&gt;&#xA;&#xA;&lt;p&gt;, which when differentiated gives the function &lt;span class=&quot;math-container&quot;&gt;$f(x)$&lt;/span&gt;. I\'ve heard this said about the function &lt;span class=&quot;math-container&quot;&gt;$f(x) = x^x$&lt;/span&gt;, for example.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;What sort of techniques are used to prove statements like this? What is this branch of mathematics called?&lt;/p&gt;&#xA;&#xA;&lt;hr&gt;&#xA;&#xA;&lt;p&gt;Merged with &quot;&lt;a href=&quot;https://math.stackexchange.com/questions/2328/&amp;quot;&amp;gt;How to prove that some functions don\'t have a primitive&lt;/a&gt;&quot; by &lt;a href=&quot;https://math.stackexchange.com/users/918/ismael&amp;quot;&amp;gt;Ismael&amp;lt;/a&amp;gt;:  &lt;/p&gt;&#xA;&#xA;&lt;p&gt;Sometimes we are told that some functions like &lt;span class=&quot;math-container&quot;&gt;$\\dfrac{\\sin(x)}{x}$&lt;/span&gt; don\'t have an indefinite integral, or that it can\'t be expressed in term of other simple functions.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;I wonder how we can prove that kind of assertion?&lt;/p&gt;&#xA;"""
testText = testTitle + testBody
testDF = sqlContext.createDataFrame([("155", testText, 1.0)], ["Id", "Text", "Label"])
result = model.transform(testDF)
prediction = result.collect()[0][7]
print("Prediction: ", prediction)

Prediction:  1.0


The second post is ***not related*** with calculus, whose label should be 0.0. It can be seen that the prediction is 0.0 as we expected.

In [28]:
testTitle = "What Does it Really Mean to Have Different Kinds of Infinities?"
testBody = """&lt;p&gt;Can someone explain to me how there can be different kinds of infinities?&lt;/p&gt;&#xA;&#xA;&lt;p&gt;I was reading &quot;&lt;a href=&quot;http://en.wikipedia.org/wiki/The_Man_Who_Loved_Only_Numbers&amp;quot; rel=&quot;noreferrer&quot;&gt;The man who loved only numbers&lt;/a&gt;&quot; by &lt;a href=&quot;http://en.wikipedia.org/wiki/Paul_Hoffman_(science_writer)&amp;quot; rel=&quot;noreferrer&quot;&gt;Paul Hoffman&lt;/a&gt; and came across the concept of countable and uncountable infinities, but they\'re only words to me.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Any help would be appreciated.&lt;/p&gt;&#xA;"""
testText = testTitle + testBody
testDF = sqlContext.createDataFrame([("1", testText, 0.0)], ["Id", "Text", "Label"])
result = model.transform(testDF)
prediction = result.collect()[0][7]
print("Prediction: ", prediction)


Prediction:  0.0


In [29]:
sc.stop()