# Chapter 2: Introduction to Data Analysis with Scala and Spark

이 주피터 파일과 같은 디렉토리에 linkage_csv디렉토리만 있으면 됩니다
>**window**   
 curl -L -o donation.zip https://bit.ly/1Aoywaq  
 7z x donation.zip -o./linkage  
 7z x linkage/block_\*.zip -o./linkage_csv


In [1]:
from pprint import pprint

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Ch02").getOrCreate()
sc = spark.sparkContext

### Bringing Data from the Cluster to the Client

In [3]:
# block_*.csv만 따로 모아놓은 디렉토리
rawblocks = sc.textFile("linkage_csv")
rawblocks

linkage_csv MapPartitionsRDD[1] at textFile at <unknown>:0

In [4]:
rawblocks.first()

'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"'

In [5]:
head = rawblocks.take(10) # returnm list type not RDD
pprint(head)
pprint(len(head))

['"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"',
 '37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 '39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 '70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 '84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 '36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 '42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 '25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 '49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 '39932,40902,1,?,1,?,1,1,1,1,1,TRUE']
10


In [6]:
head = sc.parallelize(head) # convert python list to RDD
head.collect()

['"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"',
 '37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 '39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 '70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 '84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 '36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 '42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 '25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 '49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 '39932,40902,1,?,1,?,1,1,1,1,1,TRUE']

In [7]:
def isHeader(line):
    if line.find("id_1") == -1:
        return False
    else:
        return True

In [8]:
head.filter(isHeader).collect()

['"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"']

In [9]:
# head.filterNot(isHeader).length # AttributeError: 'RDD' object has no attribute 'filterNot'
head.filter(lambda x: not isHeader(x)).collect()

['37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 '39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 '70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 '84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 '36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 '42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 '25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 '49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 '39932,40902,1,?,1,?,1,1,1,1,1,TRUE']

In [10]:
head.filter(lambda x: not isHeader(x)).count()
# pyapark RDD has no length property

9

### Shipping Code from the Client to the Cluster

In [11]:
noheader = rawblocks.filter(lambda x: not isHeader(x))
noheader.first()

'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE'

### From RDDs to Data Frames

In [12]:
parsed = spark.read.option("header", "true") \
                    .option("nullValue", "?") \
                    .option("inferSchema", "true") \
                    .csv('linkage_csv')

In [13]:
parsed.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



### Analyzing Data with the DataFrame API

In [14]:
parsed.count()

5749132

In [15]:
parsed.cache()

DataFrame[id_1: int, id_2: int, cmp_fname_c1: double, cmp_fname_c2: double, cmp_lname_c1: double, cmp_lname_c2: double, cmp_sex: int, cmp_bd: int, cmp_bm: int, cmp_by: int, cmp_plz: int, is_match: boolean]

In [16]:
parsed.rdd.map(lambda x: x.is_match).countByValue()

defaultdict(int, {True: 20931, False: 5728201})

In [17]:
parsed.groupBy("is_match") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



In [18]:
parsed.agg({"cmp_sex": "avg", "cmp_sex":"stddev"}).show() # 같은컬럼 쓰면 하나밖에 안나옴
parsed.agg({"cmp_bd": "avg", "cmp_sex":"stddev"}).show()

+-------------------+
|    stddev(cmp_sex)|
+-------------------+
|0.20730111116897781|
+-------------------+

+-------------------+-------------------+
|    stddev(cmp_sex)|        avg(cmp_bd)|
+-------------------+-------------------+
|0.20730111116897781|0.22446526708507172|
+-------------------+-------------------+



In [19]:
parsed.createOrReplaceTempView("linkage")

spark.sql("""
    SELECT is_match, COUNT(*) cnt
    FROM linkage
    GROUP BY is_match
    ORDER BY cnt DESC
""").show()

+--------+-------+
|is_match|    cnt|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



### Fast Summary Statistics for DataFrames

In [20]:
summary = parsed.describe()
summary.show()

+-------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|summary|              id_1|              id_2|       cmp_fname_c1|      cmp_fname_c2|      cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|            cmp_by|            cmp_plz|
+-------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|  count|           5749132|           5749132|            5748125|            103698|           5749132|               2464|            5749132|            5748337|            5748337|           5748337|            5736289|
|   mean| 33324.48559643438| 66587.43558331935| 0.7129024704437266|0.9000176718903189|0.315627819308

In [21]:
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

+-------+-------------------+------------------+
|summary|       cmp_fname_c1|      cmp_fname_c2|
+-------+-------------------+------------------+
|  count|            5748125|            103698|
|   mean| 0.7129024704437266|0.9000176718903189|
| stddev|0.38875835961628014|0.2713176105782334|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



In [22]:
matches = parsed.where("is_match = true")
matchSummary =  matches.describe()

misses = parsed.filter(parsed.is_match == False) # $"is_match" --> parsed.is_match
missSummary = misses.describe()

In [23]:
matchSummary.show()

+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|summary|             id_1|             id_2|       cmp_fname_c1|       cmp_fname_c2|        cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|              cmp_bm|             cmp_by|            cmp_plz|
+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|  count|            20931|            20931|              20922|               1333|               20931|                475|              20931|              20925|               20925|              20925|              20902|
|   mean|34575.72117911232|51259.95939037791| 0.9973163859635038| 0.9898900320318176|  0

In [24]:
matchSummary.show()

+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|summary|             id_1|             id_2|       cmp_fname_c1|       cmp_fname_c2|        cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|              cmp_bm|             cmp_by|            cmp_plz|
+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|  count|            20931|            20931|              20922|               1333|               20931|                475|              20931|              20925|               20925|              20925|              20902|
|   mean|34575.72117911232|51259.95939037791| 0.9973163859635038| 0.9898900320318176|  0

### Pivoting and Reshaping DataFrames

In [25]:
summary.printSchema()

root
 |-- summary: string (nullable = true)
 |-- id_1: string (nullable = true)
 |-- id_2: string (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: string (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: string (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)



In [26]:
type(summary)
# pyspark.sql.dataframe.DataFrame 
# do not have flatmap method, use DataFrame.rdd

pyspark.sql.dataframe.DataFrame

In [27]:
schema = summary.schema
# python has no type: double
longForm = summary.rdd.flatMap(lambda row: [(row[0], schema[i].name, float(row[i])) for i in range(1,len(row))])

longForm.collect()

[('count', 'id_1', 5749132.0),
 ('count', 'id_2', 5749132.0),
 ('count', 'cmp_fname_c1', 5748125.0),
 ('count', 'cmp_fname_c2', 103698.0),
 ('count', 'cmp_lname_c1', 5749132.0),
 ('count', 'cmp_lname_c2', 2464.0),
 ('count', 'cmp_sex', 5749132.0),
 ('count', 'cmp_bd', 5748337.0),
 ('count', 'cmp_bm', 5748337.0),
 ('count', 'cmp_by', 5748337.0),
 ('count', 'cmp_plz', 5736289.0),
 ('mean', 'id_1', 33324.48559643438),
 ('mean', 'id_2', 66587.43558331935),
 ('mean', 'cmp_fname_c1', 0.7129024704437266),
 ('mean', 'cmp_fname_c2', 0.9000176718903189),
 ('mean', 'cmp_lname_c1', 0.3156278193080383),
 ('mean', 'cmp_lname_c2', 0.3184128315317443),
 ('mean', 'cmp_sex', 0.955001381078048),
 ('mean', 'cmp_bd', 0.22446526708507172),
 ('mean', 'cmp_bm', 0.48885529849763504),
 ('mean', 'cmp_by', 0.2227485966810923),
 ('mean', 'cmp_plz', 0.00552866147434343),
 ('stddev', 'id_1', 23659.859374488064),
 ('stddev', 'id_2', 23620.487613269695),
 ('stddev', 'cmp_fname_c1', 0.38875835961628014),
 ('stddev', 'c

In [28]:
cols = ["metric", "field", "value"]
longDF = longForm.toDF(cols) # 파라메터 인풋 바뀜 *cols넣으면 에러남 
longDF.show()

+------+------------+-------------------+
|metric|       field|              value|
+------+------------+-------------------+
| count|        id_1|          5749132.0|
| count|        id_2|          5749132.0|
| count|cmp_fname_c1|          5748125.0|
| count|cmp_fname_c2|           103698.0|
| count|cmp_lname_c1|          5749132.0|
| count|cmp_lname_c2|             2464.0|
| count|     cmp_sex|          5749132.0|
| count|      cmp_bd|          5748337.0|
| count|      cmp_bm|          5748337.0|
| count|      cmp_by|          5748337.0|
| count|     cmp_plz|          5736289.0|
|  mean|        id_1|  33324.48559643438|
|  mean|        id_2|  66587.43558331935|
|  mean|cmp_fname_c1| 0.7129024704437266|
|  mean|cmp_fname_c2| 0.9000176718903189|
|  mean|cmp_lname_c1| 0.3156278193080383|
|  mean|cmp_lname_c2| 0.3184128315317443|
|  mean|     cmp_sex|  0.955001381078048|
|  mean|      cmp_bd|0.22446526708507172|
|  mean|      cmp_bm|0.48885529849763504|
+------+------------+-------------

In [29]:
from pyspark.sql.functions import first

wideDF = longDF.groupBy("field") \
    .pivot("metric", ["count", "mean", "stddev", "min", "max"]) \
    .agg(first("value"))

wideDF.select("field", "count", "mean").show()

+------------+---------+-------------------+
|       field|    count|               mean|
+------------+---------+-------------------+
|        id_2|5749132.0|  66587.43558331935|
|     cmp_plz|5736289.0|0.00552866147434343|
|cmp_lname_c1|5749132.0| 0.3156278193080383|
|cmp_lname_c2|   2464.0| 0.3184128315317443|
|     cmp_sex|5749132.0|  0.955001381078048|
|      cmp_bm|5748337.0|0.48885529849763504|
|cmp_fname_c2| 103698.0| 0.9000176718903189|
|cmp_fname_c1|5748125.0| 0.7129024704437266|
|        id_1|5749132.0|  33324.48559643438|
|      cmp_bd|5748337.0|0.22446526708507172|
|      cmp_by|5748337.0| 0.2227485966810923|
+------------+---------+-------------------+



In [30]:
def pivotSummary(desc): # desc: DataFrame
    schema = desc.schema
    lf = desc.rdd \
        .flatMap(lambda row: [(row[0], schema[i].name, float(row[i])) for i in range(1,len(row))]) \
        .toDF(["metric", "field", "value"])
    
    lf = lf.groupBy("field") \
        .pivot("metric", ["count", "mean", "stddev", "min", "max"]) \
        .agg(first("value"))
    
    return lf

In [31]:
matchSummaryT = pivotSummary(matchSummary)
missSummaryT = pivotSummary(missSummary)

matchSummaryT.show()

+------------+-------+------------------+--------------------+---+-------+
|       field|  count|              mean|              stddev|min|    max|
+------------+-------+------------------+--------------------+---+-------+
|        id_2|20931.0| 51259.95939037791|   24345.73345377519|6.0|99996.0|
|     cmp_plz|20902.0|0.9584250310975027| 0.19962063345931919|0.0|    1.0|
|cmp_lname_c1|20931.0|0.9970152595958817|0.043118807533945126|0.0|    1.0|
|cmp_lname_c2|  475.0| 0.969370167843852| 0.15345280740388917|0.0|    1.0|
|     cmp_sex|20931.0| 0.987291577086618| 0.11201570591216435|0.0|    1.0|
|      cmp_bm|20925.0|0.9979450418160095|0.045286127452170664|0.0|    1.0|
|cmp_fname_c2| 1333.0|0.9898900320318176| 0.08251973727615237|0.0|    1.0|
|cmp_fname_c1|20922.0|0.9973163859635038| 0.03650667584833679|0.0|    1.0|
|        id_1|20931.0| 34575.72117911232|   21950.31285196913|5.0|99946.0|
|      cmp_bd|20925.0|0.9970848267622461| 0.05391487659807981|0.0|    1.0|
|      cmp_by|20925.0|0.9

### Joining DataFrames and Selecting Features

In [32]:
matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")
spark.sql("""
SELECT a.field, a.count + b.count total, a.mean - b.mean delta
FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
WHERE a.field NOT IN ("id_1", "id_2")
ORDER BY delta DESC, total DESC
""").show()

+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926266|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482594513|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057459947|
|cmp_fname_c2| 103698.0| 0.09104268062280174|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



### Preparing Models for Production Environments

In [33]:
# python do not need case class
# matchData = parsed.as[MatchData]
matchData = parsed

In [34]:
# pyspark에서 Option[T] 타입을 지원하지 않으므로 직접 구현
def getOrElse(val, el):
    try:
        return float(val)
    except:
        return el

In [35]:
def scoreMatchData(md): # md: MatchData
    return  getOrElse(md.cmp_lname_c1, 0.0) + getOrElse(md.cmp_plz, 0.0) \
            + getOrElse(md.cmp_by, 0.0) + getOrElse(md.cmp_bd, 0.0) + getOrElse(md.cmp_bm, 0.0)

In [36]:
scored = matchData.rdd.map(lambda md: (scoreMatchData(md), md.is_match)) \
                        .toDF(["score", "is_match"])

### Model Evaluation

In [37]:
def crossTabs(scored, t): # scored: DataFrame, t: Double
    return scored.selectExpr("score >= "+str(t)+" as above", "is_match") \
                .groupBy("above") \
                .pivot("is_match", ["true", "false"]) \
                .count()

crossTabs(scored, 4.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+



In [38]:
crossTabs(scored, 2.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| null|5131787|
+-----+-----+-------+

