In [None]:
##  Quiz
* TrainValidationSplit를 이용하여 영화리뷰 긍/부정 예측  
  Estimator pipeline을 테스트(trainRatio = 0.8)
* ParamGridBuilder를 사용하여 Word2Vec의 파라미터 vectorSize를 5,10,20,40으로 바꾸어 정확도를 측정하여 출력
* 정확도는 BinaryClassificationEvaluator를 사용할 것

In [1]:
import findspark
findspark.init()

# create spark session
from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("my app1").master("local").getOrCreate()
from pyspark import SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("quiz_9") \
                  .setMaster("local[*]") \
                  .set("spark.driver.memory", "12g") \
                  .set("spark.executor.memory", "12g")
spark = SparkSession.builder \
                    .config(conf=conf) \
                    .getOrCreate()
sc = spark.sparkContext

from datetime import datetime   # system time 
import time                     # 수행시간
start_time = time.time()

In [6]:
# df = spark.read.format('csv').option('header','true').option('escape','"').load('imdb-review-sentiment.csv')
# root
#  |-- text: string (nullable = true)
#  |-- label: string (nullable = true)

df = spark.read.csv("imdb-review-sentiment.csv", inferSchema=True, header=True, escape='"')
# root
#  |-- text: string (nullable = true)
#  |-- label: integer (nullable = true)
print(df.show(5))
print(df.count())
df.printSchema()

+--------------------+-----+
|                text|label|
+--------------------+-----+
|I grew up (b. 196...|    0|
|When I put this m...|    0|
|Why do people who...|    0|
|Even though I hav...|    0|
|Im a die hard Dad...|    1|
+--------------------+-----+
only showing top 5 rows

None
40000
root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [7]:
df=df.na.drop('any')
df.count()

40000

# Read stopword list

In [8]:
stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"]

# With Pipeline

In [9]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class RemoveStopWordsAndSpecialCharacters(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(RemoveStopWordsAndSpecialCharacters, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            return [ ''.join(e for e in token if e.isalnum()) for token in s if token not in stopwords ]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, HashingTF, Word2Vec, VectorAssembler
from pyspark.ml.classification import LinearSVC

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cleaning = RemoveStopWordsAndSpecialCharacters(inputCol="words", outputCol="clean_words",stopwords=stopwords)
hashingTF = HashingTF(inputCol="clean_words", outputCol="tf")

preprocessing = Pipeline(stages=[
    tokenizer,
    cleaning,
    hashingTF,
])

preprocessed_df = preprocessing.fit(df).transform(df)
train_set, test_set = preprocessed_df.randomSplit([0.8, 0.2], seed=7)

In [11]:
w2v = Word2Vec(inputCol="clean_words", outputCol="w2v", minCount=1, maxIter=2)
asm = VectorAssembler(inputCols=[hashingTF.getOutputCol(), w2v.getOutputCol()], outputCol="features")
svm = LinearSVC(labelCol="label")

estimator = Pipeline(stages=[
    w2v,
    asm,
    svm
])

# mypipeline = Pipeline(stages=[tokenizer, cleaning, hashingTF, w2v, asm, svm])

In [12]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [13]:
evaluator = BinaryClassificationEvaluator()
paramgrid = ParamGridBuilder().addGrid(w2v.vectorSize, [5,10, 20, 40]).build()
# tvs = TrainValidationSplit(estimator = mypipeline, estimatorParamMaps=paramgrid, evaluator = evaluator, trainRatio=0.8, seed=46, collectSubModels=True))
# tvsmodel=tvs.fit(df)
# evaluator.evaluate(tvsModel.transform(df))
tvs = TrainValidationSplit(estimator = estimator, estimatorParamMaps=paramgrid, evaluator = evaluator, trainRatio=0.8, seed=46, collectSubModels=True)

In [14]:
tvsmodel=tvs.fit(train_set)

In [15]:
acc = []

for i, subModel in enumerate(tvsmodel.subModels):
    print(i)
    test_pred = subModel.transform(test_set)
    N = test_pred.count()
    
    acc.append(
        test_pred.filter(test_pred["prediction"] == test_pred["label"]).count() / N
    )

0
1
2
3


In [16]:
for _acc, param in zip(acc, [5, 10, 20,40]):
    print(f"Accuracy when vector size is {param}: {_acc}")

Accuracy when vector size is 5: 0.8888750311798453
Accuracy when vector size is 10: 0.8893739087054128
Accuracy when vector size is 20: 0.8861312047892242
Accuracy when vector size is 40: 0.8846345722125218


In [17]:
spend_time=(time.time() - start_time)
print(f'수행시간(20core, memory 24G) : {int(spend_time//60)}분 {spend_time%60:.2f}초')

수행시간(20core, memory 24G) : 197분 44.19초


In [18]:
### 아래코드는 공부용

In [19]:
import pyspark.sql.functions as F

df1 = spark.createDataFrame([
    (1, "A", "X1"),
    (2, None, "X2"),
    (3, "B", None),
    (1, "", "X3"),
    (2, "", "X2"),
    (3, "C", "X2"),
    (1, None, None),
    (1, "", ""),
    (2, "", ""),
    (1, "X3", "X8"),
], ["ID", "TYPE", "CODE"])
print(df1.show())
print(df1.rdd.map(lambda row: 1 if sum([c == None for c in row]) > 0 else 0  ).reduce(lambda x, y : x + y))
print(df1.rdd.map(lambda row: 1 if sum([c == '' for c in row]) > 0 else 0  ).reduce(lambda x, y : x + y))
print(df1.na.drop('any').show())
print(df1.na.drop('all').show())
print(df1.filter( df1.TYPE != '' ).show())
print(df1.filter(  (df1.TYPE != '') &  (df1.CODE != '') ).show())
print(df1.na.drop('any').show())
cond = (df1.TYPE != '') & (df1.CODE != '')
df1.filter( cond  ).show()

+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  2|null|  X2|
|  3|   B|null|
|  1|    |  X3|
|  2|    |  X2|
|  3|   C|  X2|
|  1|null|null|
|  1|    |    |
|  2|    |    |
|  1|  X3|  X8|
+---+----+----+

None
3
4
+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  1|    |  X3|
|  2|    |  X2|
|  3|   C|  X2|
|  1|    |    |
|  2|    |    |
|  1|  X3|  X8|
+---+----+----+

None
+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  2|null|  X2|
|  3|   B|null|
|  1|    |  X3|
|  2|    |  X2|
|  3|   C|  X2|
|  1|null|null|
|  1|    |    |
|  2|    |    |
|  1|  X3|  X8|
+---+----+----+

None
+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  3|   B|null|
|  3|   C|  X2|
|  1|  X3|  X8|
+---+----+----+

None
+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  3|   C|  X2|
|  1|  X3|  X8|
+---+----+----+

None
+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  1|    |  X3|
|  2|    |  X2|
|  3| 

In [20]:
### NULL, Empty String Blank가 포함된 ROW는 모두 제거

import pyspark.sql.functions as fn
# columns = set(df.columns) - set(['ID'])
# cond = map(lambda x: (col(x).isNotNull()), df.columns)
# [Column<(ID IS NOT NULL)>, Column<(TYPE IS NOT NULL)>, Column<(CODE IS NOT NULL)>]

# cond = map(lambda x: (col(x)) != "", columns)
# [Column<(NOT (CODE = ))>, Column<(NOT (TYPE = ))>]

# cond = map(lambda x: (col(x).isNotNull()) & (col(x) != ""), columns)
# cond
# cond = reduce((lambda x, y: x & y), cond)
# Column<(((CODE IS NOT NULL) AND (NOT (CODE = ))) AND ((TYPE IS NOT NULL) AND (NOT (TYPE = ))))>
# df.rdd.filter(cond).take(5)

for c in df1.columns:
    df1 = df1.withColumn(c, fn.when(  fn.col(c) == ''     , None).otherwise(fn.col(c))  )
print(df1.show())
df1.na.drop('any').show()
# df.filter( df.rdd.map(lambda x: (col(x).isNotNull()) & (col(x) != ""))    ).show()

### NULL, Empty String, Blank가 포함된 ROW만 선택

# cond = map(lambda x: (col(x).isNull()) | (col(x) == ""), df.columns)
# cond = reduce((lambda x, y: x | y), cond)
# df.filter(cond).show()

+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  2|null|  X2|
|  3|   B|null|
|  1|null|  X3|
|  2|null|  X2|
|  3|   C|  X2|
|  1|null|null|
|  1|null|null|
|  2|null|null|
|  1|  X3|  X8|
+---+----+----+

None
+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  3|   C|  X2|
|  1|  X3|  X8|
+---+----+----+



In [21]:
import pyspark.sql.functions as fn
for c in df.columns:
    df2 = df.withColumn(c, fn.when(  (fn.col(c) == '') , None).otherwise(fn.col(c))  )
print(df2.show(5))
print(df2.count())
print(df2.na.drop('any').show(5))
print(df2.count())

+--------------------+-----+
|                text|label|
+--------------------+-----+
|I grew up (b. 196...|    0|
|When I put this m...|    0|
|Why do people who...|    0|
|Even though I hav...|    0|
|Im a die hard Dad...|    1|
+--------------------+-----+
only showing top 5 rows

None
40000
+--------------------+-----+
|                text|label|
+--------------------+-----+
|I grew up (b. 196...|    0|
|When I put this m...|    0|
|Why do people who...|    0|
|Even though I hav...|    0|
|Im a die hard Dad...|    1|
+--------------------+-----+
only showing top 5 rows

None
40000
