In [None]:
# We start by importing SparkContext and creating a context
# which we will use in class

# from pyspark import SparkContext
# sc = SparkContext()

In [None]:
from itertools import chain

In [None]:
x = [[1,2,3], [4,5,6], [7,8]]
x.append([9,10])
y = chain(*x)
list(y)

### Core RDD Functions in Spark
* Beyond basic functions like `map`, `flatmap`, `filter`, and `reduce`, Spark offers a variety of other essential RDD methods (both transformations and actions).
* These functions enable easier and more efficient programming in the map-reduce paradigm.
* Some transformative examples include:
    * `distinct`: Yields an RDD with the unique elements from the source RDD.
    * `union`: Combines two RDDs.
    * `intersection`: Identifies common elements between two RDDs.
    * `foreach`: Applies a lambda function to each RDD element without returning any value.
      * Similar to map but without returning results.
    * `cartesian`: Generates the cartesian product of the given RDD.
    * And many more...


### Core RDD Functions in Spark - Cont'd

* Spark also provides a range of actions for RDD, such as:
    * `count`: Gives the total number of RDD elements.
    * `sum`: Calculates the total sum of RDD elements (requires numeric data).
    * `mean`: Determines the average of RDD elements (requires numeric data).
    * `stats`: Returns comprehensive statistics, including count, mean, stdev, max, and min.


In [None]:
# Distinct example 
dataset_1 = sc.parallelize(["A", "B", "C", "A", "C", "D"])
dataset_1.distinct().collect()

In [None]:
dir(dataset_1)

In [None]:
dataset_1.getNumPartitions()

In [None]:
# Union example

dataset_1 = sc.parallelize(["A", "B", "C"])
dataset_2 = sc.parallelize(["D", "E", "F"])

both_datasets = dataset_1.union(dataset_2)

both_datasets.collect()

In [None]:
# intersection example
dataset_1 = sc.parallelize(["A", "B", "C"])
dataset_2 = sc.parallelize(["N", "B", "C", "M"])
dataset_1.intersection(dataset_2).collect()

In [None]:
dataset_1.collect()

In [None]:
# foreach example - what doesn't work!

out = dataset_1.foreach(lambda x: print(x))
out


In [None]:
type(out)

In [None]:
# foreach example - what works!
dataset_1.foreach(lambda x: print(x + "-suffix"))

In [None]:
modified_rdd = dataset_1.map(lambda x: x + "-suffix")    
modified_rdd.collect()

In [None]:
# cartesian example
dataset_1 = sc.parallelize(["A", "B"])

dataset_2 = sc.parallelize(["D", "E", "F"])

dataset_1.cartesian(dataset_2).collect()

In [None]:
# sum, mean axamples

dataset_1 = sc.parallelize([1, 2, 3, 4, 5, 6])

(dataset_1.sum(), dataset_1.mean())

In [None]:
# stats example

dataset_1.stats()

In [None]:
type([1, [1,2,3,4,5]])

### Essential RDD Functions for Tuples

* Remember, key-value pairs (tuples) are a core component in the Map Reduce paradigm.

  * e.g. `[("THE", 12), ("HI", 2), ("COURSE", 2), ("STUDENTS", 3), ... ]`
  * Spark offers a collection of methods tailored for these tuples, both as transformations and actions.

* Transformations on Tuples:
    * `sortByKey`: yields a new RDD ordered by its keys.
    * `reduceByKey`: applies a function `f` to combine values by key, producing a new RDD.
    * `groupByKey`: Creates a new RDD where values are assembled under their respective keys.
    * `join`: Generates a new RDD by pairing values with matching keys from two datasets.
        * Variants include: `leftOuter`, `rightOuter`, `fullOuter` joins.

* Actions for Tuples:
  * General actions like count are adaptable to any data type.
  * To perform arithmetic actions on a tuple RDD, you can first transform it using map

In [None]:
# sortByKey example

data_1  = sc.parallelize([("C", 12), ("D", 2), ("A", 2), ("B", 3)])
data_1.sortByKey().collect()


In [None]:
type(range(0,5))

In [None]:
for i in range(1,1000000000000000000000000000000000000000000000000000000000000000000):
    print(i)
    break


In [None]:
range(1,10)

In [None]:
for i in range(1,10):
    print(i)


In [None]:
# groupByKey example

data_1  = sc.parallelize([("A", 12), ("A", 2), ("C", 2), ("B", 3), ("C", 3), ("A", 5)])
grouped_data = data_1.groupByKey().collect()
grouped_data


In [None]:
for key, val in grouped_data:
    print(f"{key}\t{list(val)}")


In [None]:
# reduceByKey example

data_1  = sc.parallelize([("A", 12), ("A", 2), ("C", 2), ("B", 3), ("C", 3)])
data_1.reduceByKey(lambda x,y: x+y).collect()


In [None]:
# join exmaple

data_1  = sc.parallelize([("A", 1), ("A", 3), ("B", 4), ("C", 6), ("D", 11)          ])
data_2  = sc.parallelize([("A", 2),           ("B", 5), ("C", 7),           ("E", 11)])

data_1.join(data_2).collect()

In [None]:
# leftOuterJoin exmaple

data_1  = sc.parallelize([("A", 1), ("D", 3), ("B", 4), ("C", 6), ])
data_2  = sc.parallelize([("A", 2),           ("B", 5), ("C", 7), ])

data_1.leftOuterJoin(data_2).collect()

In [None]:
# rightOuterJoin exmaple

data_1  = sc.parallelize([("A", 1), ("D", 3), ("B", 4), ("C", 6), ])
data_2  = sc.parallelize([("A", 2),           ("B", 5), ("C", 7), ])

data_1.rightOuterJoin(data_2).collect()

In [None]:
# apply non tuple actions  by first using map
# extract the values

data_1  = sc.parallelize([("A", 1), ("A", 3), ("B", 4), ("C", 6)])
data_1.map(lambda x: x[1]).sum()

### Conclusion
* An RDD (Resilient Distributed Dataset) is an immutable distributed collection of various data objects.
  * This can include lines, tuples, JSON objects, and more.

* RDDs are distributed across multiple nodes, enabling parallel operations through a low-level API.
  * Any transformation on an RDD yields a new RDD.
  * To explore the myriad of methods available for RDDs, use the `dir` function or consult documentation or cheatsheet.

* Importantly, RDDs don't enforce a specific data structure.
  * For instance, you can have:
```python
sc.parallelize([("A", 1), {"First": "John", "Salary": 125_000}, ("B", 4), ("C", 6), ])
```
  * This is not a desired situation when working with structured data.

* Spark also offers a data structure that enforces data validation and structure.
   * This aids in optimizing data operations more effectively.


### Dive Into Spark DataFrame
* Spark DataFrames in pySpark represent immutable, distributed collections of data neatly organized into named columns.
  * They can be visualized as tables in relational databases or analogous to Pandas's DataFrame.
  * Facilitates querying either via SQL or Python-style syntax.

* Example using SQL:

```SQL
session.sql("SELECT * from users WHERE age < 21")
```

* Example using Python-style:

```
users.filter(users.age < 21)
```

* Given their structured nature, DataFrames enable enhanced optimizations internally.
  * They can be created from various sources, including:
    * Structured data files (json, csv, etc.)
    * Parquet Files
    * External databases
    * Pre-existing RDDs
* While RDDs are seen as collections of objects, DataFrames can be understood as collections of rows (instances).
  * Similar to Pandas DataFrames

### DataFrame Operations in Spark

* Spark offers a variety of high-level functions tailored for DataFrames.
  * Rooted in the map-reduce model, these functions address common tasks efficiently.
  * While they serve as shortcuts, remember they're underpinned by core functions: map, flatmap, filter, and reduce.
* While SparkContext is essential for RDDs, DataFrames rely on SparkSession.
  * Tasks such as creating, registering, and executing SQL queries necessitate SparkSession.
* Multiple methods are available to instantiate a DataFrame:
  * From a csv: Each row is an object.
  * From a json: Every record is as an object.
    * Ensure every line contains a distinct, valid JSON object.
      * This format used in assignment 1.
    * From a text file: Here too, each row becomes an object.


### Understanding Schemas
* Schemas outline the data type structure of your fields.
* They play an important role in Spark's optimization processes.
* Schemas are essential in achieving the right in-memory compression.
  * Post-compression, the data size in memory might be more compact than its uncompressed counterpart on disk.

In [None]:
# from pyspark import SparkContext
# sc = SparkContext()
from pyspark.sql import SparkSession
session = SparkSession(sc)

In [None]:
text_df  = session.read.text('dbfs:/FileStore/pride_and_prejudice.txt')
print(text_df.count())
text_df.first()

In [None]:
text_df

In [None]:
list(range(0,10))

In [None]:
%%time

csv_df = session.read.csv("dbfs:/FileStore/flight_info.csv", header=True)
print(csv_df.count())
csv_df.head(1)


In [None]:
csv_df.schema

* In Spark's StructField, the third parameter indicates whether the field is nullable or not.
  * I.e., whether the field can be null or not.

```python
StructType(
	List(StructField(year,StringType,true),
		StructField(month,StringType,true),
		StructField(day,StringType,true),
		StructField(dep_time,StringType,true),
		StructField(dep_delay,StringType,true),
		StructField(arr_time,StringType,true),
		StructField(arr_delay,StringType,true),
		StructField(carrier,StringType,true),
		StructField(tailnum,StringType,true),
		StructField(flight,StringType,true),
		StructField(origin,StringType,true),
		StructField(dest,StringType,true),
		StructField(air_time,StringType,true),
		StructField(distance,StringType,true),
		StructField(hour,StringType,true),
		StructField(minute,StringType,true)
	)
)
```


In [None]:
%%time

csv_df = session.read.options(inferSchema = True).csv("dbfs:/FileStore/flight_info.csv", header=True)

print(csv_df.count())

csv_df.head(2)


In [None]:
csv_df.schema

```
StructType(
	List(
		StructField(year,IntegerType,true),
		StructField(month,IntegerType,true),
		StructField(day,IntegerType,true),
		StructField(dep_time,IntegerType,true),
		StructField(dep_delay,IntegerType,true),
		StructField(arr_time,IntegerType,true),
		StructField(arr_delay,IntegerType,true),
		StructField(carrier,StringType,true),
		StructField(tailnum,StringType,true),
		StructField(flight,IntegerType,true),
		StructField(origin,StringType,true),
		StructField(dest,StringType,true),
		StructField(air_time,IntegerType,true),
		StructField(distance,IntegerType,true),
		StructField(hour,IntegerType,true),
		StructField(minute,IntegerType,true)
	)
)
```

In [None]:
json_df  = session.read.json('dbfs:/FileStore/random_user_dicts.json')


In [None]:
json_df.show()

In [None]:
print(json_df.count())


In [None]:
json_df.printSchema()


In [None]:
json_df.select("first_name").take(10)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

json_struct = StructType([
    StructField("first_name", StringType(), nullable=False, metadata=None),
    StructField("last_name", StringType(),  nullable=False, metadata=None),
    StructField("lat_long", 
                StructType([
                    StructField("latitude", FloatType(), metadata=None, nullable=True),
                    StructField("longitude", FloatType(), metadata=None, nullable=True)
                ]), nullable=True, metadata=None),
    StructField("state", StringType(),  nullable=True, metadata=None),
    StructField("user_id", StringType(),  nullable=True, metadata=None),
    StructField("zip", StringType(),  nullable=True, metadata=None),    
])


In [None]:
import pyspark
dir(pyspark.sql.types)

In [None]:
import pprint
import json

json_df  = session.read.schema(json_struct).json('dbfs:/FileStore/random_user_dicts.json')
print(json_df.count())
json_df.printSchema()


In [None]:
# Note the lat_long field. It has its own format
json_df.head(1)

### Insights into Data Formats: Focus on Avro

* For stable datasets, structured data formats like CSV/TSV are commonly used.
  * However, data often evolves, affecting dataset structure over time.
  * Avro, a row-based data serialization format, offers flexibility for evolving schemas.

* Recall that Avro excels at handling complex data structures:
  * Supports nested data and hierarchical structures efficiently.
  * Example Avro schema for complex user data:
```json
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "employed", "type": ["boolean", "null"]},
    {"name": "salary", "type": ["long", "null"]},
    {"name": "cars", "type": {"type": "array", "items": "string"}},
    {"name": "children", "type": {
      "type": "map",
      "values": {
        "type": "record",
        "name": "Child",
        "fields": [
          {"name": "age", "type": "int"},
          {"name": "school", "type": "string"}
        ]
      }
    }}
  ]
}
```

* Avro advantages over JSON:
  * Compact binary format: Reduces storage requirements and improves read/write performance.
  * Schema evolution: Allows adding, removing, or changing fields without full data rewrite.
  * Built-in compression: Supports various compression codecs for further storage optimization.

* Avro in data processing:
  * Natively supported by Apache Spark for efficient reading and writing.
  * Enables schema-on-read, allowing for flexible data interpretation.
  * Facilitates easy conversion to Spark DataFrames for SQL-like querying.

* Considerations when using Avro:
  * Requires schema management, but this ensures data consistency and enables validation.
  * May require additional processing time for serialization/deserialization compared to raw text formats.
  * Excellent for systems with frequent reads, as the schema is only transmitted once per file.

* Apache Spark's integration with Avro allows for seamless processing of complex, evolving datasets while maintaining efficiency and schema flexibility.


### SQL Capabilities in Apache Spark

* SQL functionality is deeply integrated with Spark DataFrames
    * Supports both DataFrame API and SQL syntax for data manipulation

* SQL queries can be applied directly to DataFrames

* Two main approaches for SQL-like operations in Spark:
    1. DataFrame API methods (e.g., `.select()`, `.filter()`)
    2. Spark SQL queries using `spark.sql()`

* For complex queries or named references:
    * Register a temporary view with `.createOrReplaceTempView()`
    * Use `spark.sql()` method for querying

* All queries (DataFrame API or SQL) benefit from optimization
    * Utilizes the Catalyst optimizer for query planning and execution
    * [Catalyst Optimizer Details](https://databricks.com/glossary/catalyst-optimizer)

* Important to remember:
    * DataFrame operations are immutable
      * Each operation returns a new DataFrame
    * To add or modify columns, create a new DataFrame


### SQL with Spark DataFrames
```python
# Using DataFrame API
result = df.select("column1").filter("column2 > 10")

# Using SQL with temporary view
df.createOrReplaceTempView("my_table")
result = spark.sql("SELECT column1 FROM my_table WHERE column2 > 10")

# Using SQL directly on DataFrame (Spark 3.0+)
result = spark.sql("SELECT column1 FROM {df} WHERE column2 > 10")
```

## Example of Query Optimization

* consider the following

```python
customers = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie"),
    (4, "David")
], ["customer_id", "name"])

orders = spark.createDataFrame([
    (1, 100.0),
    (2, 150.0),
    (3, 200.0),
    (1, 120.0),
    (4, 80.0)
], ["customer_id", "amount"])

# Scenario 1: Join then Filter
def join_then_filter():
    result = customers.join(orders, "customer_id") \
                      .filter(col("amount") > 100)

# Scenario 2: Filter then Join
def filter_then_join():
    filtered_orders = orders.filter(col("amount") > 100)
    result = customers.join(filtered_orders, "customer_id")
```

# Example of query optimization - cont'd
![](https://www.dropbox.com/scl/fi/bnkxkbfrtotpvgfd5ybxd/unoptimized_optimized.png?rlkey=m4goppj4l2ny2r1g9uwippbh2&dl=1)
    

In [None]:
json_df.createTempView("users")

session.sql("""
SELECT first_name, COUNT(*)
FROM users
GROUP BY first_name; 
""").show()

In [None]:
session.sql("""
SELECT *
FROM users
WHERE first_name IN ("Evan", "Sarah", "John"); 
""").show()

### DataFrame and Select Queries

* The same functionality is available using Python
* Many additional functions, inlcuding analytics-specific ones are available through specific library

    ```from pyspark.sql import functions as F```
    * The functions as used with a select
  * Can use `agg` to do specific operations and rename columns.

In [None]:
json_df.filter(F.length(json_df.first_name) < 4).show()

In [None]:
json_df.groupby("first_name").count().show()

In [None]:
from pyspark.sql import functions as F
dir(F)

In [None]:
json_df.groupby("first_name").agg(F.count("first_name").alias('First Name Counts')).show()

### More on Optimization

1. **Caching Data**:
   * Caching is a powerful optimization technique in Spark that can significantly improve performance for iterative algorithms or repeated queries.
   * Cache table contents or query outputs to expedite repeated data access.
2. **Spark's Tungsten compression**.
  * Cached data, especially with Tungsten compression, can occupy less RAM compared to disk storage. 
    * It can make cached data occupy less memory than the original on-disk format.
    * Employ Lazy Caching to cache data as needed and utilize `UNCACHE` to free up space for caching other DataFrames.

4. **Adjusting DataFrame Partitioning**:
   * Narrow Operations (e.g., `COUNT`): Operate independently on each partition, no data shuffle required.
   * Wide Operations (e.g., `GROUPBY`): Require data shuffling as they need data from multiple partitions.
   * Minimize data shuffling by tuning the number of partitions; excessive shuffling can lead to performance bottlenecks.

5. **Additional Optimizations**:
   * Explore further optimizations in Spark's [Performance Tuning documentation](https://spark.apache.org/docs/latest/sql-performance-tuning.html).


In [None]:
# session.catalog.clearCache()
# df = session.range(0, 5_000_000).withColumn("square", F.col("id") * F.col("id"))

# def with_caching():
#     df.cache()  
#     df.count()  # I want to force caching to complete before moving to next chunk of code
    
#     start_time = time.time()
#     for _ in range(10):  
#         result1 = df.filter(F.col("square") > 1000000).count()
#         result2 = df.filter(F.col("square") < 100).count()
#     return time.time() - start_time

# def without_caching():
#     start_time = time.time()
#     for _ in range(10):  
#         result1 = df.filter(F.col("square") > 1000000).count()
#         result2 = df.filter(F.col("square") < 100).count()
#     return time.time() - start_time

# # Run the tests
# cache_time = with_caching()
# session.catalog.clearCache()  
# no_cache_time = without_caching()

# print(f"With caching: Time = {cache_time:.2f} seconds")
# print(f"Without caching: Time = {no_cache_time:.2f} seconds")

print("""

""")

## Title: Optimizing Spark Partitioning: E-commerce Sales Analysis

* Give an E-commerce website analyzing sales data: 
  * Large dataset with customer ID, product ID, sale amount, date
  * Goal is to calculate total sales per customer

* Default Partitioning:

* 100 partitions across 10-node cluster
  * Uneven customer distribution despite having the same number of rows.
  * Significant data shuffling required

* Optimized Partitioning:
  * Repartition by customer ID: 1000 partitions
  * Even distribution of customers per partition
  * 100 partitions per node

* Benefits:

  * Reduced data shuffling
  * Balanced workload across nodes
  * Improved data locality and parallelism

* Result:
  * Faster analysis
  * Efficient use of cluster resources
  * Scalable solution for large datasets