-
Notifications
You must be signed in to change notification settings - Fork 0
/
script2.py
executable file
·95 lines (58 loc) · 2.52 KB
/
script2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# coding: utf-8
#this script carries out a cross validation on the model of Decision Tree Classifier
#with advanced Feature Extraction & Transformation: Stemming & Cleaning
# and Feature Selection using among other TF-IDF
#import packages
from pyspark import SparkContext
import loadFiles as lf
import numpy as np
import nltk
import loadFilesPartial as lfp
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer
#create Sparkcontext
sc = SparkContext(appName="Simple App")
data,Y=lf.loadLabeled("./data/train")
labeledData = zip(data,[y.item() for y in Y])
# CHANGE NUMBER OF PARTITIONS ?
# labeledRdd = sc.parallelize(labeledData, 16)
labeledRdd = sc.parallelize(labeledData)
def cleanLower(doc):
return doc.replace("<br /><br />"," ").lower()
rdd = labeledRdd.map(lambda doc : (cleanLower(doc[0]),doc[1]))
print "Text is cleaned"
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(rdd, ['review', 'label'])
dfTrain, dfTest = df.randomSplit([0.8,0.2])
print "Random split is done"
tokenizer = Tokenizer(inputCol='review', outputCol='reviews_words')
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='reviews_tf')
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="reviews_tfidf")
string_indexer = StringIndexer(inputCol='label', outputCol='target_indexed')
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol(), maxDepth=10)
pipeline = Pipeline(stages=[tokenizer,
hashing_tf,
idf,
string_indexer,
dt])
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='target_indexed', metricName='precision')
grid=(ParamGridBuilder()
.baseOn([evaluator.metricName,'precision'])
.addGrid(dt.maxDepth, [10,20])
.build())
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid,evaluator=evaluator)
print "Grid is build"
print "CV Estimator is defined"
cv_model = cv.fit(dfTrain)
print "Model is fitted"
df_test_pred = cv_model.transform(dfTest)
print "Labels are predicted"
print evaluator.evaluate(df_test_pred)