In [1]:
# jupyter notebook 환경에서 spark를 사용하기 위한 모듈
import findspark
findspark.init('C:/spark/spark-3.0.1-bin-hadoop2.7')

In [2]:
# pyspark import
import pyspark
from pyspark import SparkConf,SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import SparkSession

In [3]:
# pyspark 연결하기
# 꼬이면 kernel restart
conf= pyspark.SparkConf().setAppName('appName').setMaster('local')
sc= pyspark.SparkContext(conf= conf)
spark= SparkSession(sc)

In [6]:
df= spark.createDataFrame(
[
    (1, 144.5, 5.9, 33, 'M'),
    (2, 167.2, 5.4, 45, 'M'),
    (3, 124.1, 5.2, 23, 'F'),
    (4, 144.5, 5.9, 33, 'M'),
    (5, 133.2, 5.7, 54, 'F'),
    (3, 124.1, 5.2, 23, 'F'),
    (5, 129.2, 5.3, 42, 'M')
],
['id', 'weight', 'height', 'age', 'gender'])

In [5]:
print(df.count())
print(df.distinct().count())

7
6


# 중복된 값 처리하기

In [6]:
# 중복된 행을 찾으나 id의 경우 제외되지 않았다. 이는 서브셋 파라미터를 통해 명시된 칼럼들을 사용해서 중복된 행을 찾기 때문이다.
df = df.dropDuplicates()

In [7]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
|  1| 144.5|   5.9| 33|     M|
|  4| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+



In [8]:
df.select([c for c in df.columns if c != 'id']).distinct().count()

5

In [9]:
df = df.dropDuplicates(
    subset=[c for c in df.columns if c != 'id'])

In [10]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [7]:
import pyspark.sql.functions as fn

df.agg(
    fn.count('id').alias('count'), # 열 이름을 count라는 이름으로 바꾸기
    fn.countDistinct('id').alias('distinct') # 열 이름을 distinct라는 이름으로 바꾸기
).show()

+-----+--------+
|count|distinct|
+-----+--------+
|    7|       5|
+-----+--------+



In [12]:
# id 중복을 피해 새로운 id를 할당
df.withColumn('new_id', fn.monotonically_increasing_id()).show()

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  1| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+



# 결측치 처리하기

In [4]:
df_miss = spark.createDataFrame([
    (1, 143.5, 5.6, 28, 'M', 100000),
    (2, 167.2, 5.4, 45, 'M', None),
    (3, None, 5.2, None, None, None),
    (4, 144.5, 5.9, 33, 'M', None),
    (5, 133.2, 5.7, 54, 'F', None),
    (6, 124.1, 5.2, None, 'F', None),
    (7, 129.2, 5.3, 42, 'M', 76000),
],
['id', 'weight', 'height', 'age', 'gender' ,'income'])

In [8]:
# 각 행의 미관찰 값 개수를 알아보기
df_miss.rdd.map(
    lambda row: (row['id'], sum([c == None for c in row]))
).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [9]:
df_miss.where('id == 3').show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [10]:
df_miss.agg(*[
    (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns
]).show()
# count함수에 있는 *를 칼럼 이름 부분에 추가하면 모든 행의 개수를 셀 수 있다.
# 반면 리스트 앞에 *가 오면, agg()함수는 그 리스트의 각 엘리먼트를 함수에 전달할 파라미터로 취급한다.

+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+



In [11]:
# 미관찰 Feature 제거하기
df_miss_no_income = df_miss.select(
    [c for c in df_miss.columns if c != 'income'])

In [12]:
df_miss_no_income.dropna(thresh=3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [14]:
means = df_miss_no_income.agg(
    *[fn.mean(c).alias(c)
     for c in df_miss_no_income.columns if c != 'gender']
).toPandas().to_dict('records')[0]

means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+



In [15]:
# dropna()를 사용하여 결측치 제거하기
df_miss_no_income.dropna(thresh= 3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [16]:
means = df_miss_no_income.agg(
    *[fn.mean(c).alias(c)
     for c in df_miss_no_income.columns if c != 'gender']
).toPandas().to_dict('records')[0]

means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+



# 아웃라이어 처리

In [17]:
df_outliers = spark.createDataFrame([
    (1, 143.5, 5.3, 28),
    (2, 154.2, 5.5, 45),
    (3, 342.3, 5.1, 99),
    (4, 144.5, 5.5, 33),
    (5, 133.2, 5.4, 54),
    (6, 124.1, 5.1, 21),
    (7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])

In [21]:
cols = ['weight', 'height', 'age']
bounds = {}
for col in cols:
    quantiles = df_outliers.approxQuantile(
    col, [0.25, 0.75], 0.05
    )
    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [
        quantiles[0] - 1.5 * IQR,
        quantiles[1] + 1.5 * IQR
    ]

In [22]:
outliers = df_outliers.select(
    *['id'] + 
    [((df_outliers[c] < bounds[c][0]) |
     (df_outliers[c] > bounds[c][1])).alias(c + '_o') for c in cols]
)
outliers.show()

+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



In [23]:
df_outliers = df_outliers.join(outliers, on = 'id')
df_outliers.filter('weight_o').select('id', 'weight').show()
df_outliers.filter('age_o').select('id', 'age').show()

+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+



In [4]:
import pyspark.sql.types as typ

In [5]:
fraud = sc.textFile('./dataset/ccFraud.csv')
header = fraud.first()
fraud = fraud.filter(lambda row: row != header)\
    .map(lambda row: [int(elem) for elem in row.split(',')])

In [6]:
fields = [
    *[typ.StructField(h[1:-1], typ.IntegerType(), True) for h in header.split(',')]
]
schema = typ.StructType(fields)

In [7]:
fraud_df = spark.createDataFrame(fraud, schema)

In [8]:
fraud_df.show()

+------+------+-----+----------+-------+--------+------------+----------+---------+
|custID|gender|state|cardholder|balance|numTrans|numIntlTrans|creditLine|fraudRisk|
+------+------+-----+----------+-------+--------+------------+----------+---------+
|     1|     1|   35|         1|   3000|       4|          14|         2|        0|
|     2|     2|    2|         1|      0|       9|           0|        18|        0|
|     3|     2|    2|         1|      0|      27|           9|        16|        0|
|     4|     1|   15|         1|      0|      12|           0|         5|        0|
|     5|     1|   46|         1|      0|      11|          16|         7|        0|
|     6|     2|   44|         2|   5546|      21|           0|        13|        0|
|     7|     1|    3|         1|   2000|      41|           0|         1|        0|
|     8|     1|   10|         1|   6016|      20|           3|         6|        0|
|     9|     2|   32|         1|   2428|       4|          10|        22|   

In [9]:
fraud_df.printSchema()

root
 |-- custID: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- state: integer (nullable = true)
 |-- cardholder: integer (nullable = true)
 |-- balance: integer (nullable = true)
 |-- numTrans: integer (nullable = true)
 |-- numIntlTrans: integer (nullable = true)
 |-- creditLine: integer (nullable = true)
 |-- fraudRisk: integer (nullable = true)



In [11]:
fraud_df.groupby('gender').count().show()

+------+-------+
|gender|  count|
+------+-------+
|     1|6178231|
|     2|3821769|
+------+-------+



In [None]:
numerical = ['balance', 'numTrans', 'numIntlTrans']
desc = fraud_df.describe(numerical)
desc.show()

In [12]:
fraud_df.agg({'balance': 'skewness'}).show()

+------------------+
| skewness(balance)|
+------------------+
|1.1818315552993839|
+------------------+



In [13]:
fraud_df.corr('balance', 'numTrans')

0.0004452314017265386

In [None]:
n_numerical = len(numerical)
corr= []
for i in range(0, n_numerical):
    temp = [None] * i
    for j in range(i, n_numerical):
        temp.append(fraud_df.corr(numerical[i], numerical[j]))
    corr.append(temp)

In [10]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')
from bokeh.io import output_notebook

output_notebook()