# Tasks

In [1]:
import datetime
import random
import re
import string
from time import sleep

import pyspark
from IPython.display import clear_output, display
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    avg,
    col,
    collect_list,
    count,
    desc,
    explode,
    from_json,
    lit,
    max,
    min,
    monotonically_increasing_id,
    udf,
    window,
)
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.types import (
    BooleanType,
    FloatType,
    IntegerType,
    StringType,
    StructType,
    TimestampType,
)

Create SparkSession in a local setup. Select your own application name. Add the `org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1` library using the `config` function during a Spark session initialization.

In [2]:
# Create Spark Session
spark = (
    SparkSession.builder.master("local[1]")
    .appName("Tutorial App")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")
    .getOrCreate()
)

# Get Spark Context
print(spark.sparkContext)

<SparkContext master=local[1] appName=Tutorial App>


### Spark Batch Processing

1. Read the data stream from file `data/titanic.csv`. Define your own schema based on information about the dataset: https://www.kaggle.com/competitions/titanic/data. Dataset was modified to include the `Timestamp` column and update schema accordingly. Display schema and the first five columns.

**Don't overwrite the original DataFrame in the following exercises unless specified otherwise.**

In [3]:
titanic_schema = (
    StructType()
    .add("PassengerId", IntegerType())
    .add("Survived", IntegerType())
    .add("Pclass", IntegerType())
    .add("Name", StringType())
    .add("Sex", StringType())
    .add("Age", IntegerType())
    .add("SibSp", IntegerType())
    .add("Parch", IntegerType())
    .add("Ticket", StringType())
    .add("Fare", FloatType())
    .add("Cabin", StringType())
    .add("Embarked", StringType())
    .add("Timestamp", TimestampType())
)

df_batch = spark.read.csv("./data/titanic.csv", schema=titanic_schema)
display(df_batch.show(5))
df_batch.printSchema()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35| 

None

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



2. Selecting columns. For every exercise, display the first 5 rows of created DataFrame.
    * Select the `PassengerId` and `Name` columns using built-in `col` function. Add aliases for those columns: `id` and `fullname`.
    * Select all columns between the third and fifth (including those two).
    * Select columns whose names end with `-ed` (use regular expressions).

In [4]:
# 1
display(
    df_batch.select(col("PassengerId").alias("id"), col("Name").alias("fullname")).show(
        5
    )
)

# 2
display(df_batch.select(df_batch.columns[2:5]).show(5))

# 3
display(df_batch.select(df_batch.colRegex("`^.*ed`")).show(5))

+---+--------------------+
| id|            fullname|
+---+--------------------+
|  1|Braund, Mr. Owen ...|
|  2|Cumings, Mrs. Joh...|
|  3|Heikkinen, Miss. ...|
|  4|Futrelle, Mrs. Ja...|
|  5|Allen, Mr. Willia...|
+---+--------------------+
only showing top 5 rows



None

+------+--------------------+------+
|Pclass|                Name|   Sex|
+------+--------------------+------+
|     3|Braund, Mr. Owen ...|  male|
|     1|Cumings, Mrs. Joh...|female|
|     3|Heikkinen, Miss. ...|female|
|     1|Futrelle, Mrs. Ja...|female|
|     3|Allen, Mr. Willia...|  male|
+------+--------------------+------+
only showing top 5 rows



None

+--------+--------+
|Survived|Embarked|
+--------+--------+
|       0|       S|
|       1|       C|
|       1|       S|
|       1|       S|
|       0|       S|
+--------+--------+
only showing top 5 rows



None

3. Columns transformations
    * Create an additional column `Constant` with the same string value `empty` in every row.
    * Create an additional column `CustomId` with new unique ids. Use `monotonicallyIncreasingId` function.
    * Rename column `Pclass` to `Class`.
    * Remove the `Cabin` and `Name` columns.

In [5]:
# 1
display(df_batch.withColumn("Constant", lit("empty")).show(5))

# 2
display(df_batch.withColumn("CustomId", monotonically_increasing_id()).show(5))

# 3
display(df_batch.withColumnRenamed("Pclass", "Class").show(5))

# 4
display(df_batch.drop("Cabin", "Name").show(5))

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|Constant|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|   empty|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|   empty|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|   empty|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|   empty|

None

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|CustomId|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|       0|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|       1|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|       2|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|       3|

None

+-----------+--------+-----+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Class|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+-----+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|          1|       0|    3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|2020-01-01 13:45:25|
|          2|       1|    1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|    3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|       1|    1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
|          5|       0|    3|Allen, Mr. Willia...|  male| 35|    0|   

None

+-----------+--------+------+------+---+-----+-----+----------------+-------+--------+-------------------+
|PassengerId|Survived|Pclass|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Embarked|          Timestamp|
+-----------+--------+------+------+---+-----+-----+----------------+-------+--------+-------------------+
|          1|       0|     3|  male| 22|    1|    0|       A/5 21171|   7.25|       S|2020-01-01 13:45:25|
|          2|       1|     1|female| 38|    1|    0|        PC 17599|71.2833|       C|2020-01-01 13:44:48|
|          3|       1|     3|female| 26|    0|    0|STON/O2. 3101282|  7.925|       S|2020-01-01 13:38:11|
|          4|       1|     1|female| 35|    1|    0|          113803|   53.1|       S|2020-01-01 13:32:00|
|          5|       0|     3|  male| 35|    0|    0|          373450|   8.05|       S|2020-01-01 13:36:30|
+-----------+--------+------+------+---+-----+-----+----------------+-------+--------+-------------------+
only showing top 5 rows



None

4. Filtering data - **modify original DataFrame in every step**
    * Select only passengers older than 25 years.
    * Based on min and max `Timestamp` column values (use the `collect` function to retrieve values from all nodes and save those in separate variables for filtering purposes). Select only rows that "started" 3 minutes after the first timestamp and 3 minutes before the last timestamp.
    
**Reminder**: `collect` function is used to retrieve all elements from nodes to the driver node. Be cautious with using this function on big datasets. It might cause `OutofMemoryError`.

In [6]:
df_batch = df_batch.filter(col("Age") > 25)  # where() == filter()
df_batch.show(3)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|2020-01-01 13:32:00|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
only showing top 3 rows



In [7]:
min_boundary, max_boundary = df_batch.agg(min("Timestamp"), max("Timestamp")).collect()[
    0
]
print(min_boundary, max_boundary)

time_change = datetime.timedelta(0, 3 * 60)

df_batch = df_batch.filter(
    col("Timestamp").between(min_boundary + time_change, max_boundary - time_change)
)
df_batch.show(3)

2020-01-01 13:30:01 2020-01-01 13:49:55
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|2020-01-01 13:36:30|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+
only showing top 3 row

5. Use the `pivot` function to create a table showing the average age of <ins>survivors</ins> based on their sex and travel class (`Pclass`).

The result table should look like this:

![obraz.png](attachment:243d33cc-5460-44a9-bbe6-8d743a06c5f6.png)

In [8]:
df_batch.filter(col("Survived") == 1).groupBy("Pclass").pivot("Sex").avg("Age").show()

+------+------------------+------------------+
|Pclass|            female|              male|
+------+------------------+------------------+
|     1| 41.64102564102564|42.285714285714285|
|     3|32.833333333333336|              32.0|
|     2| 36.76923076923077|              36.0|
+------+------------------+------------------+



6. Explode array
    * Create a separate DataFrame with the average fare per travel class. Include the `Name` column that should be aggregated into a list of names during transformation. Use the `groupby` function.
    * Use DataFrame created in a previous step to explode the column with a list of names for the average fare. Should we expect the number of rows to be the same in the results DataFrame as in the original DataFrame before `groupby` transformation?

In [9]:
df_batch_ex6_1 = df_batch.groupby("Pclass").agg(avg("Fare"), collect_list("Name"))
df_batch_ex6_1.show()

+------+------------------+--------------------+
|Pclass|         avg(Fare)|  collect_list(Name)|
+------+------------------+--------------------+
|     1| 75.70833505060254|[Cumings, Mrs. Jo...|
|     3|12.067460939158565|[Heikkinen, Miss....|
|     2|19.789383561643834|[Hewlett, Mrs. (M...|
+------+------------------+--------------------+



In [10]:
df_batch_ex6_2 = df_batch_ex6_1.select(
    col("avg(Fare)"), explode(col("collect_list(Name)"))
)

assert (
    df_batch.count() == df_batch_ex6_2.count()
)  # Original and exploded number of rows

df_batch_ex6_2.show(5)

+-----------------+--------------------+
|        avg(Fare)|                 col|
+-----------------+--------------------+
|75.70833505060254|Cumings, Mrs. Joh...|
|75.70833505060254|McCarthy, Mr. Tim...|
|75.70833505060254|Uruchurtu, Don. M...|
|75.70833505060254|Holverson, Mr. Al...|
|75.70833505060254| Icard, Miss. Amelie|
+-----------------+--------------------+
only showing top 5 rows



7. In this task we want to use the exploded table from a previous task and merge information about average fare into the original DataFrame. Use the `join` function with proper parameters. Name a new column `AvgFare` and remove the additional name column appended with the join operation.

In [11]:
df_batch = (
    df_batch.join(df_batch_ex6_2, col("Name") == col("col"), "left")
    .withColumnRenamed("avg(Fare)", "AvgFare")
    .drop("col")
)

df_batch.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|           AvgFare|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+------------------+
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48| 75.70833505060254|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|12.067460939158565|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|2020-01-01 13:36:30|12.067460939158565|
|          7|       0|     1|McCarthy, Mr. Tim...|  male| 54|    0|    0|         

8. Register DataFrame as a SQL temporary view using the `createOrReplaceTempView` function. Write SQL query selecting only the `PassengerId` and `Name` columns.

9. [Optional] You can register more SQL temporary views and try other SQL queries like joining and grouping.

In [12]:
df_batch.createOrReplaceTempView("titanic")

spark.sql("SELECT PassengerId, Name FROM titanic").show(5)

+-----------+--------------------+
|PassengerId|                Name|
+-----------+--------------------+
|          2|Cumings, Mrs. Joh...|
|          3|Heikkinen, Miss. ...|
|          5|Allen, Mr. Willia...|
|          7|McCarthy, Mr. Tim...|
|          9|Johnson, Mrs. Osc...|
+-----------+--------------------+
only showing top 5 rows



In [13]:
# Optional example
df_batch_ex6_2.createOrReplaceTempView("titanic_ex6")

spark.sql(
    """
    SELECT T1.PassengerId, T1.Name, T2.col AS Name2
    FROM titanic AS T1
    LEFT JOIN titanic_ex6 AS T2 ON T1.Name = T2.col
    """
).show(5)

+-----------+--------------------+--------------------+
|PassengerId|                Name|               Name2|
+-----------+--------------------+--------------------+
|          2|Cumings, Mrs. Joh...|Cumings, Mrs. Joh...|
|          3|Heikkinen, Miss. ...|Heikkinen, Miss. ...|
|          5|Allen, Mr. Willia...|Allen, Mr. Willia...|
|          7|McCarthy, Mr. Tim...|McCarthy, Mr. Tim...|
|          9|Johnson, Mrs. Osc...|Johnson, Mrs. Osc...|
+-----------+--------------------+--------------------+
only showing top 5 rows



10. Use the `map` function to create a new string column from the combined `Name`, `Sex`, and `Fare` columns. Show a few first elements of a new column. You have to convert DataFrame to RDD before using the `map` function.

In [14]:
rdd_column = df_batch.rdd.map(
    lambda x: f"{x['Name']} - {x['Sex']} - {x['Fare']}"
).collect()

rdd_column[:5]

['Cumings, Mrs. John Bradley (Florence Briggs Thayer) - female - 71.2833023071289',
 'Heikkinen, Miss. Laina - female - 7.925000190734863',
 'Allen, Mr. William Henry - male - 8.050000190734863',
 'McCarthy, Mr. Timothy J - male - 51.86249923706055',
 'Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg) - female - 11.133299827575684']

11. Create UDF that will combine and transform the `Ticket`, `Cabin`, and `Embarked` columns.
    * Extract the last number from the `Ticket` column. If there isn't any number use `not_found`.
    * Convert `Cabin` letters to lowercase. If there isn't assigned cabin use `not_defined`.
    * Use full names for the `Embarked` column (use provided dictionary with mapping). If there isn't assigned embarkation location use `not_defined`.

Use UDF to merge mentioned columns into one (separated with a semicolon). Overwrite base DataFrame and display a few first rows.

In [15]:
EMBARKED_MAPPING = {"C": "Cherbourg", "Q": "Queenstown", "S": "Southampton"}

In [16]:
def custom_convert(ticket: str, cabin: str, embarked: str) -> str:
    try:
        found_numbers = re.findall(r"\d+", ticket)[-1]
    except IndexError:
        found_numbers = "not_found"
    return (
        f"{found_numbers};"
        f"{cabin.lower() if cabin else 'not_defined'};"
        f"{EMBARKED_MAPPING[embarked] if embarked else 'not_defined'}"
    )


custom_covert_udf = udf(custom_convert, StringType())

df_batch = df_batch.withColumn(
    "Result", custom_covert_udf("Ticket", "Cabin", "Embarked")
)
df_batch.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+------------------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|          Timestamp|           AvgFare|              Result|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-------------------+------------------+--------------------+
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|2020-01-01 13:44:48| 75.70833505060254| 17599;c85;Cherbourg|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|2020-01-01 13:38:11|12.067460939158565|3101282;not_defin...|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|2020-01-01 13:36:

12. Save to Parquet. Do you notice any changes in the saved file structure?

If you will get an error here, usually it means that previous transformations on data can't be applied to all rows. Spark uses lazy evaluation, you have to force Spark to make calculations on all datasets (for example by using the `collect` function) unless of course you are doing transformations that must be calculated based on the whole dataset (`mean`, `min`, `max`, `groupby`).

To try to solve the problem revisit the previous point with UDF row-wise transform.

In [17]:
df_batch.write.parquet("output.parquet")

### Spark Streaming

1. Read the `titanic.csv` file as a data stream from the `data/` directory. Reuse previously implemented schema.

In [18]:
df_stream = spark.readStream.option("sep", ",").schema(titanic_schema).csv("./data/")

df_stream.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



2. Create `writeStream` and pass a query to the `check_query` function to display transformed data. Check if all the data from CSV was correctly read. In the following tasks, you can reuse this to check the changes that you applied to your dataset. Remember to change query names between executions!

**Remember to stop the execution of the cell before moving to the next exercises (the `check_query` function is running in an infinite loop).**

In [19]:
def check_query(query: StreamingQuery, sleep_time: int = 5) -> None:
    while True:
        clear_output(wait=True)
        display(query.status)
        display(spark.sql(f"SELECT * FROM {query.name}").show())
        sleep(sleep_time)

In [20]:
query = (
    df_stream.writeStream.outputMode("append")
    .format("memory")
    .queryName("".join(random.choices(string.ascii_uppercase, k=10)))
    .start()
)

In [21]:
# check_query(query)

3. From the dataset, select only passengers that:
    * did not survive the crash,
    * are older than 20 years,
    * had a third class tickets (`Pclass` column).

In [22]:
df_stream_ex3 = df_stream.filter(
    (col("Survived") == 0) & (col("Age") > 20) & (col("Pclass").isin([2, 3]))
)

In [23]:
# query = df_stream_ex3 \
#     .writeStream \
#     .outputMode("append") \
#     .format("memory") \
#     .queryName("".join(random.choices(string.ascii_uppercase, k=10))) \
#     .start()
# check_query(query)

4. Use DataFrame from the previous point. Group results by sex and port of embarkation (`Embarked` column). Count people, calculate average passenger fare and the maximum and minimum age for every group. Name every new column as `<aggregation function>_<original_column_name>`. Remember to use the `complete` output mode to check results for aggregation queries.

In [24]:
df_stream_ex4 = df_stream_ex3.groupBy("Sex", "Embarked").agg(
    count("PassengerId").alias("count_PassengerId"),
    avg("Fare").alias("avg_Fare"),
    min("Age").alias("min_Age"),
    max("Age").alias("max_Age"),
)

In [25]:
# query = df_stream_ex4 \
#     .writeStream \
#     .outputMode("complete") \
#     .format("memory") \
#     .queryName("".join(random.choices(string.ascii_uppercase, k=10))) \
#     .start()
# check_query(query)

5. Sort DataFrame from the previous exercise in descending order by the groups' keys.

In [26]:
df_stream_ex5 = df_stream_ex4.sort(desc("Sex"), desc("Embarked"))

In [27]:
# query = df_stream_ex5 \
#     .writeStream \
#     .outputMode("complete") \
#     .format("memory") \
#     .queryName("".join(random.choices(string.ascii_uppercase, k=10))) \
#     .start()
# check_query(query)

6. [Optional] Read about windowing and watermarking in Spark Streaming (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time). Use the same aggregation function from the fourth task and add tumbling windows with 5 min interval. What changed in results?

<img src="attachment:938f1999-ebc5-4ac8-a5da-e7b27350203a.png" alt="obraz.png" style="width:800px;"/>


In [28]:
df_stream_ex6 = (
    df_stream_ex3.groupBy(window(col("Timestamp"), "5 minutes"), "Sex", "Embarked")
    .agg(
        count("PassengerId").alias("count_PassengerId"),
        avg("Fare").alias("avg_Fare"),
        min("Age").alias("min_Age"),
        max("Age").alias("max_Age"),
    )
    .orderBy("Sex", "Embarked", "window")
)

In [29]:
# query = df_stream_ex6 \
#     .writeStream \
#     .outputMode("complete") \
#     .format("memory") \
#     .queryName("".join(random.choices(string.ascii_uppercase, k=10))) \
#     .start()
# check_query(query)