In [1]:
import findspark
import pyspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('spark://172.27.80.1:7077').appName('outlier').getOrCreate()

In [3]:
dataF = spark.createDataFrame([('1', 2400, 32, '9', 3, 'PHILIPS', '1XZFG')
                            , ('2', 2400, 23, '13', 3, 'Apple', '2XZFG')
                            , ('3', 2400, 22, '2.5', 5, 'LG', '3XZFG')
                            , ('4', 1, 23, '16.5', 3, None, '4XZFG')
                            , ('8', 3500, 33, '23', 5, 'LG', '8XZFG')
                            , ('6', 2400, 23, '9', 3, 'SONY', '6XZFG')
                            , ('7', 3500, 34, '23', None, 'PHILIPS', '7XZFG')
                            , ('8', 2500, 33, '23', 5, 'LG', '8XZFG')
                            , ('9', 2400, 22, '2.5', 0, 'Apple', '9XZFG')
                            , ('18', 2400, 22, '2.5', 0, 'Apple', '9XZFG')
                            , ('10', None, 23, '6', 4, None, None)
                            , ('15', 2750, 27, '8', 1, 'SONY', 'DCEZX15')
                            , ('11', 2, 20, '6', 3, 'SONY', '11XZFG')
                            , ('12', 3500, 32, '6', 3, 'LG', '12XZFG')
                            , ('17', 3500, 39, '6', 3, 'LG', '12XZFG')
                            , ('13', 2400, 24, '16.5', 5, 'Apple', '13XZFG')
                            , ('14', 3500, 34, '6', 6, 'DELL', '14XZFG')
                            , ('15', 2400, 22, '6', 4, 'SONY', '15XZFG')],
                              ['Id', 'Salary', 'age', 'experience', 'Coef', 'TV ', 'CODE_ID'])

이상치 처리 기준은 관계자들과 회의를 통해 정하는게 제일 이상적 -> 모두가 공감 가능하게

In [11]:
#col selection
features = ['Salary','age','Coef']

In [6]:
#4분위값 사용
quantiles = [0.25, 0.75]

In [9]:
from py.path import local
from pyspark.sql.types import *
from pyspark.sql import Row as row
from pyspark.sql import DataFrameStatFunctions as statFunc

In [7]:
#극단치 경계
cutoffpoints = []

In [31]:
#극단치 경계 구하는 for문 작성
for feature in features:  #roof 3번 돌려라
    qts = statFunc(dataF).approxQuantile(feature, quantiles, 0.05)
    IQR = qts[1] - qts[0]
    cutoffpoints.append((feature, [qts[0] - 1.5 * IQR, qts[1] + 1.5 * IQR]))
cutoffpoints = dict(cutoffpoints)

In [32]:
#극단치 경계 확인
cutoffpoints

{'Salary': [750.0, 5150.0], 'age': [5.5, 49.5], 'Coef': [0.0, 8.0]}

In [35]:
#이상치 확인
aberrant_value = dataF.select(*['Id'] + [
    (
        (dataF[f] < cutoffpoints[f][0]) |
        (dataF[f] > cutoffpoints[f][1])
    ).alias(f + "_b") for f in features
])

In [36]:
aberrant_value.show()

+---+--------+-----+------+
| Id|Salary_b|age_b|Coef_b|
+---+--------+-----+------+
|  1|   false|false| false|
|  2|   false|false| false|
|  3|   false|false| false|
|  4|    true|false| false|
|  8|   false|false| false|
|  6|   false|false| false|
|  7|   false|false|  null|
|  8|   false|false| false|
|  9|   false|false| false|
| 18|   false|false| false|
| 10|    null|false| false|
| 15|   false|false| false|
| 11|    true|false| false|
| 12|   false|false| false|
| 17|   false|false| false|
| 13|   false|false| false|
| 14|   false|false| false|
| 15|   false|false| false|
+---+--------+-----+------+



In [39]:
#합치기
withaberrant_value = dataF.join(aberrant_value, on='Id')

In [41]:
#outlier 확인
(
    withaberrant_value.filter('Salary_b').select('Id','Salary','Coef').show()
)

+---+------+----+
| Id|Salary|Coef|
+---+------+----+
| 11|     2|   3|
|  4|     1|   3|
+---+------+----+



In [42]:
#outlier 제외
df_no_aberrant_value = (
    withaberrant_value.filter('!Salary_b').select(dataF.columns)
)

In [43]:
#outlier 제외 출력
df_no_aberrant_value.show()

+---+------+---+----------+----+-------+-------+
| Id|Salary|age|experience|Coef|    TV |CODE_ID|
+---+------+---+----------+----+-------+-------+
|  7|  3500| 34|        23|null|PHILIPS|  7XZFG|
| 15|  2750| 27|         8|   1|   SONY|DCEZX15|
| 15|  2750| 27|         8|   1|   SONY|DCEZX15|
| 15|  2400| 22|         6|   4|   SONY| 15XZFG|
| 15|  2400| 22|         6|   4|   SONY| 15XZFG|
|  3|  2400| 22|       2.5|   5|     LG|  3XZFG|
|  8|  3500| 33|        23|   5|     LG|  8XZFG|
|  8|  3500| 33|        23|   5|     LG|  8XZFG|
|  8|  2500| 33|        23|   5|     LG|  8XZFG|
|  8|  2500| 33|        23|   5|     LG|  8XZFG|
| 18|  2400| 22|       2.5|   0|  Apple|  9XZFG|
| 17|  3500| 39|         6|   3|     LG| 12XZFG|
|  6|  2400| 23|         9|   3|   SONY|  6XZFG|
|  9|  2400| 22|       2.5|   0|  Apple|  9XZFG|
|  1|  2400| 32|         9|   3|PHILIPS|  1XZFG|
| 12|  3500| 32|         6|   3|     LG| 12XZFG|
| 13|  2400| 24|      16.5|   5|  Apple| 13XZFG|
| 14|  3500| 34|    

In [47]:
#통계 분석
descriptive_stats = df_no_aberrant_value.describe(features)
descriptive_stats.show()

#빅데이터에서 median을 안 보여주는 이유 = median을 위해서는 sort해야하는데 이는 셔플링이 제일 많이 일어나는 일

+-------+------------------+------------------+------------------+
|summary|            Salary|               age|              Coef|
+-------+------------------+------------------+------------------+
|  count|                19|                19|                18|
|   mean|2794.7368421052633|28.263157894736842| 3.388888888888889|
| stddev| 503.5691326372469| 5.635621875013321|1.8515141304249878|
|    min|              2400|                22|                 0|
|    max|              3500|                39|                 6|
+-------+------------------+------------------+------------------+



In [46]:
#상관계수 구하기
corr = df_no_aberrant_value.corr('Salary','age')
print('correlation: ', corr)

correlation:  0.763983716024124
