-
Notifications
You must be signed in to change notification settings - Fork 0
/
script2_generate_txt.py
executable file
·121 lines (70 loc) · 3.12 KB
/
script2_generate_txt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# coding: utf-8
#this script generates the file classifications.txt based on the model of Decision Tree Classifier
#with advanced Feature Extraction & Transformation: Stemming & Cleaning
# and Feature Selection using among other TF-IDF
#import packages
from pyspark import SparkContext
import loadFiles as lf
import numpy as np
import nltk
import loadFilesPartial as lfp
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer
#create Sparkcontext
sc = SparkContext(appName="Simple App")
data,Y=lf.loadLabeled("./data/train")
labeledData = zip(data,[y.item() for y in Y])
# CHANGE NUMBER OF PARTITIONS ?
# labeledRdd = sc.parallelize(labeledData, 16)
labeledRdd = sc.parallelize(labeledData)
def cleanLower(doc):
return doc.replace("<br /><br />"," ").lower()
rdd = labeledRdd.map(lambda doc : (cleanLower(doc[0]),doc[1]))
print "Text is cleaned"
sqlContext = SQLContext(sc)
#**************************************************************
#*******Partie générique à mdifier dans les script*************
dfTrain = sqlContext.createDataFrame(rdd, ['review', 'label'])
#**************************************************************
tokenizer = Tokenizer(inputCol='review', outputCol='reviews_words')
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='reviews_tf')
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="reviews_tfidf")
string_indexer = StringIndexer(inputCol='label', outputCol='target_indexed')
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol(), maxDepth=10)
pipeline = Pipeline(stages=[tokenizer,
hashing_tf,
idf,
string_indexer,
dt])
#**************************************************************
#**************Partie de code générique************************
#*************à copier après le pipeline***********************
#**************************************************************
model = pipeline.fit(dfTrain)
print "The model is fitted"
#import test set
test,names=lf.loadUknown('./data/test')
text_name=zip(test,names)
UnlabeledRdd = sc.parallelize(text_name)
def cleanLower2(doc):
return doc.replace("<br /><br />"," ").lower()
Unlabeledrdd = UnlabeledRdd.map(lambda doc : (cleanLower2(doc[0]),doc[1]))
print "Test Text is cleaned"
dfTest = sqlContext.createDataFrame(Unlabeledrdd , ['review', 'name'])
# Make predictions.
predictions = model.transform(dfTest)
list_predictions=predictions.collect()
print "Predictions are made"
output=file('./classifications_script2.txt','w')
for x in list_predictions:
output.write('{}\t{}\n'.format(x.name, x.prediction))
output.close()
print "File classifications_script2.txt is written"