# 데이터 변환

* Last updated 20201025SUN1103 20191128_20181009_20170421_20161125

## S.1 학습내용

### S.1.1 목표

* 정량데이터, 텍스트데이터에 대하여 ETL을 할 수 있다.
* RDD를 변환할 수 있다.
* DataFrame 구조적 데이터를 변환할 수 있다.

### S.1.2 목차

* S.2 Jupyter Notebook에서 SparkSession 생성하기
* S.3 데이터 변환
* S.4 RDD 변환
    * S.4.1 vectors
        * Dense Vectors 밀집벡터
        * Sparse Vectors
    * S.4.2 Matrix
    * S.4.3 Distributed Matrix
    * S.4.4 labeled point
    * 문제 S-1: RDD를 LabeledPoint로 변환하기
    * S.4.5 svm의 입력파일 형식
    * S.4.6 TF
    * S.4.7 TF-IDF
    * S.4.8 StandardScaler

* S.5 DataFrame 변환 
    * S.5.1 Labeled Point
        * 레이블이 있는 Python List에서 DataFrame 생성
        * LabeledPoint에서 DataFrame 생성
        * mllib.linalg.Vectors를 사용하여 DataFrame을 생성
        * RDD에서 DataFrame 생성
        * sparse에서 dense vector로 변환
    * S.5.2 텍스트 변환
        * Bag of Words 모델
        * 텍스트 변환 단계
        * Python을 사용한 단어 빈도 계산
    * S.5.3 Spark의 transformer, estimator
    * S.5.4 Tokenizer
    * S.5.5 RegTokenizer
    * S.5.6 Stopwords
    * S.5.7 CountVectorizer
    * S.5.8 TF-IDF
    * S.5.9 Word2Vec
    * S.5.10 NGram
    * S.5.11 StringIndexer
    * S.5.12 연속데이터의 변환
    * 5.5.13 VectorAssembler
    * S.5.14 Pipeline
    * 문제 S-2: 연설문을 기계학습하기 위해 변환
    * 문제 S-3: 트윗 정서 분석

* --- Regression 예시

### S.1.3 문제 

* 문제 S-1: 훈련데이터 만들기
* 문제 S-2: Kolmogorov-Smirnov 검증
* 문제 S-3: 평균, 표준편차와 같은 기본 통계 값을 구한다.
* 문제 S-4: 연설문을 기계학습하기 위해 변환

## S.2 Jupyter Notebook에서 SparkSession 생성하기


In [1]:
import sys
import os

home=os.path.expanduser("~")
os.environ["PYSPARK_PYTHON"]="C:\\Users\\zaqxs\\Anaconda2\\envs\\py35\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"]="C:\\Users\\zaqxs\\Anaconda2\\envs\\py35\\python.exe"
os.environ["PYLIB"]=os.path.join(os.environ["SPARK_HOME"],'python','lib')
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'py4j-0.10.9-src.zip'))
sys.path.insert(0,os.path.join(os.environ["PYLIB"],'pyspark.zip'))

In [2]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

In [3]:
print (spark.version)

3.1.2


## S.3 데이터 변환

Spark를 단순히 데이터를 읽고, groupBy, 평균, 합계를 계산하는 용도로 사용하기에는 충분하지 않다.
데이터로부터 좀 더 의미있는 신호를 읽기 위해서는 예측, 분류, 군집화, 추천 등 기계학습을 빼놓을 수 없다.
이러한 기계학습은 무작정 데이터를 우겨 넣고, 의미있는 결과가 나오기를 기다려서는 작동되지 않는다.
입력될 데이터를 정련하고, 가공하고, 일정한 형식을 가지도록 구성하는 것이 필요하다. 

**ETL** (Extract, Transform, Load)은 소스에서 필요한 데이터를 추출, 변환하고, 다른 타켓으로 로딩하는 것으로 말한다.
* **추출**은 원천에서 데이터를 가져오는 것으로, spark에서는 예를 들어 csv, NoSQL 등에서 데이터를 읽어서 RDD, DataFrame을 생성하는 작업을 말한다.
* **변환**은 분석 가능한 형식으로 변환하는 것으로, 결측 값이나 이상값을 제거하거나 map() 함수를 사용하거나 데이터타입 변환을 하는 등의 작업을 한다.
* **로딩**은 변환한 데이터를 저장하여 놓는 것으로 Spark에서는 RDD, DataFrame의 형식으로 만들어 놓는다.
지도학습 Supervised Learning을 하려면, DataFrame은 label, features 컬럼을, 
**RDD는 label과 features를 가지고 있는 Labeled Point로 구성**해야 한다.

In [5]:
!pip install IPlantUML

Collecting IPlantUML
  Downloading IPlantUML-0.1.1.tar.gz (5.5 kB)
Collecting plantweb
  Downloading plantweb-1.2.1-py3-none-any.whl (20 kB)
Building wheels for collected packages: IPlantUML
  Building wheel for IPlantUML (setup.py): started
  Building wheel for IPlantUML (setup.py): finished with status 'done'
  Created wheel for IPlantUML: filename=IPlantUML-0.1.1-py2.py3-none-any.whl size=4895 sha256=e107c905ddfe06b0b9670b1f785d698eb86db3beeb0704da47bd433c3dfd2141
  Stored in directory: c:\users\zaqxs\appdata\local\pip\cache\wheels\cf\64\08\5bac65794ab011a60f7ef62413d3c430cf715345028f4b3914
Successfully built IPlantUML
Installing collected packages: plantweb, IPlantUML
Successfully installed IPlantUML-0.1.1 plantweb-1.2.1


You should consider upgrading via the 'C:\Users\zaqxs\Anaconda2\envs\py35\python.exe -m pip install --upgrade pip' command.


In [6]:
import iplantuml

In [8]:
%%plantuml --jar
@startuml
database source
rectangle transform
source -right-> transform: extract
database target
transform -right-> target: load
@enduml

CalledProcessError: Command '['java', '-splash:no', '-jar', 'C:\\usr\\local\\bin\\plantuml.jar', '-tsvg', '1cf0ef29-ad3c-4925-970b-32156bd8bc11.uml']' returned non-zero exit status 1.

Spark 초기에는 RDD를 사용하여 데이터를 변환하고, mllib 라이브러리를 사용하여 기계학습을 하였다.
그 후 DataFrame이 소개되고 나서, ml 라이브러리가 사용되고 있다.
Spark 2.0부터 mllib는 유지보수로 지원된다고 한다. 오류가 있으면 수정되지만, 새로운 기능이 추가되지는 않고 있다.
반면에 ml 라이브러리는 새로운 기능이 추가되고 있다.

따라서 Spark에서는 **```RDD mllib```** , **```DataFrame ml```** 패키지 별로 데이터 타입이나 모델이 제공되므로, 식별하여 사용한다.
```ml``` 패키지를 사용할 경우에는 자신의 ```pyspark.ml.linalg.Vector``` 등을 사용해야 한다. ```mllib```도 마찬가지이다.

패키지 | 설명 | 데이터 타입 예
-------|-------|-----
```mllib``` | RDD API를 제공 | ```pyspark.mllib.linalg.Vector``` 또는 ```pyspark.mllib.linalg.Matrix```
```ml``` | DataFrame API를 제공 | ```pyspark.ml.linalg.Vector``` 또는 ```pyspark.ml.linalg.Matrix```


## S.4 RDD 변환

기계학습을 하기 위해서는 데이터를 일정 형식으로 만들어 주어야 한다.
**```Vector```**, **```Labeled Point```**, **```Matrix```**를 배워보자.

구분 | 설명
----------|----------
```Vector``` | ```numpy vector```와 같은 기능을 한다. **dense**와 **sparse** vector로 구분한다.
```Labeled Point``` | 분류를 의미하는 클래스 또는 **label**과 속성 **features** 이 묶인 구조로서, 지도학습 supervised learning을 할 경우 사용된다.
```Matrix``` | ```numpy matrix```와 같은 특징을 가진다.

### S.4.1 vectors

행렬 **Vector**는 **dense**와 **sparse**로 구분할 수 있다.

* dense vector는 빈 값이 별로 없이 **모든** 행열이 값을 가지고 있다.
* sparse vector는 빈 값이 많아서, 값이 있는 경우 그 값이 있는 **인덱스**로 표현해 배열을 축약하게 된다.

#### Dense Vectors 밀집벡터

벡터는 일련의 수로 구성이 되고, 행벡터 또는 열벡터가 될 수 있다. 채워지는 값이 대부분 0이면 희소벡터 Sparse Vectors로 만들어 질 수 있다.

numpy 라이브러리를 가져오자.

In [9]:
import numpy as np

numpy array를 사용하면 만들어지는 것이 dense vector이다.

In [10]:
dv = np.array([1.0, 2.1, 3])

Spark에서는 Vectors 명령어로 벡터를 만들 수 있다.
단 **RDD는 mllib** 모듈을 사용해서 vectors를 만들고, 반면에 **```DataFrame```**은 **ml**를 사용한다는 점에 유의하자.

In [11]:
from pyspark.mllib.linalg import Vectors

dv1mllib=Vectors.dense([1.0, 2.1, 3])

만든 벡터를 출력해보자. 그 타입도 확인하면 DenseVector이다.

In [12]:
print ("Dense vector: {}\nType: {}".format(dv1mllib, type(dv1mllib)))

Dense vector: [1.0,2.1,3.0]
Type: <class 'pyspark.mllib.linalg.DenseVector'>


이번에는 ```pyspark.ml``` 모듈을 이용하여 Vectors를 생성해보자. 

In [13]:
from pyspark.ml.linalg import Vectors

dv1ml=Vectors.dense([1.0, 2.1, 3])

In [14]:
print ("ml의 dense vector: {}".format(dv1ml))

ml의 dense vector: [1.0,2.1,3.0]


dense vectors는 numpy array와 같은 특징을 가진다.
인덱스로 값을 읽을 수 있다. 또한 반복문에서 사용할 수 있다.

In [15]:
for e in dv1mllib:
    print (e, end = " ")

1.0 2.1 3.0 

보통 벡터와 같이 **product**, **dot**, **norm**과 같은 벡터 연산을 할 수도 있다.
결과 값은 numpy와 동일하다.

In [16]:
dv1mllib.dot(dv1mllib)

14.41

In [17]:
np.dot(dv,dv)

14.41

더하기, 빼기, 곱하기, 나누기 연산도 가능하다. 곱하기 연산을 해보자. dot와 달리 항목별로 실행한다.

In [16]:
dv1mllib*dv1mllib

DenseVector([1.0, 4.41, 9.0])

#### Sparse Vectors 희소행렬

행렬에는 0 값이 많이 존재하기 때문에, 0값이 아닌 **NZ Nonzero**만 저장하면 훨씬 효율적이다.
**sparse**는 실제 **값이 없는 요소, '0'을 제거**하여 만든 vector이다.
Spark에서 type field (1 바이트 길이)를 통해 식별한다 (0: sparse, 1: dense)

예를 들어, 다음은 1차원 dense vector이다.
```python
[160, 69, 0, 0, 24]
```

sparse vectors는 값 중에 0이 포함된 경우 이를 생략하고, 값이 있는 요소 Nonzero만 남기게 된다.
3은 컬럼 갯수, 0, 1, 4는 값이 있는 컬럼, [160.0, 69.0, 24.0]는 실제 값을 의미한다.

In [18]:
sv1 = Vectors.sparse(5,[0,1,4],[160.0,69.0,24.0])

어느 모듈, mllib 또는 ml인지 확인해보자.

In [19]:
type(sv1)

pyspark.ml.linalg.SparseVector

```toArray()``` 함수를 사용하면 sparse에서 dense로 벡터를 변환할 수 있다.

In [20]:
sv1.toArray()

array([160.,  69.,   0.,   0.,  24.])

### S.4.2 Matrix

로컬 Matrix 역시 밀집 dense, 희소 sparse형식을 지원한다.

#### Dense Matrix

In [21]:
from pyspark.mllib.linalg import Matrices

Matrices.dense(3, 2, [1,2,3,4,5,6])

DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)

#### Sparse Vectors

```python
[1 0 2]
[0 0 3]
[4 5 6]
```

위와 같은 2차원 dense vectors를 sparse vectors의 배열 방식으로 표현해보자.
우선 다음과 같이
행, 열, 값 vector를 만든다.

```python
행 | 0 | 0 | 1 | 2 | 2 | 2
열 | 0 | 2 | 2 | 0 | 1 | 2
값 | 1 | 2 | 3 | 4 | 5 | 6
```

행을 보면 0번째에 '1','2' 1번째에 '3', 2번째에 '4','5','6'이므로 **0,0,1,2,2,2**
열을 보면 0번째에 '1', 2번째 '2','3', 0번째 '4', 1번째 '5', 2번째 '6'이므로 **0,2,2,0,1,2**

**행, 열, 데이터를 한 쌍**으로 읽으면 된다.
즉 행 0, 열 0의 위치에 1, 행 0, 열 2의 위치에 2. 이런 식으로 6개의 데이터가 있다.


In [22]:
import numpy as np

row = np.array([0, 0, 1, 2, 2, 2])
col = np.array([0, 2, 2, 0, 1, 2])
data = np.array([1, 2, 3, 4, 5, 6])

scipy의 sparse vectors로 표현해보자.

In [23]:
import scipy.sparse as sps

mtx = sps.csc_matrix((data, (row, col)), shape=(3, 3))

matrix를 dense vectors로 출력해보자.

In [24]:
print (mtx.todense())

[[1 0 2]
 [0 0 3]
 [4 5 6]]


#### Sparse Vectors의 CSR (Compressed Sparse Row) 또는 Yale Format

다음 5개의 값으로 표현된다.
* 첫째 행 개수 (```int```)
* 둘째 열 개수 (```int```)
* 세째 ```int[]```은 **열 포인터** (IA):
    * IA[0]=**0** 시작 값은 0으로, IA[i]=IA[i-1] + (i-1)열의 **NNZ** (NZ 개수, Num of NonZeroes)
* 네째 ```int[]```은 **행 인덱스** (JA): 각 NZ의 행 인덱스
* 마지막은 소수 (```double```)로 실제 값 리스트

2차원 dense vectors를 만들어 보자.
행의 갯수, 열의 갯수, 실제 값을 넣어주면 생성된다.
* 6은 행 갯수
* 4는 열 갯수
* 다음 수는 행렬을 해체하여 연속적인 수로 나열

In [25]:
from pyspark.mllib.linalg import Matrices

dm = Matrices.dense(6,4,[1, 2, 0, 0, 0, 0, 0, 3, 0, 4, 0, 0, 0, 0, 5, 6, 7, 0, 0, 0, 0, 0, 0, 8])

위를 2차원 배열로 변환해자.

In [26]:
dm.toArray()

array([[1., 0., 0., 0.],
       [2., 3., 0., 0.],
       [0., 0., 5., 0.],
       [0., 4., 6., 0.],
       [0., 0., 7., 0.],
       [0., 0., 0., 8.]])

다음을 희소행렬로 변환해보자.

```python
[ 1.,  0.,  0.,  0.]
[ 2.,  3.,  0.,  0.]
[ 0.,  0.,  5.,  0.]
[ 0.,  4.,  6.,  0.]
[ 0.,  0.,  7.,  0.]
[ 0.,  0.,  0.,  8.]
```

* 6은 행 갯수
* 4는 열 갯수
* 다음은 열포인터 [0, 2, 4, 7, 8]
    * **열**로 세어서 **요소의 개수** 2, 2, 3, 1을 가지고 구성을 한다. 즉 열로 요소가 2, 2, 3, 1개 이다.
    * **0**:IA[0]=0으로 시작, **2**:IA[1]=IA[0]+2, **4**:IA[2]=IA[1]+2 **7**:IA[3]=IA[2]+3, **8**:IA[4]=IA[3]+1
* 다음은 행인덱스 JA [0, 1, 1, 3, 2, 3, 4, 5]
    * **행**으로 세어서 (단 컬럼순을 지켜서) 0 1 1 3 2 3 4 5 (0 1 1 2 3 3 4 5가 아니라)
    * 1:**0**행, 2:**1**행, 3:**1**행, 4:**3**행, 5:**2**행, 6:**3**행, 7:**4**행, 8:**5**행
* 마지막은 소수로 실제 값 리스트 [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

In [27]:
dm.toSparse()

SparseMatrix(6, 4, [0, 2, 4, 7, 8], [0, 1, 1, 3, 2, 3, 4, 5], [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], False)

'# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
[ 1.,  4.], [ 2., 5.], [ 3.,  6.]

* 아래는 행3, 열2개 행열로 [9., 0.], [0., 8.], [0., 6.] 값이다.
* 0,1,3: **0**:IA[0]=0으로 시작, **1**:IA[1]=IA[0]+1 (1열 개수: 9뿐이 없으므르 1), **3**:IA[2]=IA[1]+2 (2열 개수: 6,8이 있으므로 2)
* 0,2,1: 9:**0**행, 6:**2**행, 8:**1**행

In [28]:
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

In [29]:
d=sm.toDense()

In [30]:
print (d)

DenseMatrix([[9., 0.],
             [0., 8.],
             [0., 6.]])


### S.4.2 분산 Matrix

배열은 n차원을 가질 수 있고, 2차원인 경우에는 매트릭스라고 지칭한다.
매트릭스 역시 로컬과 분산으로 구분할 수 있다.
앞서 로컬 매트릭스는 라이브러리 명에서 눈치를 챌 수 있는데, ```pyspark.mllib.linalg.Matrix, Matrices```를 사용한다. 반면에 분산 매트릭스는 당연히 여러 노드에 분산해서 사용할 수 있고, ```pyspark.mllib.linalg.distributed```에 존재하는 Row Matrix, Indexed Row Matrix, Coordinate Matrix, Block Matrix를 사용하면 된다.

#### Row Matrix

RowMatrix는 ```pyspark.mllib.linalg.distributed```에서 제공되는 분산벡터로서, RDD vectors로부터 생성된다.
우선 리스트에서 RDD를 생성하고 이를 RowMatrix에 넘겨주자.

In [31]:
p = [[1.0,2.0,3.0],[1.1,2.1,3.1],[1.2,2.2,3.3]]

In [32]:
my=spark.sparkContext.parallelize(p)

In [33]:
my.collect()

[[1.0, 2.0, 3.0], [1.1, 2.1, 3.1], [1.2, 2.2, 3.3]]

RDD Vectors를 넘겨주어야 한다.

In [34]:
from pyspark.mllib.linalg.distributed import RowMatrix

rm=RowMatrix(my)

In [35]:
print (type(rm))

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>


In [36]:
rm.rows.collect()

[DenseVector([1.0, 2.0, 3.0]),
 DenseVector([1.1, 2.1, 3.1]),
 DenseVector([1.2, 2.2, 3.3])]

In [37]:
rm.numRows()

3

In [38]:
rm.numCols()

3

#### Indexed Row Matrix

앞서 Row Matrix과 유사하지만, 파티션으로 나누어, 그러나 순서를 지켜서 저장이 된다.
따라서 시계열 데이터와 같이 순서가 있는 데이터를 저장하기에 적합하다.

순서인덱스와 벡터로 구성한다.

In [39]:
from pyspark.mllib.linalg.distributed import IndexedRow

irRdd = spark.sparkContext.parallelize([
    IndexedRow(1, [3, 1, 2]),
    IndexedRow(2, [1, 3, 2]),
    IndexedRow(3, [5, 4, 3]),
    IndexedRow(4, [6, 7, 4]),
    IndexedRow(5, [8, 9, 2]),
])

In [40]:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix

irm = IndexedRowMatrix(irRdd)

In [41]:
print(irm.numRows())
print(irm.numCols())

6
3


### S.4.4 Labeled Point

Labeled point는 로컬벡터로 레이블을 가지고 있는 밀집 또는 희소 행렬을 말한다.
레이블이 있으므로, supervised learning에 요구되는 형식이다.
레이블은 double형식으로 저장되어야 한다. 분류에 사용되려면 예를 들어 긍정, 부정인 경우 정수 1, 0으로 하지 않고 double 형식으로 저장되어야 한다.

#### label, features로 구성

**분류** 및 **회귀분석**에 사용되는 데이터 타잎이다.
**'label'**과 **'features'**로 구성된다.

구분 | 지도학습을 하기 위한 label과 features의 구성
-----|-----
label | supervised learning에서 '구분 값'으로 사용한다. 데이터타입은 'DoubleType'으로 설정되어야 한다.
features | **sparse**, **dense** 모두 사용할 수 있다.


label 1.0, features [1.0, 2.0, 3.0]으로 LabeledPoint를 만들어 보자.

In [42]:
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, [1.0, 2.0, 3.0])

LabeledPoint(1.0, [1.0,2.0,3.0])

sparse vectors로 features를 구성해보자.

In [44]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

LabeledPoint(1992, Vectors.sparse(10, {0: 3.0, 1:5.5, 2: 10.0}))

LabeledPoint(1992.0, (10,[0,1,2],[3.0,5.5,10.0]))

서로 다른 패키지의 데이터타잎 **```mllib LabeledPoint```**와 **```ml Vectors```**를 혼용하면, 형변환 오류가 발생한다.
이러한 오류는 패키지를 혼용하지 않으면 된다.

```python
Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector
```

**```dv1mllib```**은 앞서 **```mllib```**로부터 생성된 dense vector이다.

In [45]:
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, dv1mllib)

LabeledPoint(1.0, [1.0,2.1,3.0])

**```dv1ml```**은 앞서 **```ml```**로부터 생성된 dense vector이다.
```mllib```에서 사용하려면, **```Vectors.fromML()```**를 사용해 ```ml```의 Vectors를 읽어서 ```mllib```로 변환하여 혼용을 막는다.

In [46]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

LabeledPoint(1.0, Vectors.fromML(dv1ml))

LabeledPoint(1.0, [1.0,2.1,3.0])

## 문제 S-1: RDD 데이터를 LabeledPoint로 변환하기

### 문제

머신러닝은 사람이 경험을 통해 배우는 것과 비슷하게 **과거 데이터로부터 학습**을 한다.
학습이란 어렵게 생각할 필요 없이, 과거 데이터에서 수학적이나 알고리즘을 활용하여 어떤 패턴을 찾아내는 것이다.
spark에서 제공한 **데이터 파일 ```data/mllib/sample_svm_data.txt```을 읽어서 훈련데이터**를 만들어 보자.

데이터를 읽어 보면, 맨 처음 값은 label에 해당하고, 다음은 일련의 수로 구성된다. 이로부터 **RDD**를 생성하고, ```label```, ```features```를 구성하여 ```Labeled Point```로 만든다.

```python
1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
...
```

### Python으로 파일 읽기

Spark를 다운로드하고 압축을 풀어 설치한 경우, 환경변수 ```SPARK_HOME```의 경로를 설정해 주고 해당 파일을 읽도록 한다. 파일을 읽을 때는, 가급적 이와 같이 설정경로를 사용해 입력오류를 줄이도록 하는 것이 좋다.
다운로드 받지 않았다면, 아파치 github을 방문해서, https://github.com/apache/spark/ 아래 data/mllib 폴더로 가보면 해당 파일을 찾을 수 있다.

입출력은 ```try except``` 구문으로 오류에 대비할 수 있다.

In [49]:
_fsvm=os.path.join(os.environ["SPARK_HOME"],'data','mllib','sample_svm_data.txt')

In [50]:
import os

try:
    _f=open(_fsvm,'r')
    _lines=_f.readlines()
    _f.close()
except:
    print("An exception occurred")

파일로부터 데이터를 **```readlines()```** 함수로 모두 읽어 온다.
첫 행을 읽으면 label, features로 구성되어 있다.

In [51]:
_lines[0]

'1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0\n'

### Spark에서 RDD 생성

Spark는 파일을 Python을 통하지 않고, 직접 읽을 수 있다.
원본 데이터 ```sample_svm_data.txt```는 공백으로 구분되어 있다.
읽을 대상이 파일이므로, RDD를 사용한다. 각 행을 공백으로 분리하여 읽는다.

In [52]:
_rdd=spark.sparkContext.textFile(_fsvm)\
    .map(lambda line: [float(x) for x in line.split()])

In [61]:
_rdd.collect()

[[1.0,
  0.0,
  2.52078447201548,
  0.0,
  0.0,
  0.0,
  2.004684436494304,
  2.000347299268466,
  0.0,
  2.228387042742021,
  2.228387042742023,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [0.0,
  2.857738033247042,
  0.0,
  0.0,
  2.619965104088255,
  0.0,
  2.004684436494304,
  2.000347299268466,
  0.0,
  2.228387042742021,
  2.228387042742023,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [0.0,
  2.857738033247042,
  0.0,
  2.061393766919624,
  0.0,
  0.0,
  2.004684436494304,
  0.0,
  0.0,
  2.228387042742021,
  2.228387042742023,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [1.0,
  0.0,
  0.0,
  2.061393766919624,
  2.619965104088255,
  0.0,
  2.004684436494304,
  2.000347299268466,
  0.0,
  0.0,
  0.0,
  0.0,
  2.055002875864414,
  0.0,
  0.0,
  0.0,
  0.0],
 [1.0,
  2.857738033247042,
  0.0,
  2.061393766919624,
  2.619965104088255,
  0.0,
  2.004684436494304,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  2.055002875864414,
  0.0,
  0.0,
  0.0,
  0.0],
 [0.0,
  2.857738033247042,
  0.0,
  

각 행으로 분리되므로 2차원 리스트가 생성이 된다. 첫째 행을 읽으려면 인덱스를 사용해야 한다.

In [53]:
_rdd.take(2)[0]

[1.0,
 0.0,
 2.52078447201548,
 0.0,
 0.0,
 0.0,
 2.004684436494304,
 2.000347299268466,
 0.0,
 2.228387042742021,
 2.228387042742023,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0]

### LabeledPoint 생성

위 데이터에서 보듯이 첫 열은 **label**로, 그 나머지는 **features**로 생성한다.

In [54]:
from pyspark.mllib.regression import LabeledPoint

_trainRdd0=_rdd.map(lambda line:LabeledPoint(line[0], line[1:]))

In [55]:
_trainRdd0.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

공백을 분리하고, 분리된 데이터를 labeled point로 구성하는 기능을 합쳐서 실행해 본다.

In [56]:
_trainRdd=spark.sparkContext.textFile(_fsvm)\
    .map(lambda line: [float(x) for x in line.split()])\
    .map(lambda p:LabeledPoint(p[0], p[1:]))

In [57]:
_trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

### 정리하면

데이터를 변환하는 과정을 함수로 만들었다.
```createLP(line)```는 행 데이터를 받아서 LabeledPoint로 생성하고 있다.

In [63]:
def createLP(line):
    p = [float(x) for x in line.split()]
    return LabeledPoint(p[0], p[1:])

_rdd=spark.sparkContext.textFile(_fsvm)
trainRdd = _rdd.map(createLP)

trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

### S.4.5 svm의 입력파일 형식

LIBSVM은 기계학습 모델인 svm을 위한 입력데이터 형식이다.
0은 label, 나머지는 index:value 쌍으로 구성한다.

```python
[label] [index1]:[value1] [index2]:[value2] ...
[label] [index1]:[value1] [index2]:[value2] ...
```

* 예
```python
0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 ...
```


#### DataFrame 읽기

파일의 경로를 설정한다.

In [65]:
fsvm=os.path.join(os.environ["SPARK_HOME"],'data','mllib','sample_libsvm_data.txt')

LIBSVM 데이터를 DataFrame으로 읽고, label 컬럼과 features 컬럼을 생성한다.
이 경우, DataFrameReader의 포맷 format("libsvm")과 options을 설정할 수 있다.

In [66]:
dfsvm = spark.read.format("libsvm").load(fsvm)

In [71]:
type(dfsvm)

pyspark.sql.dataframe.DataFrame

스키마를 출력하면, 의도한 바와 같이 label, features 컬럼과 그 타입이 double, vector로 설정되어 있다.

In [67]:
dfsvm.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



DataFrame에서는 label, features 컬럼이 별도로 생성되고, features는 sparse vectors 형식으로 구성된다.

In [68]:
dfsvm.take(1)

[Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

### MLUtils를 사용하여 RDD 읽기

또는 ```MLUtils.loadLibSVMFile```로 읽을 수 있다.

In [72]:
from pyspark.mllib.util import MLUtils

data = MLUtils.loadLibSVMFile(spark.sparkContext, fsvm)

In [73]:
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

In [74]:
label.take(5)

[0.0, 1.0, 1.0, 1.0, 1.0]

In [75]:
features.take(1)

[SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0, 356: 253.0, 357: 252.0

### S.4.6 TF

지금까지는 정량데이터를 다루었지만, **텍스트**를 변환해보자.
TF (Term Frequency)
단어빈도를 계산하기 위해 HashingTF를 사용할 수 있다.
단어ID로 Hash 알고리즘에 따라 무작위 번호를 생성하고, 단어빈도를 생성한다.
Hash를 사용하지 않고 계산한 단어빈도는 당연히 동일하다는 것을 알 수 있다.

In [76]:
wikiRdd3 = spark.sparkContext\
    .textFile(os.path.join("data","ds_spark_wiki.txt"))\
    .map(lambda line: line.split())

RDD는 mllib 라이브러리를 사용한다. 여기의 HashingTF를 사용한다.
transform() 함수를 사용하여 RDD를 단어빈도 구조로 변환한다.
단 fit()은 하지 않아도 된다는 점에 주의하자.

In [77]:
from pyspark.mllib.feature import HashingTF

hashingTF = HashingTF()
tf = hashingTF.transform(wikiRdd3)
tf.collect()

[SparseVector(1048576, {1026674: 1.0}),
 SparseVector(1048576, {148618: 1.0, 183975: 1.0, 216207: 1.0, 261052: 1.0, 617454: 1.0, 696349: 1.0, 721336: 1.0, 816618: 1.0, 897662: 1.0}),
 SparseVector(1048576, {60386: 1.0, 177421: 1.0, 568609: 1.0, 569458: 1.0, 847171: 1.0, 850510: 1.0, 1040679: 1.0}),
 SparseVector(1048576, {261052: 4.0, 816618: 4.0}),
 SparseVector(1048576, {60386: 4.0, 594754: 4.0}),
 SparseVector(1048576, {21980: 1.0, 70882: 1.0, 274690: 1.0, 357784: 1.0, 549790: 1.0, 597434: 1.0, 804583: 1.0, 829803: 1.0, 935701: 1.0}),
 SparseVector(1048576, {154253: 1.0, 261052: 1.0, 438276: 1.0, 460085: 1.0, 585459: 1.0, 664288: 1.0, 816618: 1.0, 935701: 2.0, 948143: 1.0, 1017889: 1.0}),
 SparseVector(1048576, {270017: 1.0, 472985: 1.0, 511771: 1.0, 718483: 1.0, 820917: 1.0}),
 SparseVector(1048576, {34116: 1.0, 87407: 1.0, 276491: 1.0, 348943: 1.0, 482882: 1.0, 549350: 1.0, 721336: 1.0, 816618: 1.0, 1025622: 1.0}),
 SparseVector(1048576, {1769: 1.0, 151357: 1.0, 500659: 1.0, 54776

### S.4.7 TF-IDF

IDF는 전체에서 몇 개의 문서에 씌였는지를 반대로 계산한 값이다.
뒤 DataFrame를 사용하여 TF-IDF를 계산하면서 자세히 설명하기로 한다.

In [80]:
from pyspark.mllib.feature import HashingTF, IDF

idf = IDF().fit(tf)
tfidf = idf.transform(tf)

In [81]:
tfidf.collect()

[SparseVector(1048576, {1026674: 1.7047}),
 SparseVector(1048576, {148618: 1.7047, 183975: 1.7047, 216207: 1.7047, 261052: 1.0116, 617454: 1.7047, 696349: 1.7047, 721336: 1.2993, 816618: 0.7885, 897662: 1.7047}),
 SparseVector(1048576, {60386: 1.2993, 177421: 1.7047, 568609: 1.7047, 569458: 1.7047, 847171: 1.7047, 850510: 1.7047, 1040679: 1.7047}),
 SparseVector(1048576, {261052: 4.0464, 816618: 3.1538}),
 SparseVector(1048576, {60386: 5.1971, 594754: 6.819}),
 SparseVector(1048576, {21980: 1.7047, 70882: 1.7047, 274690: 1.7047, 357784: 1.7047, 549790: 1.7047, 597434: 1.7047, 804583: 1.7047, 829803: 1.7047, 935701: 1.2993}),
 SparseVector(1048576, {154253: 1.7047, 261052: 1.0116, 438276: 1.7047, 460085: 1.7047, 585459: 1.7047, 664288: 1.7047, 816618: 0.7885, 935701: 2.5986, 948143: 1.7047, 1017889: 1.7047}),
 SparseVector(1048576, {270017: 1.7047, 472985: 1.7047, 511771: 1.7047, 718483: 1.7047, 820917: 1.7047}),
 SparseVector(1048576, {34116: 1.7047, 87407: 1.7047, 276491: 1.7047, 3489

### S.4.8 StandardScaler

데이터를 표준화하려면 1) 평균과 표준편차를 계산하고, 2) 측정값에서 평균을 빼고, 표준편차로 나누어 주면 된다.
즉 zscore를 계산하는 것과 같다.

$$ z = \frac {\bar{x_n} - \mu} {\sigma / \sqrt{n}} $$


#### 데이터 생성

In [82]:
tRdd = spark.sparkContext\
    .textFile(os.path.join('data', 'ds_spark_heightweight.txt'))

#### 정규화 할 값만 추출

탭을 분리한다.

In [83]:
tRdd.map(lambda x: x.split('\t')).take(1)

[['1', '65.78', '112.99']]

형변환을 해준다.

In [84]:
tRdd.map(lambda x: x.split('\t')).map(lambda x: [str(x[0]), float(x[1]), float(x[2])]).take(1)

[['1', 65.78, 112.99]]

In [85]:
tRdd.map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .take(1)

[['1', 65.78, 112.99]]

2개의 값만 추출하여 dense vectors에 별도로 저장한다.

In [86]:
from pyspark.mllib.linalg import Vectors

_tRdd =tRdd\
    .map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .map(lambda x: Vectors.dense([x[1], x[2]]))

리스트로 저장해도 계산에 문제가 없다.

In [87]:
from pyspark.mllib.linalg import Vectors

_tRdd =tRdd\
    .map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .map(lambda x: [x[1], x[2]])

#### 표준화

In [88]:
from pyspark.mllib.feature import StandardScaler
scaler1 = StandardScaler().fit(_tRdd)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(_tRdd)

In [90]:
scaler1.transform(_tRdd).take(5)

[DenseVector([36.064, 8.7665]),
 DenseVector([39.2109, 10.5897]),
 DenseVector([38.0487, 11.873]),
 DenseVector([37.4017, 11.0436]),
 DenseVector([37.166, 11.1957])]

앞서 계산한 zscore와 비교해보자.

In [89]:
scaler2.transform(_tRdd).take(5)

[DenseVector([-1.2458, -1.2299]),
 DenseVector([1.9011, 0.5934]),
 DenseVector([0.7388, 1.8767]),
 DenseVector([0.0919, 1.0473]),
 DenseVector([-0.1439, 1.1993])]

## S.5 DataFrame 변환

DataFrame으로 만들어진 데이터를 변환해보자.
이러한 작업이 필요한 이유는 **기계학습에 넘겨줄 입력데이터를 형식에 맞추어야** 하기 때문이다.
데이터는 형식에 맞게 변환되고, 군집화, 회귀분석, 분류, 추천 모델 등에 입력으로 사용된다
물론 데이터는 '일련의 수' 또는 '텍스트'로 구성된다.
이런 데이터로부터 특징을 추출하여 **feature vectors**를 구성한다.
지도학습을 하는 경우에는 **class 또는 label** 값이 필요하다.


### S.5.1 Labeled Point를 label, features 컬럼으로 분해

RDD LabeledPoint는 label과 vectors로 구성되어 있다.
따라서 LabeledPoint를 DataFrame으로 읽어오면, 2개의 컬럼으로 생성된다.
이를 label, features 컬럼으로 맞추어 주도록 하자.

#### 레이블이 있는 Python List에서 DataFrame 생성

label과 features를 가지고 가지고 있는 데이터를 생성해보자.

In [91]:
p = [[1, [1.0, 2.0, 3.0]], [1, [1.1, 2.1, 3.1]], [0, [1.2, 2.2, 3.3]]]

첫 데이터를 분해해서 출력하면, label과 features를 하나씩 묶어 가지고 있다.

In [92]:
print ("label: {}\nfeatures: {}".format(p[0][0], p[0][1]))

label: 1
features: [1.0, 2.0, 3.0]


위 데이터를 읽어서 DataFrame을 생성하면, 두 개의 컬럼으로 구분해서 생성된다.
그러나 컬럼이 자동 명명되기 때문에 ```_1, _2```가 쓰여서 만족스럽지 못하다.

In [93]:
trainDf=spark.createDataFrame(p)

In [94]:
trainDf.collect()

[Row(_1=1, _2=[1.0, 2.0, 3.0]),
 Row(_1=1, _2=[1.1, 2.1, 3.1]),
 Row(_1=0, _2=[1.2, 2.2, 3.3])]

#### LabeledPoint에서 DataFrame 생성

Python List를 LabeledPoint로 만들어 보자.
**LabeledPoint는 RDD에서 사용하는 구조로서 mllib 라이브러리를 사용**해서 만들고 있다.
**DataFrame은 LabeledPoint를 컬럼으로 가지고 있지 않는다**.

In [61]:
from pyspark.mllib.regression import LabeledPoint
p = [LabeledPoint(1, [1.0,2.0,3.0]),
     LabeledPoint(1, [1.1,2.1,3.1]),
     LabeledPoint(0, [1.2,2.2,3.3])]

그리고 DataFrame을 생성해보자.

In [62]:
trainDf=spark.createDataFrame(p)

그러면 LabeledPoint는 분해되어, **label과 features를 별도 컬럼**으로 생성된다. 이런 명칭의 컬럼은 기계학습에 필요하다. **features 데이터를 모델링하여 label에 따라 분류**하게 된다.

In [63]:
trainDf.collect()

[Row(features=DenseVector([1.0, 2.0, 3.0]), label=1.0),
 Row(features=DenseVector([1.1, 2.1, 3.1]), label=1.0),
 Row(features=DenseVector([1.2, 2.2, 3.3]), label=0.0)]

#### mllib.linalg.Vectors를 사용하여 DataFrame을 생성

앞서 mllib와 ml 모듈을 섞어서 사용하지 않아야 한다고 했다.
mllib vectors를 사용해도 DataFrame을 생성하는데 문제가 없다.
컬럼명을 ```["label", "features"]```으로 하지 않으면, 자동명명되니 주의하자.

In [95]:
from pyspark.mllib.linalg import Vectors

trainDf = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, 1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, 0.5]))], ["label", "features"])

In [96]:
trainDf.collect()

[Row(label=1.0, features=DenseVector([0.0, 1.1, 0.1])),
 Row(label=0.0, features=DenseVector([2.0, 1.0, 1.0])),
 Row(label=0.0, features=DenseVector([2.0, 1.3, 1.0])),
 Row(label=1.0, features=DenseVector([0.0, 1.2, 0.5]))]

#### RDD에서 DataFrame 생성

rdd에서 DataFrame을 생성하면 labe, features이 당연히 생성이 되지 않는다.

In [97]:
#from pyspark.mllib.linalg import SparseVector # mllib ok
from pyspark.ml.linalg import SparseVector # ml ok

_rdd = spark.sparkContext.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

In [98]:
_df=_rdd.toDF()

In [99]:
_df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: vector (nullable = true)



In [100]:
_df=_df.withColumnRenamed('_1', 'label').withColumnRenamed('_2', 'features')

In [101]:
_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0| (4,[1,3],[1.0,5.5])|
|  1.0|(4,[0,2],[-1.0,0.5])|
+-----+--------------------+



### S.5.2 단어 빈도

정량 데이터는 합계, 평균, 표준편차 등 의미있는 통계량을 계산하거나, 이런 통계량이 집단 간에 차이가 있는지 분석한다.
반면에 텍스트는 정량데이터와 같이 이러한 통계량의 계산이 가능하지 못하게 된다.
텍스트에 어떤 단어가 얼마나 쓰였는지, 또한 같이 쓰이게 된 단어는 무엇인지 등 단어의 빈도에 따라 **정량화하여 과학적인 분석**을 하게 된다.

#### Bag of Words 모델

자연어처리 NLP에서 사용하는 모델로, 텍스트를 단어의 집합, **'bag of words'**으로 구성된다고 보며, 단어의 **순서**는 의미를 가지지 않는다.
이런 영화리뷰가 있다고 하자.
> "...그 영화는 매우 강렬했다. 그냥 좋았다. 영화관에서 보는 동안 긴장을 늦출 수 없었다. 갑돌이가 분장한 악당의 케릭터가 만들어지는 과정은 흥미롭지 않을 수가 없었다. 무비의 이야기 전개는 빠르고, 무엇이 진실이고 거짓인지 판단할 수 없었다. 누가 이런 영화를 좋아 하지 않을 수가 있겠는가 이모티콘..."

이를 단어로 분리하고, 정량화 해보자.

#### 텍스트 변환 단계

텍스트를 변환하는 단계를 보자. 순서는 변경될 수 있다.

* 단계 1: 단어로 분할 Tokenization
    * 그, 영화는, 매우, 강렬했다, 그냥, 좋았다, 영화관에서, 보는, 동안, 긴장을, 늦출, 수, 없었다, 갑돌이가, 분장한, 악당의, 케릭터가, 만들어지는, 과정은, 흥미롭지, 않을, 수가, 없었다, 무비의, 이야기, 전개는, 빠르고, 무엇이, 진실이고, 거짓인지, 판단할, 수, 없었다, 누가, 이런, 영화를, 좋아, 하지, 않을, 수가, 있겠는가, 이모티콘

* 단계 2: 정리
    - 불필요, 오타 등

* 단계 3: 불용어 stopwords 제거
    - 그, 수, 수가, 수, 이런, 하지, 수가 등

* 단계 4: 어간 추출 stemming
    영화는, 영화의는 다른 단어지만 조사를 제거하면 동일한 단어
    - 좋았다, 좋아 단어들은 어근을 판별하면 동일한 단어이다.
    - 영화, 무비의 단어는 이음동의

단계 5: 계량화
- word vector로 만든다.
- 있다-없다, 단어빈도, TF-IDF 사용할 수 있다.<br>dense, sparse 모두 가능하다.
```[1,1,1,1,1,0,0],[0,1,0,1,1,1,1]```

```python
[0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 1 1 0 0 0]
[0 0 1 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 1 1]
[1 0 0 1 0 0 1 1 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 1 0 1 1 0 0 0 1 0 0 0 1 1 1 0 1 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 1 1]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 1 1 1]
```

#### Python을 사용한 단어 빈도 계산

In [102]:
# Let it be lyrics
doc=[
    "When I find myself in times of trouble",
    "Mother Mary comes to me",
    "Speaking words of wisdom, let it be",
    "And in my hour of darkness",
    "She is standing right in front of me",
    "Speaking words of wisdom, let it be",
    "Let it be",
    "Let it be",
    "Let it be",
    "Let it be",
    "Whisper words of wisdom, let it be"
]

문서, 문장, 단어의 계층을 먼저 이해해야 한다.
문서는 문장으로 구성되어 있고, 문장은 단어로 구성되어 있다.
따라서 첫째 반복문은 문서의 각 문장에 대해, 단어로 분리하고 있다.
그 다음 반복문은 각 단어에 대해 빈도를 계산한다.
각 단어가 키가 되는데, **키가 존재하면 빈도를 증가하고, 존재하지 않으면 새로운 키를 생성**한다.

In [103]:
d={}
for sentence in doc:
    words=sentence.split()
    for word in words:
        if word in d:
            d[word]+=1
        else:
            d[word]=1

앞서 단어 빈도는 dictionary d에 저장하였다.
dictionary는 키, 빈도의 쌍으로 저장되어 있어서 ```iteritems()```으로 읽어낼 수 있다.

In [104]:
# for k,v in d.iteritems():  # python2
for k,v in d.items():
    print ("{}\t{}".format(k,v))

When	1
I	1
find	1
myself	1
in	3
times	1
of	6
trouble	1
Mother	1
Mary	1
comes	1
to	1
me	2
Speaking	2
words	3
wisdom,	3
let	3
it	7
be	7
And	1
my	1
hour	1
darkness	1
She	1
is	1
standing	1
right	1
front	1
Let	4
Whisper	1


### S.5.3 Spark의 transformer, estimator

**RDD**를 만들고 나서도 데이터를 변환하기 위해 map-reduce와 같은 함수 또는  **transform()**, **fit()**을 사용하는 것과 같이,
**DataFrame**도 역시 **Transformer**, **Estimator**를 사용할 수 있다.
이러한 Spark ml 라이브러리는 Python의 scikit-learn에서 영향을 받아 기계학습 API transformer, estimator, evaluator가 유사하다.
* Estimator는 DataFrame에 적용되는 알고리즘을 말하는 것으로, Transformer를 생성해낸다. **```Estimator.fit()```**
* Transformer는 DataFrame을 적용해서 다른 DataFrame으로 생성한다. **```Transformer.transform()```**
* Evaluator는 ```pyspark.ml.evaluation```의 'BinaryClassificationEvaluator', 'RegressionEvaluator',  'MulticlassClassificationEvaluator', 'MultilabelClassificationEvaluator', 'ClusteringEvaluator', 'RankingEvaluator' 등이 있다.

먼저 텍스트를 2차원 배열로 만들자.

In [105]:
doc2d=[
    ["When I find myself in times of trouble"],
    ["Mother Mary comes to me"],
    ["Speaking words of wisdom, let it be"],
    ["And in my hour of darkness"],
    ["She is standing right in front of me"],
    ["Speaking words of wisdom, let it be"],
    [u"우리 Let it be"],
    [u"나 Let it be"],
    [u"너 Let it be"],
    ["Let it be"],
    ["Whisper words of wisdom, let it be"]
]

그리고 DataFrame을 생성한다. schema는 만들어 주지 않고 컬럼명을 sent로 한다.

In [106]:
myDf=spark.createDataFrame(doc2d, ['sent'])

```truncate=True```는 줄여서, ```False```는 출력을 줄이지 않고 출력한다.

In [107]:
myDf.show(truncate=True)

+--------------------+
|                sent|
+--------------------+
|When I find mysel...|
|Mother Mary comes...|
|Speaking words of...|
|And in my hour of...|
|She is standing r...|
|Speaking words of...|
|      우리 Let it be|
|        나 Let it be|
|        너 Let it be|
|           Let it be|
|Whisper words of ...|
+--------------------+



### S.5.4 Tokenizer

먼저 용어를 정리해 보자.
* corpus는 어떤 주제에 대해 쓰여지거나, 어떤 사람이 작성한 전체 '말뭉치'를 말한다. 여러 문장으로 구성된 텍스트 집합을 말한다.
* document는 문장으로 구성된 문서를 말하지만, 한 문장으로만 구성될 수도, 여러 문장으로 만들어질 수도 있다. 예를 들어, "why she had to go" 같은 한 문장도 document라고 하고, "why she had to go.. I don't know" 역시 마찬가지이다.
* vocabularay는 중복없는 단어 집합을 말하며, 예를 들면, "why","she","had","to","go","where","have" 등은 단어이다.

Tokenizer는 document를 단어로 분리한다.
분리하는 기준은 whitespace로 공백, TAB, CR, New Line 등이 해당된다.
* 입력 컬럼은 "sent"로,
* 출력 컬럼은 "words"로 한다.

In [108]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="sent", outputCol="words")

```transform()```은 앞서 만든 ```tokenizer```모델에 DataFrame을 변환하여 다른 DataFrame을 생성한다.
그 결과는 문자열 배열로 구성된다.

In [109]:
tokDf = tokenizer.transform(myDf)

In [110]:
tokDf.show(3)

+--------------------+--------------------+
|                sent|               words|
+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|
|Mother Mary comes...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|
+--------------------+--------------------+
only showing top 3 rows



```for```문으로 출력해보자. ```Row()``` 객체로 출력된다.

In [111]:
for r in tokDf.select("sent", "words").take(3):
    print (r)

Row(sent='When I find myself in times of trouble', words=['when', 'i', 'find', 'myself', 'in', 'times', 'of', 'trouble'])
Row(sent='Mother Mary comes to me', words=['mother', 'mary', 'comes', 'to', 'me'])
Row(sent='Speaking words of wisdom, let it be', words=['speaking', 'words', 'of', 'wisdom,', 'let', 'it', 'be'])


### S.5.5 RegTokenizer

Tokenizer는 white space로 분리하지만, RegexTokenizer는 단어를 분리하기 위해 **정규표현식**을 적용할 수 있다.
정규표현식을 사용하여 분리하거나 특정 패턴을 추출할 수 있다.
공백으로 분리할 경우 간단히 정규표현식 ```\s``` 패턴을 적용할 수 있다.
한글에는 ```\w``` 패턴이 적용되지 않는다.
* ```\s```는 공백문자
* ```\w```는 숫자 및 대소문자 ```[A-Za-z0-9_]```
* 별표 ```*```는 0 또는 그 이상, 더하기 ```+```는 1 또는 그 이상을 의미한다.

In [112]:
from pyspark.ml.feature import RegexTokenizer

re = RegexTokenizer(inputCol="sent", outputCol="wordsReg", pattern="\\s+")

In [113]:
reDf=re.transform(myDf)
reDf.show()

+--------------------+--------------------+
|                sent|            wordsReg|
+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|
|Mother Mary comes...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|
|And in my hour of...|[and, in, my, hou...|
|She is standing r...|[she, is, standin...|
|Speaking words of...|[speaking, words,...|
|      우리 Let it be| [우리, let, it, be]|
|        나 Let it be|   [나, let, it, be]|
|        너 Let it be|   [너, let, it, be]|
|           Let it be|       [let, it, be]|
|Whisper words of ...|[whisper, words, ...|
+--------------------+--------------------+



### S.5.6 Stopwords

텍스트를 분리하고 나면, 별 의미가 없거나 쓸모가 없는 단어들이 존재한다.
예를 들어 이, 그, 저와 같은 **한 단어** 또는 있다 등과 같은 **일부 동사**, 그래서, 그러나 등과 같은 **접속사** 등이 후보가 될 수 있다.
이런 불필요한 단어들을 불용어 Stopwords라고 하며, 입력데이터에서 제거하도록 한다.
영어의 경우 불용어가 식별되어 제공되고 있다
http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words


In [114]:
from pyspark.ml.feature import StopWordsRemover
stop = StopWordsRemover(inputCol="wordsReg", outputCol="nostops")

현재 stop words를 가져온다.

In [115]:
stopwords=list()
_stopwords=stop.getStopWords()
for e in _stopwords:
    stopwords.append(e)

자신의 불용어를 추가해서, 재설정한다.

In [116]:
_mystopwords=[u"나",u"너", u"우리"]
for e in _mystopwords:
    stopwords.append(e)
stop.setStopWords(stopwords)

StopWordsRemover_cc8b2159df97

자신이 추가한 불용어가 있는지 출력해보자.

In [117]:
for e in stop.getStopWords():
    print (e, end="/")

i/me/my/myself/we/our/ours/ourselves/you/your/yours/yourself/yourselves/he/him/his/himself/she/her/hers/herself/it/its/itself/they/them/their/theirs/themselves/what/which/who/whom/this/that/these/those/am/is/are/was/were/be/been/being/have/has/had/having/do/does/did/doing/a/an/the/and/but/if/or/because/as/until/while/of/at/by/for/with/about/against/between/into/through/during/before/after/above/below/to/from/up/down/in/out/on/off/over/under/again/further/then/once/here/there/when/where/why/how/all/any/both/each/few/more/most/other/some/such/no/nor/not/only/own/same/so/than/too/very/s/t/can/will/just/don/should/now/i'll/you'll/he'll/she'll/we'll/they'll/i'd/you'd/he'd/she'd/we'd/they'd/i'm/you're/he's/she's/it's/we're/they're/i've/we've/you've/they've/isn't/aren't/wasn't/weren't/haven't/hasn't/hadn't/don't/doesn't/didn't/won't/wouldn't/shan't/shouldn't/mustn't/can't/couldn't/cannot/could/here's/how's/let's/ought/that's/there's/what's/when's/where's/who's/why's/would/나/너/우리/

transformer로 불용어를 제거해보자.
한글의 stop words '너','우리'가 제거되었다.

In [118]:
stopDf=stop.transform(reDf)
stopDf.show()

+--------------------+--------------------+--------------------+
|                sent|            wordsReg|             nostops|
+--------------------+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|[find, times, tro...|
|Mother Mary comes...|[mother, mary, co...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|
|And in my hour of...|[and, in, my, hou...|    [hour, darkness]|
|She is standing r...|[she, is, standin...|[standing, right,...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|
|      우리 Let it be| [우리, let, it, be]|               [let]|
|        나 Let it be|   [나, let, it, be]|               [let]|
|        너 Let it be|   [너, let, it, be]|               [let]|
|           Let it be|       [let, it, be]|               [let]|
|Whisper words of ...|[whisper, words, ...|[whisper, words, ...|
+--------------------+--------------------+--------------------+



### S.5.7 CountVectorizer

```CountVectorizer```는 단어의 빈도 수를 계산한다.

3번째 문장 "Speaking words of wisdom, let it be"의 word vector를 구성해 본다.
id 값은 모든 문장에서 단어를 추출하고 나서야 부여된다.

단어 (3행 "Speaking words of wisdom, let it be") | id | 빈도 | 
-----|-----|-----
Speaking | 7 | 1
words | 13 | 1
of | stopword | 0
wisdom | 12 | 1
let | 3 | 1
it | stopword | 0
be | stopword | 0

위 **word vector**를 표로 나타내면 아래와 같다.
행은 문장, 열은 id이다.
**3행은 doc2**이다. 해당하는 **단어 id의 빈도**를 적었다. 다른 행과 열은 이해를 돕기 위해 비워 놓았다.

```doc``` \ 단어 id  | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |...
------|---|---|---|---|---|---|---|---|---|---|---|---|---|---
```doc 0``` |   |   |   |   |   |   |   |   |   |   |   |   |   |...
```doc 1``` |   |   |   |   |   |   |   |   |   |   |   |   |   |...
```doc 2``` |   |   | 1 |   |   |   | 1 |   |   |   |   | 1 | 1 |...
...   |   |   |   |   |   |   |   |   |   |   |   |   |   |...

#### sklearn CountVectorizer

sklearn 라이브러리로 단어빈도를 세어보자.
입력으로 단어의 집합을 넣어야 하지만, 위 문서는 2차원 리스트이다.
먼저 2차원 리스트를 1차원으로 변경하자.

In [119]:
from functools import reduce
doc = reduce(lambda x,y: x+y, doc2d)

In [120]:
from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(stop_words='english')

In [121]:
print (vectorizer.fit_transform(doc))

  (0, 10)	1
  (0, 9)	1
  (1, 0)	1
  (1, 4)	1
  (1, 5)	1
  (2, 3)	1
  (2, 12)	1
  (2, 13)	1
  (2, 7)	1
  (3, 1)	1
  (3, 2)	1
  (4, 6)	1
  (4, 8)	1
  (5, 3)	1
  (5, 12)	1
  (5, 13)	1
  (5, 7)	1
  (6, 14)	1
  (6, 3)	1
  (7, 3)	1
  (8, 3)	1
  (9, 3)	1
  (10, 11)	1
  (10, 3)	1
  (10, 12)	1
  (10, 13)	1


In [122]:
vectorizer.vocabulary_

{'times': 9,
 'trouble': 10,
 'mother': 5,
 'mary': 4,
 'comes': 0,
 'speaking': 7,
 'words': 13,
 'wisdom': 12,
 'let': 3,
 'hour': 2,
 'darkness': 1,
 'standing': 8,
 'right': 6,
 '우리': 14,
 'whisper': 11}

In [123]:
vectorizer.fit_transform(doc).todense()

matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
        [1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0],
        [0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0]], dtype=int64)

#### Spark CountVectorizer

CountVectorizer는 단어를 분리하고 나서, 빈도를 계산할 수 있다. 결과는 단어빈도 word vector (sparse), 즉 단어별 단어빈도 TF이다.
자주 사용된 단어가 아니어서 제거해야할 단어, 즉 문서에 사용된 빈도 document frequency를 minDF, maxDF를 통해 설정할 수 있다.
소수점을 사용하면, 비율, 즉 '사용된 문서 수/전체 문서 수'를 의미한다.

* minDf는 너무 **적게 발생하는 경우 무시**, 예를 들어 0.5는 전체 문서의 **50%보다 적게** 발생하는 단어는 무시, 1.0은 기본 값이고, **100%보다 적게** 발생하는 경우 무시하게 된다. 즉, minDf=1.0은 문서 1개 이하에서 나타난 단어는 무시하라는 의미이다. 즉 어떤 단어도 무시하지 말라는 의미이다.
* maxDf는 너무 **많이 발생하는 경우 무시**, 예를 들어 0.5는 전체 문서의 **50%보다 많이** 발생하는 경우 무시, 1.0은 **100%보다 많이** 발생하는 경우 무시 (즉, 어떤 단어도 무시하지 말라는 의미). min_df와 마찬가지로 1.0이 기본 값이다.

In [124]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="nostops", outputCol="cv", vocabSize=30, minDF=1.0)

```CountVectorizerModel```은 ```fit()```하고 나면 얻어진다. 다음에 사용하는 ```HashingTF```는 ```fit()```하지 않는다는 점에서 차이가 있다.

In [125]:
cvModel = cv.fit(stopDf)

In [126]:
print (type(cv),type(cvModel))

<class 'pyspark.ml.feature.CountVectorizer'> <class 'pyspark.ml.feature.CountVectorizerModel'>


In [127]:
cvDf = cvModel.transform(stopDf)

In [128]:
cvDf.show(3)

+--------------------+--------------------+--------------------+--------------------+
|                sent|            wordsReg|             nostops|                  cv|
+--------------------+--------------------+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|[find, times, tro...|(16,[5,6,8],[1.0,...|
|Mother Mary comes...|[mother, mary, co...|[mother, mary, co...|(16,[10,13,14],[1...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|(16,[0,1,2,3],[1....|
+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



DataFrame 전체를 출력하면 보기 불편하므로, 이 가운데 일부 컬럼만을 선택하여 출력할 수 있다.
```(16,[5,6,8],[1.0,1.0,1.0])```
16은 전체 단어의 개수, 그리고 다음 5,6,8은 값이 있는 컬럼 번호, 1.0,1.0,1.0은 그 값을 말한다.

In [129]:
cvDf.select('sent','nostops','cv').show()

+--------------------+--------------------+--------------------+
|                sent|             nostops|                  cv|
+--------------------+--------------------+--------------------+
|When I find mysel...|[find, times, tro...|(16,[5,6,8],[1.0,...|
|Mother Mary comes...|[mother, mary, co...|(16,[10,13,14],[1...|
|Speaking words of...|[speaking, words,...|(16,[0,1,2,3],[1....|
|And in my hour of...|    [hour, darkness]|(16,[7,9],[1.0,1.0])|
|She is standing r...|[standing, right,...|(16,[4,12,15],[1....|
|Speaking words of...|[speaking, words,...|(16,[0,1,2,3],[1....|
|      우리 Let it be|               [let]|      (16,[0],[1.0])|
|        나 Let it be|               [let]|      (16,[0],[1.0])|
|        너 Let it be|               [let]|      (16,[0],[1.0])|
|           Let it be|               [let]|      (16,[0],[1.0])|
|Whisper words of ...|[whisper, words, ...|(16,[0,1,2,11],[1...|
+--------------------+--------------------+--------------------+



```CountVectorizer```에서 사용된 단어 목록을 출력할 수 있다. 아래 단어의 수를 세어보면 위 sparse vector의 컬럼 개수와 동일하다.

In [130]:
cvModel.vocabulary

['let',
 'wisdom,',
 'words',
 'speaking',
 'right',
 'trouble',
 'find',
 'hour',
 'times',
 'darkness',
 'mother',
 'whisper',
 'front',
 'mary',
 'comes',
 'standing']

### S.5.8 TF-IDF

```TfidfTransformer```는 **TF-IDF(Term Frequency-Inverse Document Frequency)**를 계산한다.
이를 위해서는 우선 Tokenizer를 사용하여 문장을 단어로 분리해 놓아야 한다.

HashingTF를 사용하여 'word vector'를 계산한다.
HashingTF은 hash함수에 따라 단어의 고유번호를 생성하며, 단어 수가 많아지면서 고유번호가 충돌할 수 있는 가능성이 있지만 이를 최소화게 된다.
그리고 IDF를 계산하고, TF-IDF를 계산한다.

#### S.5.8.1 TF-IDF 계산

'Let it be'가사 세 번째 줄 **'wisdom' 단어**의 TF-IDF를 계산해보자.
**TF**는 단어빈도수, 즉 문서에 단어가 나타난 빈도수를 의미한다.
단어빈도는 경우에 따라서는 문제가 될 수 있다. 예를 들어, 'a', 'the', 'of'와 같은 단어는 빈도는 높지만 별로 유용하지 못하다.
이 경우 IDF는 유용하다. **IDF**는 자주 나타나는 단어에 대한 가중치를 줄이고, 드물게 나타나는 단어에 가중치를 높이는 방식으로 계산된다.

t는 단어, 문서는 d, D는 corpus,

항목 | 설명 | 예제
-----|-----|-----
tf(d,f) | 단어 t가 문서 d에서 나타나는 단어의 빈도 수, term frequency | $f_{t,d}$ / (number of words in d) = 1/4 = 0.25<br>(3번째 문서에 stopwords를 제외하면 4개의 단어, wisdom은 1회 나타난다.)
df | document frequency 단어가 나타난 문서 수 | 3 (wisdom이 포함된 문서는 3)
N | number of documents 전체 문서의 수 | 11 (전체의 문서는 11개)
idf | inverse document frequency 단어가 나타난 문서의 비율을 거꾸로 | ln(N+1 / df+1) + 1 = log(12/4) + 1 = 1.09861 + 1<br>0으로 나뉘는 것을 방지하기 위해 **smoothing**, 즉 1을 더한다. 

프로그래밍에서는 메모리를 적게 사용하도록 설계되어 있다. ```1/4```와 같이 정수 타잎으로 연산하면, 정수를 사용하여 연산하고 그 결과도 정수를 출력하게 된다. 이를 변환하기 위해 ```1.```의 경우에서와 같이 소수를 사용하자.

In [131]:
import math

tf=1./4
df=3.
N=11.
idf=math.log((N+1)/(df+1))+1
print ("idf: {}".format(idf))

idf: 2.09861228866811


#### S.5.8.2 sklearn을 사용한 TF-IDF

우선 'sklearn'의 TF-IDF를 계산해보자.
**```CountVectorizer```**는 텍스트를 단어의 빈도로 변환해주어, 문서 x 단어 표를 출력할 수 있다.
**```CountVectorizer()```**의 인자로
analyzer ("word", "character ngram" 등 선택),
tokenizer (단어의 tokenizer를 지정),
stop_words (불용어 처리 기준),
max_features (최대 속성 개수) 등을 지정할 수 있다.
그 다음으로, TF-IDF를 계산할 수 있다. 이 때 (문서id, 단어id) 별로 결과가 출력된다.

```TfidfVectorizer```를 사용해서 계산하면 그 결과를 아래와 같이 볼 수 있다.
```python
(2,12) 2.09861228867
```

결과에서
**'2'**는 3번째 문서번호, **'12'**는 'wisdom' 단어번호
TF-IDF는 ```2.09861228867```이다.

max_df, min_df는 기본 값이 1.0으로 굳이 설정하지 않아도 되는데, 어떤 단어도 무시하지 말라는 의미이다.

In [132]:
from sklearn.feature_extraction.text import TfidfVectorizer

vectorizer = TfidfVectorizer(max_df=1.0, stop_words='english',norm = None)

In [133]:
print (vectorizer.fit_transform(doc))

  (0, 9)	2.791759469228055
  (0, 10)	2.791759469228055
  (1, 5)	2.791759469228055
  (1, 4)	2.791759469228055
  (1, 0)	2.791759469228055
  (2, 7)	2.386294361119891
  (2, 13)	2.09861228866811
  (2, 12)	2.09861228866811
  (2, 3)	1.4054651081081644
  (3, 2)	2.791759469228055
  (3, 1)	2.791759469228055
  (4, 8)	2.791759469228055
  (4, 6)	2.791759469228055
  (5, 7)	2.386294361119891
  (5, 13)	2.09861228866811
  (5, 12)	2.09861228866811
  (5, 3)	1.4054651081081644
  (6, 3)	1.4054651081081644
  (6, 14)	2.791759469228055
  (7, 3)	1.4054651081081644
  (8, 3)	1.4054651081081644
  (9, 3)	1.4054651081081644
  (10, 13)	2.09861228866811
  (10, 12)	2.09861228866811
  (10, 3)	1.4054651081081644
  (10, 11)	2.791759469228055


In [134]:
vectorizer.vocabulary_

{'times': 9,
 'trouble': 10,
 'mother': 5,
 'mary': 4,
 'comes': 0,
 'speaking': 7,
 'words': 13,
 'wisdom': 12,
 'let': 3,
 'hour': 2,
 'darkness': 1,
 'standing': 8,
 'right': 6,
 '우리': 14,
 'whisper': 11}

In [135]:
vectorizer.idf_

array([2.79175947, 2.79175947, 2.79175947, 1.40546511, 2.79175947,
       2.79175947, 2.79175947, 2.38629436, 2.79175947, 2.79175947,
       2.79175947, 2.79175947, 2.09861229, 2.09861229, 2.79175947])

#### S.5.8.3 Spark를 사용한 TF-IDF

##### TF
HashingTF는 단어집합을 워드벡터 word vector로 변환하는데, 해시함수를 사용해서 단어에 해당하는 일련번호를 결정한다.
HashingTF에서의 **```numFeatures```는 $2^n$**으로 결정하게 된다.
단어 갯수가 900이면, $2^{10}=1024$이므로 1024로 설정하면 된다.
기본은 $2^{18}=262,144$이다.
너무 적게 설정되면 인덱스가 부족하거나 적절하게 매핑될 수 있으니 주의해야 한다.


예를 들어, 
문서 ```[speaking, words, wisdom,, let]```의 경우 ```(32,[4,24,27],[1.0,1.0,2.0])```가 출력된다.
아래의 결과와 비교하면 단어 하나가 유실된 것을 알 수 있다.

In [136]:
from pyspark.ml.feature import HashingTF, IDF

# hashTF = HashingTF(inputCol="nostops", outputCol="hash", numFeatures=32) #  mapping indices insufficient
hashTF = HashingTF(inputCol="nostops", outputCol="hash")

```HashingTF```는 ```fit()```하지 않고 ```transform()``` 한다.

In [137]:
hashDf = hashTF.transform(stopDf)

hashTF.transform()의 결과는 **벡터의 튜플**이다. 예를 들어, ```(262144,[64317,91878,152481],[1.0,1.0,1.0])```
262144는 해시 개수 (앞서 CountVectorizer의 경우에서와 같이 전체 단어의 개수가 아니다), 그리고 다음 [64317,91878,152481]은 값이 있는 **해시 컬럼** 번호, 1.0,1.0,1.0은 그 값을 말한다.

In [138]:
hashDf.select("nostops", "hash").show(truncate=False)

+-------------------------------+--------------------------------------------------------+
|nostops                        |hash                                                    |
+-------------------------------+--------------------------------------------------------+
|[find, times, trouble]         |(262144,[64317,91878,152481],[1.0,1.0,1.0])             |
|[mother, mary, comes]          |(262144,[24657,63767,245426],[1.0,1.0,1.0])             |
|[speaking, words, wisdom,, let]|(262144,[27556,151864,173339,175131],[1.0,1.0,1.0,1.0]) |
|[hour, darkness]               |(262144,[74517,98431],[1.0,1.0])                        |
|[standing, right, front]       |(262144,[84798,218360,229166],[1.0,1.0,1.0])            |
|[speaking, words, wisdom,, let]|(262144,[27556,151864,173339,175131],[1.0,1.0,1.0,1.0]) |
|[let]                          |(262144,[173339],[1.0])                                 |
|[let]                          |(262144,[173339],[1.0])                                 |

##### TF-IDF

앞서 hashTF의 결과는 벡터튜플이고, 이를 IDF에 입력으로 넣어준다.

In [3]:
idf = IDF(inputCol="hash", outputCol="idf")

NameError: name 'IDF' is not defined

In [4]:
idfModel = idf.fit(hashDf)

NameError: name 'idf' is not defined

In [5]:
idfDf = idfModel.transform(hashDf)

NameError: name 'idfModel' is not defined

In [6]:
for e in idfDf.select("nostops","hash").take(10):
    print(e)

NameError: name 'idfDf' is not defined

### S.5.9 Word2Vec

Word2Vec은 2013년 구글 Tomas Mikolov가 고안한 방법으로, 단어를 벡터로 변환하는 방법을 말한다.
**Bag of Words 모델은 단어 순서와 문맥을 무시**한다. Word2Vec는 이런 BoW 모델의 단점을 극복하기 위해서,
말뭉치로부터 **단어들 서로의 맥락 또는 연관성 Word Embedding을 신경망으로 학습**하여 Word2Vec을 계산한다.
단어가 벡터로 변환되면, 벡터연산이 가능해지고 서로 간의 거리를 측정하여, 가까울수록 비슷한 단어라고 해석하게 된다.

Word2Vec을 계산하고 나면, 벡터('king') - 벡터('man') + 벡터('woman') = 벡터('queen) 이런 연산이 가능해진다. 즉 king 단어벡터에서 man 단어백터를 빼고, woman 단어백터를 더하면, queen 단어백터를 구할 수 있다는 의미이다.

```vectorSize```는 단어벡터를 몇 개로 구성할 것인지, ```minCount```는 최소 단어빈도를 설정할 수 있다.


In [7]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="w2v")
model = word2Vec.fit(tokDf)
w2vDf = model.transform(tokDf)

NameError: name 'tokDf' is not defined

In [43]:
for e in w2vDf.select("w2v").take(3):
    print(e)

Row(w2v=DenseVector([-0.0305, -0.0208, -0.0298]))
Row(w2v=DenseVector([0.0712, 0.0931, 0.0137]))
Row(w2v=DenseVector([0.0201, 0.0342, -0.0526]))


In [44]:
model.getVectors().show(truncate=False)

+--------+-----------------------------------------------------------------+
|word    |vector                                                           |
+--------+-----------------------------------------------------------------+
|trouble |[0.09912902861833572,-0.030523067340254784,-0.06505794078111649] |
|mother  |[0.048511162400245667,0.1060371845960617,-0.138960599899292]     |
|find    |[-0.13015522062778473,-0.16613608598709106,0.11310941725969315]  |
|standing|[0.023323552682995796,0.06111243739724159,0.12170795351266861]   |
|wisdom, |[-0.08077400922775269,0.04744517803192139,-0.11689654737710953]  |
|in      |[0.1382375806570053,-0.012973016127943993,-0.1357135772705078]   |
|myself  |[-0.1496686488389969,-0.058997854590415955,-0.05918445438146591] |
|is      |[-0.07973738759756088,-0.09185151010751724,-0.06437903642654419] |
|darkness|[-0.06594431400299072,0.050063349306583405,0.1444423347711563]   |
|우리    |[-0.10292311012744904,-0.12944838404655457,-0.12971743941307068] |
|

In [45]:
model.findSynonyms("times", 3).show()

+--------+------------------+
|    word|        similarity|
+--------+------------------+
|darkness|0.9830100536346436|
| whisper|0.9286471009254456|
|      너|0.9184039235115051|
+--------+------------------+



### S.5.10 NGram

텍스트를 대상으로 하면, n-gram은 연속된 n개의 토큰으로 구성된 순열을 말한다.
unigram은 한 단어로, bigram은 두 단어로 구성한다.

In [None]:
from pyspark.ml.feature import NGram

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

In [None]:
ngramDf = ngram.transform(tokDf)

In [None]:
ngramDf.show()

In [50]:
for e in ngramDf.select("words","ngrams").take(3):
    print (e)

Row(words=['when', 'i', 'find', 'myself', 'in', 'times', 'of', 'trouble'], ngrams=['when i', 'i find', 'find myself', 'myself in', 'in times', 'times of', 'of trouble'])
Row(words=['mother', 'mary', 'comes', 'to', 'me'], ngrams=['mother mary', 'mary comes', 'comes to', 'to me'])
Row(words=['speaking', 'words', 'of', 'wisdom,', 'let', 'it', 'be'], ngrams=['speaking words', 'words of', 'of wisdom,', 'wisdom, let', 'let it', 'it be'])


### S.5.11 StringIndexer

문자열 컬럼을 인덱스 컬럼으로 변환한다. **빈도가 제일 높은 순서**로 ```0.0```부터 인덱스 값이 주어진다. 인덱스는 double 형을 가지게 된다.
**없는 레이블에 대해서는 예외**가 발생할 수 있으므로 (default), ```setHandleInvalid("skip")``` 함수로 'skip', 'keep', 'error' 등으로 설정할 수 있다.

구분 | 설명 | 예
-----|-----|-----
nominal | 명목 또는 구분 값 cateogry  | 사자, 호랑이, 사람
ordinal | 명목값과 다른 점은 순서가 있다. | 키 low, med, high
interval | 일정한 간격이 있다. | 150-165, 165-180, 180-195

현재 텍스트에 대해서는 적당한 명목변수가 없으므로, 문장 전체에 대해 인덱스로 변환해보자.

In [8]:
from pyspark.ml.feature import StringIndexer

labelIndexer = StringIndexer(inputCol="sent", outputCol="sentLabel")

In [9]:
model=labelIndexer.fit(myDf)

NameError: name 'myDf' is not defined

In [59]:
siDf=model.transform(myDf)

In [60]:
siDf.show()

+--------------------+---------+
|                sent|sentLabel|
+--------------------+---------+
|When I find mysel...|      5.0|
|Mother Mary comes...|      3.0|
|Speaking words of...|      0.0|
|And in my hour of...|      1.0|
|She is standing r...|      4.0|
|Speaking words of...|      0.0|
|      우리 Let it be|      9.0|
|        나 Let it be|      7.0|
|        너 Let it be|      8.0|
|           Let it be|      2.0|
|Whisper words of ...|      6.0|
+--------------------+---------+



### S.5.12 연속데이터의 변환

몸무게(inches), 키(pounds) 데이터를 분석해보자.
이 데이터는 정량, 연속 데이터이다. 
출처는 https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv

```python
1	65.78	112.99
2	71.52	136.49
3	69.40	153.03
4	68.22	142.34
5	67.79	144.30
6	68.70	123.30
7	69.80	141.49
8	70.01	136.46
9	67.90	112.37
10	66.78	120.67
11	66.49	127.45
12	67.62	114.14
13	68.30	125.61
14	67.12	122.46
15	68.28	116.09
16	71.09	140.00
17	66.46	129.50
18	68.65	142.97
19	71.23	137.90
20	67.13	124.04
21	67.83	141.28
22	68.88	143.54
23	63.48	97.90
24	68.42	129.50
25	67.63	141.85
26	67.21	129.72
27	70.84	142.42
28	67.49	131.55
29	66.53	108.33
30	65.44	113.89
31	69.52	103.30
32	65.81	120.75
33	67.82	125.79
34	70.60	136.22
35	71.80	140.10
36	69.21	128.75
37	66.80	141.80
38	67.66	121.23
39	67.81	131.35
40	64.05	106.71
41	68.57	124.36
42	65.18	124.86
43	69.66	139.67
44	67.97	137.37
45	65.98	106.45
46	68.67	128.76
47	66.88	145.68
48	67.70	116.82
49	69.82	143.62
50	69.09	134.93
```

In [10]:
from pyspark.sql.types import *

rdd=spark.sparkContext\
    .textFile(os.path.join('data','ds_spark_heightweight.txt'))

In [11]:
myRdd=rdd.map(lambda line:[float(x) for x in line.split('\t')])

In [12]:
myDf=spark.createDataFrame(myRdd,["id","weight","height"])

In [13]:
myDf.printSchema()

root
 |-- id: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)



In [14]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(threshold=68.0, inputCol="weight", outputCol="weight2")

In [15]:
binDf = binarizer.transform(myDf)

In [16]:
binDf.show(10)

+----+------+------+-------+
|  id|weight|height|weight2|
+----+------+------+-------+
| 1.0| 65.78|112.99|    0.0|
| 2.0| 71.52|136.49|    1.0|
| 3.0|  69.4|153.03|    1.0|
| 4.0| 68.22|142.34|    1.0|
| 5.0| 67.79| 144.3|    0.0|
| 6.0|  68.7| 123.3|    1.0|
| 7.0|  69.8|141.49|    1.0|
| 8.0| 70.01|136.46|    1.0|
| 9.0|  67.9|112.37|    0.0|
|10.0| 66.78|120.67|    0.0|
+----+------+------+-------+
only showing top 10 rows



In [17]:
from pyspark.ml.feature import QuantileDiscretizer

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="height", outputCol="height3")

In [18]:
qdDf = discretizer.fit(binDf).transform(binDf)

In [19]:
qdDf.show(10)

+----+------+------+-------+-------+
|  id|weight|height|weight2|height3|
+----+------+------+-------+-------+
| 1.0| 65.78|112.99|    0.0|    0.0|
| 2.0| 71.52|136.49|    1.0|    1.0|
| 3.0|  69.4|153.03|    1.0|    2.0|
| 4.0| 68.22|142.34|    1.0|    2.0|
| 5.0| 67.79| 144.3|    0.0|    2.0|
| 6.0|  68.7| 123.3|    1.0|    0.0|
| 7.0|  69.8|141.49|    1.0|    2.0|
| 8.0| 70.01|136.46|    1.0|    1.0|
| 9.0|  67.9|112.37|    0.0|    0.0|
|10.0| 66.78|120.67|    0.0|    0.0|
+----+------+------+-------+-------+
only showing top 10 rows



### S.5.13 VectorAssembler

열을 묶어서 Vector열로 만든다. features 컬럼을 생성할 경우에 사용한다.
단 문자열 string은 묶을 수 없다.

In [20]:
#from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=["weight2","height3"],outputCol="features")

In [21]:
vaDf = va.transform(qdDf)

In [22]:
vaDf.printSchema()

root
 |-- id: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)
 |-- weight2: double (nullable = true)
 |-- height3: double (nullable = true)
 |-- features: vector (nullable = true)



In [23]:
vaDf.show(5)

+---+------+------+-------+-------+---------+
| id|weight|height|weight2|height3| features|
+---+------+------+-------+-------+---------+
|1.0| 65.78|112.99|    0.0|    0.0|(2,[],[])|
|2.0| 71.52|136.49|    1.0|    1.0|[1.0,1.0]|
|3.0|  69.4|153.03|    1.0|    2.0|[1.0,2.0]|
|4.0| 68.22|142.34|    1.0|    2.0|[1.0,2.0]|
|5.0| 67.79| 144.3|    0.0|    2.0|[0.0,2.0]|
+---+------+------+-------+-------+---------+
only showing top 5 rows



### S.5.14 Pipeline

**Pipeline**은 여러 Estimator를 묶은 Estimator를 반환한다. Pipeline은 여러 작업을 묶어, 순서대로 단계적으로 Estimator를 적용하기 위해 사용한다.


In [24]:
df = spark.createDataFrame([
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0),
        (4, "my dog has flea problems. help please.",0.0)
    ], ["id", "text", "label"])

In [25]:
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [26]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [27]:
model = pipeline.fit(df)

In [28]:
myDf = model.transform(df)

In [None]:
myDf.select('label', 'features').show()

## 문제 S-4: 연설문을 기계학습하기 위해 변환

2019.10.21일 '제74주년 경찰의 날 기념식 축사' 전문을 변환하세요.
전문은 http://www.korea.kr/archive/speechView.do?newsId=132031636 에서 읽을 수 있고, 해당 사이트에서 텍스트만 파일로 저장해서 사용한다.

* 1) DataFrame 생성
* 2) 단어로 분리해서, 출력
* 3) 정리 strip, replace
* 3) 불용어 구성, 출력 - 축사 전문에서 한글자로 된 단어를 찾아내 스스로 구성
* 4) 불용어 제거하고, 출력
* 5) TF-IDF를 계산하고, 출력
* 6) TF-IDF 컬럼을 features로 구성, 출력

### 1) DataFrame 생성

In [85]:
import os
#spark.createDataFrame(os.path.join("data", "20191021_policeAddress.txt"))
#df=spark.read.text(os.path.join("data", "20191021_policeAddress.txt"))

from pyspark.sql.types import StructType, StructField, StringType
police=spark.read\
    .options(header="true", delimiter=" ", inferSchema="true")\
    .schema(
        StructType([
            StructField("sent",StringType()),
            ])
    )\
    .text(os.path.join("data", "20191021_policeAddress.txt"))

문장이 잘리지 않고 온전하게 출력되려면 'False'로 설정한다.

In [86]:
police.show(3)

+---------------------------------+
|                             sent|
+---------------------------------+
| 존경하는 국민 여러분, 경찰관 ...|
|                                 |
|국민의 안전을 위해 밤낮없이 애...|
+---------------------------------+
only showing top 3 rows



### 2) 단어로 분리

In [62]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="sent", outputCol="tokens")
tokDf = tokenizer.transform(police)

In [63]:
tokDf.show(3)

+---------------------------------+------------------------------+
|                             sent|                        tokens|
+---------------------------------+------------------------------+
| 존경하는 국민 여러분, 경찰관 ...| [존경하는, 국민, 여러분,, ...|
|                                 |                            []|
|국민의 안전을 위해 밤낮없이 애...|[국민의, 안전을, 위해, 밤낮...|
+---------------------------------+------------------------------+
only showing top 3 rows



In [64]:
for r in tokDf.select("sent").take(3):
    print (r[0])

존경하는 국민 여러분, 경찰관 여러분, 일흔네 돌 ‘경찰의 날’입니다.
 
국민의 안전을 위해 밤낮없이 애쓰시는 전국의 15만 경찰관 여러분께 먼저 감사를 드립니다. 전몰·순직 경찰관들의 고귀한 희생에 경의를 표합니다. 유가족 여러분께 위로의 마음을 전합니다.


### 3) 정리

텍스트는 정리가 필요하다. 예를 들어, '여러분,'은 컴마로 인해 '여러분'과 다른 단어로 인식된다.
'날’입니다.'도 '날입니다.'와 다른 단어로 취급된다.

#### 컴마, 따옴표, 마침표 등 제거

'여러분,', '‘경찰의', '날’입니다.' 에서 발견할 수 있듯이, 컴마, 따옴표, 마침표 등을 제거해야 한다.

In [87]:
'여러분,'.rstrip(',')

'여러분'

In [66]:
'‘경찰의'.lstrip('‘')

'경찰의'

In [67]:
'날’입니다.'.rstrip('.')

'날’입니다'

In [68]:
'날’입니다.'.replace("’","")

'날입니다.'

In [129]:
wordList=['존경하는', '국민', '여러분,', '경찰관', '여러분,', '일흔네', '‘경찰의', '날’입니다.']

In [130]:
cleaned=list()
for w in wordList:
    cleaned.append(w.lstrip('‘').rstrip("’").rstrip(',').rstrip('.').replace("’","").replace("”",""))
cleaned

['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다']

#### 숫자 제거

숫자가 하나 이상 있는 경우 정규식 패턴 ```\d+```를 적용한다.
숫자로 시작하는 문자열인 경우 ```regex.match(w)```가 아니면 컴마등을 정리한다.

In [131]:
import re

regex = re.compile('\d+')
cleaned=list()

wordList=["1", "123", "15만", "2015년에", "15.1%", "74.5점", "8,572명을", "Seoul1", "Seoul"]
for w in wordList:
    if not regex.match(w):
        cleaned.append(w)
print(cleaned)

['Seoul1', 'Seoul']


#### udf함수: 컴마, 따옴표, 마침표, 숫자 제거

In [132]:

import re

def trim(wordList):
    regex = re.compile('\d+')
    cleaned=list()
    for w in wordList:
        if not regex.match(w):
            cleaned.append(w.lstrip('‘').rstrip("’").rstrip(',').rstrip('.').replace("’","").replace("”",""))
    return cleaned

위 함수가 올바르게 실행되는지 해보자.

In [133]:
myList=["1", "123", "15만", "2015년에", "15.1%", "74.5점", "8,572명을", "Seoul1", "Seoul"]
trim(myList)

['Seoul1', 'Seoul']

이번에는 udf에 넣어서 컬럼을 생성해보자.
udf함수 결과는 **문자열 배열 ArrayType(StringType)**로 맞추어 준다.

In [134]:
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType

trimUdf=f.udf(trim, ArrayType(StringType()))

In [135]:
wordsDf = tokDf.withColumn('words', trimUdf(f.col('tokens')))

아래에서 보듯이, 15만과 같이 숫자가 포함된 문자열이 제거되었다는 것을 확인하자.

In [136]:
wordsDf.show(3, False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|sent                                                                                                                                                                              |tokens                                                                                                                                                                                                    |words                                  

### 3) 불용어 

In [140]:
from pyspark.ml.feature import StopWordsRemover

#stop = StopWordsRemover(inputCol="tokens", outputCol="nostops")
stop = StopWordsRemover(inputCol="words", outputCol="nostops")
stop.setStopWords([u"돌", u"너", u"우리", u'있습니다', u'더', u'합니다', u'그', u'드립니다', u'것입니다'])

StopWordsRemover_8319c919508a

_mystopwords=[u"나", u"너", u"우리"]
stopwords=list()
for e in _mystopwords:
    stopwords.append(e)
stop.setStopWords(stopwords)

In [141]:
#stopDf=stop.transform(tokDf)
stopDf=stop.transform(wordsDf)
stopDf.show(3)

+---------------------------------+------------------------------+------------------------------+------------------------------+
|                             sent|                        tokens|                         words|                       nostops|
+---------------------------------+------------------------------+------------------------------+------------------------------+
| 존경하는 국민 여러분, 경찰관 ...| [존경하는, 국민, 여러분,, ...|[존경하는, 국민, 여러분, 경...|[존경하는, 국민, 여러분, 경...|
|                                 |                            []|                            []|                            []|
|국민의 안전을 위해 밤낮없이 애...|[국민의, 안전을, 위해, 밤낮...|[국민의, 안전을, 위해, 밤낮...|[국민의, 안전을, 위해, 밤낮...|
+---------------------------------+------------------------------+------------------------------+------------------------------+
only showing top 3 rows



In [142]:
for r in stopDf.select("nostops").take(3):
    for e in r:
        print (e)

['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다']
[]
['국민의', '안전을', '위해', '밤낮없이', '애쓰시는', '전국의', '경찰관', '여러분께', '먼저', '감사를', '전몰·순직', '경찰관들의', '고귀한', '희생에', '경의를', '표합니다', '유가족', '여러분께', '위로의', '마음을', '전합니다']


### 전체 단어빈도

불용어를 제거한 단어들을 가진 컬럼을 RDD로 변환해서 출력해보자.
리스트의 Row로 구성되어 있다.

In [143]:
stopDf.select("nostops").rdd.take(3)

[Row(nostops=['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다']),
 Row(nostops=[]),
 Row(nostops=['국민의', '안전을', '위해', '밤낮없이', '애쓰시는', '전국의', '경찰관', '여러분께', '먼저', '감사를', '전몰·순직', '경찰관들의', '고귀한', '희생에', '경의를', '표합니다', '유가족', '여러분께', '위로의', '마음을', '전합니다'])]

flatMap()을 하면, Row가 제거된다. 그래도 2차원 리스트로 구성된다.

In [144]:
stopDf.select("nostops").rdd.flatMap(lambda x:x).take(2)

[['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다'], []]

flatMap()을 한차례 더하면, 2차원 리스트마저 해제되어 1차원 리스트가 된다.

In [145]:
stopDf.select("nostops").rdd.flatMap(lambda x:x).flatMap(lambda x:x)\
    .take(3)

['존경하는', '국민', '여러분']

앞서 RDD에서 해보았던대로 단어빈도를 계산하고, 빈도순으로 정렬하여 출력해보자.

In [146]:
stopDf.select("nostops")\
    .rdd\
    .flatMap(lambda x:x).flatMap(lambda x:x)\
    .map(lambda x:(x,1))\
    .reduceByKey(lambda x,y:x+y)\
    .map(lambda x:(x[1],x[0]))\
    .sortByKey(False)\
    .take(20)

[(8, '경찰은'),
 (7, '국민의'),
 (6, '여러분'),
 (6, '경찰의'),
 (5, '경찰관'),
 (4, '경찰'),
 (4, '우리의'),
 (3, '여러분께'),
 (3, '역대'),
 (3, '가장'),
 (3, '함께'),
 (2, '안전을'),
 (2, '위해'),
 (2, '먼저'),
 (2, '감사를'),
 (2, '받으신'),
 (2, '비롯한'),
 (2, '또한'),
 (2, '외국'),
 (2, '경찰을')]

경찰은, 경찰의, 경찰 이런 단어들은 이음동의로 처리하자.

In [147]:
stopDf.select("nostops")\
    .rdd\
    .flatMap(lambda x:x).flatMap(lambda x:x)\
    .map(lambda x: x.replace("경찰은","경찰"))\
    .map(lambda x: x.replace("경찰의","경찰"))\
    .map(lambda x:(x,1))\
    .reduceByKey(lambda x,y:x+y)\
    .map(lambda x:(x[1],x[0]))\
    .sortByKey(False)\
    .take(20)

[(18, '경찰'),
 (7, '국민의'),
 (6, '여러분'),
 (5, '경찰관'),
 (4, '우리의'),
 (3, '여러분께'),
 (3, '역대'),
 (3, '가장'),
 (3, '함께'),
 (2, '안전을'),
 (2, '위해'),
 (2, '먼저'),
 (2, '감사를'),
 (2, '받으신'),
 (2, '비롯한'),
 (2, '또한'),
 (2, '외국'),
 (2, '경찰을'),
 (2, '경찰헌장은'),
 (2, '겨레를')]

### 4) TF-IDF 계산, features로 구성

In [148]:
from pyspark.ml.feature import HashingTF, IDF

# hashTF = HashingTF(inputCol="nostops", outputCol="hash", numFeatures=32) #  mapping indices insufficient
hashTF = HashingTF(inputCol="nostops", outputCol="hash")

hashDf = hashTF.transform(stopDf)

In [149]:
idf = IDF(inputCol="hash", outputCol="idf")

idfModel = idf.fit(hashDf)
idfDf = idfModel.transform(hashDf)

In [150]:
idfDf.select("nostops", "hash").show(5, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|nostops                                                                                                                                                                                                                                                   |hash                                                                                                                                                                                                                      

5) Word2Vec

In [192]:
from pyspark.ml.feature import Word2Vec

#word2Vec = Word2Vec(vectorSize=20, minCount=1, inputCol="nostops", outputCol="w2v")
#model = word2Vec.fit(stopDf)
#w2vDf = model.transform(stopDf)
word2Vec = Word2Vec(vectorSize=20, minCount=1, inputCol="tokens", outputCol="w2v")
model = word2Vec.fit(tokDf)
w2vDf = model.transform(tokDf)

In [193]:
model.getVectors().show()

+-----------+--------------------+
|       word|              vector|
+-----------+--------------------+
|   공정해야|[0.02441495656967...|
| 자치경찰제|[0.01395535655319...|
|   공권력을|[0.02412784658372...|
|흘렸습니다.|[0.01344944164156...|
|     만들고|[0.02127943187952...|
|     헌신에|[-0.0116500146687...|
|    나창헌,|[0.01650489121675...|
|       닿아|[-0.0210012532770...|
|         돌|[-0.0164661873131...|
|           |[-0.0221833325922...|
|     민갑룡|[0.02174899168312...|
|     수상자|[0.02335253916680...|
|       기간|[0.01740676164627...|
|     시대적|[-0.0141646964475...|
|     일흔네|[-0.0246702507138...|
|  했습니다.|[-0.0157327018678...|
|       범죄|[0.00503238197416...|
|  됐습니다.|[0.00533150695264...|
|       검경|[-0.0115960473194...|
|   공권력이|[-0.0018140476895...|
+-----------+--------------------+
only showing top 20 rows



In [203]:
model.findSynonyms("검경", 5).show()

+----------+------------------+
|      word|        similarity|
+----------+------------------+
|      한때|0.6153565645217896|
|근무환경을|0.5948322415351868|
|    조정과|0.5635372996330261|
|    경찰이|0.5051454901695251|
|    시대적|0.4923901855945587|
+----------+------------------+



In [195]:
model.findSynonyms("치안이", 5).show()

+--------+------------------+
|    word|        similarity|
+--------+------------------+
|   15.1%|0.6066877245903015|
|모범으로|0.5669057369232178|
|  검찰과|0.5377734899520874|
|드리기도|0.5311453342437744|
|  없지는|0.5089554786682129|
+--------+------------------+



In [196]:
model.findSynonyms("공권력이", 5).show()

+----------------------+------------------+
|                  word|        similarity|
+----------------------+------------------+
|                축하와|0.5985018014907837|
|서울국제경찰청장회의가|0.5500941872596741|
|         좋아졌습니다.|0.5490857362747192|
|                감사의|0.5258331298828125|
|                응원할|0.5257837772369385|
+----------------------+------------------+



## 문제: 트윗 정서 분석

트윗의 정서를 분석해보자.
트윗은 API를 사용하여 수집할 수 있으나, 정서를 태깅해야 한다.
스탠포드 대학생들이 수집해 놓은 트윗이 있으니 이를 이용해 보자. http://help.sentiment140.com/for-students/
이 데이터의 항목은:
* 트윗 정서 (0=부정, 2=중립, 4=긍정)
* 트윗 ID
* 트윗 일자
* 조회 (없으면 NO_QUERY)
* 사용자
* 트윗 텍스트

1) 전체 데이터 갯수, 각 '부정' '긍정' '중립' 별 데이터 갯수
2) 트윗 정서 컬럼을 'label'로 변경
5) stopwords 제거하고, 출력
6) pipeline으로 tf-idf 계산하고 'features' 컬럼으로 변경

https://github.com/tthustla/twitter_sentiment_analysis_part2/blob/master/Capstone_part3-Copy1.ipynb
https://towardsdatascience.com/another-twitter-sentiment-analysis-with-python-part-2-333514854913