In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()

import pyspark

# SparkContext
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

# SparkSession
from pyspark.sql import SparkSession

#spark = SparkSession.builder.getOrCreate() 
spark = SparkSession.builder.appName("My App").getOrCreate()
spark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [66.2 kB]
Hit:10 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/

In [3]:
# google driveに配置しているファイルにアクセス
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Chapter 3 - Learning PySpark
## Resilient Distributed Datasets

### Creating RDDs

PySparkでRDDを作るには2つの方法があります。リストを並列化する方法

In [4]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), 
     ('Amber', 9)])

またはリポジトリ（ファイルやデータベース）からの読み込み

In [5]:
data_from_file = sc.\
    textFile(
        '/content/drive/MyDrive/Colab Notebooks/data/VS14MORT.txt.gz', # ファイルパス指定
        4)

上記のコードを実行するには、データが保存されているパスを変更する必要があることに注意してください。データセットは、 http://tomdrabas.com/data/VS14MORT.txt.gz からダウンロードできます。

#### Schema

RDDはスキーマレスな構造をとる



In [6]:
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()
data_heterogenous

[('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]

.collect()でドライバへデータを戻すアクションを実行  
あとはpythonでのやり方でオブジェクト内のデータにアクセスできる

In [7]:
data_heterogenous[1]['Porsche']

100000

#### Reading from files

テキストファイルから読み取ると、ファイルの各行がRDDの要素となります。

In [8]:
data_from_file.take(1)

['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

#### User defined functions

ラムダ式を使用する代わりに、より長いメソッドを作成してデータを変換することができます。  
先ほど読めなかった行をパース（構文解析）して利用できるものに変換

In [9]:
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]

    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''

    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs
#     return record_split.split(row)

ここでは、`lambda`を使う代わりに、`extractInformation(...)`メソッドを使って、データセットの分割と変換を行います。

In [10]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

In [11]:
data_from_file_conv.take(3)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40'),
 array(['1', '  ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08',
        '  ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250',
        '214', '062', '   ', '21', '03', '11I250 ', '61I272 ', '62E669 ',
        '       ', '       ', '       ', '       ', '       ', '     

### Transformations

#### .map(...)

このメソッドは、RDDの各要素に適用されます。`data_from_file_conv`データセットの場合、これは各行の変換と考えることができます。

In [12]:
data_2014 = data_from_file_conv.map(lambda row: int(row[16])) # 16列目のデータを行毎に適用
data_2014.take(10)

[2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

より多くの列を組み合わせることができます。

In [13]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)

[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]

#### .filter(...)

`.filter(...)`メソッドは、指定された条件に合うデータセットの要素を選択することができます。

In [14]:
data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
data_filtered.count()

6

#### .flatMap(...)

`.flatMap(...)`メソッドは、`.map(...)`と同様に動作しますが、リストではなくフラット化された結果を返します。

In [15]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)

['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

#### .distinct()

このメソッドは、指定された列の異なる値のリストを返します。

In [16]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
distinct_gender

['M', 'F', '-99']

#### .sample(...)

`.sample()`メソッドは、データセットから無作為に抽出したサンプルを返します。

In [17]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)

data_sample.take(1)

[array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
        '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
        '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '02',
        'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',
        ' ', '2', '4', '100', '8'], dtype='<U40')]

本当に全体の10％の記録が取れたのか確認してみましょう。

In [18]:
print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

Original dataset: 2631171, sample: 263247


#### .leftOuterJoin(...)

left outer join は、SQLと同じように、2つのRDDを両方のデータセットにある値に基づいて結合し、左のRDDからのレコードを返し、2つのRDDが一致するところに右のRDDからのレコードを追加します。

In [19]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.take(2)

[('b', (4, '6')), ('c', (10, None))]

もし代わりに.`join(...)`メソッドを使っていたら、`'a'`と`'b'`の2つの値がこれらの2つのRDDの間で交差しているので、`'a'`と`'b'`の値だけを得ることができたでしょう。

In [20]:
rdd4 = rdd1.join(rdd2)
rdd4.collect()

[('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

もうひとつの便利なメソッドは、両方のRDDで等しいレコードを返す.intersection(...)です

In [21]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

[('a', 1)]

#### .repartition(...)

データセットの再分割は、データセットを分割するパーティションの数を変えます。

In [22]:
rdd1 = rdd1.repartition(4)

len(rdd1.glom().collect())

4

### Actions

#### .take(...)

このメソッドは、1つのデータパーティションから `n` 個の先頭行を返します。

In [23]:
data_first = data_from_file_conv.take(1)
data_first

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

ある程度ランダムなデータが必要な場合は、代わりに`.takeSample(...)` を使うことができます。

In [24]:
data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
data_take_sampled

[array(['2', '17', ' ', '0', '08', 'M', '1', '069', ' ', '39', '19', '09',
        '  ', '1', 'M', '7', '2014', 'U', '7', 'U', 'N', ' ', ' ', 'I251',
        '215', '063', '   ', '21', '06', '11I500 ', '21I251 ', '61I499 ',
        '62I10  ', '63N189 ', '64K761 ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '05',
        'I251 ', 'I120 ', 'I499 ', 'I500 ', 'K761 ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

#### .reduce(...)

データを処理するもう一つのアクションである.reduce(...)メソッドは、指定されたメソッドを使ってRDDの要素にreduceの処理(RDD要素の合計値を求めるなど)をかける。

In [25]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

15

データの分割方法によっては間違った結果が得られることがあります。

In [26]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

現在の結果を後続の結果で割るように`.reduce()`を適用すると、10という値が期待できます。

In [27]:
works = data_reduce.reduce(lambda x, y: x / y)
works

10.0

しかし、仮にデータを3つのパーティションに分割した場合、その結果は間違ったものになります。

In [28]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

0.004

`.reduceByKey(...)`メソッドは、`.reduce(...)`メソッドと同じように動作しますが、キーごとにreduceを適用する。

In [29]:
data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_key.reduceByKey(lambda x, y: x + y).collect()

[('b', 4), ('c', 2), ('a', 12), ('d', 5)]

#### .count()

`.count()`メソッドは、RDDの要素数をカウントします。

In [30]:
data_reduce.count()

6

下記の方法と同じ効果がありますが、データをドライバーに移す必要はありません。

In [31]:
len(data_reduce.collect()) # WRONG -- DON'T DO THIS!

6

データセットが*key-value*の形をしている場合、`.countByKey()`メソッドを使って、異なるキーの数を得ることができます。

In [32]:
data_key.countByKey().items()

dict_items([('a', 2), ('b', 2), ('c', 1), ('d', 2)])

#### .saveAsTextFile(...)

その名の通り、.saveAsTextFile()はRDDをテキストファイルに保存します：各パーティションを別々のファイルに。

In [33]:
'''
data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')
'''

"\ndata_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')\n"

それを読み返すには、先ほどと同じように、すべての行を文字列として扱うようにパースする必要があります。

In [34]:
'''def parseInput(row):
    import re
    
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc \
    .textFile('/Users/drabast/Documents/PySpark_Data/data_key.txt') \
    .map(parseInput)
data_key_reread.collect()'''

"def parseInput(row):\n    import re\n    \n    pattern = re.compile(r'\\('([a-z])', ([0-9])\\)')\n    row_split = pattern.split(row)\n    \n    return (row_split[1], int(row_split[2]))\n    \ndata_key_reread = sc     .textFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')     .map(parseInput)\ndata_key_reread.collect()"

**.foreach(...)**

RDDの各要素に同じ関数を反復的に適用する方法。

In [35]:
def f(x): 
    print(x)

data_key.foreach(f)