[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://githubtocolab.com/jkanclerz/data-science-workshop-2024/blob/main/40--spark/01--rdd.ipynb)

In [None]:
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz -O spark-3.5.4-bin-hadoop3.tgz
!tar xf spark-3.5.4-bin-hadoop3.tgz

In [2]:
!pip install -q pyspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

In [40]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("DDD")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [42]:
sc = spark.sparkContext

### Resilient Distributed Dataset or RDD

An RDD is a distributed collection of elements. All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result. Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

#### Creating and RDD using parallelize
Another way of creating an RDD is to parallelize an already existing list.

In [43]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

In [None]:
type(distData)

In [None]:
type(distData.collect())

In [None]:
distData.take(2)

In [47]:
A = ((a, a*a) for a in range(1000))

In [48]:
data = sc.parallelize(A)

In [None]:
data.count()

In [None]:
data.take(10)

In [55]:
rm -rf var/data.txt

In [57]:
data.saveAsTextFile('var/data.txt')

#### Creating a RDD from a file
The most common way of creating an RDD is to load it from a file. Notice that Spark's textFile can handle compressed files directly.

In [None]:
!mkdir -p var
!wget https://wolnelektury.pl/media/book/txt/krzyzacy-tom-pierwszy.txt -O var/krzyzacy-1.txt

In [60]:
rm -rf var/krzyzacy-1-upper.txt/

In [61]:
file = sc.textFile('var/krzyzacy-1.txt')

(file
  .map(lambda line: line.upper())
  .saveAsTextFile('var/krzyzacy-1-upper.txt')
)

## Transformations
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

In [62]:
raw_data = sc.textFile("var/krzyzacy*")

In [None]:
raw_data.count()

In [64]:
sentences = raw_data.filter(lambda x: "" != x)

In [65]:
words_batch = sentences.map(lambda x: x.split(" "))

In [None]:
words_batch.take(6)

In [67]:
words = sentences.flatMap(lambda x: x.split(" "))
words = words.map(lambda x: x.lower())

In [None]:

words.take(10)

In [69]:
words_occurence = words.map(lambda word: (word, 1))

In [None]:
words_occurence.take(3)

In [71]:
wordCounts = words_occurence.reduceByKey(lambda a,b: a + b)

In [None]:
wordCounts.map(lambda x: (x[1], x[0])) \
    .sortByKey(ascending=False) \
    .take(5)

In [None]:
!wget https://raw.githubusercontent.com/bieli/stopwords/master/polish.stopwords.txt -O var/stopwords.txt

In [74]:
stop_words = sc.textFile("var/stopwords.txt").collect()

In [75]:
counter = (wordCounts
  .map(lambda x: (x[1], x[0]))
  .filter(lambda x: x[1] not in stop_words)
  .filter(lambda x: x[1] != '—')
  .sortByKey(ascending=False)
)

In [None]:
counter.take(15)

## Sampling

In [None]:
words.count()

In [78]:
sample_words = words.sample(False, 0.005, 1234)

In [None]:
sample_words.count()

In [None]:
sample_words.take(10)

In [None]:
words.takeSample(False, 10, 1234)

In [None]:
!mkdir -p var
!wget https://wolnelektury.pl/media/book/txt/krzyzacy-tom-drugi.txt -O var/krzyzacy-2.txt

In [83]:
file = sc.textFile('var/krzyzacy*')

In [None]:
file.count()

In [103]:
counter = (
  file
    .filter(lambda x: "" != x)
    .flatMap(lambda x: x.split(" "))
    .map(lambda x: x.lower())
    .filter(lambda x: x not in stop_words)
    .filter(lambda x: x != '—')
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a,b: a + b)
    .map(lambda x: (x[1], x[0]))
    .sortByKey(ascending=False)
)

In [None]:
counter.take(15)

In [105]:
spark.stop()