# Exercises for Apache Spark™ and Scala Workshops PART - I

This are my own solutions version in PySpark of the Exercises proposed by Jacek Laskowski in https://github.com/jaceklaskowski/spark-workshop/tree/gh-pages/exercises

## Exercises:

1. [Split function with variable delimiter per row](#1) 
2. [Selecting the most important rows per assigned priority](#2) 
3. [Adding count to the source DataFrame](#3) 
4. [Limiting collect_set Standard Function](#4) 
5. [Structs for column names and values](#5) 
6. [Merging two rows](#6)
7. [Exploding structs array](#7)
8. [Finding Ids of Rows with Word in Array Column](#8)
9. [Using Dataset.flatMap Operator](#9)
10.[Flattening Array Columns (From Datasets of Arrays to Datasets of Array Elements)](#10)

# SET UP

In [1]:
!pip install findspark

import findspark
findspark.init()



In [2]:
# Cargar Pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf


spark = SparkSession.builder.appName("Test_spark").master("local[*]").getOrCreate()

spark

# LIBRARIES

In [3]:
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *

## 1. Split function with variable delimiter per row <a id='1'></a>

In [4]:
# DataFrame Input
dept = spark.createDataFrame([
  ["50000.0#0#0#", "#"],
  ["0@1000.0@", "@"],
  ["1$", "$"],
  ["1000.00^Test_string", "^"]],("VALUES", "Delimiter"))
# For the script version
dept_sql = dept

dept.show()

+-------------------+---------+
|             VALUES|Delimiter|
+-------------------+---------+
|       50000.0#0#0#|        #|
|          0@1000.0@|        @|
|                 1$|        $|
|1000.00^Test_string|        ^|
+-------------------+---------+



Using UDFs:

In [5]:
# DataFrame Output API
## Declare functions
def splitbydelimiter(array,delimiter):

  return array.split(delimiter)

def removeblank(array):
  #return [x for x in array if x] with list comprehension
  return list(filter(None, array))
## Set UDFs
splitbydelimiter = udf(splitbydelimiter)
removeblank = udf(removeblank)
## Apply
dept = dept.withColumn('split_values',splitbydelimiter(col("VALUES"),col("Delimiter"))) ## Split by special character and add the array as column 'split_values'
dept = dept.withColumn('extra',removeblank(col('split_values'))) ## Remove blank from array as column 'extra'
dept.show()

+-------------------+---------+--------------------+--------------------+
|             VALUES|Delimiter|        split_values|               extra|
+-------------------+---------+--------------------+--------------------+
|       50000.0#0#0#|        #|   [50000.0, 0, 0, ]|     [50000.0, 0, 0]|
|          0@1000.0@|        @|       [0, 1000.0, ]|         [0, 1000.0]|
|                 1$|        $|               [1, ]|                 [1]|
|1000.00^Test_string|        ^|[1000.00, Test_st...|[1000.00, Test_st...|
+-------------------+---------+--------------------+--------------------+



Using UDFs with SQL scripts:

In [6]:
# DataFrame Output SQL
## Declare table
dept_sql.createOrReplaceTempView('dept_sql')
## Set UDFs
spark.udf.register("splitbydelimiter", splitbydelimiter)
spark.udf.register("removeblank", removeblank)
## Define script
script = "SELECT *, removeblank(split_values) as extra FROM ( \
          SELECT VALUES, Delimiter, splitbydelimiter(VALUES,Delimiter) as split_values FROM dept_sql);"
## Apply
dept_sql = spark.sql(script)
dept_sql.show()

+-------------------+---------+--------------------+--------------------+
|             VALUES|Delimiter|        split_values|               extra|
+-------------------+---------+--------------------+--------------------+
|       50000.0#0#0#|        #|   [50000.0, 0, 0, ]|     [50000.0, 0, 0]|
|          0@1000.0@|        @|       [0, 1000.0, ]|         [0, 1000.0]|
|                 1$|        $|               [1, ]|                 [1]|
|1000.00^Test_string|        ^|[1000.00, Test_st...|[1000.00, Test_st...|
+-------------------+---------+--------------------+--------------------+



## 2. Selecting the most important rows per assigned priority <a id='2'></a>

In [7]:
# DataFrame Input
df_input = spark.createDataFrame([
  [1, "MV1"],
  [1, "MV2"],
  [2, "VPV"],
  [2, "Others"]],("id", "value"))
# For the script version
df_input_sql = df_input

df_input.show()

+---+------+
| id| value|
+---+------+
|  1|   MV1|
|  1|   MV2|
|  2|   VPV|
|  2|Others|
+---+------+



In [8]:
# DataFrame Output API
df_input.dropDuplicates(subset=["id"]).show()

+---+-----+
| id|value|
+---+-----+
|  1|  MV1|
|  2|  VPV|
+---+-----+



In [9]:
# Another Approach
window = Window.partitionBy("id").orderBy("id",'tiebreak')
(df_input
 .withColumn('tiebreak', monotonically_increasing_id())
 .withColumn('rank', rank().over(window))
 .filter(col('rank') == 1).drop('rank','tiebreak')
 .show()
)

+---+-----+
| id|value|
+---+-----+
|  1|  MV1|
|  2|  VPV|
+---+-----+



In [10]:
# Using SQL
# DataFrame Output SQL
## Declare table
df_input_sql.createOrReplaceTempView('df_input_sql')
## Define script
script = " \
SELECT * \
FROM ( \
  SELECT \
      *, \
      ROW_NUMBER() \
          OVER (PARTITION BY id ORDER BY id) \
          row_number \
  FROM df_input_sql \
) \
WHERE row_number = 1;"

## Apply
spark.sql(script).show()

+---+-----+----------+
| id|value|row_number|
+---+-----+----------+
|  1|  MV1|         1|
|  2|  VPV|         1|
+---+-----+----------+



## 3. Adding count to the source DataFrame <a id='3'></a>

In [11]:
df_input = spark.createDataFrame([
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604900", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604900", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604900", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604900", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604900", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604900", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604899", "10.0.0.2.54880", "10.0.0.3.5001",  2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2],
  ["05:49:56.604908", "10.0.0.3.5001",  "10.0.0.2.54880", 2]],("column0", "column1", "column2", "label"))

df_input.show()

+---------------+--------------+--------------+-----+
|        column0|       column1|       column2|label|
+---------------+--------------+--------------+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604908| 10.0.0.3.5

In [12]:
# Create aggregation table
df_input_agg = df_input.select(col("column2")).groupby("column2").count()
df_input_agg = df_input_agg.withColumnRenamed("column2","key")
# Join
df_input.join(df_input_agg,df_input["column2"]==df_input_agg["key"],"left").drop("key").show()

+---------------+--------------+--------------+-----+-----+
|        column0|       column1|       column2|label|count|
+---------------+--------------+--------------+-----+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2

In [13]:
df_input.createOrReplaceTempView("df_input_agg")
script = "\
SELECT\
    column0,\
    column1,\
    column2,\
    label,\
    count \
FROM\
    df_input_agg \
LEFT JOIN \
(SELECT\
    column2 AS key_agg,\
        count(column2) AS count\
     FROM\
        df_input_agg\
    GROUP BY\
        1) \
ON\
    column2 == key_agg;"
spark.sql(script).show()

+---------------+--------------+--------------+-----+-----+
|        column0|       column1|       column2|label|count|
+---------------+--------------+--------------+-----+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2

## 4. Limiting collect_set Standard Function <a id='4'></a>

In [14]:
df_input = spark.createDataFrame([
[0,0],
[1,1],
[2,2],
[3,3],
[4,4],
[5,0],
[6,1],
[7,2],
[8,3],
[9,4],
[10,0],
[11,1],
[12,2],
[13,3],
[14,4],
[15,0],
[16,1],
[17,2],
[18,3],
[19,4],
[20,0],
[21,1],
[22,2],
[23,3],
[24,4],
[25,0],
[26,1],
[27,2],
[28,3],
[29,4],
[30,0],
[31,1],
[32,2],
[33,3],
[34,4],
[35,0],
[36,1],
[37,2],
[38,3],
[39,4],
[40,0],
[41,1],
[42,2],
[43,3],
[44,4],
[45,0],
[46,1],
[47,2],
[48,3],
[49,4],
],("id","key"))

df_input.show(20)

+---+---+
| id|key|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  0|
|  6|  1|
|  7|  2|
|  8|  3|
|  9|  4|
| 10|  0|
| 11|  1|
| 12|  2|
| 13|  3|
| 14|  4|
| 15|  0|
| 16|  1|
| 17|  2|
| 18|  3|
| 19|  4|
+---+---+
only showing top 20 rows



In [15]:
df = df_input.groupby("key").agg(collect_set("id").alias("all")) # Get All
df = df.withColumn("only_first_three",slice(col("all"),1,3)) # Get first three elements
df.show()

+---+--------------------+----------------+
|key|                 all|only_first_three|
+---+--------------------+----------------+
|  0|[0, 15, 30, 45, 5...|     [0, 15, 30]|
|  1|[1, 16, 31, 46, 6...|     [1, 16, 31]|
|  3|[33, 48, 13, 38, ...|    [33, 48, 13]|
|  2|[12, 27, 37, 2, 1...|    [12, 27, 37]|
|  4|[9, 19, 34, 49, 2...|     [9, 19, 34]|
+---+--------------------+----------------+



## 5. Limiting collect_set Standard Function <a id='5'></a>

In [16]:
df_input = spark.createDataFrame([
    ["Manuel",[["Logan",1.5],["Zoolander",3.0], ["John Wick",2.5]]],
    ["John",[["Logan",2.0], ["Zoolander",3.5], ["John Wick",3.0]]],
],("name","movieRatings"))

df_input.show()

+------+--------------------+
|  name|        movieRatings|
+------+--------------------+
|Manuel|[[Logan, 1.5], [Z...|
|  John|[[Logan, 2.0], [Z...|
+------+--------------------+



In [17]:
# Explode array
df_output = (df_input
             .withColumn("explode",explode(col("movieRatings")))
             .drop("movieRatings"))
# Get array elements into columns
df_output = (df_output
             .withColumn("Movie",element_at("explode",1))
             .withColumn("Rate",element_at("explode",2))
             .drop("explode"))
# Transpose de table by pivot it
df_output = (df_output
             .groupby("name")
             .pivot("Movie")
             .agg(sum("Rate")))
df_output.show()

+------+---------+-----+---------+
|  name|John Wick|Logan|Zoolander|
+------+---------+-----+---------+
|  John|      3.0|  2.0|      3.5|
|Manuel|      2.5|  1.5|      3.0|
+------+---------+-----+---------+



## 6. Merging two rows <a id='6'></a>

In [18]:
df_input = spark.createDataFrame([
  ["100","John", 35,None],
  ["100","John", None,"Georgia"],
  ["101","Mike", 25,None],
  ["101","Mike", None,"New York"],
  ["103","Mary", 22,None],
  ["103","Mary", None,"Texas"],
  ["104","Smith", 25,None],
  ["105","Jake", None,"Florida"]],("id", "name", "age", "city"))

df_input.show()

+---+-----+----+--------+
| id| name| age|    city|
+---+-----+----+--------+
|100| John|  35|    null|
|100| John|null| Georgia|
|101| Mike|  25|    null|
|101| Mike|null|New York|
|103| Mary|  22|    null|
|103| Mary|null|   Texas|
|104|Smith|  25|    null|
|105| Jake|null| Florida|
+---+-----+----+--------+



In [19]:
# Get first value not null for each column in order to group by without the less posible amount of nulls
df_output = (df_input
             .groupBy("id","name")
             .agg(*[first(x,ignorenulls=True) for x in df_input.columns if x!='id' and x!='name']))
# Rename columns
df_output = (df_output
             .withColumnRenamed("first(age)","age")
             .withColumnRenamed("first(city)","city")
             .orderBy("id"))
df_output.show()

+---+-----+----+--------+
| id| name| age|    city|
+---+-----+----+--------+
|100| John|  35| Georgia|
|101| Mike|  25|New York|
|103| Mary|  22|   Texas|
|104|Smith|  25|    null|
|105| Jake|null| Florida|
+---+-----+----+--------+



## 7. Exploding structs array <a id='7'></a>

In [20]:
df_input = spark.read.json("exercise_7.json")
df_input.show()

+-----------+--------------+--------------------+
|business_id|  full_address|               hours|
+-----------+--------------+--------------------+
|        abc|random_address|[[02:00, 11:00], ...|
+-----------+--------------+--------------------+



In [21]:
# Get struct days and hours
df_output = df_input.select(col("business_id"),col("full_address"),col("hours.*"))
# Unpivot the table and get for separated day and hours
df_output = df_output.selectExpr("business_id","full_address","stack(7,'Friday',Friday,'Monday',Monday,'Saturday',Saturday,'Sunday',Sunday,'Thursday',Thursday,'Tuesday',Tuesday,'Wednesday',Wednesday) as (day, hour)")
# Get open and close time from struct hour
df_output = (df_output
             .withColumn("open_time",col("hour").getField("open"))
             .withColumn("close_time",col("hour").getField("close"))
             .drop("hour"))
df_output.show()

+-----------+--------------+---------+---------+----------+
|business_id|  full_address|      day|open_time|close_time|
+-----------+--------------+---------+---------+----------+
|        abc|random_address|   Friday|    11:00|     02:00|
|        abc|random_address|   Monday|    11:00|     02:00|
|        abc|random_address| Saturday|    11:00|     02:00|
|        abc|random_address|   Sunday|    11:00|     00:00|
|        abc|random_address| Thursday|    11:00|     02:00|
|        abc|random_address|  Tuesday|    11:00|     02:00|
|        abc|random_address|Wednesday|    11:00|     02:00|
+-----------+--------------+---------+---------+----------+



## 8. Finding Ids of Rows with Word in Array Column <a id='8'></a>

In [22]:
df_input = spark.createDataFrame([
    [1,"one,two,three","one"],
    [2,"four,one,five","six"],
    [3,"seven,nine,one,two","eight"],
    [4,"two,three,five","five"],
    [5,"six,five,one","seven"],
],("id","words","word"))

df_input.show()

+---+------------------+-----+
| id|             words| word|
+---+------------------+-----+
|  1|     one,two,three|  one|
|  2|     four,one,five|  six|
|  3|seven,nine,one,two|eight|
|  4|    two,three,five| five|
|  5|      six,five,one|seven|
+---+------------------+-----+



In [23]:
# Get words as a array
df_output = df_input.withColumn("split",split(col("words"),",")).drop("words").drop("word")
# Flatten array
df_output = df_output.withColumn("explode",explode("split")).drop("split")
# Get wor as key
df_input_used = df_input.select("word")
# Join Keys with flatten words
df_output = df_input_used.join(df_output,df_input.word == df_output.explode,"inner")
# Select main columns
df_output = df_output.select("word","id")
# Collect ids
df_output = df_output.groupby("word").agg(collect_set("id").alias("ids")).orderBy("word")
df_output.show()

+-----+------------+
| word|         ids|
+-----+------------+
| five|   [5, 2, 4]|
|  one|[1, 5, 2, 3]|
|seven|         [3]|
|  six|         [5]|
+-----+------------+



## 9. Using Dataset.flatMap Operator <a id='9'></a>

There is not datasets on Python, este método es más eficiente que el explode()

In [24]:
df_input = spark.createDataFrame([
    [1,[1,2,3]],
],("id","nums"))

df_input.show()

+---+---------+
| id|     nums|
+---+---------+
|  1|[1, 2, 3]|
+---+---------+



In [25]:
df_output = df_input.withColumn("explode",explode("nums"))
df_output.show()

+---+---------+-------+
| id|     nums|explode|
+---+---------+-------+
|  1|[1, 2, 3]|      1|
|  1|[1, 2, 3]|      2|
|  1|[1, 2, 3]|      3|
+---+---------+-------+



## 10. Flattening Array Columns (From Datasets of Arrays to Datasets of Array Elements) <a id='10'></a>

In [31]:
df_input = spark.createDataFrame([
  [1,["a","b","c"]],
  [2,["X","Y","Z"]]],("id","value"))

df_input.show()

+---+---------+
| id|    value|
+---+---------+
|  1|[a, b, c]|
|  2|[X, Y, Z]|
+---+---------+



In [35]:
# First approach
df_output = (df_input
             .withColumn("0",element_at("value",1))
             .withColumn("1",element_at("value",2))
             .withColumn("2",element_at("value",3))
             .drop("id","value"))
df_output.show()

+---+---+---+
|  0|  1|  2|
+---+---+---+
|  a|  b|  c|
|  X|  Y|  Z|
+---+---+---+

