# Spark
- A fast and general compute engine (for [Hadoop](https://hadoop.apache.org/) data).
    - Often paired with Hadoop for its distributed filesystem (HDFS), cluster resource management and parallel processing.
- Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL (extract, transform, load), Machine Learning, stream processing, and graph computation.
- Also communicates well with some databases and other resources.

## Spark and Cassandra
- Cassandra is one of the databases that work well with Spark.
    - Same type of distributed processing.
    - Same way of replicating for fault tolerance.
- Spark can be deployed on the same nodes as Cassandra for:
    - local (short traveled) data manipulation, and
    - combination of results to a central hub ([MapReduce](https://en.wikipedia.org/wiki/MapReduce)).
- Requires drivers from Datastax
    - Automatically downloaded and applied with the following configuration.
- A [SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) instantiates Spark, applies configurations and connects to a data source.

In [22]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
# Some warnings are to be expected.

In [23]:
# .load() is used to load data from Cassandra table as a Spark DataFrame.
spark.read.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").load().show()


+---+--------+-------+
|ind| company|  model|
+---+--------+-------+
|  3|Polestar|      3|
|  1|   Tesla|Model S|
|  2|   Tesla|Model 3|
+---+--------+-------+



### Database views
- Useful for "setting the scene" before a more simplified data extraction.
- The below example simply attaches to the correct keyspace and table.
    - The _view_ could also be a selection into that table to query further.

In [25]:
# Create view for simpler SQL queries
spark.read.format("org.apache.spark.sql.cassandra").options(table="table_with_uuid", keyspace="my_first_keyspace").load().createOrReplaceTempView("my_first_table_view")

### [Spark DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
- Related to a Pandas data frame, but can be distributed over compute nodes.
- Various functions like filters, statistical calculations, groupBy, Pandas functions (mapInPandas), joins, etc.
- Export to Pandas and JSON.
- Reads many formats, including SQL, JSON, Excel, ...

In [35]:
# Read CSV file into Spark DataFrame
planets = spark.read.csv("../../data/planets.csv", header=True, inferSchema=True)
planets.show()

+-------+---------+
| planet| distance|
+-------+---------+
|Mercury| 0.387 AU|
|  Venus| 0.723 AU|
|  Earth| 1.000 AU|
|   Mars| 1.524 AU|
|Jupiter| 5.203 AU|
| Saturn| 9.546 AU|
| Uranus|19.218 AU|
|Neptune|30.069 AU|
+-------+---------+



In [33]:
# Select only Tesla company
#             DataFrame                    -->
spark.sql("select * from my_first_table_view").filter("company = 'Tesla'").show()

+--------------------+-------+-------+-------+
|                  id|company|  model|  price|
+--------------------+-------+-------+-------+
|d99d3340-4b02-11e...|  Tesla|Model S|21000.0|
|d99bd3b0-4b02-11e...|  Tesla|Model S|20000.0|
+--------------------+-------+-------+-------+



In [27]:
# Equivalent to the above but in pure SQL
spark.sql("select * from my_first_table_view where company = 'Tesla'").show()

+--------------------+-------+-------+-------+
|                  id|company|  model|  price|
+--------------------+-------+-------+-------+
|d99bd3b0-4b02-11e...|  Tesla|Model S|20000.0|
|d99d3340-4b02-11e...|  Tesla|Model S|21000.0|
+--------------------+-------+-------+-------+



In [28]:
# Select all data from the view and convert it to Pandas DataFrame
spark.sql("select * from my_first_table_view").toPandas()

Unnamed: 0,id,company,model,price
0,d99d3340-4b02-11ee-8fb7-47776a2dd8a7,Tesla,Model S,21000.0
1,d99da870-4b02-11ee-8fb7-47776a2dd8a7,Oldsmobile,Model 6C,135000.0
2,d99bd3b0-4b02-11ee-8fb7-47776a2dd8a7,Tesla,Model S,20000.0


In [29]:
# View data as a table and select only Tesla company
df = spark.sql("select * from my_first_table_view")
df.filter(df.company == 'Tesla').toPandas() # Equivalent to "company = 'Tesla'"


Unnamed: 0,id,company,model,price
0,d99bd3b0-4b02-11ee-8fb7-47776a2dd8a7,Tesla,Model S,20000.0
1,d99d3340-4b02-11ee-8fb7-47776a2dd8a7,Tesla,Model S,21000.0


In [30]:
# Filter also on price > 20000
df.filter((df.company == 'Tesla') & (df.price > 20000)).toPandas()

Unnamed: 0,id,company,model,price
0,d99d3340-4b02-11ee-8fb7-47776a2dd8a7,Tesla,Model S,21000.0


### Aggregation, grouping and filtering
- These can be combined in many ways.
- Starting from the left.
- Order is important.

In [32]:
# Aggregate prices by company and sort by company name
df.groupBy("company").agg({"price": "avg"}).orderBy('company').toPandas()

Unnamed: 0,company,avg(price)
0,Oldsmobile,135000.0
1,Tesla,20500.0


### Write data to Cassandra
- One can append or overwrite data in existing database tables.
- PySpark is picky regarding data formats.
    - Reading data from the existing table and extracting formatting is possible.

In [66]:
# Create two new cars in a Pandas DataFrame
import pandas as pd
newCars = pd.DataFrame([[459, 'Ford', 'Escort'], [460, 'Ford', 'Transit']], columns=['ind', 'company', 'model'])
newCars

Unnamed: 0,ind,company,model
0,459,Ford,Escort
1,460,Ford,Transit


In [67]:
# Convert the Pandas DataFrame to Spark DataFrame and save it to Cassandra (append mode)
spark.createDataFrame(newCars).write.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").mode("append").save()

In [68]:
# Check if the new cars are in the table
spark.read.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").load().createOrReplaceTempView("my_first_table_view2")
spark.sql("select * from my_first_table_view2").toPandas()

Unnamed: 0,ind,company,model
0,3,Polestar,3
1,460,Ford,Transit
2,459,Ford,Escort
3,1,Tesla,Model S
4,2,Tesla,Model 3


In [21]:
# Stop Spark session
try:
    spark.stop()
except ConnectionRefusedError:
    print("Spark session already stopped.")

## Resources
- [PySpark Tutorial For Beginners (sparkbyexample.com)](https://sparkbyexamples.com/pyspark-tutorial/)
- [PySpark documentation](https://spark.apache.org/docs/latest/api/python/index.html)
    - [PySpark DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
    - [PySpark SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
- [YouTube: PySpark Tutorial: Spark SQL & DataFrame Basics](https://youtu.be/3-pnWVWyH-s?si=5AfOao23gqgh19en) (17m:12s)