/
rdd_utils.py
121 lines (99 loc) · 4.05 KB
/
rdd_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from __future__ import absolute_import
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from ..mllib.adapter import to_vector, from_vector
try:
from elephas.java import java_classes
from elephas.java.ndarray import ndarray
except Exception:
print("WARNING")
from six.moves import zip
def to_java_rdd(jsc, features, labels, batch_size):
"""Convert numpy features and labels into a JavaRDD of
DL4J DataSet type.
:param jsc: JavaSparkContext from pyjnius
:param features: numpy array with features
:param labels: numpy array with labels:
:return: JavaRDD<DataSet>
"""
data_sets = java_classes.ArrayList()
num_batches = int(len(features) / batch_size)
for i in range(num_batches):
xi = ndarray(features[:batch_size].copy())
yi = ndarray(labels[:batch_size].copy())
data_set = java_classes.DataSet(xi.array, yi.array)
data_sets.add(data_set)
features = features[batch_size:]
labels = labels[batch_size:]
return jsc.parallelize(data_sets)
def to_simple_rdd(sc, features, labels):
"""Convert numpy arrays of features and labels into
an RDD of pairs.
:param sc: Spark context
:param features: numpy array with features
:param labels: numpy array with labels
:return: Spark RDD with feature-label pairs
"""
pairs = [(x, y) for x, y in zip(features, labels)]
return sc.parallelize(pairs)
def to_labeled_point(sc, features, labels, categorical=False):
"""Convert numpy arrays of features and labels into
a LabeledPoint RDD for MLlib and ML integration.
:param sc: Spark context
:param features: numpy array with features
:param labels: numpy array with labels
:param categorical: boolean, whether labels are already one-hot encoded or not
:return: LabeledPoint RDD with features and labels
"""
labeled_points = []
for x, y in zip(features, labels):
if categorical:
lp = LabeledPoint(np.argmax(y), to_vector(x))
else:
lp = LabeledPoint(y, to_vector(x))
labeled_points.append(lp)
return sc.parallelize(labeled_points)
def from_labeled_point(rdd, categorical=False, nb_classes=None):
"""Convert a LabeledPoint RDD back to a pair of numpy arrays
:param rdd: LabeledPoint RDD
:param categorical: boolean, if labels should be one-hot encode when returned
:param nb_classes: optional int, indicating the number of class labels
:return: pair of numpy arrays, features and labels
"""
features = np.asarray(
rdd.map(lambda lp: from_vector(lp.features)).collect())
labels = np.asarray(rdd.map(lambda lp: lp.label).collect(), dtype='int32')
if categorical:
if not nb_classes:
nb_classes = np.max(labels) + 1
temp = np.zeros((len(labels), nb_classes))
for i, label in enumerate(labels):
temp[i, label] = 1.
labels = temp
return features, labels
def encode_label(label, nb_classes):
"""One-hot encoding of a single label
:param label: class label (int or double without floating point digits)
:param nb_classes: int, number of total classes
:return: one-hot encoded vector
"""
encoded = np.zeros(nb_classes)
encoded[int(label)] = 1.
return encoded
def lp_to_simple_rdd(lp_rdd, categorical=False, nb_classes=None):
"""Convert a LabeledPoint RDD into an RDD of feature-label pairs
:param lp_rdd: LabeledPoint RDD of features and labels
:param categorical: boolean, if labels should be one-hot encode when returned
:param nb_classes: int, number of total classes
:return: Spark RDD with feature-label pairs
"""
if categorical:
if not nb_classes:
labels = np.asarray(lp_rdd.map(
lambda lp: lp.label).collect(), dtype='int32')
nb_classes = np.max(labels) + 1
rdd = lp_rdd.map(lambda lp: (from_vector(lp.features),
encode_label(lp.label, nb_classes)))
else:
rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), lp.label))
return rdd