# DS-610 Week 4 Homework: Structured APIs in Apache Spark
This week we will work with the Veris Community Database (VCDB). This is a dataset in which each row represents a reported security incident. Each incident is time-stamped with the type of security attacks and the potential financial impact the reporting firm may have endured.

This is an interesting dataset because it contains over 2,500 columns and the schema contains special characters such as '.' and whitespaces. We will practice more hands-on APIs to clean up the DataFrame for more interesting analysis.

### Loading VCDB Data
We will first load the VCDB data. It is recommended that you run this notebook on Saint Peters' Databricks environment as the data is over 100 MB. If you have a powerful desktop, you may also want to download the data onto your laptop.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.dataframe import DataFrame
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# This is the default path to the data. If you are running locally, change the following line.
df = spark.read.format('csv').option('header', 'true').load('dbfs:/FileStore/shared_uploads/dlee5@saintpeters.edu/vcdb.csv')
#df = spark.read.format('csv').option('header', 'true').load('vcdb.csv')

## Part 1: Warmup
Let us check the column names and the number of columns and the number of rows. The column names is conveniently stored in `df.columns` as a Python list of column names. The number of rows can be obtained via the `count` method.
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.count.html

In [0]:
# Your code for Part 1 goes here.

print(len(df.columns))
print(df.columns)
df.count

2550
['action.environmental.notes', 'action.environmental.variety.Deterioration', 'action.environmental.variety.Earthquake', 'action.environmental.variety.EMI', 'action.environmental.variety.ESD', 'action.environmental.variety.Fire', 'action.environmental.variety.Flood', 'action.environmental.variety.Hazmat', 'action.environmental.variety.Humidity', 'action.environmental.variety.Hurricane', 'action.environmental.variety.Ice', 'action.environmental.variety.Landslide', 'action.environmental.variety.Leak', 'action.environmental.variety.Lightning', 'action.environmental.variety.Meteorite', 'action.environmental.variety.Other', 'action.environmental.variety.Particulates', 'action.environmental.variety.Pathogen', 'action.environmental.variety.Power failure', 'action.environmental.variety.Temperature', 'action.environmental.variety.Tornado', 'action.environmental.variety.Tsunami', 'action.environmental.variety.Unknown', 'action.environmental.variety.Vermin', 'action.environmental.variety.Volc

<bound method DataFrame.count of DataFrame[action.environmental.notes: string, action.environmental.variety.Deterioration: string, action.environmental.variety.Earthquake: string, action.environmental.variety.EMI: string, action.environmental.variety.ESD: string, action.environmental.variety.Fire: string, action.environmental.variety.Flood: string, action.environmental.variety.Hazmat: string, action.environmental.variety.Humidity: string, action.environmental.variety.Hurricane: string, action.environmental.variety.Ice: string, action.environmental.variety.Landslide: string, action.environmental.variety.Leak: string, action.environmental.variety.Lightning: string, action.environmental.variety.Meteorite: string, action.environmental.variety.Other: string, action.environmental.variety.Particulates: string, action.environmental.variety.Pathogen: string, action.environmental.variety.Power failure: string, action.environmental.variety.Temperature: string, action.environmental.variety.Tornado

## Part 2: Renaming the Columns
You may have noted that the column names contain special characters such as '.' and white space ' '. Your task to replace the column names using the following rule:
```
'.' -> '-'  (minus sign)
' ' -> '_'  (underscore sign)
```

For this you may find the `toDF` function useful:
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toDF.html

Set the renamed DataFrame result to `df`, overwriting the previous variable.

In [0]:
# Your code for Part 2 goes here.

columns = df.columns

new_col = []

for i in columns:
    chk = i.replace(' ', '_').replace('.', '_')
    new_col.append(chk)
print(new_col)

for i in range(len(df.columns)):
    df = df.withColumnRenamed(df.columns[i], new_col[i])

['action_environmental_notes', 'action_environmental_variety_Deterioration', 'action_environmental_variety_Earthquake', 'action_environmental_variety_EMI', 'action_environmental_variety_ESD', 'action_environmental_variety_Fire', 'action_environmental_variety_Flood', 'action_environmental_variety_Hazmat', 'action_environmental_variety_Humidity', 'action_environmental_variety_Hurricane', 'action_environmental_variety_Ice', 'action_environmental_variety_Landslide', 'action_environmental_variety_Leak', 'action_environmental_variety_Lightning', 'action_environmental_variety_Meteorite', 'action_environmental_variety_Other', 'action_environmental_variety_Particulates', 'action_environmental_variety_Pathogen', 'action_environmental_variety_Power_failure', 'action_environmental_variety_Temperature', 'action_environmental_variety_Tornado', 'action_environmental_variety_Tsunami', 'action_environmental_variety_Unknown', 'action_environmental_variety_Vermin', 'action_environmental_variety_Volcano',

### Interlude
The dataset contains security incidents from all over the world. Let us restrict our focus to those whose primary location of the business group is in the U.S. We will also only look at attacks that are related to *hacking.* This is implemented for you below.

In [0]:
# DO NOT MODIFY THIS CODE.

# Restrict ourselves to attacks whose primary location of the business group is in the US.
df = df.filter(df["`victim-country-US`"].rlike("(?i)^*TRUE$"))
print("Attacks in US: %d rows." % df.count())
df = df.select(df.colRegex("`.+hacking[-]{1}variety.+`"), df.colRegex("`timeline[-]{1}incident[-]{1}year`"))
print("After filting to U.S. location, there are %d rows." % df.count())

Attacks in US: 5860 rows.
After filting to U.S. location, there are 5860 rows.


In [0]:
# DO NOT REMOVE: This will show the first row.
df.take(1)

[Row(action.environmental.notes='NA', action.environmental.variety.Deterioration='FALSE', action.environmental.variety.Earthquake='FALSE', action.environmental.variety.EMI='FALSE', action.environmental.variety.ESD='FALSE', action.environmental.variety.Fire='FALSE', action.environmental.variety.Flood='FALSE', action.environmental.variety.Hazmat='FALSE', action.environmental.variety.Humidity='FALSE', action.environmental.variety.Hurricane='FALSE', action.environmental.variety.Ice='FALSE', action.environmental.variety.Landslide='FALSE', action.environmental.variety.Leak='FALSE', action.environmental.variety.Lightning='FALSE', action.environmental.variety.Meteorite='FALSE', action.environmental.variety.Other='FALSE', action.environmental.variety.Particulates='FALSE', action.environmental.variety.Pathogen='FALSE', action.environmental.variety.Power failure='FALSE', action.environmental.variety.Temperature='FALSE', action.environmental.variety.Tornado='FALSE', action.environmental.variety.Ts

### Interlude 2
As you have seen above, the format of the data is unique since each column is essentially a boolean flag denoting whether an incident is of that particular type. For example, if `action-hacking-variety-Abuse_of_functionality` has the value `TRUE` (in all caps string), then the incident corresponding to the row is of the type. Of course, each incident can belong to more than one hacking type (it could also be a backdoor attempt, etc.).

What may be useful here is somehow to create a single column that captures all of the columns that have `TRUE` value so that it can be expanded into multiple rows. For example, if an incident is of type `"backdoor", "bruteforce", "buffer_overflow"`, then a Python list `["backdoor", "bruteforce", "buffer_overflow"]` is created.

There are many ways of achieving this effect and one way is to utilize the UDF functionality. This is shown below. 

Note that `df_result` will contain two columns, the first is the `year` which denotes the time of the security incident and the second is the `hacking_types` which will essentially contain a Python list of hacking types that correspond to the incident.

In [0]:
# DO NOT MODIFY.
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as F
broadcast_column_names = sc.broadcast(df.columns)
def process_row(row):    
    hacking_types = []
    for i, column_name in enumerate(broadcast_column_names.value):
        if "action" in column_name and row[column_name] == 'TRUE':
            hacking_types += [ column_name ]
    return hacking_types

udf_func = udf(lambda row : process_row(row), ArrayType(StringType(), True))

# This is the DataFrame that should be used in Part 3.
df_result = df.select(F.col("timeline-incident-year").alias("year"), udf_func(F.struct(*list(df.columns))).alias("hacking_types"))
df_result = df_result.filter(F.size(df_result.hacking_types) > 0)




In [0]:
# DO NOT REMOVE: This will show the few rows.
df_result.orderBy("year").tail(50)

[Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='NA'),
 Row(year='TRUE'),
 Row(year='TRUE'),
 Row(year='TRUE')]

## Part 3
Your final step is to perform `flatMap` operation on `hacking_types` column of `df_result`. 
For example, if a row in `df_result` has the following values:
`{year: 2010, hacking_types: [backdoor, bruteforce, buffer_overflow]}`
then the following three records will be created:
```
{year: 2010, type: backdoor }
{year: 2010, type: bruteforce }
{year: 2010, type: buffer_overflow }
```
For this you can utilize:
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html

For a better example:
https://sparkbyexamples.com/pyspark/pyspark-flatmap-transformation/

In [0]:
# Your code for Part 3 goes here.
from pyspark.sql.functions import explode
df_result = df_result.rdd.flatMap(lambda row: [(row['year'], hacking_type) for hacking_type in row['hacking_types']]).toDF(['year', 'type'])




### Conclusion
If you have correctly finished Part 3, you will see an output below. The next logical step maybe is to do a group-level analysis on different types of hacking, etc. but this is beyond the scope of this assignment.

In [0]:
# DO NOT REMOVE.
df_result.tail(10)


