# Pair RDD - Transformations

## Setup Spark environment

In [1]:
from pathlib import Path

installation_folder = Path("/content/spark-3.5.0-bin-hadoop3")

if not installation_folder.exists():

  # Install Java locally
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null

  # Download & decompress Spark
  !wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz -O spark-3.5.0-bin-hadoop3.tgz
  !tar xf spark-3.5.0-bin-hadoop3.tgz

  # Install finspark
  !pip install -q findspark

  # Setup required environment variables
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

  print("Spark setup finished!")

else:
  print("Skipping Spark setup")

Spark setup finished!


## Prepare the Spark context

In [2]:
# Import findpsark
import findspark

# Configure the environment
findspark.init()

# Import the Spark components required for the context creation
from pyspark import SparkConf, SparkContext

# Configure and create the context
conf = SparkConf()
conf = conf.setAppName('mds-session')
conf = conf.setMaster('local[*]')
sc = SparkContext.getOrCreate(conf=conf)

## sortByKey

In [3]:
rdd1 = sc.parallelize(['2014-12-31', '2015-01-25', '2016-05-17', '2016-11-08', '2017-01-05', '2014-08-06'])

rdd2 = rdd1.groupBy(lambda element: element.split('-')[0])
print([(key, list(value)) for (key, value) in rdd2.collect()])

rdd3 = rdd2.sortByKey(ascending=False)
print([(key, list(value)) for (key, value) in rdd3.collect()])

[('2014', ['2014-12-31', '2014-08-06']), ('2015', ['2015-01-25']), ('2016', ['2016-05-17', '2016-11-08']), ('2017', ['2017-01-05'])]
[('2017', ['2017-01-05']), ('2016', ['2016-05-17', '2016-11-08']), ('2015', ['2015-01-25']), ('2014', ['2014-12-31', '2014-08-06'])]


## reduceByKey

In [4]:
rdd1 = sc.parallelize(['2014-12-31', '2015-01-25', '2016-05-17', '2016-11-08', '2017-01-05', '2014-08-06'])

def parseDate(date):
    year, month, day = date.split('-')
    return (year, month + '-' + day)

rdd2 = rdd1.map(parseDate)

def maxDate(date1, date2):
    if date1 > date2:
        return date1
    else:
        return date2

rdd3 = rdd2.reduceByKey(maxDate)
print(rdd2.collect())
print(rdd3.collect())

[('2014', '12-31'), ('2015', '01-25'), ('2016', '05-17'), ('2016', '11-08'), ('2017', '01-05'), ('2014', '08-06')]
[('2014', '12-31'), ('2015', '01-25'), ('2016', '11-08'), ('2017', '01-05')]


## Keys y values

In [5]:
rdd4 = rdd3.keys()
print(rdd4.collect())

['2014', '2015', '2016', '2017']


In [6]:
rdd5 = rdd3.values()
print(rdd5.collect())

['12-31', '01-25', '11-08', '01-05']


## join, leftOuterJoin, rightOuterJoin, fullOuterJoin

In [8]:
rdd1 = sc.parallelize([(2014, 10000), (2015, 20000), (2016, 30000)])
rdd2 = sc.parallelize([(2014, -500), (2016, -6000), (2017, -9000)])

In [9]:
rdd3 = rdd1.join(rdd2).sortByKey()
print(rdd3.collect())

[(2014, (10000, -500)), (2016, (30000, -6000))]


In [10]:
rdd3 = rdd1.leftOuterJoin(rdd2).sortByKey()
print(rdd3.collect())

[(2014, (10000, -500)), (2015, (20000, None)), (2016, (30000, -6000))]


In [11]:
rdd3 = rdd1.rightOuterJoin(rdd2).sortByKey()
print(rdd3.collect())

[(2014, (10000, -500)), (2016, (30000, -6000)), (2017, (None, -9000))]


In [12]:
rdd3 = rdd1.fullOuterJoin(rdd2).sortByKey()
print(rdd3.collect())

[(2014, (10000, -500)), (2015, (20000, None)), (2016, (30000, -6000)), (2017, (None, -9000))]


## Close the Spark context

In [None]:
sc.stop()