## Content

- Why Distributed File System and Computing?
	- Distributed File System
	- Distributed Computing
	- Resource Manager 

- What is Spark?
	- RDD
	- Dataframe & SQL
---

## Why Distributed File System and Computing?

- Solve Big Data Problem; 
  where data too big to store or process data in a single machine
- Increase Parallelism in Computing
- Higher Availability

<img src="doc\1_spark_introduction\1_single_node_filesystem.png" width="400"><img src="doc\1_spark_introduction\2_distributed_filesystem.png" width="400">


### Distributed File System
##### Example DFS
- Hadoop Distributed File System
- MinIO
- Isilon OneFS
- Amazon S3
- Google GCS
|
In DFS, a file larger than a predefined block size is split into blocks and then to disks 

<img src="doc\1_spark_introduction\3_hdfs_filesystem.png" width="400">

---

### Distributed Computing

- Uses functional programming approach (stateless design easily)
	- Map, Reduce, Filter
- Enable Parallelism
	- Utilizing multiple processors
- Example Distributed Processing Framework
	- MapReduce
	- Spark
	- Flink
	- Beam
    


#### Word Count Example
<img src="doc\1_spark_introduction\4_mapreduce_wordcount.png" width="800">


#### Left Outer Join Example
<img src="doc\1_spark_introduction\5_mapreduce_join.png" width="700">

---

### Resource Manager

It handles the resource management and scheduling of containers during computing

##### Example Resource Manager Supported by Spark:
- Apache Hadoop YARN
- Kubernetes
- Mesos
- Docker Swarm

<img src="doc\1_spark_introduction\6_yarn_cluster_workflow.png" width="500"><img src="doc\1_spark_introduction\7_kube_workflow_spark.png" width="500">

---

## What is Apache Spark?
Apache Spark is  a data processing framework
- Unified analytics engine 
- Process tasks on large-scale datasets across multiple computers
- Fault tolerance
- Provides easy-to-use interface for distributed computing with implicit data parallelism 

<img src="doc\1_spark_introduction\8_spark_stack.png" width="500">

Spark provides in-memory storage for intermediate computations, making it much faster than Hadoop MapReduce

| Spark   | Hadoop MapReduce    |
|-------------|-------------|
| <img src="doc\1_spark_introduction\9_spark_highlevel_workflow.jpg" width="600">         |  <img src="doc\1_spark_introduction\10_hadoop_mapreduce_workflow.png" width="460">         |


---


## Spark Basics - RDD

RDD (Resilient Distributed Dataset) Is data structure of Apache Spark. 
- It is an immutable collection of objects which computes on the different node of the cluster.
- Resilient, restore the data on failure by using RDD lineage graph (DAG), 
	-  recompute missing or damaged partitions due to node failures.

### Start SparkSession

In [644]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("simpleApp") \
        .master("local[*]") \
        .getOrCreate()
sc = spark.sparkContext
sc._jsc.sc().uiWebUrl().get()

'http://DESKTOP-1A80NHD.mshome.net:4040'

### Create RDD

In [553]:
data = ["First sample data", "Also a sample"]
rdd = sc.parallelize(data)
print(type(rdd))
print(rdd.collect())

<class 'pyspark.rdd.RDD'>
['First sample data', 'Also a sample']


In [645]:
rdd = sc.textFile("./data/raw")
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [646]:
collected_list = rdd.take(5)
print(type(collected_list))

<class 'list'>


In [647]:
collected_list

['The park have statues that was crafted in the early 70s and have  foretold stories from Chinese mythology, folklore, legends etc. I have been here since my early childhood with my parent , then with my schoolmate  and later with family and children. Very nice place to walk asit fill with heritages.',
 'Visited after reopening in 1 July. Although the two main sections (Hell museum, 10 stages) were not open it still had a couple of statues with descriptions. Seems like management is trying to open new f&b venues inside the park but not open at this point. Quite an exotic experience free of charge!',
 'Good Place to know about the Chinese Ancient Scripts, Tradition and their Sulptures. Most people feel this place is creepy but its not. The Architectures and the Sculptures are really good to admire. There is also a small Buddha temple inside. There is no proper restaurants though. Restrooms are available. Its free of charge, People visiting Chinese Garden also should visit this place.',


In [648]:
def describeRdd(rdd, description = ""):
    def countRecord(splitIndex, iterator): 
        yield (splitIndex, len(list(iterator)))
    numRecordPerPartition = rdd.mapPartitionsWithIndex(countRecord).collect()
    if (len(description)>0):
        print(description)
    print(f"number of records in rdd = {rdd.count()}")
    print(f"number of partitions = { rdd.getNumPartitions()}")
    print(f"number of records in each partition = {numRecordPerPartition}")
    print("")
    
describeRdd(rdd)

number of records in rdd = 39
number of partitions = 3
number of records in each partition = [(0, 21), (1, 14), (2, 4)]



---

### RDD Transformations & Actions

Spark Transformation is a function that produces new RDD from the existing RDDs. 
- Lazy evaluation
	- Data does not get loaded in an RDD during transformation
	- RDD keep a reference that points to previous RDD; aka RDD lineage 

Spark Action
- Transformations are only computed when an action is called
- Spark can make optimization decision after it looks at the RDD lineage; avoid large volume of Network IO
- The Materialized RDD result is stored in memory for next action to be triggered


<img src="doc\1_spark_introduction\11_spark_rdd_transfromation_action.png" width="600">   

#### Example of RDD Transformations

In [651]:
import re
def clean_words(words):
    return map(lambda word: re.sub("[^0-9a-zA-Z ]+", "", word), words)

def is_insignificant(word):
    if(word in ["the","and","to","of","a","is","it","in","this","my","you","as","its","are","was","were","can","i","but","with","and","an","there","here","has","not","many","for"]):
        return True
    return False

tokenized_rdd = rdd.map(lambda record: re.split("\s|(?<!\d)[,.](?!\d)", record))
cleaned_rdd = tokenized_rdd.flatMap(clean_words)
cleaned_non_empty_rdd = cleaned_rdd.filter(lambda word: len(word)>0)
normalized_rdd = cleaned_non_empty_rdd.map(lambda word: word.lower())
cleaned_rdd = normalized_rdd.filter(lambda word: not(is_insignificant(word)))

cleaned_rdd.persist()

#### Example of RDD Action

In [650]:
# count is an action
cleaned_rdd.count()

1068

In [560]:
word_count_rdd = cleaned_rdd.map(lambda x : (x,1)).aggregateByKey(0, lambda x, y: x+y , lambda x,y : x+y)

In [561]:
# collect is an action
word_count_list = word_count_rdd.collect()
word_count_list.sort(key= lambda kv: kv[1], reverse=True)
word_count_list[:15]

[('park', 22),
 ('place', 22),
 ('chinese', 19),
 ('free', 14),
 ('statues', 13),
 ('hell', 13),
 ('scared', 12),
 ('visit', 12),
 ('scary', 10),
 ('villa', 9),
 ('par', 9),
 ('tiger', 9),
 ('very', 8),
 ('balm', 8),
 ('interesting', 8)]

---

### RDD - DAG and Lineage
Directed Acyclic Graph is a finite direct graph that performs a sequence of computations on data. 
Each node is an RDD, and the edge is a transformation on top of data.


<img src="doc\1_spark_introduction\12_rdd_dag_lineage_example.png" width="400">   




### RDD Cache/Persist

In [653]:
cleaned_rdd.persist()
word_count_rdd = cleaned_rdd.map(lambda x : (x,1)).aggregateByKey(0, lambda x, y: x+y , lambda x,y : x+y)
word_count_list = word_count_rdd.collect()

### RDD Repartition

In [562]:
describeRdd(tokenized_rdd,"tokenized_rdd")
describeRdd(cleaned_rdd,"cleaned_rdd")
describeRdd(cleaned_non_empty_rdd,"cleaned_non_empty_rdd")
describeRdd(normalized_rdd,"normalized_rdd")
describeRdd(word_count_rdd,"word_count_rdd")

tokenized_rdd
number of records in rdd = 39
number of partitions = 3
number of records in each partition = [(0, 21), (1, 14), (2, 4)]

cleaned_rdd
number of records in rdd = 1068
number of partitions = 3
number of records in each partition = [(0, 566), (1, 364), (2, 138)]

cleaned_non_empty_rdd
number of records in rdd = 1618
number of partitions = 3
number of records in each partition = [(0, 866), (1, 540), (2, 212)]

normalized_rdd
number of records in rdd = 1618
number of partitions = 3
number of records in each partition = [(0, 866), (1, 540), (2, 212)]

word_count_rdd
number of records in rdd = 531
number of partitions = 3
number of records in each partition = [(0, 196), (1, 161), (2, 174)]



In [654]:
cleaned_non_empty_rdd = cleaned_non_empty_rdd.repartition(6)

describeRdd(cleaned_non_empty_rdd,"cleaned_non_empty_rdd")

cleaned_non_empty_rdd
number of records in rdd = 1618
number of partitions = 6
number of records in each partition = [(0, 270), (1, 272), (2, 270), (3, 266), (4, 270), (5, 270)]



### Write RDD to File

In [600]:
word_count_rdd.map(lambda x: f"{x[0]}, {x[1]}, {x[0][:1]}") \
  .saveAsTextFile("data/rdd_example/word_count_result")

---
## SparkSQL and Dataframe

Spark SQL is a module for working with structured data
- Unified Data Access
	- Data Sources like Hive, Avro, Parquet, ORC, Kafka , JSON, as well as JDBC
- Enable SQL queries
- Query Plan Optimization
- Abstraction of batch and streaming operations


<img src="doc\1_spark_introduction\13_spark_architecture.png" width="400"> 

### Dataframe
DataFrames is an integrated data structure with an easy-to-use API for simplifying distributed big data processing. 
- Can be constructed from many sources including structured data files, tables in Hive, external databases, or existing RDDs
- Provide SQL like data manipulations and aggregations
- It is an extension of the Spark RDD API optimized for writing code more efficiently
- Similar to Python Pandas and R data frames. Except is made to integrate with large-scale data and optimizations.

### Create Dataframe from Python List

In [655]:
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import col
from pyspark.sql.functions import upper
from pyspark.sql.functions import substring

data_1 = [
            {"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
            {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
            {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
            {"Category": 'E', "ID": 4, "Value": 33.87, "Truth": True}
        ]

schema_1 = StructType([
    StructField(dataType=StringType(), name="Category"),
    StructField(dataType=IntegerType(), name="ID"),
    StructField(dataType=DoubleType(), name="Value"),
    StructField(dataType=BooleanType(), name="Truth"),
])
df1 = spark.createDataFrame(data_1, schema=schema_1)

In [603]:
df1.show()
df1.printSchema()

+--------+---+------+-----+
|Category| ID| Value|Truth|
+--------+---+------+-----+
|       A|  1|121.44| true|
|       B|  2|300.01|false|
|       C|  3| 10.99| null|
|       E|  4| 33.87| true|
+--------+---+------+-----+

root
 |-- Category: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Value: double (nullable = true)
 |-- Truth: boolean (nullable = true)



### Create Dataframe from a source

In [604]:
schema = StructType([
    StructField(dataType=StringType(), name="word"),
    StructField(dataType=IntegerType(), name="count"),
    StructField(dataType=StringType(), name="starting_letter"),
])
word_count_df = spark.read.format("csv")\
    .option("delimiter", ",")\
    .option("header",False)\
    .schema(schema)\
    .load("data/rdd_example/word_count_result")

In [605]:
word_count_df.show()

+----------+-----+---------------+
|      word|count|starting_letter|
+----------+-----+---------------+
|      have|    5|              h|
|   statues|   13|              s|
|      from|    5|              f|
|   legends|    5|              l|
|       etc|    1|              e|
|    family|    2|              f|
|  children|    3|              c|
|      very|    8|              v|
|      nice|    7|              n|
|      walk|    2|              w|
|   visited|    3|              v|
| reopening|    1|              r|
|  although|    1|              a|
|       two|    1|              t|
|      hell|   13|              h|
|    museum|    2|              m|
|      open|    3|              o|
|     still|    4|              s|
|      like|    6|              l|
|management|    1|              m|
+----------+-----+---------------+
only showing top 20 rows



---

### Transform Data using SparkSQL & Dataframe API

Create a tempview from dataframe so that it can be queried via SQL query

In [628]:
df1.createOrReplaceTempView("testing_table")
word_count_df.createOrReplaceTempView("word_count")

spark.catalog.listTables()

[Table(name='testing_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='word_count', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

#### SQL queries

In [632]:
dataframe_sql_query = spark.sql("""
    SELECT UPPER(word) as word, count
    FROM word_count 
    WHERE (word IN ('tiger','balm', 'villa'))
        AND (count > 5)
    ORDER BY word DESC
""")
dataframe_sql_query.show()

+-----+-----+
| word|count|
+-----+-----+
|VILLA|    9|
|TIGER|    9|
| BALM|    8|
+-----+-----+



#### Pyspark Dataframe API

In [660]:
df = word_count_df.filter((col("word").isin(["tiger","villa","balm"])))
df = df.withColumn("word", upper(col("word")))
df = df.orderBy("word", ascending=False)
df = df.filter(col("count") > 5)
df = df.select(col("word"), col("count"))
df.show()

+-----+-----+
| word|count|
+-----+-----+
|VILLA|    9|
|TIGER|    9|
| BALM|    8|
+-----+-----+



---

### Catalyst Optimizer

<img src="doc\1_spark_introduction\14_catalyst_optimizer.png" width="800"> 

- Catalyst Optimizer engine converts each SQL query into a logical plan. 
- and then converts it to many physical execution plans.  
    - eg. Projection and Predicate Pushdown
- During execution, it selects the most optimal physical plan and generates new RDD lineage

In [661]:
df.explain(True)

== Parsed Logical Plan ==
'Project [unresolvedalias('word, None), unresolvedalias('count, None)]
+- Filter (count#1808 > 5)
   +- Sort [word#2173 DESC NULLS LAST], true
      +- Project [upper(word#1807) AS word#2173, count#1808, starting_letter#1809]
         +- Filter word#1807 IN (tiger,villa,balm)
            +- Relation[word#1807,count#1808,starting_letter#1809] csv

== Analyzed Logical Plan ==
word: string, count: int
Project [word#2173, count#1808]
+- Filter (count#1808 > 5)
   +- Sort [word#2173 DESC NULLS LAST], true
      +- Project [upper(word#1807) AS word#2173, count#1808, starting_letter#1809]
         +- Filter word#1807 IN (tiger,villa,balm)
            +- Relation[word#1807,count#1808,starting_letter#1809] csv

== Optimized Logical Plan ==
Sort [word#2173 DESC NULLS LAST], true
+- Project [upper(word#1807) AS word#2173, count#1808]
   +- Filter ((isnotnull(count#1808) && word#1807 IN (tiger,villa,balm)) && (count#1808 > 5))
      +- Relation[word#1807,count#1808,starti

         
<table>
    <tr>
        <th>Original Plan</th>
        <th>Optimized Plan</th>
   </tr>
    <tr>
        <td>1. filter by "tiger","villa","balm" </td>
        <td>1. filter by "tiger","villa","balm"  </td>
   </tr>
    <tr>
        <td>2. upper case "word" column </td>
        <td>2. filter by "count" > 5 🔼</td>
   </tr>
    <tr>
        <td>3. order by "word" column </td>
        <td>3. select "word" and "count" column 🔼</td>
   </tr>
    <tr>
        <td>4. filter by "count" > 5 🔼</td>
        <td>4. upper case "word" column</td>
   </tr>
    <tr>
        <td>5. select "word" and "count" column 🔼</td>
        <td>5. order by "word" column </td>
   </tr>
    </table>



### Write Dataframe to File 

In [662]:
describeRdd(df.rdd)

number of records in rdd = 3
number of partitions = 3
number of records in each partition = [(0, 1), (1, 1), (2, 1)]



In [641]:
df.repartition(1) \
    .write.format("csv") \
    .option("header",True).save("data/dataframe_example/word_count_result")