# Custom Transformations, Aggregating and Loading

The goal of this project is to populate aggregate tables using Twitter data.  In the process, you write custom User Defined Functions (UDFs), aggregate daily most trafficked domains, join new records to a lookup table, and load to a target database.

In this project you ETL JSON Twitter data to build aggregate tables that monitor trending websites and hashtags and filter malicious users using historical data.  Use these four exercises to achieve this goal:<br><br>

1. **Parse tweeted URLs** using a custom UDF
2. **Compute aggregate statistics** of most tweeted websites and hashtags by day
3. **Join new data** to an existing dataset of malicious users
4. **Load records** into a target database

Run the following cell.

In [3]:
%run "../Includes/Classroom-Setup"

## Exercise 1: Parse Tweeted URLs

Some tweets in the dataset contain links to other websites.  Import and explore the dataset using the provided schema.  Then, parse the domain name from these URLs using a custom UDF.

### Step 1: Import and Explore

The following is the schema created as part of the capstone project for ETL Part 1.  
Run the following cell and then use this schema to import one file of the Twitter data.

In [6]:
from pyspark.sql.types import StructField, StructType, ArrayType, StringType, IntegerType, LongType
from pyspark.sql.functions import col

fullTweetSchema = StructType([
  StructField("id", LongType(), True),
  StructField("user", StructType([
    StructField("id", LongType(), True),
    StructField("screen_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("friends_count", IntegerType(), True),
    StructField("followers_count", IntegerType(), True),
    StructField("description", StringType(), True)
  ]), True),
  StructField("entities", StructType([
    StructField("hashtags", ArrayType(
      StructType([
        StructField("text", StringType(), True)
      ]),
    ), True),
    StructField("urls", ArrayType(
      StructType([
        StructField("url", StringType(), True),
        StructField("expanded_url", StringType(), True),
        StructField("display_url", StringType(), True)
      ]),
    ), True)
  ]), True),
  StructField("lang", StringType(), True),
  StructField("text", StringType(), True),
  StructField("created_at", StringType(), True)
])

Import one file of the data located at `/mnt/training/twitter/firehose/2018/01/08/18/twitterstream-1-2018-01-08-18-48-00-bcf3d615-9c04-44ec-aac9-25f966490aa4` using the schema.  Be sure to do the following:<br><br>

* Save the result to `tweetDF`
* Apply the schema `fullTweetSchema`
* Filter out null values from the `id` column

In [8]:
# TODO
path = # FILL_IN
tweetDF = # FILL_IN

In [9]:
# TEST - Run this cell to test your solution
dbTest("ET2-P-08-01-01", 1491, tweetDF.count())
dbTest("ET2-P-08-01-02", True, "text" in tweetDF.columns and "id" in tweetDF.columns)

print("Tests passed!")

-sandbox
### Step 2: Write a UDF to Parse URLs

The Python regular expression library `re` allows you to define a set of rules of a string you want to match. In this case, parse just the domain name in the string for the URL of a link in a Tweet. Take a look at the following example:

```
import re

URL = "https://www.databricks.com/"
pattern = re.compile(r"https?://(www\.)?([^/#?]+).*$")
match = pattern.search(URL)
print("The string {} matched {}".format(URL, match.group(2)))
```

This code prints `The string https://www.databricks.com/ matched spark.apache.org`. **Wrap this code into a function named `getDomain` that takes a parameter `URL` and returns the matched string.**

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> <a href="https://docs.python.org/3/howto/regex.html" target="_blank">You can find more on the `re` library here.</a>

In [11]:
# TODO
def getDomain(URL):
  # FILL_IN

URL = "https://www.databricks.com/"
print("The string {} matched {}".format(URL, getDomain(URL)))

In [12]:
# TEST - Run this cell to test your solution
dbTest("ET2-P-08-02-01", "databricks.com",  getDomain("https://www.databricks.com/"))

print("Tests passed!")

### Step 3: Test and Register the UDF

Now that the function works with a single URL, confirm that it works on different URL formats.

Run the following cell to test your function further.

In [15]:
urls = [
  "https://www.databricks.com/",
  "https://databricks.com/",
  "https://databricks.com/training-overview/training-self-paced",
  "http://www.databricks.com/",
  "http://databricks.com/",
  "http://databricks.com/training-overview/training-self-paced",
  "http://www.apache.org/",
  "http://spark.apache.org/docs/latest/"
]

for url in urls:
  print(getDomain(url))

Register the UDF as `getDomainUDF`.

In [17]:
# TODO
getDomainUDF = # FILL_IN

In [18]:
# TEST - Run this cell to test your solution
dbTest("ET2-P-08-03-01", True, bool(getDomainUDF))

print("Tests passed!")

### Step 4: Apply the UDF

Create a dataframe called `urlDF` that has three columns:<br><br>

1. `URL`: The URL's from `tweetDF` (located in `entities.urls.expanded_url`) 
2. `parsedURL`: The UDF applied to the column `URL`
3. `created_at`

There can be zero, one, or many URLs in any tweet.  For this step, use the `explode` function, which takes an array like URLs and returns one row for each value in the array.  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=explode#pyspark.sql.functions.explode" target="_blank">See the documents here for details.</a>

In [20]:
# TODO
urlDF = # FILL_IN

In [21]:
# TEST - Run this cell to test your solution
cols = urlDF.columns
sample = urlDF.first()

dbTest("ET2-P-08-04-01", True, "URL" in cols and "parsedURL" in cols and "created_at" in cols)
dbTest("ET2-P-08-04-02", "https://www.youtube.com/watch?v=b4iz9nZPzAA", sample["URL"])
dbTest("ET2-P-08-04-03", "Mon Jan 08 18:47:59 +0000 2018", sample["created_at"])
dbTest("ET2-P-08-04-04", "youtube.com", sample["parsedURL"])

print("Tests passed!")

## Exercise 2: Compute Aggregate Statistics

Calculate top trending 10 URLs by hour.

### Step 1: Parse the Timestamp

Create a DataFrame `urlWithTimestampDF` that includes the following columns:<br><br>

* `URL`
* `parsedURL`
* `timestamp`
* `hour`

Import `unix_timestamp` and `hour` from the `functions` module and `TimestampType` from the types `module`. To parse the `create_at` field, use `unix_timestamp` with the format `EEE MMM dd HH:mm:ss ZZZZZ yyyy`.

In [24]:
# TODO
timestampFormat = "EEE MMM dd HH:mm:ss ZZZZZ yyyy"
urlWithTimestampDF = # FILL_IN

In [25]:
# TEST - Run this cell to test your solution
cols = urlWithTimestampDF.columns
sample = urlWithTimestampDF.first()

dbTest("ET2-P-08-05-01", True, "URL" in cols and "parsedURL" in cols and "timestamp" in cols and "hour" in cols)
dbTest("ET2-P-08-05-02", 18, sample["hour"])

print("Tests passed!")

### Step 2: Calculate Trending URLs

Create a DataFrame `urlTrendsDF` that looks at the top 10 hourly counts of domain names and includes the following columns:<br><br>

* `hour`
* `parsedURL`
* `count`

The result should sort `hour` in ascending order and `count` in descending order.

In [27]:
# TODO
urlTrendsDF = # FILL_IN

In [28]:
# TEST - Run this cell to test your solution
cols = urlTrendsDF.columns
sample = urlTrendsDF.first()

dbTest("ET2-P-08-06-01", True, "hour" in cols and "parsedURL" in cols and "count" in cols)
dbTest("ET2-P-08-06-02", 18, sample["hour"])
dbTest("ET2-P-08-06-03", "twitter.com", sample["parsedURL"])
dbTest("ET2-P-08-06-04", 159, sample["count"])

print("Tests passed!")

## Exercise 3: Join New Data

Filter out bad users.

### Step 1: Import Table of Bad Actors

Create the DataFrame `badActorsDF`, a list of bad actors that sits in `/mnt/training/twitter/supplemental/badactors.parquet`.

In [31]:
# TODO
badActorsDF = # FILL_IN

In [32]:
# TEST - Run this cell to test your solution
cols = badActorsDF.columns
sample = badActorsDF.first()

dbTest("ET2-P-08-07-01", True, "userID" in cols and "screenName" in cols)
dbTest("ET2-P-08-07-02", 4875602384, sample["userID"])
dbTest("ET2-P-08-07-03", "cris_silvag1", sample["screenName"])

print("Tests passed!")

### Step 2: Add a Column for Bad Actors

Add a new column to `tweetDF` called `maliciousAcct` with `true` if the user is in `badActorsDF`.  Save the results to `tweetWithMaliciousDF`.

In [34]:
# TODO
tweetWithMaliciousDF = # FILL_IN

In [35]:
# TEST - Run this cell to test your solution
cols = tweetWithMaliciousDF.columns
sample = tweetWithMaliciousDF.first()

dbTest("ET2-P-08-08-01", True, "maliciousAcct" in cols and "id" in cols)
dbTest("ET2-P-08-08-02", 950438954272096257, sample["id"])
dbTest("ET2-P-08-08-03", False, sample["maliciousAcct"])

print("Tests passed!")

## Exercise 4: Load Records

Transform your two DataFrames to 4 partitions and save the results to the following endpoints:

| DataFrame              | Endpoint                            |
|:-----------------------|:------------------------------------|
| `urlTrendsDF`          | `/tmp/urlTrends.parquet`            |
| `tweetWithMaliciousDF` | `/tmp/tweetWithMaliciousDF.parquet` |

In [37]:
# TODO - FILL_IN

In [38]:
# TEST - Run this cell to test your solution
urlTrendsDFTemp = spark.read.parquet("/tmp/urlTrends.parquet")
tweetWithMaliciousDFTemp = spark.read.parquet("/tmp/tweetWithMaliciousDF.parquet")

dbTest("ET2-P-08-09-01", 4, urlTrendsDFTemp.rdd.getNumPartitions())
dbTest("ET2-P-08-09-02", 4, tweetWithMaliciousDFTemp.rdd.getNumPartitions())

print("Tests passed!")