In [3]:
import findspark

In [4]:
findspark.init("C:\\spark")

In [5]:
from pyspark import SparkContext

In [6]:
sc = SparkContext()

In [6]:
liste1 = [("Fiz-I", 73),
         ("Mat-I", 63),
         ("Kimya", 66),
         ("Mat-I", 44),
         ("Mat-I", 78)]

In [7]:
RDD_1 = sc.parallelize(liste1)

In [8]:
RDD_1.reduceByKey(lambda x, y: x +y).collect()

[('Kimya', 66), ('Fiz-I', 73), ('Mat-I', 185)]

In [9]:
RDD_1.groupByKey().collect()

[('Kimya', <pyspark.resultiterable.ResultIterable at 0x7adecf39e8>),
 ('Fiz-I', <pyspark.resultiterable.ResultIterable at 0x7adecf3c50>),
 ('Mat-I', <pyspark.resultiterable.ResultIterable at 0x7adecf3908>)]

In [10]:
for i in RDD_1.groupByKey().collect():
    print (i[0], list(i[1]))

Kimya [66]
Fiz-I [73]
Mat-I [63, 44, 78]


In [11]:
group_RDD = RDD_1.groupByKey()
[(i[0], list(i[1])) for i in group_RDD.collect()]

[('Kimya', [66]), ('Fiz-I', [73]), ('Mat-I', [63, 44, 78])]

In [12]:
RDD_1.groupByKey().map(lambda t: (t[0], sum(t[1]))).collect()

[('Kimya', 66), ('Fiz-I', 73), ('Mat-I', 185)]

In [13]:
liste2 = [("Mat-ı", 68),
         ("Fzk-ı", 45),
         ("Kimya", 94),
         ("Tarih", 79)]

In [14]:
RDD_2 = sc.parallelize(liste1 + liste2)
RDD_2.collect()


[('Fiz-I', 73),
 ('Mat-I', 63),
 ('Kimya', 66),
 ('Mat-I', 44),
 ('Mat-I', 78),
 ('Mat-ı', 68),
 ('Fzk-ı', 45),
 ('Kimya', 94),
 ('Tarih', 79)]

In [15]:
toplam = RDD_2.combineByKey((lambda x: (x,1)), 
                (lambda x, y: (x[0] + y, x[1] + y[0])),
                (lambda x, y: (x[0] + y[0], x[1] + y[1])))

In [16]:
toplam.collect()

[('Kimya', (160, 2)),
 ('Fiz-I', (73, 1)),
 ('Mat-ı', (68, 1)),
 ('Fzk-ı', (45, 1)),
 ('Tarih', (79, 1)),
 ('Mat-I', (185, 3))]

In [17]:
toplam.map(lambda xy: (xy[0], xy[1][0]/xy[1][1])).collect()

[('Kimya', 80.0),
 ('Fiz-I', 73.0),
 ('Mat-ı', 68.0),
 ('Fzk-ı', 45.0),
 ('Tarih', 79.0),
 ('Mat-I', 61.666666666666664)]

In [18]:
RDD_2.mapValues(lambda x: x+1).collect()

[('Fiz-I', 74),
 ('Mat-I', 64),
 ('Kimya', 67),
 ('Mat-I', 45),
 ('Mat-I', 79),
 ('Mat-ı', 69),
 ('Fzk-ı', 46),
 ('Kimya', 95),
 ('Tarih', 80)]

In [19]:
RDD_2.map(lambda x: (x[0], x[1] +1)).collect()

[('Fiz-I', 74),
 ('Mat-I', 64),
 ('Kimya', 67),
 ('Mat-I', 45),
 ('Mat-I', 79),
 ('Mat-ı', 69),
 ('Fzk-ı', 46),
 ('Kimya', 95),
 ('Tarih', 80)]

In [20]:
RDD_3 = sc.parallelize([("fruit", "apple,banane,orange"),
                       ("animal", "panda, zebra"),
                       ("stuff", "lamp, table, book")])

In [21]:
RDD_3.flatMapValues(lambda x: x.split(",")).collect()

[('fruit', 'apple'),
 ('fruit', 'banane'),
 ('fruit', 'orange'),
 ('animal', 'panda'),
 ('animal', ' zebra'),
 ('stuff', 'lamp'),
 ('stuff', ' table'),
 ('stuff', ' book')]

In [22]:
RDD_3.keys().collect()

['fruit', 'animal', 'stuff']

In [23]:
RDD_3.values().collect()

['apple,banane,orange', 'panda, zebra', 'lamp, table, book']

In [24]:
RDD_3.sortByKey().collect()

[('animal', 'panda, zebra'),
 ('fruit', 'apple,banane,orange'),
 ('stuff', 'lamp, table, book')]

In [25]:
RDD_4 = sc.parallelize([("animal", "panda, zebra"),
                       ("language", "python, java  , scala")])
RDD_4.collect()

[('animal', 'panda, zebra'), ('language', 'python, java  , scala')]

In [26]:
RDD_3.subtract(RDD_4).collect()

[('fruit', 'apple,banane,orange'), ('stuff', 'lamp, table, book')]

In [27]:
RDD_3.join(RDD_4).collect()

[('animal', ('panda, zebra', 'panda, zebra'))]

In [28]:
RDD_3.rightOuterJoin(RDD_4).collect()

[('language', (None, 'python, java  , scala')),
 ('animal', ('panda, zebra', 'panda, zebra'))]

In [29]:
RDD_3.leftOuterJoin(RDD_4).collect()

[('stuff', ('lamp, table, book', None)),
 ('fruit', ('apple,banane,orange', None)),
 ('animal', ('panda, zebra', 'panda, zebra'))]

In [30]:
print("RDD_3: ", RDD_3.collect())
print("RDD_4: ", RDD_4.collect())

RDD_3:  [('fruit', 'apple,banane,orange'), ('animal', 'panda, zebra'), ('stuff', 'lamp, table, book')]
RDD_4:  [('animal', 'panda, zebra'), ('language', 'python, java  , scala')]


In [31]:
RDD_3.cogroup(RDD_4).collect()

[('stuff',
  (<pyspark.resultiterable.ResultIterable at 0x7adef061d0>,
   <pyspark.resultiterable.ResultIterable at 0x7adef06358>)),
 ('fruit',
  (<pyspark.resultiterable.ResultIterable at 0x7adef06470>,
   <pyspark.resultiterable.ResultIterable at 0x7adef06710>)),
 ('language',
  (<pyspark.resultiterable.ResultIterable at 0x7adef06208>,
   <pyspark.resultiterable.ResultIterable at 0x7adef06048>)),
 ('animal',
  (<pyspark.resultiterable.ResultIterable at 0x7adef067f0>,
   <pyspark.resultiterable.ResultIterable at 0x7adef069b0>))]

In [32]:
t = tuple()
for i in RDD_3.cogroup(RDD_4).collect():
    l = list()
    for a in range(len(list(i[1]))):
        l.append(list(i[1][a]))
    t += (i[0], l)
print (t)

('stuff', [['lamp, table, book'], []], 'fruit', [['apple,banane,orange'], []], 'language', [[], ['python, java  , scala']], 'animal', [['panda, zebra'], ['panda, zebra']])


In [33]:
numbers = sc.parallelize([1,2,3,4,5])

In [57]:
sumof = lambda k, v: (k[0] + [v], k[1] + v)
agg = lambda k, v: (k[0] + v[0] , k[1] + v[1])
numbers.aggregate(([],0),
                 sumof,
                 agg)

([1, 2, 3, 4, 5], 15)

In [35]:
RDD_5 = sc.parallelize([("kolay_dil", "python"),
                       ("zor_dil", "C"),
                       ("kolay_dil", "javascript"),
                       ("kolay_dil", "html"),
                        ("zor_dil", "C++"),
                       ("zor_dil", "C")])

In [36]:
RDD_5.countByKey()

defaultdict(int, {'kolay_dil': 3, 'zor_dil': 3})

In [37]:
RDD_5.countByValue()

defaultdict(int,
            {('kolay_dil', 'html'): 1,
             ('kolay_dil', 'javascript'): 1,
             ('kolay_dil', 'python'): 1,
             ('zor_dil', 'C'): 2,
             ('zor_dil', 'C++'): 1})

In [38]:
animal = sc.parallelize(["panda",
                        "penguen",
                        "mouse",
                         "bird",
                         "lion",
                         "cheetah"
                        ])

In [39]:
animal.groupBy(lambda x: x[0]).collect()

[('l', <pyspark.resultiterable.ResultIterable at 0x7ade6f4908>),
 ('m', <pyspark.resultiterable.ResultIterable at 0x7ade6f4940>),
 ('p', <pyspark.resultiterable.ResultIterable at 0x7ade6f4278>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x7ade6f49b0>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7ade6f4b70>)]

In [40]:
print([(k, list(v)) for (k, v) in animal.groupBy(lambda x:  x[0]).collect()])

[('l', ['lion']), ('m', ['mouse']), ('p', ['panda', 'penguen']), ('b', ['bird']), ('c', ['cheetah'])]


In [41]:
RDD_5.collect()

[('kolay_dil', 'python'),
 ('zor_dil', 'C'),
 ('kolay_dil', 'javascript'),
 ('kolay_dil', 'html'),
 ('zor_dil', 'C++'),
 ('zor_dil', 'C')]

In [42]:
RDD_5.lookup("kolay_dil")

['python', 'javascript', 'html']

In [43]:
numbers.collect()

[1, 2, 3, 4, 5]

In [44]:
squared = numbers.map(lambda x: x**2)
print(squared.collect())

[1, 4, 9, 16, 25]


In [45]:
print(numbers.zip(squared).collect())

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]


In [46]:
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda xy: xy[0] + xy[1]).getNumPartitions()

8

In [47]:
sc.parallelize(data).reduceByKey(lambda xy: xy[0] +xy[1], 10).getNumPartitions()

10

In [7]:
zoo = sc.parallelize([("panda", 0),
                     ("zebra", 3),
                     ("snake", 1),
                     ("panda", 5),
                     ("zebra", 4),
                     ("snake", 2),
                     ("zebra", 1)])

In [8]:
zoo.groupByKey().collect()

[('panda', <pyspark.resultiterable.ResultIterable at 0xf4ec1d05f8>),
 ('zebra', <pyspark.resultiterable.ResultIterable at 0xf4ec1d0588>),
 ('snake', <pyspark.resultiterable.ResultIterable at 0xf4ec1d0668>)]

In [9]:
for i in zoo.groupByKey().collect():
    print((i[0], list(i[1])))

('panda', [0, 5])
('zebra', [3, 4, 1])
('snake', [1, 2])


In [10]:
zoo.mapValues(lambda x: (x,  1)).collect()

[('panda', (0, 1)),
 ('zebra', (3, 1)),
 ('snake', (1, 1)),
 ('panda', (5, 1)),
 ('zebra', (4, 1)),
 ('snake', (2, 1)),
 ('zebra', (1, 1))]

GroupByKey Kullanmadan nasıl yaparız ???

In [11]:
zoo.mapValues(lambda x: (x,  1)).reduceByKey(lambda x, y: [x[0], y[0]]).collect()

[('panda', [0, 5]), ('zebra', [3, 1]), ('snake', [1, 2])]

In [12]:
print(list((j[0],  list(j[1])) for j in zoo.groupByKey().collect()))

[('panda', [0, 5]), ('zebra', [3, 4, 1]), ('snake', [1, 2])]
