In [1]:
# https://www.youtube.com/watch?v=e5ol7oyKV0A&list=PL9ooVrP1hQOEBF5zdCdoMs2l1wws6be2X&index=4

In [2]:
# importing libraries
import os
import re
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('dataFrame').getOrCreate()

sc = spark.sparkContext


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/08 23:33:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Reading a file

# distributed across multiple nodes
myRDD = sc.parallelize([("ross", 19), ("joey", 18), ("rachel", 16), ("phobe", 18), ("chandler", 17), ("monica", 20)], numSlices=4) # distribute across multiple nodes
# collect: transfer data from worker node to 
# take: return the first x arguments
myRDD.take(6)



                                                                                

[('ross', 19),
 ('joey', 18),
 ('rachel', 16),
 ('phobe', 18),
 ('chandler', 17),
 ('monica', 20)]

In [4]:
# Reading txt
textRDD = sc.textFile("data/sample.txt", minPartitions=4)
textRDD.take(1)

['Bitcoin (abbreviation: BTC; sign: ₿) is a decentralized digital currency that can be transferred on the peer-to-peer bitcoin network.[7] Bitcoin transactions are verified by network nodes through cryptography and recorded in a public distributed ledger called a blockchain. The cryptocurrency was invented in 2008 by an unknown person or group of people using the name Satoshi Nakamoto.[10] The currency began use in 2009,[11] when its implementation was released as open-source software.[6]:\u200ach. 1\u200a']

In [5]:
# Reading csv
csvRDD = sc.textFile("data/fifa_player.csv", minPartitions=4).map(lambda element: element.split("\t")) # split by tab delimiter
csvRDD.take(2)



[['sofifa_id,player_url,short_name,long_name,age,dob,height_cm,weight_kg,nationality,club,overall,potential,value_eur,wage_eur,player_positions,preferred_foot,international_reputation,weak_foot,skill_moves,work_rate,body_type,real_face,release_clause_eur,player_tags,team_position,team_jersey_number,loaned_from,joined,contract_valid_until,nation_position,nation_jersey_number,pace,shooting,passing,dribbling,defending,physic,gk_diving,gk_handling,gk_kicking,gk_reflexes,gk_speed,gk_positioning,player_traits,attacking_crossing,attacking_finishing,attacking_heading_accuracy,attacking_short_passing,attacking_volleys,skill_dribbling,skill_curve,skill_fk_accuracy,skill_long_passing,skill_ball_control,movement_acceleration,movement_sprint_speed,movement_agility,movement_reactions,movement_balance,power_shot_power,power_jumping,power_stamina,power_strength,power_long_shots,mentality_aggression,mentality_interceptions,mentality_positioning,mentality_vision,mentality_penalties,mentality_composure,d

In [6]:
print(csvRDD.getNumPartitions())

print(csvRDD.count())

4
17771


In [7]:
# User defined functions
def Func(lines):
    lines = lines.lower()
    lines = lines.split()
    return lines

split_rdd = textRDD.map(Func) # for each and every element on RDD
split_rdd.take(1)


[['bitcoin',
  '(abbreviation:',
  'btc;',
  'sign:',
  '₿)',
  'is',
  'a',
  'decentralized',
  'digital',
  'currency',
  'that',
  'can',
  'be',
  'transferred',
  'on',
  'the',
  'peer-to-peer',
  'bitcoin',
  'network.[7]',
  'bitcoin',
  'transactions',
  'are',
  'verified',
  'by',
  'network',
  'nodes',
  'through',
  'cryptography',
  'and',
  'recorded',
  'in',
  'a',
  'public',
  'distributed',
  'ledger',
  'called',
  'a',
  'blockchain.',
  'the',
  'cryptocurrency',
  'was',
  'invented',
  'in',
  '2008',
  'by',
  'an',
  'unknown',
  'person',
  'or',
  'group',
  'of',
  'people',
  'using',
  'the',
  'name',
  'satoshi',
  'nakamoto.[10]',
  'the',
  'currency',
  'began',
  'use',
  'in',
  '2009,[11]',
  'when',
  'its',
  'implementation',
  'was',
  'released',
  'as',
  'open-source',
  'software.[6]:',
  'ch.',
  '1']]

In [8]:
# flatMap: similar to map, but output is flatten
RDD = textRDD.flatMap(Func)
RDD.take(5)

['bitcoin', '(abbreviation:', 'btc;', 'sign:', '₿)']

In [9]:
stopwords = ["a", "all", "the", "some"]

RDD1 = RDD.filter(lambda x: x not in stopwords)
RDD1.take(6)

['bitcoin', '(abbreviation:', 'btc;', 'sign:', '₿)', 'is']

In [10]:
# Filtering
filteredRDD = RDD.filter(lambda x: x.startswith("c"))
filteredRDD.distinct().take(5)

['ch.', 'compound', 'coin.[13]', 'capitalized,', 'capacity.']

In [11]:
# Group by
rdd_mapped = RDD.map(lambda x: (x, 1))
rdd_grouped = rdd_mapped.groupByKey()
rdd_frequency = rdd_grouped.mapValues(sum).map(lambda x: (x[1], x[0])).sortByKey(False)
rdd_frequency.take(10)

[(15, 'the'),
 (12, 'bitcoin'),
 (7, 'in'),
 (7, 'a'),
 (6, 'of'),
 (5, 'by'),
 (5, 'and'),
 (5, 'to'),
 (4, 'has'),
 (4, 'use')]

In [12]:
# Distinct and count
RDD2 = RDD1.distinct()
RDD2.count()

150

In [13]:
RDD2.take(5)

['bitcoin', '(abbreviation:', 'sign:', 'transferred', 'peer-to-peer']

In [14]:
# Group by
RDD3 = RDD2.groupBy(lambda w: w[0:3])
print([(k, list(v)) for (k, v) in RDD3.take(5)])

[('sig', ['sign:']), ('pee', ['peer-to-peer']), ('are', ['are']), ('in', ['in']), ('was', ['was'])]


In [15]:
sampled_RDD = RDD1.sample(withReplacement=False, fraction=0.1)

In [16]:
sampled_RDD.count()

12

In [17]:
sampled_RDD.take(3)

['(abbreviation:', 'called', '2008']

In [18]:
# Joins
a = sc.parallelize([("a", 2), ("b", 3)]) 
b = sc.parallelize([("a", 9), ("b", 7), ("c", 10)]) 

c = a.join(b)
c.collect()

                                                                                

[('b', (3, 7)), ('a', (2, 9))]

In [19]:
# Right outer join
d = a.rightOuterJoin(b)
d.collect()

                                                                                

[('b', (3, 7)), ('a', (2, 9)), ('c', (None, 10))]

In [20]:
# Reduce
num_rdd = sc.parallelize(range(1,50000))
num_rdd.reduce(lambda x,y: x+y) # sum from 1 to 50000

1249975000

In [21]:
# Reduce by Key
data_keydata = sc.parallelize(c=[("a", 4), ("b", 3), ("c", 2), ("a", 4), ("b", 3), ("c", 2)], numSlices=4)
data_keydata.reduceByKey(lambda x, y: x+y).collect()

[('b', 6), ('c', 4), ('a', 8)]

In [23]:
# Save as text file
outputFilePath = os.path.join(os.getcwd(), "output.txt")

myRDD = sc.parallelize([("ross", 19), ("joey", 18), ("rachel", 16), ("phobe", 18), ("chandler", 17), ("monica", 20)], numSlices=2) # distribute across multiple nodes
myRDD.saveAsTextFile(outputFilePath)

In [24]:
# Read from text file
RDD_read = sc.textFile(outputFilePath).map(lambda x: eval(x))
RDD_read.collect()

[('phobe', 18),
 ('chandler', 17),
 ('monica', 20),
 ('ross', 19),
 ('joey', 18),
 ('rachel', 16)]

In [25]:
# Sort by Key
test = [("a", 4), ("b", 3), ("1", 3), ("d", 4), ("2", 3),]
sc.parallelize(test).sortByKey(True, 1).collect()

[('1', 3), ('2', 3), ('a', 4), ('b', 3), ('d', 4)]

In [26]:
# Sets: Union
union_rdd = spark.sparkContext.parallelize([1, 1, 2, 3])
union2 = spark.sparkContext.parallelize([1, 1, 2, 3])
union_rdd.union(union2).collect()

[1, 1, 2, 3, 1, 1, 2, 3]

In [27]:
mpwi = spark.sparkContext.parallelize([1,2,3,4], 2) # map partition with index

def f(splitIndex, iterator):
    yield sum(iterator)

mpwi.mapPartitionsWithIndex(f).collect()

[3, 7]

In [28]:
sc = spark.sparkContext

rdd_a = sc.parallelize([1,2,3,4])
rdd_b = sc.parallelize([4,5,6,7])

rdd_a.intersection(rdd_b).collect()

                                                                                

[4]

In [29]:
rdd_a.subtract(rdd_b).collect()

                                                                                

[1, 2, 3]

In [30]:
rdd_a.cartesian(rdd_b).collect()

[(1, 4),
 (1, 5),
 (1, 6),
 (1, 7),
 (2, 4),
 (2, 5),
 (2, 6),
 (2, 7),
 (3, 4),
 (3, 5),
 (3, 6),
 (3, 7),
 (4, 4),
 (4, 5),
 (4, 6),
 (4, 7)]

In [31]:
pageLinks = ["a", ["b", "c", "d"]], ["b", ["d", "c"]], ["c", ["b"]], ["d", ["a", "c"]]
pageRanks = [["a", 1], ["b", 1], ["c", 1], ["d", 1]]

In [32]:
def rankContribution(uris, rank):
    numberofUris = len(uris)
    rankContribution = float(rank) / numberofUris
    newRank = []
    for uri in uris:
        newRank.append((uri, rankContribution))
    return newRank

In [33]:
pageLinksRDD = sc.parallelize(pageLinks, 2)
pageLinksRDD.collect()

[['a', ['b', 'c', 'd']], ['b', ['d', 'c']], ['c', ['b']], ['d', ['a', 'c']]]

In [34]:
pageRanksRDD = sc.parallelize(pageRanks, 2)
pageRanksRDD.collect()

[['a', 1], ['b', 1], ['c', 1], ['d', 1]]

In [35]:
numIter = 20
s = 0.85

for i in range(numIter):
    linksRank = pageLinksRDD.join(pageRanksRDD)
    contributedRDD = linksRank.flatMap(lambda x: rankContribution(x[1][0], x[1][1]))
    sumRanks = contributedRDD.reduceByKey(lambda v1, v2: v1 + v2)
    pageRanksRDD = sumRanks.map(lambda x: (x[0], (1-s)+s*x[1]))

pageRanksRDD.collect()

                                                                                

[('a', 0.5217268024809147),
 ('b', 1.357243795127982),
 ('c', 1.2463781024360086),
 ('d', 0.8746512999550939)]