In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark = SparkSession.builder \
  .remote("sc://192.168.1.7:15002") \
  .appName("UDFTransformation") \
  .config("spark.sql.ansi.enabled", "false") \
  .config("spark.sql.execution.pythonUDF.arrow.enabled", "true") \
  .getOrCreate()

# limit() shows a nice HTML table in Jupyter, while show() prints plain text  
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

spark

<pyspark.sql.connect.session.SparkSession at 0x24595594cd0>

In [2]:
DataStreamReader = spark.readStream \
  .format("socket") \
  .option("host", "192.168.1.7") \
  .option("port", 9999)

linesDF = DataStreamReader.load()
lines_writer = linesDF.writeStream \
  .outputMode("append") \
  .format("memory") \
  .queryName("lines_table")

DataStreamReader

<pyspark.sql.connect.streaming.readwriter.DataStreamReader at 0x245955953f0>

In [3]:
def stop_queries():
  for query in spark.streams.active:
    query.stop()

# Task 1

In [None]:
@udf
def name_transform(sentence, first="André", last="Plancha"):
  words = sentence.split()
  return ','.join([f"{first}{word}{last}" for word in words])

transformedDF = linesDF.withColumn("value", name_transform(linesDF.value))


stop_queries()

task1_writer = transformedDF.writeStream.outputMode("append").format("memory").queryName("task1_table")
task1_query = task1_writer.start()

task1_writer

<pyspark.sql.connect.streaming.readwriter.DataStreamWriter at 0x1932daf3ee0>

```console
ssh plancha@192.168.1.7 -t /usr/bin/nc -lk 9999
plancha@192.168.1.7's password: 
Sweden is a good country
Finland is a better country
Denmark is an awesome country
I love programming in Spark
Data is the new oil
```

In [None]:
task1_query.stop()

spark.sql("SELECT * FROM task1_table").show(20, truncate=False)

+-----------------------------------------------------------------------------------------+
|value                                                                                    |
+-----------------------------------------------------------------------------------------+
|AndréSwedenPlancha,AndréisPlancha,AndréaPlancha,AndrégoodPlancha,AndrécountryPlancha     |
|AndréFinlandPlancha,AndréisPlancha,AndréaPlancha,AndrébetterPlancha,AndrécountryPlancha  |
|AndréDenmarkPlancha,AndréisPlancha,AndréanPlancha,AndréawesomePlancha,AndrécountryPlancha|
|AndréIPlancha,AndrélovePlancha,AndréprogrammingPlancha,AndréinPlancha,AndréSparkPlancha  |
|AndréDataPlancha,AndréisPlancha,AndréthePlancha,AndrénewPlancha,AndréoilPlancha          |
+-----------------------------------------------------------------------------------------+



# Task 2

In [41]:
stop_queries()

linesDF.createOrReplaceTempView("lines_table")

task2_DF = spark.sql("""
  from lines_table |>
  select value, split(value, ' ') as phrase |> 
  select explode(phrase) as word |>
  aggregate avg(length(word)) as average_word_length
""")
task2_writer = task2_DF.writeStream.outputMode("complete").format("memory").queryName("task2_table")
task2_query = task2_writer.start()
task2_writer

<pyspark.sql.connect.streaming.readwriter.DataStreamWriter at 0x245a7fc76d0>

```console
ssh plancha@192.168.1.7 -t /usr/bin/nc -lk 9999
plancha@192.168.1.7's password: 
Sweden is a good country
Finland is a better country
Denmark is an awesome country
I love programming in Spark
Data is the new oil
```

In [39]:
lines = [
  "Sweden is a good country",
  "Finland is a better country",
  "Denmark is an awesome country",
  "I love programming in Spark",
  "Data is the new oil"
]
words = " ".join(lines).split(" ")
lengths = [len(word) for word in words]
average_length = sum(lengths) / len(lengths)
average_length

4.24

In [42]:
task2_query.stop()
spark.sql("SELECT * FROM task2_table").show(20, truncate=False)

+-------------------+
|average_word_length|
+-------------------+
|4.24               |
+-------------------+



# Task 3