# Wikipedia definition

Apache Spark is an open-source unified analytics engine for large-scale data processing.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, collect_list, rank
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, StringType



In [24]:
spark = SparkSession.builder.appName("DockerizedDBDemo").getOrCreate()

# PostgreSQL connection details
db_url = "jdbc:postgresql://postgres:5432/mydatabase"
db_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

# Read data from PostgreSQL
employees_df = spark.read.jdbc(url=db_url, table="employees", properties=db_properties)
employees_df.show()

+---+-------+---+-----------+--------------------+
| id|   name|age| department|              skills|
+---+-------+---+-----------+--------------------+
|  1|  Alice| 30|Engineering|     Python,Java,SQL|
|  2|    Bob| 25|      Sales|Communication,Neg...|
|  3|Charlie| 40|  Marketing|SEO,Content Marke...|
|  4|  David| 28|Engineering|        Java,C++,AWS|
|  5|    Eve| 35|      Sales|CRM,Salesforce,Le...|
+---+-------+---+-----------+--------------------+



In [22]:
# 1. Explode and analyze skills data

# Assume a new 'skills' column with comma-separated values
split_employees_df = employees_df.withColumn("skills", split(col("skills"), ","))

# Explode the skills array into individual rows
exploded_employees_df = split_employees_df.select(col("name"), explode(col("skills")).alias("skill"))
exploded_employees_df.show()

# Count the occurrences of each skill
skill_counts = exploded_employees_df.groupBy("skill").count().orderBy(col("count").desc())

skill_counts.show()

+-------+-----------------+
|   name|            skill|
+-------+-----------------+
|  Alice|           Python|
|  Alice|             Java|
|  Alice|              SQL|
|    Bob|    Communication|
|    Bob|      Negotiation|
|Charlie|              SEO|
|Charlie|Content Marketing|
|Charlie|     Social Media|
|  David|             Java|
|  David|              C++|
|  David|              AWS|
|    Eve|              CRM|
|    Eve|       Salesforce|
|    Eve|  Lead Generation|
+-------+-----------------+

+-----------------+-----+
|            skill|count|
+-----------------+-----+
|             Java|    2|
|              SEO|    1|
|              AWS|    1|
|              C++|    1|
|       Salesforce|    1|
|     Social Media|    1|
|  Lead Generation|    1|
|              SQL|    1|
|           Python|    1|
|    Communication|    1|
|Content Marketing|    1|
|              CRM|    1|
|      Negotiation|    1|
+-----------------+-----+



In [25]:
# 2. Window function to rank employees within departments

# Define a window specification partitioned by department and ordered by age
window_spec = Window.partitionBy("department").orderBy(col("age").desc())

# Add a rank column based on age within each department
ranked_df = employees_df.withColumn("rank_in_dept", rank().over(window_spec))

ranked_df.show()

+---+-------+---+-----------+--------------------+------------+
| id|   name|age| department|              skills|rank_in_dept|
+---+-------+---+-----------+--------------------+------------+
|  1|  Alice| 30|Engineering|     Python,Java,SQL|           1|
|  4|  David| 28|Engineering|        Java,C++,AWS|           2|
|  3|Charlie| 40|  Marketing|SEO,Content Marke...|           1|
|  5|    Eve| 35|      Sales|CRM,Salesforce,Le...|           1|
|  2|    Bob| 25|      Sales|Communication,Neg...|           2|
+---+-------+---+-----------+--------------------+------------+



In [26]:
# 3. User-defined function (UDF) to categorize employees

# Define a UDF to categorize employees based on age
def categorize_pension_messaging(age):
    if age < 30:
        return "NewJoiners"
    elif age < 50:
        return "MidCareer"
    else:
        return "LateCareer"

# Register the UDF
categorize_age_udf = spark.udf.register("categorize_pension_messaging", categorize_pension_messaging)

# Apply the UDF to create a new 'age_category' column
categorized_df = employees_df.withColumn("pension_category", categorize_age_udf(col("age")))

categorized_df.show()

+---+-------+---+-----------+--------------------+----------------+
| id|   name|age| department|              skills|pension_category|
+---+-------+---+-----------+--------------------+----------------+
|  1|  Alice| 30|Engineering|     Python,Java,SQL|       MidCareer|
|  2|    Bob| 25|      Sales|Communication,Neg...|      NewJoiners|
|  3|Charlie| 40|  Marketing|SEO,Content Marke...|       MidCareer|
|  4|  David| 28|Engineering|        Java,C++,AWS|      NewJoiners|
|  5|    Eve| 35|      Sales|CRM,Salesforce,Le...|       MidCareer|
+---+-------+---+-----------+--------------------+----------------+



In [None]:
spark.stop()


# Key takeaways on Spark

- Offers a series of utility methods for working with data. (Obvious Hyble use case - parsing out helpful data from the order JSON). 
- Can be used for any stage of ETL pipelines.
- Optimised to run in a distributed cluster, doing as much processing in-memory as possible. Therefore fast and scalable.
- Generally a bit easier to get started with than Hadoop/MapReduce.