Tasks:
* Create custom schema for json files
* Read files 
* Add new column via UDF - timestamp 
* Add new column - Solder's High salary 
* Rename column 
* Append rows (contatinatin) 
* Join all file types 
* Write to JSON 
* Filtering 
* Sorting 
* Generate new rows 
* Aggregations
* Grouping 

### Import Pyspark package

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf

### Initialize SparkSession

In [2]:
spark = SparkSession.builder\
	.appName("Lesson 2 Spark Exercise 01")\
	.getOrCreate()

### Task 01: Read Event types CSV file 

Requirements: 
* Read source data from S3 bucket: event_types.csv -> path: s3a://wix-pyspark-labs/data/war-data/event_types.csv
* Use modes. Throws an exception when it meets corrupted records.
* Apply delimiter option via '|'
* Rename 'event type' column to 'event_type'

Expected table:
```
+---+--------------+
| id|    event_type|
+---+--------------+
|  1|          kill|
|...|          ... |
+---+--------------+
```

In [3]:
dfEventTypes = spark.read \
.format("csv") \
.option("header","true") \
.option("mode", "FAILFAST") \
.option("delimiter", "|") \
.load("data/event_types.csv") \
.withColumnRenamed("event type", "event_type")

Show the existing schema on the current DataFrame. Then print all the data. 

Please provide the code for the following task:

In [4]:
dfEventTypes.printSchema()
dfEventTypes.show()

root
 |-- id: string (nullable = true)
 |-- event_type: string (nullable = true)

+---+--------------+
| id|    event_type|
+---+--------------+
|  1|          kill|
|  2|         wound|
|  3|           hit|
|  4|          shot|
|  5|       misfire|
|  6|   close range|
|  7|avgerage range|
|  8|    long range|
+---+--------------+



### Task 02: Read Weapon types CSV file 

Requirements: 
* Read source data from S3 bucket: weapon_types.csv -> path: `s3a://wix-pyspark-labs/data/war-data/weapon_types.csv`
* Use modes. Throws an exception when it meets corrupted records.
* Add custom schema: 
    * 'in range' should be int type value
* Rename 'name', 'in range' columns to 'weapon_name','weapon_range'.

Expected table:
```
+---+--------------+------------+
| id|   weapon_name|weapon_range|
+---+--------------+------------+
|  1|          m 16|        2000|
|  2|           ...|         ...|
+---+--------------+------------+
```

Create a new custom schema 'weaponTypesSchema' on the current DataFrame

Please provide the code for the following task:

In [5]:


weaponTypesSchema =  StructType([
    StructField("id", IntegerType(), True),
    StructField("weapon_name", StringType(), True),
    StructField("weapon_range", IntegerType(), True)
])

Read Weapon types CSV file 

In [6]:
dfWeaponTypes = spark.read \
.format("csv") \
.option("header","true") \
.option("mode", "FAILFAST") \
.schema(weaponTypesSchema) \
.load("data/weapon_types.csv") \
.withColumnRenamed("in range", "weapon_range") \
.withColumnRenamed("name", "weapon_name")

Show the existing schema on the current DataFrame. Then print all the data.

Please provide the code for the following task:

In [7]:
dfWeaponTypes.printSchema()
dfWeaponTypes.show()

root
 |-- id: integer (nullable = true)
 |-- weapon_name: string (nullable = true)
 |-- weapon_range: integer (nullable = true)

+---+--------------+------------+
| id|   weapon_name|weapon_range|
+---+--------------+------------+
|  1|          m 16|        2000|
|  2|           uzi|         200|
|  3|           akm|        2200|
|  4|      revolver|         100|
|  5|Smith & Wesson|         150|
+---+--------------+------------+



### Task 03: Read Soldiers JSON file 


Requirements: 
* Read source data from S3 bucket: soldiers.json -> path: S3://
* Use modes. Throws an exception when it meets corrupted records.
* Use inferSchema.
* Rename 'name' column to 'soldier_name'.

Expected table:
```
+---+-------------------+------+
| id|       soldier_name|salary|
+---+-------------------+------+
|  1|   Haegon Blackfyre| 18477|
+---+-------------------+------+
```

In [8]:
dfSoldiers = spark.read \
.format("json") \
.option("mode", "FAILFAST") \
.option("inferSchema", "true") \
.load("data/soldiers.json") \
.withColumnRenamed("name", "soldier_name")

Show the existing schema on the current DataFrame. Then print all the data.

Please provide the code for the following task:

In [9]:
dfSoldiers.printSchema()
dfSoldiers.show()

root
 |-- id: long (nullable = true)
 |-- soldier_name: string (nullable = true)
 |-- salary: long (nullable = true)

+---+-------------------+------+
| id|       soldier_name|salary|
+---+-------------------+------+
|  1|   Haegon Blackfyre| 18477|
|  2|   Walder Goodbrook| 11371|
|  3|              Quent| 18689|
|  4|        Androw Frey| 13961|
|  5|         Blind Doss| 18662|
|  6|    Victaria Tyrell| 13073|
|  7|Belaquo Bonebreaker| 16006|
|  8|       Mariya Darry| 17818|
|  9|    Alyn Connington| 18486|
| 10|             Lharys| 11102|
+---+-------------------+------+



### Task 04: Read raw data JSON file 


Requirements: 
* Read source data from S3 bucket: raw_data.json -> path: S3://
* Use modes. Throws an exception when it meets corrupted records.
* Add custom schema.



Expected schema:
```
root
 |-- distance: double (nullable = true)
 |-- eventId: integer (nullable = true)
 |-- soldierId: integer (nullable = true)
 |-- type: integer (nullable = true)
 |-- weaponId: integer (nullable = true)
 |-- when: double (nullable = true)
```

Task: Create a new custom schema on the current DataFrame¶

Please provide the code for the following task:

In [10]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

rawDataSchema =  StructType([
    StructField("distance", DoubleType(), False),
    StructField("eventId", IntegerType(), False),
    StructField("soldierId", IntegerType(), False),
    StructField("type", IntegerType(), False),
    StructField("weaponId", IntegerType(), False),
    StructField("when", DoubleType(), False)
])

Task: Read raw data JSON file 

In [11]:
dfRawData = spark.read \
.format("json") \
.option("mode", "FAILFAST") \
.schema(rawDataSchema) \
.load("data/raw_data.json") 

Task: Show the existing schema on the current DataFrame. Then print all data.

Please provide the code for the following task:

In [12]:
dfRawData.printSchema()
dfRawData.show()

root
 |-- distance: double (nullable = true)
 |-- eventId: integer (nullable = true)
 |-- soldierId: integer (nullable = true)
 |-- type: integer (nullable = true)
 |-- weaponId: integer (nullable = true)
 |-- when: double (nullable = true)

+------------------+-------+---------+----+--------+-----------------+
|          distance|eventId|soldierId|type|weaponId|             when|
+------------------+-------+---------+----+--------+-----------------+
| 846.9120172371518|      1|        3|   4|       5|1.563230808105E12|
| 37.45444429727007|      2|        5|   3|       5|1.563230808169E12|
| 515.0792479596358|      3|        1|   1|       4|1.563230808171E12|
| 992.0908485570353|      4|       10|   5|       6|1.563230808173E12|
| 827.5958171434006|      5|        5|   4|       9|1.563230808175E12|
| 812.9223072173637|      6|        6|   1|       7|1.563230808177E12|
|  914.714275003622|      7|        3|   3|       4|1.563230808179E12|
|20.228233112572603|      8|        8|   2|     

### Task 05: Rename multiple columns at once
* `eventId` into event_id
* `soldierId` into soldier_id
* `type` into event_type_id
* `weaponId` into weapon_id
* `when` into epochTimestamp

In [13]:
dfRawDataRenamed = dfRawData \
.withColumnRenamed("eventId", "event_id") \
.withColumnRenamed("soldierId", "soldier_id") \
.withColumnRenamed("type", "event_type_id") \
.withColumnRenamed("weaponId", "weapon_id") \
.withColumnRenamed("when", "epochTimestamp")

Task: Show the existing schema on the current DataFrame. Then print all data.

Please provide the code for the following task:

In [14]:
dfRawDataRenamed.printSchema()
dfRawDataRenamed.show()

root
 |-- distance: double (nullable = true)
 |-- event_id: integer (nullable = true)
 |-- soldier_id: integer (nullable = true)
 |-- event_type_id: integer (nullable = true)
 |-- weapon_id: integer (nullable = true)
 |-- epochTimestamp: double (nullable = true)

+------------------+--------+----------+-------------+---------+-----------------+
|          distance|event_id|soldier_id|event_type_id|weapon_id|   epochTimestamp|
+------------------+--------+----------+-------------+---------+-----------------+
| 846.9120172371518|       1|         3|            4|        5|1.563230808105E12|
| 37.45444429727007|       2|         5|            3|        5|1.563230808169E12|
| 515.0792479596358|       3|         1|            1|        4|1.563230808171E12|
| 992.0908485570353|       4|        10|            5|        6|1.563230808173E12|
| 827.5958171434006|       5|         5|            4|        9|1.563230808175E12|
| 812.9223072173637|       6|         6|            1|        7|1.563230

### Task 06: Add `timestamp` new column via UDF to dfRawData 


In [15]:
import pyspark.sql.functions as F
dfRawData = dfRawDataRenamed.withColumn("timestamp",F.to_timestamp(dfRawDataRenamed["epochTimestamp"]/1000))

Task: Print all the data `using truncate`

Please provide the code for the following task:

In [16]:
dfRawData.show(truncate=False)

+------------------+--------+----------+-------------+---------+-----------------+-----------------------+
|distance          |event_id|soldier_id|event_type_id|weapon_id|epochTimestamp   |timestamp              |
+------------------+--------+----------+-------------+---------+-----------------+-----------------------+
|846.9120172371518 |1       |3         |4            |5        |1.563230808105E12|2019-07-16 01:46:48.105|
|37.45444429727007 |2       |5         |3            |5        |1.563230808169E12|2019-07-16 01:46:48.169|
|515.0792479596358 |3       |1         |1            |4        |1.563230808171E12|2019-07-16 01:46:48.171|
|992.0908485570353 |4       |10        |5            |6        |1.563230808173E12|2019-07-16 01:46:48.173|
|827.5958171434006 |5       |5         |4            |9        |1.563230808175E12|2019-07-16 01:46:48.175|
|812.9223072173637 |6       |6         |1            |7        |1.563230808177E12|2019-07-16 01:46:48.177|
|914.714275003622  |7       |3       

### Task 07: Register as a temporary views 'rawTable' based create DataFrames
Please provide the code for the following task:

In [17]:
dfWeaponTypes.createOrReplaceTempView("weaponTypes")
dfEventTypes.createOrReplaceTempView("eventTypes")
dfSoldiers.createOrReplaceTempView("soldiers")
dfRawData.createOrReplaceTempView("rawData")

### Task 08: Create new rows based existing row data

Create the rows based event types:

```
+---+--------------+
| id|    event_type|
+---+--------------+
|  1|          kill|
|  2|         wound|
|  3|           hit|
|  4|          shot|
|  5|       misfire|
|  6|   close range|
|  7|avgerage range|
|  8|    long range|
+---+--------------+
```

Task: Create findEventType function. The function should apply filter and and change value. The return dataframe.

3 parameters: df - rawdata dataFrame, eventTypes - array of events, selected_eventType.

In [18]:
def findEventType(df, eventTypes, selected_eventType):
    return df.filter((df["event_type_id"]).isin(eventTypes)).withColumn("event_type_id",F.lit(selected_eventType))

Task: Create new rows based existing row data through created function

In [19]:

dfEventType3 = findEventType(dfRawData, [1,2], 3)
dfEventType4 = findEventType(dfRawData, [1,2,3,6,7,8], 4)


dfRawAddedData = dfRawData \
    .union(dfEventType3) \
    .union(dfEventType4) 

### Task 09: Join all file types
* Create Join and Drop the columns 'ID' after the join
* Join with: dfRawData with dfSoldiers, dfWeaponTypes and dfEventTypes
* Specify dfRawData left DataFrame and join the right in the JOIN expressions


In [20]:
dfRawDataJoined = dfRawAddedData \
.join(dfSoldiers, dfRawData["soldier_id"] == dfSoldiers["id"]) \
.drop("id") \
.join(dfWeaponTypes, dfRawData["weapon_id"] == dfWeaponTypes["id"]) \
.drop("id") \
.join(dfEventTypes, dfRawData["event_type_id"] == dfEventTypes["id"]) \
.drop("id") 

Print total count

Then print selected columns: 
* distance
* soldier_name
* event_id
* event_type_id
* event_type
* weapon_id
* weapon_name

In [21]:
print(dfRawDataJoined.count())
dfRawDataJoined.select(F.col("distance"), \
                       F.col("soldier_name"), \
                       F.col("event_id"), \
                       F.col("event_type_id"), \
                       F.col("event_type"), \
                       F.col("weapon_id"), \
                       F.col("weapon_name")
                      ).show(n=1000, truncate=False)

149391
+------------------+-------------------+--------+-------------+--------------+---------+--------------+
|distance          |soldier_name       |event_id|event_type_id|event_type    |weapon_id|weapon_name   |
+------------------+-------------------+--------+-------------+--------------+---------+--------------+
|846.9120172371518 |Quent              |1       |4            |shot          |5        |Smith & Wesson|
|37.45444429727007 |Blind Doss         |2       |3            |hit           |5        |Smith & Wesson|
|515.0792479596358 |Haegon Blackfyre   |3       |1            |kill          |4        |revolver      |
|914.714275003622  |Quent              |7       |3            |hit           |4        |revolver      |
|20.228233112572603|Mariya Darry       |8       |2            |wound         |3        |akm           |
|127.47432429426387|Lharys             |9       |4            |shot          |5        |Smith & Wesson|
|771.4875672556907 |Haegon Blackfyre   |12      |8       

### Task 10: Write JSON files 

* Create a `single JSON file` from multiple partitions in Amazon S3
* Overwrite files 
* S3 path: S3 


In [22]:
pathtarget = 'enriched_data.json'

dfRawDataJoined \
.coalesce(1) \
.write \
.mode('overwrite') \
.format('json') \
.json(pathtarget)

### Task 11: Group Solders and sort by rating


* Add an UDF fuciton which will calculate the rating of shooting skills.
* The rating popularity calculate by event types:
    * 1 for event_type = 6 (close range)
    * 2 for event_type = 7 (avgerage range)
    * 3 for event_type = 8 (long range)
* Add a new column `rating`
* Group by solder id and count of rating values



In [23]:


def calculate_rating(event):
    if (event== 6):
        return 1
    elif event == 7:
        return 2
    elif event == 8:
        return 3
    else:
        return 0

calculate_rating_udf = udf(lambda event: calculate_rating(event), IntegerType())

dfRawAddedData = dfRawAddedData.withColumn("rating", calculate_rating_udf(F.col("event_type_id")))


dfRawAddedData \
.groupBy(F.col("soldier_id")).agg(F.count(F.col("rating"))) \
.orderBy(F.col("count(rating)").desc()) \
.show()


+----------+-------------+
|soldier_id|count(rating)|
+----------+-------------+
|         7|        30293|
|         3|        30273|
|         9|        30050|
|         2|        30041|
|         6|        30035|
|         1|        29995|
|         8|        29921|
|         5|        29880|
|        10|        29839|
|         4|        29713|
+----------+-------------+



### Task 12: Create validation for check invalid data


* Add two validations:
    * Weapons validations 
    * Event validations
* Weapons validations:
    * There are only 5 weapons but in raw data we have more than 5. 
    * Add a new column `weapon_validation` where values: `1` - invalid, `2` - valid
* Event validations:
    * Compare distanse with event types.
    * Event types are:
        * 6 - close range
        * 7 - avgerage range
        * 8 - long range
    * Add a new column `event_validation` where values:
        * `1` - A shot was as `close range` but from `long distance` (distance more than 100).
        * `2` - A shot was as `avgerage range` but from `long distance` (distance less than 100).
        * `3` - A shot was as `long range` but from `avgerage distance` (distance more than 500).
        * `4` - A shot was as `long range` but from `close distance` (distance less than 500).
* Print columns: distance, event_id, event_type_id, weapon_id, weapon_validation, event_validation. 
* Use Row data DataFrame without joins.

In [24]:
def validate_weapon(weapon):
    if (weapon > 5):
        return 1
    else:
        return 0

validate_weapon_udf = udf(lambda weapon: validate_weapon(weapon), IntegerType())

dfRawAddedData = dfRawAddedData.withColumn("weapon_validation", validate_weapon_udf(F.col("weapon_id")))

def validate_event(event,distance):
    if (event == 6 and distance>100):
        return 1
    elif event == 7 and distance<100:
        return 2
    elif event == 7 and distance>500:
        return 3
    elif event == 8 and distance<500:
        return 4
    else:
        return 0

validate_event_udf = udf(lambda event,distance: validate_event(event, distance), IntegerType())

dfRawAddedData = dfRawAddedData.withColumn("event_validation", validate_event_udf(F.col("event_type_id"), F.col("distance")))


dfRawAddedData.select(F.col("distance"), \
                       F.col("event_id"), \
                       F.col("event_type_id"), \
                       F.col("weapon_id"), \
                       F.col("weapon_validation"), \
                       F.col("event_validation"), \
                      ).show(40)

+------------------+--------+-------------+---------+-----------------+----------------+
|          distance|event_id|event_type_id|weapon_id|weapon_validation|event_validation|
+------------------+--------+-------------+---------+-----------------+----------------+
| 846.9120172371518|       1|            4|        5|                0|               0|
| 37.45444429727007|       2|            3|        5|                0|               0|
| 515.0792479596358|       3|            1|        4|                0|               0|
| 992.0908485570353|       4|            5|        6|                1|               0|
| 827.5958171434006|       5|            4|        9|                1|               0|
| 812.9223072173637|       6|            1|        7|                1|               0|
|  914.714275003622|       7|            3|        4|                0|               0|
|20.228233112572603|       8|            2|        3|                0|               0|
|127.47432429426387| 