In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import datetime


# Start Spark Session

In [2]:
# By default, Spark’s standalone scheduler runs jobs in FIFO fashion. 
# The first job gets priority on all available resources, then the second job gets priority, etc. 

# If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to 
# run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

# It is also possible to configure fair sharing between jobs.

# More info here : https://spark.apache.org/docs/latest/job-scheduling.html


In [3]:
# Start spark session. Not specifying what we want for executor and core resources, gets us all available resources.
# Default for 'spark.executor.memory' (memory per worker) is 1GB. Need to bump that up explicitely, if we want more.

# Ask for too many resources, and your job sits waiting for these resources to become available.
# You'll get warnings asking you to check your cluster UI to ensure that workers are registered and have sufficient resources.

spark = SparkSession\
            .builder\
            .master('spark://spark-master:7077')\
            .appName('common_ops_jupyter')\
            .config('spark.jars', '/src/java/spark-jobs/helloworld/target/jv_helloworld-1.0-SNAPSHOT.jar')\
            .config('spark.executor.extraClassPath', '/src/java/spark-jobs/helloworld/target/jv_helloworld-1.0-SNAPSHOT.jar')\
            .config('spark.driver.memory', '2G')\
            .config('spark.executor.memory', '3G')\
            .getOrCreate()


22/04/01 14:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Add jar to java spark context. It has a UDF that I want to use later.

spark._jsc.addJar("/src/java/spark-jobs/helloworld/target/jv_helloworld-1.0-SNAPSHOT.jar")


In [5]:
# Register the java function. It will be available as StringLengthUDF (in the spark.sql command).

spark.udf.registerJavaFunction("StringLengthUDF", "ca.nrc.udf.StringLengthUDF", T.LongType())


# Create Some Toy Data - (The Person and Vehicle Relationship)

![erd](media/erd.png)

# The Person Table

In [6]:
# Create the Person Table

person_schema = T.StructType([
    T.StructField('person_id', T.StringType(), False),
    T.StructField('first_name',T.StringType(), False),
    T.StructField('last_name',T.StringType(), False),
    T.StructField('gender', T.StringType(), False),
    T.StructField('date_of_birth', T.DateType(), False),
    T.StructField('children_ages', T.ArrayType(T.IntegerType()), False)
])

person_data = [
    ('001','Willy','Walker', 'M', datetime.date(1971, 3, 18), [21, 23]),
    ('002','Marry','Brown', 'F', datetime.date(1974, 3, 19), [4, 11, 15]),
    ('003','Harry','Snow', 'M', datetime.date(1965, 3, 20), [23, 25]),
    ('004','Joey','Jollymore', 'M', datetime.date(1971, 3, 13), [12, 15, 17]),
    ('005','Tim','Horton', 'M', datetime.date(1963, 3, 14), [11, 15]),
    ('006','Jenny','Purple', 'F', datetime.date(1971, 3, 14), [14, 18, 21]),
    ('007','Ronald','McDonald', 'M', datetime.date(1972, 3, 14), [11, 15]),
    ('008','Oscar','Grouch', 'M', datetime.date(1968, 3, 14), [20, 22]),
    ('009','Kermit','Frog', 'M', datetime.date(1969, 3, 14), [19, 25]),
    ('010','Anna','Green', 'F', datetime.date(1973, 3, 14), [13, 15]),
]

person_df = spark.createDataFrame(data=person_data, schema=person_schema)


In [7]:

# This data is mostly tabular in format.  
# There is a slightly more complex structure for the children_age field (ArrayType).

# Spark can deal with different file formats (json, avro, orc, etc..) 
# and has utilities to manupulate complext types (e.g. Exploding the nested structure into individual rows etc).

person_df.printSchema()
person_df.show(10, False)


root
 |-- person_id: string (nullable = false)
 |-- first_name: string (nullable = false)
 |-- last_name: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- date_of_birth: date (nullable = false)
 |-- children_ages: array (nullable = false)
 |    |-- element: integer (containsNull = true)



[Stage 1:>                                                          (0 + 1) / 1]

+---------+----------+---------+------+-------------+-------------+
|person_id|first_name|last_name|gender|date_of_birth|children_ages|
+---------+----------+---------+------+-------------+-------------+
|001      |Willy     |Walker   |M     |1971-03-18   |[21, 23]     |
|002      |Marry     |Brown    |F     |1974-03-19   |[4, 11, 15]  |
|003      |Harry     |Snow     |M     |1965-03-20   |[23, 25]     |
|004      |Joey      |Jollymore|M     |1971-03-13   |[12, 15, 17] |
|005      |Tim       |Horton   |M     |1963-03-14   |[11, 15]     |
|006      |Jenny     |Purple   |F     |1971-03-14   |[14, 18, 21] |
|007      |Ronald    |McDonald |M     |1972-03-14   |[11, 15]     |
|008      |Oscar     |Grouch   |M     |1968-03-14   |[20, 22]     |
|009      |Kermit    |Frog     |M     |1969-03-14   |[19, 25]     |
|010      |Anna      |Green    |F     |1973-03-14   |[13, 15]     |
+---------+----------+---------+------+-------------+-------------+



                                                                                

# The Vehicle Table

In [8]:
# Create the Vehicle Table

vehicle_schema = T.StructType([
    T.StructField('person_id', T.StringType(), False),
    T.StructField('vehicle_id', T.StringType(), False),
    T.StructField('vehicle_name', T.StringType(), False),
])

vehicle_data = [
    ('001','v-001','Honda Civic'),
    ('001','v-002','Honda Fit'),
    ('001','v-003','Honda Ruckus'),
    ('001','v-004','Toyota Camry'),
    ('002','v-005','Toyota Camry'),
    ('002','v-005','Toyota Rav4'),
    ('002','v-010','Geo Metro'),    
    ('005','v-006','Doge Caravan'),
    ('005','v-007','Toyota Camry'),
    ('007','v-008','Honda Civic'),
    ('010','v-009','Honda Civic'),
]

vehicle_df = spark.createDataFrame(data=vehicle_data, schema=vehicle_schema)


In [9]:
# Print schema and some sample data for the vehicle dataframe.

vehicle_df.printSchema()
vehicle_df.show(10, False)


root
 |-- person_id: string (nullable = false)
 |-- vehicle_id: string (nullable = false)
 |-- vehicle_name: string (nullable = false)



                                                                                

+---------+----------+------------+
|person_id|vehicle_id|vehicle_name|
+---------+----------+------------+
|001      |v-001     |Honda Civic |
|001      |v-002     |Honda Fit   |
|001      |v-003     |Honda Ruckus|
|001      |v-004     |Toyota Camry|
|002      |v-005     |Toyota Camry|
|002      |v-005     |Toyota Rav4 |
|002      |v-010     |Geo Metro   |
|005      |v-006     |Doge Caravan|
|005      |v-007     |Toyota Camry|
|007      |v-008     |Honda Civic |
+---------+----------+------------+
only showing top 10 rows



# Feature Engineering

In [10]:
# Lifted from : https://stackoverflow.com/questions/2217488/age-from-birthdate-in-python

# Function to calculate your age in years. Input is the date of birth.
def calculate_age(date_born):
    today = datetime.date.today()
    return today.year - date_born.year - ((today.month, today.day) < (date_born.month, date_born.day))


In [11]:
# Create a user defined function... a.k.a a UDF (i.e. a user-programmable routine that act on one row).

calc_age_udf = F.udf(lambda date_born: calculate_age(date_born), T.IntegerType())


In [12]:

# Add person_age field by calling a Python UDF
person_df = person_df.withColumn('person_age', calc_age_udf(F.col('date_of_birth')))

# Add birth day of week field (birth_dow) by calling PySpark canned utily function
person_df = person_df.withColumn('birth_dow', F.dayofweek('date_of_birth'))

# Add the first name length field (fname_length) by calling a Java function. 
# This function lives in the jar we added to Spark earlier.
person_df = person_df.withColumn("fname_length", F.expr("StringLengthUDF(first_name)"))                                 


In [13]:
# Show the transformed dataframe. 
# We should see the newly created columns : [person_age, birth_dow, fname_length]

person_df.show(10, False)


[Stage 6:>                                                          (0 + 1) / 1]

+---------+----------+---------+------+-------------+-------------+----------+---------+------------+
|person_id|first_name|last_name|gender|date_of_birth|children_ages|person_age|birth_dow|fname_length|
+---------+----------+---------+------+-------------+-------------+----------+---------+------------+
|001      |Willy     |Walker   |M     |1971-03-18   |[21, 23]     |51        |5        |5           |
|002      |Marry     |Brown    |F     |1974-03-19   |[4, 11, 15]  |48        |3        |5           |
|003      |Harry     |Snow     |M     |1965-03-20   |[23, 25]     |57        |7        |5           |
|004      |Joey      |Jollymore|M     |1971-03-13   |[12, 15, 17] |51        |7        |4           |
|005      |Tim       |Horton   |M     |1963-03-14   |[11, 15]     |59        |5        |3           |
|006      |Jenny     |Purple   |F     |1971-03-14   |[14, 18, 21] |51        |1        |5           |
|007      |Ronald    |McDonald |M     |1972-03-14   |[11, 15]     |50        |3   

                                                                                

In [14]:
# Do some transformations with SparkSQL.

# Create fucntion to calculate max from list.
def max_age(i_list):
    return max(i_list)

# Create a user defined function (i.e. a user-programmable routine that act on one row).
max_age_udf = F.udf(lambda i_list: max_age(i_list), T.IntegerType())

# Need to register UDF if we want to use it with spark.sql command
spark.udf.register("max_age_udf", max_age_udf)


<function __main__.<lambda>(i_list)>

In [15]:
# Create a temporary table where lifetime is tied to the SparkSession

person_df.createOrReplaceTempView("PERSON")

In [16]:
# Add a colum called eldest_child_age in the person_df dataframe. 
# Do this using Spark's SQL interface.

sql_str = '''SELECT 
                *, 
                max_age_udf(children_ages) as eldest_child_age
            FROM 
                PERSON'''

person_df = spark.sql(sql_str)


In [17]:

# Show the transformed dataframe. 
# We should see the newly created column : [eldest_child_age]

person_df.show(10, False)


[Stage 8:>                                                          (0 + 1) / 1]

+---------+----------+---------+------+-------------+-------------+----------+---------+------------+----------------+
|person_id|first_name|last_name|gender|date_of_birth|children_ages|person_age|birth_dow|fname_length|eldest_child_age|
+---------+----------+---------+------+-------------+-------------+----------+---------+------------+----------------+
|001      |Willy     |Walker   |M     |1971-03-18   |[21, 23]     |51        |5        |5           |23              |
|002      |Marry     |Brown    |F     |1974-03-19   |[4, 11, 15]  |48        |3        |5           |15              |
|003      |Harry     |Snow     |M     |1965-03-20   |[23, 25]     |57        |7        |5           |25              |
|004      |Joey      |Jollymore|M     |1971-03-13   |[12, 15, 17] |51        |7        |4           |17              |
|005      |Tim       |Horton   |M     |1963-03-14   |[11, 15]     |59        |5        |3           |15              |
|006      |Jenny     |Purple   |F     |1971-03-1

                                                                                

In [18]:

# Create vehicle_count Column - (Aggregation and Joinning)


In [19]:

# Need list of person_ids and their vehicle counts.
# Example of aggregation. Can do sums, min, max etc. 
# Many aggregations included in API.

person_vehicle_count_df = vehicle_df\
    .select('person_id', 'vehicle_id')\
    .groupBy('person_id')\
    .count()\
    .withColumnRenamed('count', 'vehicle_count')\

person_vehicle_count_df\
    .show()


+---------+-------------+
|person_id|vehicle_count|
+---------+-------------+
|      001|            4|
|      005|            2|
|      002|            3|
|      010|            1|
|      007|            1|
+---------+-------------+



In [20]:

# Join person_df and person_vehicle_count_df dataframes. 
# Need people with and without vehicles.

person_and_cars_df = person_df\
    .join(person_vehicle_count_df, on=['person_id'], how='left_outer')\
    .withColumn("vehicle_count", F.when(F.col('vehicle_count').isNotNull(), F.col('vehicle_count')).otherwise(F.lit(0)))


person_and_cars_df\
    .orderBy('person_id', ascending=True)\
    .show()


+---------+----------+---------+------+-------------+-------------+----------+---------+------------+----------------+-------------+
|person_id|first_name|last_name|gender|date_of_birth|children_ages|person_age|birth_dow|fname_length|eldest_child_age|vehicle_count|
+---------+----------+---------+------+-------------+-------------+----------+---------+------------+----------------+-------------+
|      001|     Willy|   Walker|     M|   1971-03-18|     [21, 23]|        51|        5|           5|              23|            4|
|      002|     Marry|    Brown|     F|   1974-03-19|  [4, 11, 15]|        48|        3|           5|              15|            3|
|      003|     Harry|     Snow|     M|   1965-03-20|     [23, 25]|        57|        7|           5|              25|            0|
|      004|      Joey|Jollymore|     M|   1971-03-13| [12, 15, 17]|        51|        7|           4|              17|            0|
|      005|       Tim|   Horton|     M|   1963-03-14|     [11, 15]|  

# Filtering 

In [21]:

# People with more than 1 vehicle
# Select person_id, name (format the name as : last, first) and the vehicle count

person_and_cars_df\
    .select('person_id', 
            F.concat(F.col('last_name'), F.lit(', '), F.col('first_name')).alias('person_name (last,first)'),
            'vehicle_count'
    )\
    .filter(F.col('vehicle_count') >= 1).show()


+---------+------------------------+-------------+
|person_id|person_name (last,first)|vehicle_count|
+---------+------------------------+-------------+
|      005|             Horton, Tim|            2|
|      001|           Walker, Willy|            4|
|      002|            Brown, Marry|            3|
|      010|             Green, Anna|            1|
|      007|        McDonald, Ronald|            1|
+---------+------------------------+-------------+



In [22]:
# People who don't have a vehicle

person_and_cars_df\
    .select('person_id', 
            F.concat(F.col('last_name'), F.lit(', '), F.col('first_name')).alias('person_name (last,first)'),
            'vehicle_count'
    )\
    .filter(F.col('vehicle_count') == 0).show()

+---------+------------------------+-------------+
|person_id|person_name (last,first)|vehicle_count|
+---------+------------------------+-------------+
|      003|             Snow, Harry|            0|
|      004|         Jollymore, Joey|            0|
|      009|            Frog, Kermit|            0|
|      006|           Purple, Jenny|            0|
|      008|           Grouch, Oscar|            0|
+---------+------------------------+-------------+



# Windowing

In [23]:

# Ref : https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html


# Windowing function enable us to both operate on a group of rows while still returning a single value for every input row. 
# With windowing, we can calculate moving averages, cumulative sums, or access the values of a row appearing before 
# the current row. 

# “Rank car owners by number of cars they own and their sex. What is the top 2 vehicle count in each gender category ?”

from pyspark.sql.window import Window


windowSpec = Window.partitionBy(F.col('gender')).orderBy(F.col('vehicle_count').desc())


person_and_cars_df\
    .select('person_id', 'first_name', 'last_name', 'gender', 'vehicle_count')\
    .withColumn("rank", F.rank().over(windowSpec)) \
    .orderBy(F.col('gender').desc(), F.col('rank').asc())\
    .filter(F.col('rank') <= 2)\
    .show()


+---------+----------+---------+------+-------------+----+
|person_id|first_name|last_name|gender|vehicle_count|rank|
+---------+----------+---------+------+-------------+----+
|      001|     Willy|   Walker|     M|            4|   1|
|      005|       Tim|   Horton|     M|            2|   2|
|      002|     Marry|    Brown|     F|            3|   1|
|      010|      Anna|    Green|     F|            1|   2|
+---------+----------+---------+------+-------------+----+



In [24]:
spark.stop()