In [1]:
import pyspark

myConf=pyspark.SparkConf().set("spark.driver.bindAddress", "127.0.0.1")
#myConf=pyspark.SparkConf()
spark=pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

### 문제 1-1의 답
#### 성적데이터로 DataFrame 생성

In [2]:
marks=[
    "김하나, English, 100",
    "김하나, Math, 80",
    "임하나, English, 70",
    "임하나, Math, 100",
    "김갑돌, English, 82.3",
    "김갑돌, Math, 98.5"
]

In [3]:
marksRdd=spark.sparkContext.parallelize(marks).map(lambda x:x.split(','))

In [4]:
marksDf=spark.createDataFrame(marksRdd, schema=["name", "subject", "mark"])

In [5]:
marksDf.printSchema()

root
 |-- name: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- mark: string (nullable = true)



In [6]:
marksDf.show()

+------+--------+-----+
|  name| subject| mark|
+------+--------+-----+
|김하나| English|  100|
|김하나|    Math|   80|
|임하나| English|   70|
|임하나|    Math|  100|
|김갑돌| English| 82.3|
|김갑돌|    Math| 98.5|
+------+--------+-----+



### 문제 1-2의 답
#### zscore 컬럼 생성

In [7]:
from pyspark.sql.types import FloatType

marksDf=marksDf.withColumn('markF', marksDf['mark'].cast(FloatType()))

In [8]:
# zscore를 계산하려면, 평균과 표준편차를 알아야 함
# 계산식에 F함수를 직접 사용하면 오류 발생 -> 따로 평균과 표준편차를 구해서 계산식에서 사용해야 함

from pyspark.sql import functions as F

markStats=marksDf.select(
    F.mean('markF').alias('mean'),
    F.stddev('markF').alias('std')
).collect()

In [9]:
markStats

[Row(mean=88.46666717529297, std=12.786190172956093)]

In [10]:
meanMark=markStats[0]['mean']
stdMark=markStats[0]['std']

In [11]:
meanMark
#markStats[0][0]

88.46666717529297

In [12]:
stdMark

12.786190172956093

In [13]:
# FloatType으로 형변환
from pyspark.sql.types import FloatType

zscoreUdf=F.udf(lambda x: (x-meanMark)/stdMark, FloatType()) 

In [14]:
marksDf=marksDf.withColumn("zscore", zscoreUdf(marksDf['markF']))

In [15]:
marksDf.show()

+------+--------+-----+-----+-----------+
|  name| subject| mark|markF|     zscore|
+------+--------+-----+-----+-----------+
|김하나| English|  100|100.0|  0.9020148|
|김하나|    Math|   80| 80.0| -0.6621728|
|임하나| English|   70| 70.0| -1.4442666|
|임하나|    Math|  100|100.0|  0.9020148|
|김갑돌| English| 82.3| 82.3|-0.48229098|
|김갑돌|    Math| 98.5| 98.5| 0.78470075|
+------+--------+-----+-----+-----------+



### 문제 1-3의 답
#### cdf 컬럼 생성

In [16]:
# scipy.stats.norm.cdf() 함수는 데이터타입을 float로 맞추어 주어야 함
from scipy.stats import norm
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

normCdf=F.udf(lambda x: float(norm.cdf(x)))

In [17]:
# cdf는 평균=0, 표준편차=1을 기본 값으로 누적확률 계산 -> 1.0(원점수,markF를 사용했기 때문에)
marksDf.withColumn("cdf", normCdf(marksDf['markF'])).show()

+------+--------+-----+-----+-----------+---+
|  name| subject| mark|markF|     zscore|cdf|
+------+--------+-----+-----+-----------+---+
|김하나| English|  100|100.0|  0.9020148|1.0|
|김하나|    Math|   80| 80.0| -0.6621728|1.0|
|임하나| English|   70| 70.0| -1.4442666|1.0|
|임하나|    Math|  100|100.0|  0.9020148|1.0|
|김갑돌| English| 82.3| 82.3|-0.48229098|1.0|
|김갑돌|    Math| 98.5| 98.5| 0.78470075|1.0|
+------+--------+-----+-----+-----------+---+



In [18]:
# zscore를 사용해서 cdf 계산
marksDf.withColumn("cdf", normCdf(marksDf['zscore'])).show()

+------+--------+-----+-----+-----------+-------------------+
|  name| subject| mark|markF|     zscore|                cdf|
+------+--------+-----+-----+-----------+-------------------+
|김하나| English|  100|100.0|  0.9020148| 0.8164754981807292|
|김하나|    Math|   80| 80.0| -0.6621728| 0.2539302463290559|
|임하나| English|   70| 70.0| -1.4442666| 0.0743320011235712|
|임하나|    Math|  100|100.0|  0.9020148| 0.8164754981807292|
|김갑돌| English| 82.3| 82.3|-0.48229098|0.31479962882028223|
|김갑돌|    Math| 98.5| 98.5| 0.78470075| 0.7836854740814176|
+------+--------+-----+-----+-----------+-------------------+

