<div style="font-size:18pt; padding-top:20px; text-align:center"><b>Operations on <span style="font-weight:bold; color:green">Spark RDD</span> using Python</b></div><hr>
<div style="text-align:right;">Sergei Yu. Papulin <span style="font-style: italic;font-weight: bold;">(papulin_bmstu@mail.ru)</span></div>

<a name="0"></a>
<div><span style="font-size:14pt; font-weight:bold">Content</span>
    <ol>
        <li><a href="#1">Distributed dataset</a>
            <ol style = "list-style-type:lower-alpha">
                <li><a href="#1a">Transformations</a></li>
                <li><a href="#1b">Actions</a></li>
            </ol>
        </li>
        <li><a href="#2">Distributed dataset of (K, V) pairs</a>
            <ol style = "list-style-type:lower-alpha">
                <li><a href="#2a">Transformations</a></li>
                <li><a href="#2b">Actions</a></li>
            </ol>
        </li>
        <li><a href="#3">Examples</a></li>
        <li><a href="#4">References</a></li>
    </ol>
</div>

In [1]:
import pyspark

<p>Run Spark Context</p>

In [2]:
# import pyspark

conf = pyspark.SparkConf() \
        .setAppName("basicOperationsRDDApp") \
        .setMaster("spark://spark-master:7077")

sc = pyspark.SparkContext.getOrCreate(conf = conf)

<a name="1"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">1. Distributed dataset</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To Content</a></div>
    </div>
</div>

<a name="1a"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            a. Transformations
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#1">Back</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#1b">Next</a>
            </div>
        </div>
    </div>
</div>

<p>Create <b><i>RDD</i></b> from an initial list of numbers и print it using the <b><i>collect</i></b> action</p>

In [3]:
# Initial list
data = [1, 2, 3, 4, 5]

In [4]:
# Create RDD - distributed data
data_rdd = sc.parallelize(data)
data_rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [5]:
# Collect all RDD data on the Spark driver
data_rdd.collect()

[1, 2, 3, 4, 5]

<p>Create <b><i>RDD</i></b> from a text file and print it using <b><i>take</i></b> action</p>

In [6]:
# Path to a file in HDFS
file_path = "hdfs://namenode:9000/samples_100.json"

In [7]:
# Create RDD
data_rdd = sc.textFile(file_path)
data_rdd

hdfs://namenode:9000/samples_100.json MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

In [8]:
# Take 2 records from a RDD to the Spark driver
data_rdd.take(2)

['{"reviewerID": "AO94DHGC771SJ", "asin": "0528881469", "reviewerName": "amazdnu", "helpful": [0, 0], "reviewText": "We got this GPS for my husband who is an (OTR) over the road trucker.  Very Impressed with the shipping time, it arrived a few days earlier than expected...  within a week of use however it started freezing up... could of just been a glitch in that unit.  Worked great when it worked!  Will work great for the normal person as well but does have the \\"trucker\\" option. (the big truck routes - tells you when a scale is coming up ect...)  Love the bigger screen, the ease of use, the ease of putting addresses into memory.  Nothing really bad to say about the unit with the exception of it freezing which is probably one in a million and that\'s just my luck.  I contacted the seller and within minutes of my email I received a email back with instructions for an exchange! VERY impressed all the way around!", "overall": 5.0, "summary": "Gotta have GPS!", "unixReviewTime": 137013

<p><b><i>Map</i></b></p>

In [9]:
data = [1, 2, 3, 4, 5]
data_rdd = sc.parallelize(data)

In [10]:
# Increment a value by 1
data_map_rdd = data_rdd.map(lambda x: x + 1)

In [11]:
# Collect data on the Spark driver
data_map_rdd.collect()

[2, 3, 4, 5, 6]

<p><b><i>flatMap</i></b></p>

In [12]:
# Create RDD
data_rdd = sc.textFile(file_path)

In [13]:
# Take 2 records from a RDD to the Spark driver
data_rdd.take(2)

['{"reviewerID": "AO94DHGC771SJ", "asin": "0528881469", "reviewerName": "amazdnu", "helpful": [0, 0], "reviewText": "We got this GPS for my husband who is an (OTR) over the road trucker.  Very Impressed with the shipping time, it arrived a few days earlier than expected...  within a week of use however it started freezing up... could of just been a glitch in that unit.  Worked great when it worked!  Will work great for the normal person as well but does have the \\"trucker\\" option. (the big truck routes - tells you when a scale is coming up ect...)  Love the bigger screen, the ease of use, the ease of putting addresses into memory.  Nothing really bad to say about the unit with the exception of it freezing which is probably one in a million and that\'s just my luck.  I contacted the seller and within minutes of my email I received a email back with instructions for an exchange! VERY impressed all the way around!", "overall": 5.0, "summary": "Gotta have GPS!", "unixReviewTime": 137013

In [14]:
data_map_rdd = data_rdd.map(lambda x: x.split(" "))
data_map_rdd.take(2)

[['{"reviewerID":',
  '"AO94DHGC771SJ",',
  '"asin":',
  '"0528881469",',
  '"reviewerName":',
  '"amazdnu",',
  '"helpful":',
  '[0,',
  '0],',
  '"reviewText":',
  '"We',
  'got',
  'this',
  'GPS',
  'for',
  'my',
  'husband',
  'who',
  'is',
  'an',
  '(OTR)',
  'over',
  'the',
  'road',
  'trucker.',
  '',
  'Very',
  'Impressed',
  'with',
  'the',
  'shipping',
  'time,',
  'it',
  'arrived',
  'a',
  'few',
  'days',
  'earlier',
  'than',
  'expected...',
  '',
  'within',
  'a',
  'week',
  'of',
  'use',
  'however',
  'it',
  'started',
  'freezing',
  'up...',
  'could',
  'of',
  'just',
  'been',
  'a',
  'glitch',
  'in',
  'that',
  'unit.',
  '',
  'Worked',
  'great',
  'when',
  'it',
  'worked!',
  '',
  'Will',
  'work',
  'great',
  'for',
  'the',
  'normal',
  'person',
  'as',
  'well',
  'but',
  'does',
  'have',
  'the',
  '\\"trucker\\"',
  'option.',
  '(the',
  'big',
  'truck',
  'routes',
  '-',
  'tells',
  'you',
  'when',
  'a',
  'scale',
  'is'

In [15]:
data_flatmap_rdd = data_rdd.flatMap(lambda x: x.split(" "))
data_flatmap_rdd.take(2)

['{"reviewerID":', '"AO94DHGC771SJ",']

<p><b>filter</b></p>

In [16]:
data = [1, 2, 3, 4, 5, 6, 7]
data_rdd = sc.parallelize(data)
data_filter_rdd = data_rdd.filter(lambda x: x % 2 == 0)

data_filter_rdd.collect()

[2, 4, 6]

<p><b>sortBy</b></p>

In [17]:
data = ["f", "a", "h", "b", "c"]

data_rdd = sc.parallelize(data)

data_sortby_rdd = data_rdd.sortBy(lambda x: x, ascending=False, numPartitions=3)
data_sortby_rdd.collect()

['h', 'f', 'c', 'b', 'a']

<p><b>sample</b></p>

In [18]:
# Sample without replacement
data = [1, 2, 3, 4, 5, 6, 7]
data_rdd = sc.parallelize(data)
data_sample_rdd = data_rdd.sample(withReplacement=False, fraction=0.8) #seed=

data_sample_rdd.collect()

[1, 2, 4, 5, 6, 7]

In [19]:
# Sample with replacement
data_sample_repl_rdd = data_rdd.sample(withReplacement=True, fraction=0.8) #seed=

data_sample_repl_rdd.collect()

[2, 3, 6, 7]

<p><b>union</b></p>

In [20]:
data_1 = [1, 2, 3, 4]
data_2 = [3, 4, 5, 6]

data1_rdd = sc.parallelize(data_1)
data2_rdd = sc.parallelize(data_2)

data_union_rdd = data1_rdd.union(data2_rdd)

data_union_rdd.collect()

[1, 2, 3, 4, 3, 4, 5, 6]

<p><b>intersection</b></p>

In [21]:
data_1 = [1, 2, 3, 4]
data_2 = [3, 4, 5, 6]

data1_rdd = sc.parallelize(data_1)
data2_rdd = sc.parallelize(data_2)

data_intersection_rdd = data1_rdd.intersection(data2_rdd)

data_intersection_rdd.collect()

[3, 4]

<p><b>distinct</b></p>

In [22]:
data = [1, 2, 2, 4, 4, 6, 7]
data_rdd = sc.parallelize(data)
data_distinct_rdd = data_rdd.distinct()

data_distinct_rdd.collect()

[4, 1, 2, 6, 7]

<b>mapPartitions</b>

In [23]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

def funct(partition):
    part = list()
    for record in partition:
        part.append(record)
    return [part]

data_rdd = sc.parallelize(data, 3)
data_mappart_rdd = data_rdd.mapPartitions(funct)
data_mappart_rdd.collect()

[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

<b>mapPartitionsWithIndex</b>

In [24]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

def funct(part_id, partition):
    for record in partition:
        yield part_id, record

data_rdd = sc.parallelize(data, 3)
data_mappart_rdd = data_rdd.mapPartitionsWithIndex(funct)
data_mappart_rdd.collect()

[(0, 1), (0, 2), (0, 3), (1, 4), (1, 5), (1, 6), (2, 7), (2, 8), (2, 9)]

<b>cartesian</b>

In [25]:
data1 = [1, 2, 3, 4]
data2 = ["a", "b", "c", "d"]

data1_rdd = sc.parallelize(data1, 2)
data2_rdd = sc.parallelize(data2, 2)

data_cartesian_rdd = data1_rdd.cartesian(data2_rdd)

data_cartesian_rdd.collect()

[(1, 'a'),
 (1, 'b'),
 (2, 'a'),
 (2, 'b'),
 (1, 'c'),
 (1, 'd'),
 (2, 'c'),
 (2, 'd'),
 (3, 'a'),
 (3, 'b'),
 (4, 'a'),
 (4, 'b'),
 (3, 'c'),
 (3, 'd'),
 (4, 'c'),
 (4, 'd')]

<b>glom</b>

In [26]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)
data_rdd.glom().collect()

[[1, 2], [3, 4], [5, 6], [7, 8, 9]]

<b>coalesce</b>

In [27]:
data_coarse_rdd = data_rdd.coalesce(2)
data_coarse_rdd.glom().collect()

[[1, 2, 3, 4], [5, 6, 7, 8, 9]]

<b>repartition</b>

In [28]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 2)
data_rdd.glom().collect()

[[1, 2, 3, 4], [5, 6, 7, 8, 9]]

In [29]:
data_repart_incr_rdd = data_rdd.repartition(4)
data_repart_incr_rdd.glom().collect()

[[], [5, 6, 7, 8, 9], [], [1, 2, 3, 4]]

<a name="1b"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            b. Actions
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#1a">Back</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#2">Next</a>
            </div>
        </div>
    </div>
</div>

<b>reduce</b>

In [30]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)

data_reduce = data_rdd.reduce(lambda x, y: x + y)
data_reduce

45

In [31]:
def summ(x, y):
    return x + y

data_reduce = data_rdd.reduce(summ)
data_reduce

45

<b>fold</b>

In [32]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)

data_fold = data_rdd.fold(0, lambda x, y: x + y)
data_fold

45

In [33]:
data_fold_10 = data_rdd.fold(10, lambda x, y: x + y)
data_fold_10

95

<b>count</b>

In [34]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)

data_count = data_rdd.count()
data_count

9

<p><b>countByValue</b></p>

In [35]:
pers_purchases = ["car", "hotel", "smartphone", "laptop", "car", "laptop", "laptop"]
pers_purchases_rdd = sc.parallelize(pers_purchases, 2)

count_value = pers_purchases_rdd.countByValue()
count_value

defaultdict(int, {'car': 2, 'hotel': 1, 'smartphone': 1, 'laptop': 3})

<b>first</b>

In [36]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)

data_first = data_rdd.first()
data_first

1

<b>take</b>

In [37]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)

data_take = data_rdd.take(5)
data_take

[1, 2, 3, 4, 5]

<b>takeSample</b>

In [38]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

data_rdd = sc.parallelize(data, 4)

data_take_sample = data_rdd.takeSample(withReplacement=False, num=5) #seed=
data_take_sample

[8, 4, 3, 6, 9]

<b>takeOrdered</b>

In [39]:
data = [5, 2, 6, 4, 1, 3, 7, 9, 8]

data_rdd = sc.parallelize(data, 4)
data_take_desc_ordered = data_rdd.takeOrdered(num=4, key=lambda x: -x)
data_take_desc_ordered

[9, 8, 7, 6]

In [40]:
data_take_asc_ordered = data_rdd.takeOrdered(num=4, key=lambda x: x)
data_take_asc_ordered

[1, 2, 3, 4]

<b>aggregate</b>

In [41]:
data = [1, 2, 3, 4]
data_rdd = sc.parallelize(data, 2)

data_agg = data_rdd.aggregate((0, 0),
                              (lambda x, value: (x[0] + value, x[1] + 1)),
                              (lambda x, y: (x[0] + y[0], x[1] + y[1])))
data_agg

(10, 4)

In [42]:
data_agg = data_rdd.aggregate((2, 0),
                              (lambda x, value: (x[0] + value, x[1] + 1)),
                              (lambda x, y: (x[0] + y[0], x[1] + y[1])))
data_agg

(16, 4)

<b>saveAsTextFile</b>

In [43]:
# HDFS
output_file_path = "data/spark_rdd/samples_100_split.json" 

In [44]:
data_file_rdd = sc.textFile(file_path, 2)

data_map_rdd = data_file_rdd.flatMap(lambda x: x.split())

data_map_rdd.saveAsTextFile(output_file_path)

In [45]:
data_file_output_rdd = sc.textFile(output_file_path, 2)
data_file_output_rdd.take(2)

[]

<a name="2"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">2. Distributed dataset of (K, V) pairs</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To Content</a></div>
    </div>
</div>

<a name="2a"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            a. Transformations
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#2">Back</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#2b">Next</a>
            </div>
        </div>
    </div>
</div>

<b>groupByKey</b>

In [46]:
purchases = [(1, "car"), (1, "hotel"), (1, "smartphone"), 
             (1, "laptop"), (2, "TV"), (2, "car"), 
             (3, "laptop"), (3, "laptop"), (3, "hotel")]

purchases_rdd = sc.parallelize(purchases, 2)

groupByKey_rdd = purchases_rdd.groupByKey()
groupByKey_rdd.collect()

[(2, <pyspark.resultiterable.ResultIterable at 0x7f9cd505a4d0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x7f9cd505a790>),
 (3, <pyspark.resultiterable.ResultIterable at 0x7f9cd505a2d0>)]

In [47]:
[(k, list(v)) for k, v in groupByKey_rdd.collect()]

[(2, ['TV', 'car']),
 (1, ['car', 'hotel', 'smartphone', 'laptop']),
 (3, ['laptop', 'laptop', 'hotel'])]

<b>reduceByKey</b>

In [48]:
purchases = [(1, "car"), (1, "hotel"), (1, "smartphone"), 
             (1, "laptop"), (2, "TV"), (2, "car"), 
             (3, "laptop"), (3, "laptop"), (3, "hotel")]

purchases_rdd = sc.parallelize(purchases, 2)

reduce_key_rdd = purchases_rdd.reduceByKey(lambda x, y: x + " " + y)

reduce_key_rdd.collect()

[(2, 'TV car'), (1, 'car hotel smartphone laptop'), (3, 'laptop laptop hotel')]

<b>foldByKey</b>

In [49]:
purchases = [(1, "car"), (1, "hotel"), (1, "smartphone"), 
             (1, "laptop"), (2, "TV"), (2, "car"), 
             (3, "laptop"), (3, "laptop"), (3, "hotel")]

purchases_rdd = sc.parallelize(purchases, 2)

reduce_key_rdd = purchases_rdd.foldByKey("x", lambda x, y: x + " " + y)

reduce_key_rdd.collect()

[(2, 'x TV car'),
 (1, 'x car hotel smartphone laptop'),
 (3, 'x laptop laptop hotel')]

<b>distinct</b>

In [50]:
persons = [(1, "Ivanov"), (2, "Petrov"), (3, "Jamson"), (4, "Black"), (4, "Black")]
persons_rdd = sc.parallelize(persons, 2)

map_rdd = persons_rdd.distinct()
map_rdd.collect()

[(4, 'Black'), (1, 'Ivanov'), (2, 'Petrov'), (3, 'Jamson')]

<b>keys</b>

In [51]:
data = [("f", 2), ("a", 3), ("h", 5), ("b", 6), ("c", 1)]

data_rdd = sc.parallelize(data)

data_keys_rdd = data_rdd.keys()
data_keys_rdd.collect()

['f', 'a', 'h', 'b', 'c']

<b>values</b>

In [52]:
data = [("f", 2), ("a", 3), ("h", 5), ("b", 6), ("c", 1)]

data_rdd = sc.parallelize(data)

data_values_rdd = data_rdd.values()
data_values_rdd.collect()

[2, 3, 5, 6, 1]

<b>mapValues</b>

In [53]:
data = [("f", 2), ("a", 3), ("h", 5), ("b", 6), ("c", 1)]

data_rdd = sc.parallelize(data)
data_mapValue_rdd = data_rdd.mapValues(lambda x: x + 10)
data_mapValue_rdd.collect()

[('f', 12), ('a', 13), ('h', 15), ('b', 16), ('c', 11)]

<b>flatMapValues</b>

In [54]:
data = [("f", [2, 1]), ("a", [3,1]), ("h", [3,4,5]), ("b", [6]), ("c", [1])]

data_rdd = sc.parallelize(data)
data_mapValue_rdd = data_rdd.flatMapValues(lambda x: x)
data_mapValue_rdd.collect()

[('f', 2),
 ('f', 1),
 ('a', 3),
 ('a', 1),
 ('h', 3),
 ('h', 4),
 ('h', 5),
 ('b', 6),
 ('c', 1)]

<b>join</b>

In [55]:
persons = [(1, "Ivanov"), (2, "Petrov"), (3, "Jamson"), (4, "Black")]
purchases = [(1, "car"), (1, "hotel"), (1, "smartphone"), (1, "laptop"), (2, "TV"), 
             (2, "car"), (3, "laptop"), (3, "laptop"), (3, "hotel"), (5, "TV")]

persons_rdd = sc.parallelize(persons, 2)
purchases_rdd = sc.parallelize(purchases, 4)

join_rdd = persons_rdd.join(purchases_rdd, numPartitions=2)
join_rdd.collect()

[(2, ('Petrov', 'TV')),
 (2, ('Petrov', 'car')),
 (1, ('Ivanov', 'car')),
 (1, ('Ivanov', 'hotel')),
 (1, ('Ivanov', 'smartphone')),
 (1, ('Ivanov', 'laptop')),
 (3, ('Jamson', 'laptop')),
 (3, ('Jamson', 'laptop')),
 (3, ('Jamson', 'hotel'))]

In [56]:
join_left_rdd = persons_rdd.leftOuterJoin(purchases_rdd, numPartitions=2)
join_left_rdd.collect()

[(2, ('Petrov', 'TV')),
 (2, ('Petrov', 'car')),
 (4, ('Black', None)),
 (1, ('Ivanov', 'car')),
 (1, ('Ivanov', 'hotel')),
 (1, ('Ivanov', 'smartphone')),
 (1, ('Ivanov', 'laptop')),
 (3, ('Jamson', 'laptop')),
 (3, ('Jamson', 'laptop')),
 (3, ('Jamson', 'hotel'))]

In [57]:
join_right_rdd = persons_rdd.rightOuterJoin(purchases_rdd, numPartitions=2)
join_right_rdd.collect()

[(2, ('Petrov', 'TV')),
 (2, ('Petrov', 'car')),
 (1, ('Ivanov', 'car')),
 (1, ('Ivanov', 'hotel')),
 (1, ('Ivanov', 'smartphone')),
 (1, ('Ivanov', 'laptop')),
 (3, ('Jamson', 'laptop')),
 (3, ('Jamson', 'laptop')),
 (3, ('Jamson', 'hotel')),
 (5, (None, 'TV'))]

<b>cogroup</b>

In [58]:
persons = [(1, "Ivanov"), (2, "Petrov"), (3, "Jamson"), (4, "Black")]
purchases = [(1, "car"), (1, "hotel"), (1, "smartphone"), (1, "laptop"), (2, "TV"), 
             (2, "car"), (3, "laptop"), (3, "laptop"), (3, "hotel"), (5, "TV")]

cogroup_rdd = persons_rdd.cogroup(purchases_rdd, numPartitions=2)

cogroup_rdd.collect()

[(2,
  (<pyspark.resultiterable.ResultIterable at 0x7f9cd4ff56d0>,
   <pyspark.resultiterable.ResultIterable at 0x7f9cd4fb6910>)),
 (4,
  (<pyspark.resultiterable.ResultIterable at 0x7f9cd4ff5390>,
   <pyspark.resultiterable.ResultIterable at 0x7f9cd5060f90>)),
 (1,
  (<pyspark.resultiterable.ResultIterable at 0x7f9cd4ff5090>,
   <pyspark.resultiterable.ResultIterable at 0x7f9cd50607d0>)),
 (3,
  (<pyspark.resultiterable.ResultIterable at 0x7f9cd5053090>,
   <pyspark.resultiterable.ResultIterable at 0x7f9cd50536d0>)),
 (5,
  (<pyspark.resultiterable.ResultIterable at 0x7f9cd4fedf10>,
   <pyspark.resultiterable.ResultIterable at 0x7f9cd4feddd0>))]

In [59]:
[(k, [list(el) for el in v]) for k, v in cogroup_rdd.collect()]

[(2, [['Petrov'], ['TV', 'car']]),
 (4, [['Black'], []]),
 (1, [['Ivanov'], ['car', 'hotel', 'smartphone', 'laptop']]),
 (3, [['Jamson'], ['laptop', 'laptop', 'hotel']]),
 (5, [[], ['TV']])]

<b>partitionBy</b>

In [60]:
purchases_price = [("car", 1), ("hotel", 2), ("smartphone", 2), ("laptop", 3), ("TV", 4), 
                   ("car", 2), ("laptop", 1), ("laptop", 3), ("hotel", 1)]
purchases_price_rdd = sc.parallelize(purchases_price, 2)

purchases_price_rdd.glom().collect()

[[('car', 1), ('hotel', 2), ('smartphone', 2), ('laptop', 3)],
 [('TV', 4), ('car', 2), ('laptop', 1), ('laptop', 3), ('hotel', 1)]]

In [61]:
part_rdd = purchases_price_rdd.partitionBy(2)
part_rdd.glom().collect()

[[('smartphone', 2), ('laptop', 3), ('TV', 4), ('laptop', 1), ('laptop', 3)],
 [('car', 1), ('hotel', 2), ('car', 2), ('hotel', 1)]]

<b>aggregateByKey</b>

In [62]:
pers_purchases = [("car", 1), ("hotel", 2), ("smartphone", 2), 
                  ("laptop", 3), ("TV", 4), ("car", 2), 
                  ("laptop", 1), ("laptop", 3), ("hotel", 1)]

pers_purchases_rdd = sc.parallelize(pers_purchases, 4)

agg_key_rdd = pers_purchases_rdd.aggregateByKey((0, 0), 
                                                (lambda x, value: (x[0] + value, x[1] + 1)), 
                                                (lambda x, y: (x[0] + y[0], x[1] + y[1])))
agg_key_rdd.collect()

[('smartphone', (2, 1)),
 ('laptop', (7, 3)),
 ('car', (3, 2)),
 ('hotel', (3, 2)),
 ('TV', (4, 1))]

<b>combineByKey</b>

In [63]:
purchases_price = [("car", 1.0), ("hotel", 2.0), ("smartphone", 2.0), 
                   ("laptop", 3.0), ("TV", 4.0), ("car", 2.0), 
                   ("laptop", 1.0), ("laptop", 3.0), ("hotel", 1.0)]

purchases_price_rdd = sc.parallelize(purchases_price, 4).persist()

purchases_price_rdd.glom().collect()

[[('car', 1.0), ('hotel', 2.0)],
 [('smartphone', 2.0), ('laptop', 3.0)],
 [('TV', 4.0), ('car', 2.0)],
 [('laptop', 1.0), ('laptop', 3.0), ('hotel', 1.0)]]

In [64]:
combine_key_rdd = purchases_price_rdd.combineByKey((lambda value: (value, 1)), 
                                                  (lambda x, value: (x[0] + value, x[1] + 1)), 
                                                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))
combine_key_rdd.collect()

[('smartphone', (2.0, 1)),
 ('laptop', (7.0, 3)),
 ('car', (3.0, 2)),
 ('hotel', (3.0, 2)),
 ('TV', (4.0, 1))]

In [65]:
combine_key_rdd = purchases_price_rdd.combineByKey((lambda value: (value, 2)), 
                                                  (lambda x, value: (x[0] + value, x[1] + 1)), 
                                                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))
combine_key_rdd.collect()

[('smartphone', (2.0, 2)),
 ('laptop', (7.0, 5)),
 ('car', (3.0, 4)),
 ('hotel', (3.0, 4)),
 ('TV', (4.0, 2))]

<b>sortByKey</b>

In [66]:
data = [("f", 2), ("a", 3), ("h", 5), ("b", 6), ("c", 1)]

data_rdd = sc.parallelize(data)

data_sortbykey_rdd = data_rdd.sortByKey(ascending=True, numPartitions=3)
data_sortbykey_rdd.collect()

[('a', 3), ('b', 6), ('c', 1), ('f', 2), ('h', 5)]

<b>sortBy</b>

In [67]:
data = [("f", 2), ("a", 3), ("h", 5), ("b", 6), ("c", 1)]

data_rdd = sc.parallelize(data)

data_sortby_rdd = data_rdd.sortBy(lambda x: x[0], ascending=True, numPartitions=3)
data_sortby_rdd.collect()

[('a', 3), ('b', 6), ('c', 1), ('f', 2), ('h', 5)]

In [68]:
data_sortby_rdd = data_rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3)
data_sortby_rdd.collect()

[('c', 1), ('f', 2), ('a', 3), ('h', 5), ('b', 6)]

<a name="2b"></a>
<div style="display:table; width:100%">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-style:italic; font-weight:bold; font-size:12pt">
            b. Actions
        </div>
        <div style="display:table-cell; border:1px solid lightgrey; width:20%">
            <div style="display:table-cell; width:10%; text-align:center; background-color:whitesmoke;">
                <a href="#2a">Back</a>
            </div>
            <div style="display:table-cell; width:10%; text-align:center;">
                <a href="#3">Next</a>
            </div>
        </div>
    </div>
</div>

<b>countByKey</b>

In [69]:
purchases = [(1, "car"), (1, "hotel"), (1, "smartphone"), 
             (1, "laptop"), (2, "TV"), (2, "car"), 
             (3, "laptop"), (3, "laptop"), (3, "hotel")]

purchases_rdd = sc.parallelize(purchases, 2)

count_key = purchases_rdd.countByKey()

count_key

defaultdict(int, {1: 4, 2: 2, 3: 3})

<b>takeOrdered</b>

In [70]:
data = [("f", 2), ("a", 3), ("h", 5), ("b", 6), ("c", 1)]

data_rdd = sc.parallelize(data)
take_ordered = data_rdd.takeOrdered(5, key = lambda x: -x[1])
take_ordered

[('b', 6), ('h', 5), ('a', 3), ('f', 2), ('c', 1)]

<a name="3"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">3. Examples</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To content</a></div>
    </div>
</div>

<p><b>WordCount</b> example</p>

In [71]:
data_map_pair_rdd = data_map_rdd.map(lambda x: (x, 1))
data_map_pair_rdd.take(5)

[('{"reviewerID":', 1),
 ('"AO94DHGC771SJ",', 1),
 ('"asin":', 1),
 ('"0528881469",', 1),
 ('"reviewerName":', 1)]

In [72]:
data_map_pair_reduce_rdd = data_map_pair_rdd.reduceByKey(lambda x1, x2: x1+x2)
data_map_pair_reduce_rdd.take(10)

[('{"reviewerID":', 100),
 ('"0528881469",', 5),
 ('"reviewerName":', 99),
 ('[0,', 79),
 ('0],', 72),
 ('"reviewText":', 100),
 ('got', 8),
 ('this', 95),
 ('husband', 2),
 ('is', 132)]

In [73]:
data_file_rdd = sc.textFile(file_path, 2)

wcount_rdd = data_file_rdd \
    .flatMap(lambda x: x.split()) \
    .map(lambda x: (x, 1)) \
    .aggregateByKey(0,
                    (lambda x, value: x + value), 
                    (lambda x, y: x + y))

wcount_rdd.take(5)

[('{"reviewerID":', 100),
 ('"0528881469",', 5),
 ('"reviewerName":', 99),
 ('[0,', 79),
 ('0],', 72)]

In [74]:
data_file_rdd = sc.textFile(file_path, 2)

wcount_rdd = data_file_rdd \
    .flatMap(lambda x: x.split()) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x1, x2: x1 + x2)
    
wcount_rdd.take(10)

[('{"reviewerID":', 100),
 ('"0528881469",', 5),
 ('"reviewerName":', 99),
 ('[0,', 79),
 ('0],', 72),
 ('"reviewText":', 100),
 ('got', 8),
 ('this', 95),
 ('husband', 2),
 ('is', 132)]

In [75]:
data_file_rdd = sc.textFile(file_path, 2)

wcount_rdd = data_file_rdd \
    .flatMap(lambda x: x.split())

wcount_rdd.countByValue()

defaultdict(int,
            {'{"reviewerID":': 100,
             '"AO94DHGC771SJ",': 1,
             '"asin":': 100,
             '"0528881469",': 5,
             '"reviewerName":': 99,
             '"amazdnu",': 1,
             '"helpful":': 100,
             '[0,': 79,
             '0],': 72,
             '"reviewText":': 100,
             '"We': 4,
             'got': 8,
             'this': 95,
             'GPS': 7,
             'for': 126,
             'my': 84,
             'husband': 2,
             'who': 6,
             'is': 132,
             'an': 24,
             '(OTR)': 1,
             'over': 8,
             'the': 476,
             'road': 5,
             'trucker.': 1,
             'Very': 9,
             'Impressed': 1,
             'with': 81,
             'shipping': 2,
             'time,': 1,
             'it': 187,
             'arrived': 1,
             'a': 270,
             'few': 4,
             'days': 3,
             'earlier': 1,
             'than': 20,

<a name="4"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">4. References</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To content</a></div>
    </div>
</div>

<a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#module-pyspark">pyspark package</a><br>
<a href="http://spark.apache.org/docs/latest/programming-guide.html">Spark Programming Guide</a>