In [1]:
import pandas as pd
import pyspark

In [2]:
sc = pyspark.SparkContext(appName="test")

In [3]:
dept = sc.parallelize([(100, '성서'), (101, '사복'), (102, '영보'), (103, '컴소'), (104, '간호')])
student = sc.parallelize([(1, '홍길동', 103), (2, '이순신', 103), (3, '김철수', 101), (4, '김다미', 104), (5, '이유리', None)])

In [4]:
student.keys().collect()

[1, 2, 3, 4, 5]

In [5]:
dept.keys().collect()

[100, 101, 102, 103, 104]

In [6]:
student.keyBy(lambda x: x[2]).join(dept).collect()

[(101, ((3, '김철수', 101), '사복')),
 (103, ((1, '홍길동', 103), '컴소')),
 (103, ((2, '이순신', 103), '컴소')),
 (104, ((4, '김다미', 104), '간호'))]

In [7]:
student.keyBy(lambda x: x[2]).leftOuterJoin(dept).collect()

[(None, ((5, '이유리', None), None)),
 (101, ((3, '김철수', 101), '사복')),
 (103, ((1, '홍길동', 103), '컴소')),
 (103, ((2, '이순신', 103), '컴소')),
 (104, ((4, '김다미', 104), '간호'))]

In [8]:
student.keyBy(lambda x: x[2]).rightOuterJoin(dept).collect()

[(100, (None, '성서')),
 (101, ((3, '김철수', 101), '사복')),
 (102, (None, '영보')),
 (103, ((1, '홍길동', 103), '컴소')),
 (103, ((2, '이순신', 103), '컴소')),
 (104, ((4, '김다미', 104), '간호'))]

In [9]:
student.keyBy(lambda x: x[2]).leftOuterJoin(dept).filter(lambda x: x[0] is None).collect()

[(None, ((5, '이유리', None), None))]

In [11]:
student.keyBy(lambda x: x[2]).leftOuterJoin(dept).filter(lambda x: x[0] is None).map(lambda x: x[1][0][1]).collect()

['이유리']

In [12]:
student.keyBy(lambda x: x[2]).fullOuterJoin(dept).collect()

[(None, ((5, '이유리', None), None)),
 (100, (None, '성서')),
 (101, ((3, '김철수', 101), '사복')),
 (102, (None, '영보')),
 (103, ((1, '홍길동', 103), '컴소')),
 (103, ((2, '이순신', 103), '컴소')),
 (104, ((4, '김다미', 104), '간호'))]

In [14]:
student.keyBy(lambda x: x[2]).fullOuterJoin(dept).filter(lambda x: x[0] is None or x[1][0] is None).collect()

[(None, ((5, '이유리', None), None)), (100, (None, '성서')), (102, (None, '영보'))]

In [15]:
student.keyBy(lambda x: x[2]).cogroup(dept).collect()

[(None,
  (<pyspark.resultiterable.ResultIterable at 0x154a4238448>,
   <pyspark.resultiterable.ResultIterable at 0x154a4238648>)),
 (100,
  (<pyspark.resultiterable.ResultIterable at 0x154a4238248>,
   <pyspark.resultiterable.ResultIterable at 0x154a4238708>)),
 (101,
  (<pyspark.resultiterable.ResultIterable at 0x154a4238188>,
   <pyspark.resultiterable.ResultIterable at 0x154a41a0988>)),
 (102,
  (<pyspark.resultiterable.ResultIterable at 0x154a42387c8>,
   <pyspark.resultiterable.ResultIterable at 0x154a42385c8>)),
 (103,
  (<pyspark.resultiterable.ResultIterable at 0x154a41a0808>,
   <pyspark.resultiterable.ResultIterable at 0x154a41a0fc8>)),
 (104,
  (<pyspark.resultiterable.ResultIterable at 0x154a41a0f08>,
   <pyspark.resultiterable.ResultIterable at 0x154a41a0e48>))]

In [16]:
student.keyBy(lambda x: x[2]).cogroup(dept).mapValues(lambda x: [dept_name for sublist in x for dept_name in sublist ]).collect()

[(None, [(5, '이유리', None)]),
 (100, ['성서']),
 (101, [(3, '김철수', 101), '사복']),
 (102, ['영보']),
 (103, [(1, '홍길동', 103), (2, '이순신', 103), '컴소']),
 (104, [(4, '김다미', 104), '간호'])]

In [17]:
student.keyBy(lambda x: x[2]).cartesian(dept).collect()

[((103, (1, '홍길동', 103)), (100, '성서')),
 ((103, (1, '홍길동', 103)), (101, '사복')),
 ((103, (1, '홍길동', 103)), (102, '영보')),
 ((103, (1, '홍길동', 103)), (103, '컴소')),
 ((103, (1, '홍길동', 103)), (104, '간호')),
 ((103, (2, '이순신', 103)), (100, '성서')),
 ((103, (2, '이순신', 103)), (101, '사복')),
 ((103, (2, '이순신', 103)), (102, '영보')),
 ((103, (2, '이순신', 103)), (103, '컴소')),
 ((103, (2, '이순신', 103)), (104, '간호')),
 ((101, (3, '김철수', 101)), (100, '성서')),
 ((101, (3, '김철수', 101)), (101, '사복')),
 ((101, (3, '김철수', 101)), (102, '영보')),
 ((101, (3, '김철수', 101)), (103, '컴소')),
 ((101, (3, '김철수', 101)), (104, '간호')),
 ((104, (4, '김다미', 104)), (100, '성서')),
 ((104, (4, '김다미', 104)), (101, '사복')),
 ((104, (4, '김다미', 104)), (102, '영보')),
 ((104, (4, '김다미', 104)), (103, '컴소')),
 ((104, (4, '김다미', 104)), (104, '간호')),
 ((None, (5, '이유리', None)), (100, '성서')),
 ((None, (5, '이유리', None)), (101, '사복')),
 ((None, (5, '이유리', None)), (102, '영보')),
 ((None, (5, '이유리', None)), (103, '컴소')),
 ((None, (5, '이유리', None)), (104

In [18]:
A = sc.parallelize([1, 2, 3, 4, 5])
B = sc.parallelize([2, 3, 4, 5, 6])

In [19]:
A.union(B).collect()

[1, 2, 3, 4, 5, 2, 3, 4, 5, 6]

In [20]:
A.union(B).distinct().collect()

[1, 2, 3, 4, 5, 6]

In [21]:
A.intersection(B).collect()

[2, 3, 4, 5]

In [22]:
A.subtract(B).collect()

[1]

In [23]:
B.subtract(A).collect()

[6]

In [24]:
C = sc.parallelize([('A', 100), ('B', 120), ('C', 100)])
D = sc.parallelize([('A', 100), ('A', 120), ('D', 100)])

In [25]:
C.subtract(D).collect()

[('C', 100), ('B', 120)]

In [26]:
D.subtract(C).collect()

[('A', 120), ('D', 100)]

In [27]:
D.subtractByKey(C).collect()

[('D', 100)]

In [28]:
number = sc.parallelize([1, 2, 3, 4, 5])

In [29]:
number.min()

1

In [30]:
number.max()

5

In [31]:
number.mean()

3.0

In [32]:
number.sum()

15

In [33]:
number.stdev()

1.4142135623730951

In [34]:
number.variance()

2.0

In [35]:
number.stats()

(count: 5, mean: 3.0, stdev: 1.4142135623730951, max: 5.0, min: 1.0)

In [36]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

In [37]:
v1 = np.array([0.1, 0.0, 0.2, 0.3])

In [38]:
type(v1)

numpy.ndarray

In [39]:
v2 = Vectors.dense([0.1, 0.2, 0.3])

In [40]:
v2

DenseVector([0.1, 0.2, 0.3])

In [41]:
v3 = [0.1, 0.0, 0.2, 0.3]

In [42]:
v4 = Vectors.sparse(4, [(0, 0.1), (1, 0.2)])

In [43]:
v4

SparseVector(4, {0: 0.1, 1: 0.2})

In [45]:
v5 = Vectors.sparse(4, [0, 2], [0.1, 0.2])

In [46]:
v5

SparseVector(4, {0: 0.1, 2: 0.2})

In [47]:
v4.toArray()

array([0.1, 0.2, 0. , 0. ])

In [48]:
v5.toArray()

array([0.1, 0. , 0.2, 0. ])

In [53]:
v6 = sps.csc_matrix((np.array([0.1, 0.2, 0.3]), np.array([0, 2, 2]), np.array([0, 3])), shape=(4,1))

In [54]:
print(v6)

(0, 0)	0.1
  (2, 0)	0.2
  (2, 1)	0.3


In [55]:
print(v6.toarray())

[[0.1 0. ]
 [0.  0. ]
 [0.2 0.3]
 [0.  0. ]]
