# Apache Spark

- Open Source Distributed Computing Framework
- Fast and General-Purpose Big Data Processing
- Developed at UC Berkeley's AMPLab in 2009
- Donated to Apache in 2014
- Features:
   - Speed through in-memory computation
   - Ease of Use with APIs
   - Unified Engine for batch, streaming, machine learning and graph workloads
- Is it replacement of Hadoop?
   - No, but it's an powerful alternative to MapReduce.
   - Works on top of HDFS[Hadoop Distributed File System] or any other file system

## Hadoop:
- Open-Source Framework
- written in Java
- Designed for storing and processing large datasets distributed environment
- Cluster of Computer -> Task to be done -> Break this tasks into small tasks which can be processed parallely
   - Distribute these small tasks to the clusters which makes it fast, efficient and cost-effective. 

### Traditional System [DBMS, Hadoop MapReduce]
- Not very flexible, Fast

### Features             Hadoop                Apache Spark
   1. Process           Batch             Batch + Real-time Streaming
   2. Speed             Disk I/O          In-memory computation (upto 100x faster)
   3. Ease of Use       Java, complex     APIs in Python, Java, R
   4. Fault Tolerance   Checkpoint-based  RDD lineage and DAG-based
   5. Libraries         Very Limited      Spark SQL, MLib, GraphX
                         (Pig, Hive)          Streaming


Task: Word Count of a document [size - 1 TB]
Hadoop: ~2 hours [120 mins]
Spark:  ~15 mins

# Spark Architecture

- Apache Spark follows a master-slave architecture.
- 3 main components
   - Driver Program
   - Cluster Manager
   - Executors

User Program (PySpark or Scala Code)
     |
Spark Driver [Driver Program] (Create DAG, send tasks)
     |
Cluster Manager (Allocate Resources)
     |
Executors (Perform operations in parallel)

In [22]:
from pyspark.sql import SparkSession

In [57]:
spark = (
    SparkSession.builder 
    .appName("ArchDemo")
    .master("local[*]")
    .getOrCreate()
)

data = [("Ram", 100), ("Mohan", 200), ("Shyam", 300)]
df = spark.createDataFrame(data, ["Name", "Sales"])

updated_df = df.withColumn("Bonus", df.Sales * 0.10)

updated_df.show()

spark.stop()



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.6 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/opt/anaconda3/lib/python3.12/site-packages/ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "/opt/anaconda3/lib/python3.12/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/opt/anaconda3/lib/python3.12/site-packages/ipykernel/kernelapp.py", line 701, in start
    self.io_loop.start()
  File "/opt/anaconda3/lib/python3.12/site-

ImportError: 
A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.6 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.



+-----+-----+-----+
| Name|Sales|Bonus|
+-----+-----+-----+
|  Ram|  100| 10.0|
|Mohan|  200| 20.0|
|Shyam|  300| 30.0|
+-----+-----+-----+



# PySpark
- Python API of Apache Spark
- Allows us to interact with Spark using Python
- Works perfectly with local env

## Installation
- pip install pyspark [Spark binaries | PySpark Libraries | Java dependencies]
- !pip install pyspark
- 

# RDDs - Resilient Distributed Datasets

- What is RDDs?
- Why they matter?
- Create and Manipulate RDDs
- Key Terminologies
- Use Cases

## What is RDDs?
- Resilient Distributed Datasets.
- Core Data Structure of Apache Spark.
- Foundation for all higher level APIs

## Features
- Resilient -> Fault-Tolerant
- Distributed -> Data is split over a cluster of machine
- Immutable -> once created, can't be changed
- Lazy Evaluated -> operations are not being executed until some action is triggered
- Typed -> Consist of data of a specific type ( int, strings, tuples..)


rdd = spark.SparkContent.parallelize([1,2,3,4,5,6])
rdd.collect()

In [61]:
spark = (
    SparkSession.builder 
    .appName("ArchDemo")
    .master("local[*]")
    .getOrCreate()
)

rdd = spark.sparkContext.parallelize([1,2,3,4,5,6])
print(rdd.collect())

spark.stop()

[1, 2, 3, 4, 5, 6]


# Data Cleaning using PySpark

In [76]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, to_date, mean,regexp_replace

spark = (
    SparkSession.builder 
    .appName("Synthetic Data Cleaning")
    .master("local[*]")
    .getOrCreate()
)


In [77]:
synthetic_data = """id,name,age,address,join_date
1, Alice ,25,123 Main St.,2023-01-01
2,BOB,30,456@Broadway,2023-05-15
3,Charlie,,789 Elm St.,invalid_date
4,dave,45,1011 Park Ave,2022-12-01
5,,22,PO Box 123,2023-07-07
6,Eve,,42 Wallaby Way,2023-03-03
7,Frank,29,!!NoWhere,2023-08-08
8,Gina,34,Somewhere-Else,2022-11-11
9,Hank,19,Unknown,2023-06-06
10, Ivy ,50, ,2023-02-02"""

In [80]:
synthetic_data

'id,name,age,address,join_date\n1, Alice ,25,123 Main St.,2023-01-01\n2,BOB,30,456@Broadway,2023-05-15\n3,Charlie,,789 Elm St.,invalid_date\n4,dave,45,1011 Park Ave,2022-12-01\n5,,22,PO Box 123,2023-07-07\n6,Eve,,42 Wallaby Way,2023-03-03\n7,Frank,29,!!NoWhere,2023-08-08\n8,Gina,34,Somewhere-Else,2022-11-11\n9,Hank,19,Unknown,2023-06-06\n10, Ivy ,50, ,2023-02-02'

In [82]:
with open("synthetic_data.csv", "w") as f:
    f.write(synthetic_data)

In [100]:
df = spark.read.csv("synthetic_data.csv", header=True, inferSchema=True)
df.show()

+---+-------+----+--------------+------------+
| id|   name| age|       address|   join_date|
+---+-------+----+--------------+------------+
|  1| Alice |  25|  123 Main St.|  2023-01-01|
|  2|    BOB|  30|  456@Broadway|  2023-05-15|
|  3|Charlie|NULL|   789 Elm St.|invalid_date|
|  4|   dave|  45| 1011 Park Ave|  2022-12-01|
|  5|   NULL|  22|    PO Box 123|  2023-07-07|
|  6|    Eve|NULL|42 Wallaby Way|  2023-03-03|
|  7|  Frank|  29|     !!NoWhere|  2023-08-08|
|  8|   Gina|  34|Somewhere-Else|  2022-11-11|
|  9|   Hank|  19|       Unknown|  2023-06-06|
| 10|   Ivy |  50|              |  2023-02-02|
+---+-------+----+--------------+------------+



In [98]:
df = df.withColumn("name",lower(trim(col("name"))) )
df.show()

+---+-------+----+--------------+------------+
| id|   name| age|       address|   join_date|
+---+-------+----+--------------+------------+
|  1|  alice|  25|  123 Main St.|  2023-01-01|
|  2|    bob|  30|  456@Broadway|  2023-05-15|
|  3|charlie|NULL|   789 Elm St.|invalid_date|
|  4|   dave|  45| 1011 Park Ave|  2022-12-01|
|  5|unknown|  22|    PO Box 123|  2023-07-07|
|  6|    eve|NULL|42 Wallaby Way|  2023-03-03|
|  7|  frank|  29|     !!NoWhere|  2023-08-08|
|  8|   gina|  34|Somewhere-Else|  2022-11-11|
|  9|   hank|  19|       Unknown|  2023-06-06|
| 10|    ivy|  50|              |  2023-02-02|
+---+-------+----+--------------+------------+



In [94]:
df = df.fillna({"name":'Dev'})
df.show()

+---+-------+----+--------------+------------+
| id|   name| age|       address|   join_date|
+---+-------+----+--------------+------------+
|  1|  alice|  25|  123 Main St.|  2023-01-01|
|  2|    bob|  30|  456@Broadway|  2023-05-15|
|  3|charlie|NULL|   789 Elm St.|invalid_date|
|  4|   dave|  45| 1011 Park Ave|  2022-12-01|
|  5|unknown|  22|    PO Box 123|  2023-07-07|
|  6|    eve|NULL|42 Wallaby Way|  2023-03-03|
|  7|  frank|  29|     !!NoWhere|  2023-08-08|
|  8|   gina|  34|Somewhere-Else|  2022-11-11|
|  9|   hank|  19|       Unknown|  2023-06-06|
| 10|    ivy|  50|              |  2023-02-02|
+---+-------+----+--------------+------------+



In [102]:
df = df.fillna({"name":'Dev'})
df.show()

+---+-------+----+--------------+------------+
| id|   name| age|       address|   join_date|
+---+-------+----+--------------+------------+
|  1| Alice |  25|  123 Main St.|  2023-01-01|
|  2|    BOB|  30|  456@Broadway|  2023-05-15|
|  3|Charlie|NULL|   789 Elm St.|invalid_date|
|  4|   dave|  45| 1011 Park Ave|  2022-12-01|
|  5|    Dev|  22|    PO Box 123|  2023-07-07|
|  6|    Eve|NULL|42 Wallaby Way|  2023-03-03|
|  7|  Frank|  29|     !!NoWhere|  2023-08-08|
|  8|   Gina|  34|Somewhere-Else|  2022-11-11|
|  9|   Hank|  19|       Unknown|  2023-06-06|
| 10|   Ivy |  50|              |  2023-02-02|
+---+-------+----+--------------+------------+



In [110]:
mean_age = df.select(mean("age")).first()[0]

In [113]:
mean_age

31.75

In [115]:
df = df.fillna({'age':mean_age})

In [117]:
df.show()

+---+-------+---+--------------+------------+
| id|   name|age|       address|   join_date|
+---+-------+---+--------------+------------+
|  1| Alice | 25|  123 Main St.|  2023-01-01|
|  2|    BOB| 30|  456@Broadway|  2023-05-15|
|  3|Charlie| 31|   789 Elm St.|invalid_date|
|  4|   dave| 45| 1011 Park Ave|  2022-12-01|
|  5|    Dev| 22|    PO Box 123|  2023-07-07|
|  6|    Eve| 31|42 Wallaby Way|  2023-03-03|
|  7|  Frank| 29|     !!NoWhere|  2023-08-08|
|  8|   Gina| 34|Somewhere-Else|  2022-11-11|
|  9|   Hank| 19|       Unknown|  2023-06-06|
| 10|   Ivy | 50|              |  2023-02-02|
+---+-------+---+--------------+------------+



In [127]:
df = df.withColumn("address",regexp_replace("address",'[^a-zA-Z0-9]'," "))

In [129]:
df.show()

+---+-------+---+-------------+------------+
| id|   name|age|      address|   join_date|
+---+-------+---+-------------+------------+
|  1| Alice | 25|    123MainSt|  2023-01-01|
|  2|    BOB| 30|  456Broadway|  2023-05-15|
|  3|Charlie| 31|     789ElmSt|invalid_date|
|  4|   dave| 45|  1011ParkAve|  2022-12-01|
|  5|    Dev| 22|     POBox123|  2023-07-07|
|  6|    Eve| 31| 42WallabyWay|  2023-03-03|
|  7|  Frank| 29|      NoWhere|  2023-08-08|
|  8|   Gina| 34|SomewhereElse|  2022-11-11|
|  9|   Hank| 19|      Unknown|  2023-06-06|
| 10|   Ivy | 50|             |  2023-02-02|
+---+-------+---+-------------+------------+



In [137]:
df = df.dropna(subset=["join_date","address"])

In [141]:
df.show() #Check this

+---+-------+---+-------------+------------+
| id|   name|age|      address|   join_date|
+---+-------+---+-------------+------------+
|  1| Alice | 25|    123MainSt|  2023-01-01|
|  2|    BOB| 30|  456Broadway|  2023-05-15|
|  3|Charlie| 31|     789ElmSt|invalid_date|
|  4|   dave| 45|  1011ParkAve|  2022-12-01|
|  5|    Dev| 22|     POBox123|  2023-07-07|
|  6|    Eve| 31| 42WallabyWay|  2023-03-03|
|  7|  Frank| 29|      NoWhere|  2023-08-08|
|  8|   Gina| 34|SomewhereElse|  2022-11-11|
|  9|   Hank| 19|      Unknown|  2023-06-06|
| 10|   Ivy | 50|             |  2023-02-02|
+---+-------+---+-------------+------------+



In [143]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = false)
 |-- age: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- join_date: string (nullable = true)

