# Check MinIO

In [2]:
!pip install minio delta-spark==2.2.0

Collecting minio
  Downloading minio-7.1.13-py3-none-any.whl (76 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.2/76.2 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0mta [36m0:00:01[0m
[?25hCollecting delta-spark==2.2.0
  Downloading delta_spark-2.2.0-py3-none-any.whl (20 kB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j, minio, delta-spark
Successfully installed delta-spark-2.2.0 minio-7.1.13 py4j-0.10.9.5


In [3]:
from minio import Minio

In [4]:
client = Minio(
    "minio:9000",
    access_key="minio",
    secret_key="minio123",
    secure=False
)

bucket = "warehouse"
if client.bucket_exists(bucket):
    print(f"{bucket} exists")


warehouse exists


# Init SparkContext

In [1]:
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

In [2]:
spark = (SparkSession.builder.appName("pyspark-rdd-demo-{}".format(datetime.today()))
        .master("spark://spark-master:7077")      
        .getOrCreate())

sqlContext = SQLContext(spark)
spark.sparkContext.getConf().getAll()




[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false'),
 ('spark.repl.local.jars',
  'file:///usr/local/spark-3.3.2-bin-hadoop3/jars/delta-core_2.12-2.2.0.jar,file:///usr/local/spark-3.3.2-bin-hadoop3/jars/hadoop-aws-3.3.2.jar,file

In [3]:
sc = spark.sparkContext
sc

In [4]:
# test Delta Lake
spark.range(500).write.format("delta").save("s3a://warehouse/deltafile", mode="overwrite")

# Create RDDs

## By loading dataset

In [5]:
fdd = sc.textFile("s3a://warehouse/testfile.txt")
fdd

s3a://warehouse/testfile.txt MapPartitionsRDD[25] at textFile at NativeMethodAccessorImpl.java:0

In [14]:
fdd.getNumPartitions()

2

In [13]:
fdd.count()

16

In [15]:
fdd = fdd.repartition(10)
fdd.getNumPartitions()

10

In [16]:
fdd.count()

16

In [6]:
fdd.collect()

['',
 'What is Lorem Ipsum?',
 '',
 "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.",
 'Why do we use it?',
 '',
 "It is a long established fact that a reader will be distracted by the readable content of a page when looking at its layout. The point of using Lorem Ipsum is that it has a more-or-less normal distribution of letters, as opposed to using 'Content here, content here', making it look like readable English. Many desktop publishing packages and web page edito

In [6]:
# split lines into words
words = fdd.flatMap(lambda line: line.split())

# count the occurrences of each word
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.collect()

[('The', 4),
 ('chunk', 1),
 ('of', 21),
 ('Lorem', 17),
 ('Ipsum', 13),
 ('used', 1),
 ('is', 8),
 ('below', 1),
 ('interested.', 1),
 ('Finibus', 2),
 ('Bonorum', 2),
 ('Malorum"', 2),
 ('are', 3),
 ('in', 10),
 ('form,', 2),
 ('English', 1),
 ('versions', 3),
 ('Rackham.', 1),
 ('some?', 1),
 ('passages', 1),
 ('but', 2),
 ('have', 2),
 ('alteration', 1),
 ('injected', 2),
 ('look', 2),
 ('even', 1),
 ('believable.', 1),
 ('use', 3),
 ('Ipsum,', 2),
 ('sure', 1),
 ('there', 1),
 ('anything', 1),
 ('embarrassing', 1),
 ('hidden', 1),
 ('middle', 1),
 ('generators', 1),
 ('repeat', 1),
 ('as', 3),
 ('necessary,', 1),
 ('making', 3),
 ('this', 1),
 ('true', 1),
 ('It', 5),
 ('uses', 1),
 ('dictionary', 1),
 ('Latin', 4),
 ('handful', 1),
 ('model', 2),
 ('generate', 1),
 ('looks', 1),
 ('generated', 1),
 ('always', 1),
 ('free', 1),
 ('repetition,', 1),
 ('non-characteristic', 1),
 ('etc.', 1),
 ('What', 1),
 ('dummy', 2),
 ('printing', 1),
 ('typesetting', 1),
 ("industry's", 1),
 ('e

In [8]:
# filter and split lines into words
words_2 = fdd.filter(lambda x: "There are" in x).flatMap(lambda line: line.split())

# count the occurrences of each word
word_counts_2 = words_2.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts_2.collect()

[('are', 2),
 ('of', 6),
 ('passages', 1),
 ('Lorem', 5),
 ('Ipsum', 4),
 ('but', 1),
 ('have', 1),
 ('alteration', 1),
 ('in', 2),
 ('form,', 1),
 ('injected', 2),
 ('look', 1),
 ('even', 1),
 ('believable.', 1),
 ('use', 1),
 ('Ipsum,', 1),
 ('sure', 1),
 ('there', 1),
 ('anything', 1),
 ('embarrassing', 1),
 ('hidden', 1),
 ('middle', 1),
 ('generators', 1),
 ('repeat', 1),
 ('as', 1),
 ('necessary,', 1),
 ('making', 1),
 ('this', 1),
 ('true', 1),
 ('It', 1),
 ('uses', 1),
 ('dictionary', 1),
 ('Latin', 1),
 ('handful', 1),
 ('model', 1),
 ('generate', 1),
 ('looks', 1),
 ('The', 1),
 ('generated', 1),
 ('is', 1),
 ('always', 1),
 ('free', 1),
 ('repetition,', 1),
 ('non-characteristic', 1),
 ('etc.', 1),
 ('There', 1),
 ('many', 1),
 ('variations', 1),
 ('available,', 1),
 ('the', 6),
 ('majority', 1),
 ('suffered', 1),
 ('some', 1),
 ('by', 1),
 ('humour,', 2),
 ('or', 2),
 ('randomised', 1),
 ('words', 2),
 ('which', 2),
 ("don't", 1),
 ('slightly', 1),
 ('If', 1),
 ('you', 2),


In [9]:
word_counts.join(word_counts_2).collect()

[('is', (8, 1)),
 ('Lorem', (17, 5)),
 ('Ipsum', (13, 4)),
 ('of', (21, 6)),
 ('It', (5, 1)),
 ('but', (2, 1)),
 ('in', (10, 2)),
 ('use', (3, 1)),
 ('The', (4, 1)),
 ('as', (3, 1)),
 ('making', (3, 1)),
 ('look', (2, 1)),
 ('model', (2, 1)),
 ('have', (2, 1)),
 ('Latin', (4, 1)),
 ('Ipsum,', (2, 1)),
 ('are', (3, 2)),
 ('form,', (2, 1)),
 ('passages', (1, 1)),
 ('alteration', (1, 1)),
 ('injected', (2, 2)),
 ('even', (1, 1)),
 ('believable.', (1, 1)),
 ('sure', (1, 1)),
 ('there', (1, 1)),
 ('anything', (1, 1)),
 ('embarrassing', (1, 1)),
 ('hidden', (1, 1)),
 ('middle', (1, 1)),
 ('generators', (1, 1)),
 ('repeat', (1, 1)),
 ('necessary,', (1, 1)),
 ('this', (1, 1)),
 ('true', (1, 1)),
 ('uses', (1, 1)),
 ('dictionary', (1, 1)),
 ('handful', (1, 1)),
 ('generate', (1, 1)),
 ('looks', (1, 1)),
 ('generated', (1, 1)),
 ('always', (1, 1)),
 ('free', (1, 1)),
 ('repetition,', (1, 1)),
 ('non-characteristic', (1, 1)),
 ('etc.', (1, 1)),
 ('the', (23, 6)),
 ('a', (15, 3)),
 ('to', (7, 4)),

In [12]:
# create two RDDs with key-value pairs
rdd1 = sc.parallelize([(1, "hello"), (2, "world"), (3, "foo")], 1)
rdd2 = sc.parallelize([(1, "bar"), (2, "baz"), (4, "qux")], 1)

# perform a join on the two RDDs
rdd_joined = rdd1.join(rdd2)

# print the result
rdd_joined.collect()

[(2, ('world', 'baz')), (1, ('hello', 'bar'))]

## By using parallelize

In [28]:
data = [1, 2, 3, 3]
rdd = sc.parallelize(data, 2)
rdd

ParallelCollectionRDD[27] at readRDDFromFile at PythonRDD.scala:274

In [29]:
rdd.getNumPartitions()

2

In [30]:
rdd.collect()

[1, 2, 3, 3]

# RDD operations

## Transformation

* Element-wise transformations
* Transformation filter(): Takes in a function and returns an RDD that only has elements that pass the filter( ) function

In [31]:
rdd.filter(lambda x: x != 1).collect()

[2, 3, 3]

* Element-wise transformations
* Transformation map(): Takes in a function and applies it to each element  in the RDD with the result of the function being the new value of each element in the resulting RDD

In [32]:
rdd.map(lambda x: x + 1).collect()

[2, 3, 4, 4]

In [33]:
rdd.map(lambda x: [x, x + 5]).collect()

[[1, 6], [2, 7], [3, 8], [3, 8]]

In [34]:
rdd.flatMap(lambda x: [x, x + 5]).collect()

[1, 6, 2, 7, 3, 8, 3, 8]

* Sampling Transformation
* sample() an RDD: We can specify with or without replacement, or the fraction

In [35]:
rdd.sample(False, 0.5).collect()

[2, 3, 3]

* Pseudo Set Operations
* RDDs support many operations of mathematical sets: 
* distinct, union, intersect, subtract
* All expensive except union because they involve shuffling

In [36]:
rdd = sc.parallelize([1, 2, 3])
other = sc.parallelize([3, 4, 5])

In [37]:
rdd.union(other).collect()

[1, 2, 3, 3, 4, 5]

In [38]:
rdd.intersection(other).collect()

[3]

In [39]:
rdd.subtract(other).collect()

[1, 2]

In [40]:
rdd.cartesian(other).collect()

[(1, 3), (1, 4), (1, 5), (2, 3), (3, 3), (2, 4), (2, 5), (3, 4), (3, 5)]

## Actions

* fold( )
* takes a function similarly as reduce( ) does, but takes a “zero value” to be used for initial call on each partition
* should be the identity element for the operation
* 0 for +, 1 for *, etc.
* return type the same as RDD elements 


In [46]:
rdd = sc.parallelize([1, 2, 3, 3])
rdd.fold(0, lambda a, b: a + b)

9

* reduce( )
* takes a function that operates on two elements of the type in the RDD and returns a new element of the same type
* should be commutative and associative so that it can be computed correctly in parallel

In [47]:
rdd.reduce(lambda a, b: a + b)

9

aggregate( )
* we can also supply an initial zero value of the type we want to return
* a 1st function to combine the elements from RDD with the accumulator
* a 2nd function to merge two accumulators given that each node accumulates its own results locally

In [52]:
seq_op = (lambda acc, value: (acc[0] + value, acc[1] + 1))
comb_op = (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))

sumCount = rdd.aggregate((0, 0), seq_op, comb_op)
sumCount[0], sumCount[1], sumCount[0]/float(sumCount[1])

(9, 4, 2.25)

collect( )
* return the entire RDD’s contents to the driver program

take( )
* returns n elements from the RDD
* attempts to minimize the number of partitions it accesses, so may be biased

top(n)
* return the top n elements of the RDD

count( )
* returns the number of elements in the RDD

In [62]:
rdd.collect()

[1, 2, 3, 3]

In [59]:
rdd.take(2)

[1, 2]

In [67]:
rdd.takeOrdered(2, key=lambda x: -x)

[3, 3]

In [69]:
rdd.takeSample(False, 1)

[3]

In [60]:
rdd.top(2)

[3, 3]

In [61]:
rdd.count()

4

In [63]:
rdd.countByValue()

defaultdict(int, {1: 1, 2: 1, 3: 2})

# Caching RDDs

In [72]:
lines = sc.textFile("s3a://warehouse/testfile.txt", 4)
lines

s3a://warehouse/testfile.txt MapPartitionsRDD[79] at textFile at NativeMethodAccessorImpl.java:0

Count will cause Spark to
* read data
* sum within partitions
* combine sums in driver

In [73]:
lines.count()

16

In [83]:
paragraphs = lines.filter(lambda x: len(x) > 0)
paragraphs

PythonRDD[85] at RDD at PythonRDD.scala:53

Count will cause Spark to
* read data (again)
* sum within partitions
* combine sums in driver

In [84]:
paragraphs.count()

9

In [86]:
lines = sc.textFile("s3a://warehouse/testfile.txt", 4)

# save, don't compute
lines.cache()

s3a://warehouse/testfile.txt MapPartitionsRDD[92] at textFile at NativeMethodAccessorImpl.java:0

In [87]:
paragraphs = lines.filter(lambda x: len(x) > 0)
print(lines.count())

16


In [88]:
print(paragraphs.count())

9


In [89]:
# remove from cache
lines.unpersist()

s3a://warehouse/testfile.txt MapPartitionsRDD[92] at textFile at NativeMethodAccessorImpl.java:0