## Install Pyspark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=c6fa444573ced68eae830531d67935e3acc4888c5b6692907bcb27d6652d9ccc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## Set up the environment for SparkSession and SparkContext

In [2]:
from pyspark import SparkContext

In [3]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("PySpark1").getOrCreate()

# Get the SparkContext from the SparkSession
sc = spark.sparkContext

In [4]:
# Spark version
sc.version

'3.5.0'

In [5]:
# PySpark version
sc.pythonVer

'3.10'

In [6]:
# Master location
sc.master

'local[*]'

Currently, the SparkSession is running in local (it can be either **Local** or Cluster)

## Get familiar with PySpark

**RDD - Resilient Distributed Dataset**

In [7]:
# RDD 1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])

Note 1: Import local text file using **"file:///"**

Note 2: Use "**r**' at the beginning of the path in order to not get UnicodeEscape error

In [8]:
# RDD 2: Local User's file import
rdd2 = sc.textFile(r"file:///C:\Users\admin.DESKTOP-GCE5IF7\Desktop\Project PT\B-Insect.csv")

In [9]:
# RDD 3: List of number from 1 to 1000
number1000 = list(range(1, 1000))
rdd3 = sc.parallelize(number1000)

### Lambda Function

#### Map()

**1 - Squared List**

In [10]:
normal_list = [3, 6, 8, 10, 15, 17, 200]
squared_list = list(map(lambda x: x**2, normal_list))
squared_list

[9, 36, 64, 100, 225, 289, 40000]

**2 - Operated List**

In [11]:
prime_list = [2, 3, 5, 7, 11, 13, 17, 19]
operated_list = list(map(lambda x: (x + 4)**2 - 20, prime_list))
operated_list

[16, 29, 61, 101, 205, 269, 421, 509]

**3 - Sum of Digits in Random List**

In [12]:
import random
random_list = []
for i in list(range(1,10)):
  rando = random.randint(100,999)
  random_list.append(rando)

random_list

[141, 138, 473, 211, 127, 318, 344, 234, 460]

In [13]:
sum_digit_list = list(map(lambda x: x // 100 + (x // 10) % 10 + x % 10, random_list))
sum_digit_list

[6, 12, 14, 4, 10, 12, 11, 9, 10]

**4 - List of numbers divisible by 17**

In [14]:
random_list2 = []
for i in list(range(1, 10)):
  rando = random.randint(1, 100000)
  random_list2.append(rando)

random_list2

[7533, 97187, 19535, 4059, 8897, 16405, 92810, 15938, 32849]

In [15]:
div17_list = list(map(lambda x: x % 17, random_list2))
div17_list

[2, 15, 2, 13, 6, 0, 7, 9, 5]

#### Filter()

**1 - Filter number divisible by 5 or 7**

In [16]:
random_list3 = []
for i in range(1, 14):
  rando = random.randint(1, 999)
  random_list3.append(rando)

random_list3

[361, 115, 145, 595, 612, 389, 458, 918, 265, 368, 897, 624, 41]

In [17]:
filter57_list = list(filter(lambda x: (x % 5 == 0) or (x % 7 == 0), random_list3))
filter57_list

[115, 145, 595, 265]

**2 - Filter number that is a squared result**

In [18]:
random_list4 = []
for i in range(1, 15):
  rando = random.randint(1, 60)
  random_list4.append(rando)

random_list4

[10, 43, 9, 6, 18, 19, 56, 26, 58, 12, 32, 43, 8, 53]

In [19]:
squared_res_list = list(filter(lambda x: ((x ** 0.5) - round(x ** 0.5, ) == 0), random_list4))
squared_res_list

[9]

### More on RDD transformation

#### Partitioning - **Set the Partitioning**

Count the partitions

In [20]:
num_par = rdd3.getNumPartitions()
num_par

2

In [21]:
num_list = []
for i in range(1,10000):
  num_list.append(i)

rdd4 = sc.parallelize(num_list)
rdd4.getNumPartitions()

2

#### FlatMap()

**Return more values than the original array**

In [22]:
text_n = ['Hello this is', 'Hoang Anh Tuan', 'What is your name?']
rdd5 = sc.parallelize(text_n)

In [23]:
# Split all the words using flatMap
rdd5 = rdd5.flatMap(lambda x: x.split(" "))

#### Union - **To combine 2 RDDs**

In [24]:
# Create a new RDD
rdd6 = sc.parallelize([1, 5, 7, 'Numbers'])

In [25]:
combined_rdd = rdd5.union(rdd6)

#### Collect() - **To see all elements**

In [26]:
rdd1.collect()

[1, 2, 3, 4, 5]

In [27]:
combined_rdd.collect()

['Hello',
 'this',
 'is',
 'Hoang',
 'Anh',
 'Tuan',
 'What',
 'is',
 'your',
 'name?',
 1,
 5,
 7,
 'Numbers']

In [28]:
# See all the elements
data = combined_rdd.collect()
for element in data:
    print(element)

Hello
this
is
Hoang
Anh
Tuan
What
is
your
name?
1
5
7
Numbers


#### take(N) - **To see N number of elements from the RDD**

In [29]:
combined_rdd.take(6)

['Hello', 'this', 'is', 'Hoang', 'Anh', 'Tuan']

In [30]:
rdd4.take(11)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [31]:
random_list9 = []
for i in range(1,11):
  rando = random.randint(999,10000)
  random_list9.append(rando)

rdd7 = sc.parallelize(random_list9)
rdd7.take(7)

[5884, 8718, 3660, 5005, 6806, 1522, 8372]

In [32]:
# Collect() to get all elements
rdd7.collect()

[5884, 8718, 3660, 5005, 6806, 1522, 8372, 9901, 1319, 3188]

#### first() - **Get the first element on the list**

In [33]:
rdd1.first()

1

In [34]:
rdd3.first()

1

In [35]:
rdd4.first()

1

In [36]:
rdd5.first()

'Hello'

In [37]:
rdd6.first()

1

In [38]:
combined_rdd.first()

'Hello'

In [39]:
rdd7.first()

5884

#### count() - **Count the element of the RDD**

In [40]:
rdd1.count()

5

In [41]:
rdd3.count()

999

In [42]:
rdd4.count()

9999

In [43]:
rdd5.count()

10

In [44]:
rdd6.count()

4

In [45]:
combined_rdd.count()

14

In [46]:
rdd7.count()

10

### Pair RDD

#### Creating pair RDDs

**There are 2 ways of creating pair RDDs**

1. From a list of key-tuple value
2. From a regular RDD

*Method 1: Key-tuple value*

In [47]:
list_tuple1 = [('Tuan', 23), ('Quyen', 20), ('Ken', 12), ('Huy', 18), ('Mother', 48)]
rdd8 = sc.parallelize(list_tuple1)
rdd8.collect()

[('Tuan', 23), ('Quyen', 20), ('Ken', 12), ('Huy', 18), ('Mother', 48)]

In [48]:
list_tuple2 = [('Minecraft', 5, 'Expensive'), ('Terraria', 4, 'Fair'), ('GTA5', 5, 'Very Expensive'), ('Stardew Valley', 3, 'Cheap AF')]
rdd9 = sc.parallelize(list_tuple2)
rdd9.collect()

[('Minecraft', 5, 'Expensive'),
 ('Terraria', 4, 'Fair'),
 ('GTA5', 5, 'Very Expensive'),
 ('Stardew Valley', 3, 'Cheap AF')]

In [49]:
list_tuple3 = [(1, 3), (3, 5), (5, 7), (7, 9), (9, 11), (11, 1)]
rdd10 = sc.parallelize(list_tuple3)
rdd10.collect()

[(1, 3), (3, 5), (5, 7), (7, 9), (9, 11), (11, 1)]

*Method 2: One regular RDD*

In [50]:
list_na = ['Tuan 29 0', 'Quyen 23 1', 'Ken 12 0', 'Huy 18 1', 'Mother 48 3', 'Grandpa 82 18', 'Earth 100000000 9000']
rdd11_regular = sc.parallelize(list_na)
rdd11 = rdd11_regular.map((lambda x: (x.split(" ")[0], x.split(" ")[1])))
rdd11.collect()

[('Tuan', '29'),
 ('Quyen', '23'),
 ('Ken', '12'),
 ('Huy', '18'),
 ('Mother', '48'),
 ('Grandpa', '82'),
 ('Earth', '100000000')]

In [51]:
rdd12 = rdd11_regular.map(lambda s: (s.split(" ")[0], s.split(" ")[1], s.split(" ")[2]))
rdd12.collect()

[('Tuan', '29', '0'),
 ('Quyen', '23', '1'),
 ('Ken', '12', '0'),
 ('Huy', '18', '1'),
 ('Mother', '48', '3'),
 ('Grandpa', '82', '18'),
 ('Earth', '100000000', '9000')]

In [52]:
rdd11.count() + rdd12.count()

14

#### reduceByKey() - **Like SUM() + Group By in SQL**

In [53]:
# Create a RDD with duplicated value x
list_player = [('Ronaldo', 30), ('Messi', 32), ('Pele', 40), ('Ronaldinho', 32), ('Ganarcho', 30), ('Ronaldo', 36), ('Messi', 40)]
rdd12 = sc.parallelize(list_player)
rdd12.collect()

[('Ronaldo', 30),
 ('Messi', 32),
 ('Pele', 40),
 ('Ronaldinho', 32),
 ('Ganarcho', 30),
 ('Ronaldo', 36),
 ('Messi', 40)]

In [54]:
rdd12_reduce = rdd12.reduceByKey(lambda x,y: x+y)
rdd12_reduce.collect()

[('Ronaldo', 66),
 ('Pele', 40),
 ('Messi', 72),
 ('Ronaldinho', 32),
 ('Ganarcho', 30)]

#### sortByKey() - **Like Order By in SQL**

In [55]:
# Reverse the order from (Messi, 1) to (1, Messi)
rdd12_reduce_revese = rdd12_reduce.map(lambda x: (x[1], x[0]))
rdd12_reduce_revese.sortByKey(ascending=True).collect()   # Ascending Number of Goals

[(30, 'Ganarcho'),
 (32, 'Ronaldinho'),
 (40, 'Pele'),
 (66, 'Ronaldo'),
 (72, 'Messi')]

In [56]:
rdd12_reduce_revese.sortByKey(ascending=False).collect()   # Descending Number of Goals

[(72, 'Messi'),
 (66, 'Ronaldo'),
 (40, 'Pele'),
 (32, 'Ronaldinho'),
 (30, 'Ganarcho')]

#### groupByKey() - **Like GROUP BY in SQL**

In [57]:
# List of Cities
cities = [('Hanoi', 'Vietnam'), ('Hue', 'Vietnam'), ('HCM city', 'Vietnam'), ('Shanghai', 'China'), ('Beijing', 'China'), ('Kyoto', 'Japan'), ('Nagashima', 'Japan')]

# Parallelize
rdd13 = sc.parallelize(cities)

# Reverse order
rdd13 = rdd13.map(lambda x: (x[1], x[0]))
pair_rdd13_group = rdd13.groupByKey().collect()

for nation, city in pair_rdd13_group:
  print(nation, list(city))

Vietnam ['Hanoi', 'Hue', 'HCM city']
China ['Shanghai', 'Beijing']
Japan ['Kyoto', 'Nagashima']


In [58]:
# List of Airports
airports = [('North', 'Noi Bai'), ('South', 'Tan Son Nhat'), ('Middle', 'Phu Bai'), ('North', 'Dien Bien'), ('North', 'Van Don'), ('South', 'Dong Thap'), ('Middle', 'Hue'),
            ('Middle', 'Dong Hoi'), ('North', 'Cat Bi'), ('South', 'Can Tho'), ('South', 'Phu Quoc')]

rdd14 = sc.parallelize(airports)
rdd14_group = rdd14.groupByKey().collect()

for zone, airport in rdd14_group:
  print(zone, list(airport))

North ['Noi Bai', 'Dien Bien', 'Van Don', 'Cat Bi']
Middle ['Phu Bai', 'Hue', 'Dong Hoi']
South ['Tan Son Nhat', 'Dong Thap', 'Can Tho', 'Phu Quoc']


#### join() - **Like JOIN in SQL**

*Example 1*

In [59]:
airports2 = [('North', 'Noi Bai'), ('South', 'Tan Son Nhat'), ('Middle', 'Phu Bai'), ('North', 'Dien Bien'), ('North', 'Van Don'), ('South', 'Dong Thap'), ('Middle', 'Hue'),
            ('Middle', 'Dong Hoi'), ('North', 'Cat Bi'), ('South', 'Can Tho'), ('South', 'Phu Quoc'), ('South', 'Dong Thap'), ('Global', 'Changi'), ('Global', 'Shanghai')]

rdd15 = sc.parallelize(airports2)
rdd15.collect()

[('North', 'Noi Bai'),
 ('South', 'Tan Son Nhat'),
 ('Middle', 'Phu Bai'),
 ('North', 'Dien Bien'),
 ('North', 'Van Don'),
 ('South', 'Dong Thap'),
 ('Middle', 'Hue'),
 ('Middle', 'Dong Hoi'),
 ('North', 'Cat Bi'),
 ('South', 'Can Tho'),
 ('South', 'Phu Quoc'),
 ('South', 'Dong Thap'),
 ('Global', 'Changi'),
 ('Global', 'Shanghai')]

In [60]:
areas = [('North', 1), ('Middle', 2), ('South', 3), ('Global', 4)]
rdd16 = sc.parallelize(areas)
rdd16.collect()

[('North', 1), ('Middle', 2), ('South', 3), ('Global', 4)]

In [61]:
# Match 2 RDDs that have the same keys
rdd15.join(rdd16).collect()

[('South', ('Tan Son Nhat', 3)),
 ('South', ('Dong Thap', 3)),
 ('South', ('Can Tho', 3)),
 ('South', ('Phu Quoc', 3)),
 ('South', ('Dong Thap', 3)),
 ('North', ('Noi Bai', 1)),
 ('North', ('Dien Bien', 1)),
 ('North', ('Van Don', 1)),
 ('North', ('Cat Bi', 1)),
 ('Middle', ('Phu Bai', 2)),
 ('Middle', ('Hue', 2)),
 ('Middle', ('Dong Hoi', 2)),
 ('Global', ('Changi', 4)),
 ('Global', ('Shanghai', 4))]

In [62]:
# Reverse
rdd16.join(rdd15).collect()

[('South', (3, 'Tan Son Nhat')),
 ('South', (3, 'Dong Thap')),
 ('South', (3, 'Can Tho')),
 ('South', (3, 'Phu Quoc')),
 ('South', (3, 'Dong Thap')),
 ('North', (1, 'Noi Bai')),
 ('North', (1, 'Dien Bien')),
 ('North', (1, 'Van Don')),
 ('North', (1, 'Cat Bi')),
 ('Middle', (2, 'Phu Bai')),
 ('Middle', (2, 'Hue')),
 ('Middle', (2, 'Dong Hoi')),
 ('Global', (4, 'Changi')),
 ('Global', (4, 'Shanghai'))]

*Example 2: There are some missing key values*

In [66]:
# List of Parks
parks = [('Thu Le', 1992), ('Yen So', 1998), ('Thanh Cong', 2012), ('Nhat Le', 2011), ('Yen So', 1989), ('Hoa Binh', 2002)]
rdd17 = sc.parallelize(parks)

# List of Locations
park_places = [('Thu Le', 'Ha Noi'), ('Yen So', 'Ha Tay'), ('Minh Khai', 'Ha Nam')]
rdd18 = sc.parallelize(park_places)

In [65]:
# JOIN 17 x 18
rdd17.join(rdd18).collect()

[('Yen So', (1998, 'Ha Tay')),
 ('Yen So', (1989, 'Ha Tay')),
 ('Thu Le', (1992, 'Ha Noi'))]

In [67]:
# JOIN 18 x 17
rdd18.join(rdd17).collect()

[('Yen So', ('Ha Tay', 1998)),
 ('Yen So', ('Ha Tay', 1989)),
 ('Thu Le', ('Ha Noi', 1992))]

#### reduce() - **Get all values from the RDD to do operations**

*Example 1*

In [69]:
rdd3.take(10)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [71]:
# Use reduce to calculate the sum of the entire array
rdd3.reduce(lambda x, y: x+y)

499500

*Example 2*

In [72]:
x = [2, 3, 4, 2, 5, 1, 7, 4]
rdd_numbers = sc.parallelize(x)

# Multiply all numbers together
rdd_numbers.reduce(lambda x, y: x*y)

6720

#### saveAsTextFile() - **Save all partitions into separate files**

In [82]:
rdd_numbers.saveAsTextFile('TempFile')

#### coalesce() - **Save all partitions into 1 File**

In [84]:
rdd_numbers.coalesce(1).saveAsTextFile('TempFile2')

#### countByKey() - **Like COUNT() + GROUP BY in SQL**

In [85]:
airports2 = [('North', 'Noi Bai'), ('South', 'Tan Son Nhat'), ('Middle', 'Phu Bai'), ('North', 'Dien Bien'), ('North', 'Van Don'), ('South', 'Dong Thap'), ('Middle', 'Hue'),
            ('Middle', 'Dong Hoi'), ('North', 'Cat Bi'), ('South', 'Can Tho'), ('South', 'Phu Quoc'), ('South', 'Dong Thap'), ('Global', 'Changi'), ('Global', 'Shanghai')]

rdd19 = sc.parallelize(airports2)
for key, value in rdd19.countByKey().items():
  print(key, value)

North 4
South 5
Middle 3
Global 2


#### collectAsMap() - **Input Tuples to get Dictionary-type**

*Note: can be used as an alternative for collect()*

In [89]:
cities = [('Hanoi', 'Vietnam'), ('Hue', 'Vietnam'), ('HCM city', 'Vietnam'), ('Shanghai', 'China'), ('Beijing', 'China'), ('Kyoto', 'Japan'), ('Nagashima', 'Japan')]

sc.parallelize(cities).collectAsMap()

{'Hanoi': 'Vietnam',
 'Hue': 'Vietnam',
 'HCM city': 'Vietnam',
 'Shanghai': 'China',
 'Beijing': 'China',
 'Kyoto': 'Japan',
 'Nagashima': 'Japan'}