
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


data_bucket_url = "s3://spark.demo.data/"
lab_output_bucket_url = "s3://spark.labs.saves/"

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.35 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::429937773353:role/AWS-Glue-S3-Bucket-Access
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 5e282089-a999-4f84-8b14-27409ef2cf4a
Applying the following default arguments:
--glue_kernel_version 0.35
--enable-glue-datacatalog true
Waiting for session 5e282089-a999-4f84-8b14-27409ef2cf4a to get into ready status...
Session 5e282089-a999-4f84-8b14-27409ef2cf4a has been created




# Exercise 1 Using the PySpark API

Add some code in the notebook as follows:

* Open the Macbeth.txt file from the lab bucket which is called spark-lab-data. This will return an RDD of strings, ie all the lines in the file. Assign the result to a variable named lines.

To do this you will need to use the glueContext variable since this has the API capability to read and write directly to and from S3. The API call will be something like:

```
lines = glueContext.read.text("s3://spark-lab-data/Macbeth.txt")
```

* Call lines.count() to count the number of items in the RDD (i.e. the number of lines in the file). Print the result.
* Call lines.first() to obtain the first item in the RDD (i.e. the first line in the file). Print the result.
* Call lines.filter() to filter the lines, and print the result. Note that the filter() function takes the column (which in this case is called value) and then a contains() function can be used to filter out the content. So the code would look something like:

```
witchLines = lines.filter(lines["value"].contains("Witch"))
```
Run your code block and you should see output like the following
* Number of lines: 4102
* First line: ACT I
* Witch lines: (A collection of RDD objects)

 
## Saving RDDs 
Modify the code so that it saves the witchLines RDD which you then can write to S3. 

The easiest way to complete this is to use the write function contained within the dataframe itself. Save the file to your S3 bucket with a path something like YOURBUCKET/lab1. 

The code would be something like this but the bucket name would be different.

```
witchLines.write.format('csv').option('header','false').save('s3://glue-nickt/lab1')
```

Now go to the S3 service in AWS and locate your bucket. There should now be a new file in there with your outpput in it. If you want to, you can download it and review the contents.


In [1]:
# add code for exercise 1 here


# Exercise 2 RDD Operations Part 1

## Overview
In this lab you'll use various RDD transformation methods to manipulate data from several simple text files.

## Roadmap
There are 3 exercises in this lab, of which the last exercise is "if time permits". Here is a brief summary of the tasks you will perform in each exercise; more detailed instructions follow later:
1. Mapping and filtering data
2. Performing set-based operations
3. (If Time Permits) Additional transformation suggestions

## Part 1: Mapping and filtering data
In this exercise you will perform various mapping and filtering operations on RDD objects. You will do all your work in app1.py, so open this file in the text editor now. The application makes use of data from two text files:
* klm.txt ‚Äì contains a partial list of airports that KLM flies into.
* norwegian.txt ‚Äì contains a partial list of airports that Norwegian Airlines flies into.

Follow the comments in the codeblock below to complete this exercise. After each step, save the Notebook and use the play button to test. Make sure you have run the first code block in the notebook first to ensure that the glueContext and sparkContext is all configured.

In [None]:
klmAirports = glueContext.read.text(data_bucket_url + "klm.txt").rdd
norAirports = glueContext.read.text(data_bucket_url + "norwegian.txt").rdd


# In each of the following statements, replace "None" with a suitable call to a PySpark API function...
# We've done the first one for you, to get you started...

# Get KLM airports in uppercase.
klmUpperCaseAirports = klmAirports.map(lambda airport: airport["value"].upper()).collect()
print("\nKLM airports in uppercase: %s" % klmUpperCaseAirports)

# Get KLM airports that start with "L" (hint, Python strings have a startswith() method).
klmLAirports = None
print("\nKLM airports that start with 'L': %s" % klmLAirports)

# Get the union of all airports.
allAirports = None
print("\nUnion of all airports: %s" % allAirports)

# Get all distinct airports.
distinctAirports = None
print("\nAll distinct airports: %s" % distinctAirports)

# Get airports in common.
commonAirports = None
print("\nAirports in common: %s" % commonAirports)

# Get airports served by KLM but not Norwegian.
klmOnlyAirports = None
print("\nAirports served by KLM but not Norwegian: %s" % klmOnlyAirports)



KLM airports in uppercase: ['LONDON HEATHROW', 'LONDON GATWICK', 'AMSTERDAM', 'CARDIFF', 'OSLO', 'DUBAI', 'SINGAPORE', 'BERGEN', 'BRISTOL', 'PARIS', 'LONDON CITY', 'LUTON', 'LIVERPOOL']

KLM airports that start with 'L': None

Union of all airports: None

All distinct airports: None

Airports in common: None

Airports served by KLM but not Norwegian: None


## Part 2: Performing set-based operations
In this exercise you will perform set-based operations on RDD objects. You will do all your work in the code block below. The application makes use of data from two text files:

* teams.txt ‚Äì contains the names of Premier League teams, 2017-18 (happy days!)
* stadiums.txt ‚Äì contains the stadiums of Premier League teams, 2017-18

Follow the comments in the code block below to complete this exercise. After each step, save the Notebook and click the play button (remembering to ensure that the block at the top of the notebook has executed. 

Here are some additional hints:
* To zip teams with stadiums, use the zip() method. You should obtain a collection of tuples such as (Arsenal, The Emirates), (Bournemouth, Vitality Stadium), etc.
* To get the Cartesian product of all teams, call the cartesian() method on the teams RDD. Also pass the teams RDD as a parameter, so that you perform a Cartesian product between all the teams. You should obtain a collection of tuples such as (Arsenal, Arsenal), (Arsenal, Bournemouth), (Arsenal, Burnley), etc.
* To get all the fixtures, it's similar to getting a Cartesian product but you need to filter-out the tuples where element [0] equals element [1]. E.g., tuples such as (Arsenal, Arsenal) should be excluded from the result. 


In [None]:
teams = glueContext.read.text(data_bucket_url + "teams.txt").rdd
stadiums = glueContext.read.text(data_bucket_url + "stadiums.txt").rdd

# In each of the following statements, replace "None" with a suitable call to a PySpark API function...

# Zip teams with stadiums.
teamsAndStadiums = None
print("Teams and stadiums: %s" % teamsAndStadiums)

# Cartesian product of all teams.
cartesian = None
print("Cartesian product of all teams: %s" % cartesian)

# Fixtures.
fixtures = None
print("Fixtures: %s" % fixtures)



Teams and stadiums: None
Cartesian product of all teams: None
Fixtures: None


## Part 3 (If time permits):  Additional transformation suggestions 
In this exercise you'll perform various transformation operations on data for some chemical elements in the periodic table. The data is located in elements.txt. For each element, we've specified the following information (we've used tabs as field separators):
* Atomic number
* Atomic mass
* Group number in the periodic table
* Period number in the periodic table
* Symbol 
* Name of element

You can review the periodic table here if you are a bit rusty on your chemistry!
https://sciencenotes.org/wp-content/uploads/2013/06/PeriodicTable-NoBackground2.png

We've defined a Python class named Element in the codeblock below to represent an element in the periodic table.  All the code you have to write will be in the block below that. Complete the application as per the comments in codeblock below the Element block. 


In [13]:
# Run  this block once to load the class
class Element:
    def __init__(self, str):
        arr = str.split("\t") 
        self.atomicNumber = int(arr[0])
        self.atomicMass = float(arr[1])
        self.group = int(arr[2])
        self.period = int(arr[3])
        self.symbol = arr[4]
        self.name = arr[5]

    def __repr__(self):
        return "%d | %f | %d | %d | %s | %s" % (self.atomicNumber, self.atomicMass, self.group, self.period, self.symbol, self.name)





In [None]:
lines = glueContext.read.text(data_bucket_url + "elements.txt").rdd

# In each of the following statements, replace "None" with a suitable call to a PySpark API function...

# Map each line into an Element object. 
elements = None
                        
# Group elements by period.
groupedByPeriod = None
print("Elements grouped by period: %s" % groupedByPeriod)

# Create a dict, where the key is the element symbol, and the value is the element itself.
keyedBySymbol = None
print("Elements keyed by symbol: %s" % keyedBySymbol)

# Sort elements by name.
sortedByName = None
print("Elements sorted by name: %s" % sortedByName)

# Repartition elements into 5 partitions, and then add code to save in a directory named "partitionedElements".
repartitionedElements = None

# Exercise 3 RDD Operations Part 2

## Part 1: Mapping lines of text to a flat map of words
In the empty code box below. create an RDD named lines, containing all the lines of text from MacbethSnippet.txt which is a file in the labfile bucket used in the other code blocks.

Where indicated by the "Ex 1" comment, add code to create an RDD containing all the words in all the lines. Here are some hints:
* Use the RDD flatMap() function, whose purpose is to map an item into a sequence of sub-items. In this case, it will map a line into its constituent words‚Ä¶
* flatMap() takes a lambda expression as a parameter, indicating how to map each item into sub-items. Implement the lambda so that it splits a line at the space character (call the split(' ') function to do this).
* The flatMap() function returns all words, including words that are effectively empty. Call filter() to just keep the non-empty words. You'll need to write a lambda that tests if a word isn't empty‚Ä¶ how can you test if a Python string isn't empty?

Finally, print the resultant RDD to the console, via the print() function. In order to see the actual contents of the RDD, call collect() on the RDD for simplicity.

Run the code block to verify that the application prints all the words found in the text file. 

In [None]:
# Add your code for part 1 here.
lines = glueContext.read.text(data_bucket_url + "MacbethSnippet.txt").rdd

# Create RDD of words using flatMap and filter
words = None

print("All words: %s" % words.collect())




## Part  2: Creating an RDD of (word, count) tuples
In the following code block, add code to create an RDD containing tuples of words and word counts (i.e. the first item in the tuple is a word, and the second item is the count of how many times that word appears). 

Follow these steps:
* Call map() to map each word into a (word, count) tuple where each count is 1 initially. 
* Call reduceByKey(), to group (key, value) tuples by key and then reduce the values into a single value. E.g. imagine you start off with the following (word, count) tuples:
    * ("do", 1)
    * ("you", 1)
    * ("know", 1)
    * ("the", 1)
    * ("muffin", 1)
    * ("man", 1)
    * ("the", 1)
    * ("muffin", 1)
    * ("man", 1)
    *  ("the", 1)
    * ("muffin", 1)
    * ("man", 1)

reduceByKey() first groups these tuples by key (i.e. word) as follows:
    * ("do", 1)
    * ("you", 1)
    * ("know", 1)
    * ("the", 1)	("the", 1)	("the", 1)
    * ("muffin", 1)	("muffin", 1) 	("muffin", 1)
    * ("man", 1)	("man", 1)	("man", 1)

For each different key, reduceByKey() calls the lambda expression that you provide, successively with each value. E.g. for the "muffin" key, it'll call your lambda 3 times, passing the values 1, 1, 1 each time. 

So, implement a lambda that accumulates the total word count for each word. The lambda receives two parameters: 
* The accumulated value so far (defaults to 0 on the first call)
* The value of the next item in the key group (e.g. the value 1)
* Finally call sortBy() to sort the (word, count) tuples by descending word count.
* Print the resultant RDD of (word, count) tuples.

Test your new code. 

In [None]:
# Add your code for Part 2 here

## Part 3: Caching the RDD of (word, count) tuples

The exercises that follow will perform various actions on the RDD of (word, count) tuples. Under normal circumstances, Spark would re-evaluate all the intermediate steps to recreate the RDD ready for each action. This is inefficient, so add code to cache the RDD of (word, count) tuples before going any further.

In [None]:
# add code for part 3



## Part 4: Displaying the first 100 words

In the code box below, add code to display the first100 words. Here are some hints:
* First call take() to create an RDD containing just the first 100 items.
* Then pass the resultant RDD to the Python print() function.

---
**NOTE**
Aside: An alternative to take() is collect(). However, collect() might cause the Spark driver to run out of memory, because it fetches the entire RDD to a single machine. This is why take() is generally a safer option, because it limits the number of items being collated.

---

In [None]:
# Add code for part 4 here




## Part 5: Performing aggregation actions

In the code block below, add code to perform the following aggregation actions on the RDD of (word, count) tuples:
* Find the count of all items in the RDD, via the RDD count() method. This tells you the number of different words in MacbethSnippet.txt. 
* Find the most frequent word. Use the RDD max() method to do this. Note the following points about the max() method:
    * The max() method takes an optional lambda expression, which allows you to specify how to compare items‚Ä¶ 
    * In our scenario, the RDD contains (word, count) tuples, and you want max() to compare element [1] in the tuples (i.e. the counts)‚Ä¶
    * Therefore, when you call max(), pass in a lambda that takes a (word, count) tuple and returns element [1] from the tuple. 
* In the same way, find the least frequent word. Use the RDD min() method to do this.

In [None]:
# add code for part 5



## Part 6 (If time permits): Performing key-based actions
In the code block below, add code to lookup a word and find its count. 

You could for example set up a list of words that you would like to count.

Write code to iterate through the list. For each word, look it up in the (word, count) tuple. This will tell you the occurrence count for that word. Display the results.

In [None]:
# Add the code for part 6 here


## Part 7 (If time permits): Performing numeric actions

In the code block below, add code to perform the following numeric actions on the word counts:
* The sum of all counts (i.e. the total number of words)
* The average (mean) of all counts
* The standard deviation of all counts
* The variance of all counts

Note 1: You'll first need to create an RDD containing just the word counts but not the words themselves ‚Äì how will you do this?
Note 2: Ensure that the process by which you map (word, count) tuples isn't repeated each time you invoke one of the action methods.


In [1]:
# Add code for part 7 here


# Exercise 6 Getting Started with Spark SQL

## Overview
In this lab you‚Äôll use Spark SQL to read data from a CSV file. This is very similar to reading data from a JSON file, which we covered during the chapter.

Roadmap
There are 5 exercises in this lab, of which the last exercise is "if time permits". Here is a brief summary of the tasks you will perform in each exercise; more detailed instructions follow later:
1. Reading data from a CSV file into a DataFrame
2. Specifying options when reading a file
3. Defining a schema for the data
4. Executing a SQL query
5. (If Time Permits) Generating computed columns
 
## Familiarization
From the lab bucket, download and review weather.csv in a text editor. This file contains real temperature and precipitation measurements for every day in Bergen in 2019 (spoiler alert, it was wet). All weather measurements are from Yr, delivered by the Norwegian Meteorological Institute and NRK. 

The CSV file has 365 rows, containing the data for every day from 1 January to 31 December. Each row has 4 values:
* The first value indicates the day number in a month (notice how this value wraps from 31 back to 1 as we move from January to February, for example).
* The next value contains the minimum temperature for a day, in degrees Celsius.
* The next value contains the maximum temperature for a day, in degrees Celsius.
* The final value contains the precipitation for a day, in mm.
 
## Part 1: Reading data from a CSV file into a DataFrame
In the below code block add code to read data from weather.csv into a Spark SQL DataFrame object in memory. It‚Äôs similar to how you read JSON (see the PowerPoint chapter for a reminder), except you call the csv() method instead:
```
df = glueContext.read.csv("s3://spark-lab-data/weather.csv")
```

Spark SQL parses each line of text in the file, using a comma as the default delimiter. It creates a DataFrame with 365 rows, with 4 columns per row. Display the DataFrame object as follows:
```
df.show()
```

Run the code as usual:
  
Note that the DataFrame‚Äôs columns are named _c0, _c1, _c2, and _c3 by default:

In [None]:
# Add code for part 1
df = glueContext.read.csv(data_bucket_url + "weather.csv")
df.show()



   
## Part 2: Specifying options when reading a file
In the previous exercise your Python application loaded weather.csv into a DataFrame. The CSV file didn‚Äôt give any indication about the meaning of each field, so Spark SQL created columns named _c0, _c1, _c2, and _c3 by default.

An alternative approach is for the CSV file to include a ‚Äúheader line‚Äù at the top. We‚Äôve done this in weatherWithHeader.csv ‚Äì download this file from the S3 bucket and take a look in a text editor. Also note that the file happens to use semi-colons rather than commas as the field delimiter, just to make life interesting.

In order to read this file into a DataFrame, you have to specify a couple of options to PySpark:
* You need to tell PySpark to expect the header line.
* You need to tell PySpark that the field delimiter is a semi-colon rather than a comma.

This is how you specify options when reading a CSV file: 
```
df = glueContext.read \
                  .option(option-name-1, option-value-2) \
                  .option(option-name-2, option-value-2) \
                  .csv("s3://spark-lab-data/weatherWithHeader.csv")
```

Take a look at the online docs here to see what option names and values you need to specify:
https://spark.apache.org/docs/latest/sql-data-sources-csv.html

(Scroll down the website a couple of pages to see a list of all the option properties you can set. You‚Äôre looking for how to specify that a header line is expected, and that the delimiter is a semi-colon).
When you‚Äôve modified your code, submit your application to PySpark again and verify it works properly. You should now see columnar output with proper column names.

In [None]:
# Add code for part 2
df = glueContext.read \
               .option("delimiter", ";") \
               .option("header", "true") \
               .csv(data_bucket_url + "weatherWithHeader.csv")
               
df.show()



## Part 3: Defining a schema for the data
In your application as it stands, you don‚Äôt tell Spark SQL about the schema of the CSV data in advance. Therefore, Spark SQL has to scan the entire dataset initially so that it can infer the schema from the shape of the data.

If you know the schema, you can tell the Spark SQL in advance by calling the schema() method on the DataFrameReader. Modify your code to do this, then run the application again to make sure it all still works fine. You won‚Äôt see any changes in your application‚Äôs behaviour, but under the covers you‚Äôve improved its efficiency üëç.

In [None]:
# Add code for part 3
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

weatherSchema = StructType([
	StructField("DayOfMonth", IntegerType(), False),
	StructField("MinTemp", DoubleType(), False),
	StructField("MaxTemp", DoubleType(), False),
	StructField("Precipitation", DoubleType(), False)
])

df = glueContext.read \
               .schema(weatherSchema) \
               .option("delimiter", ";") \
               .option("header", "true") \
               .csv(data_bucket_url + "weatherWithHeader.csv")
               
df.show()



## Part 4: Executing a SQL query
So far in the lab, you‚Äôve successfully managed to load CSV data into a DataFrame object. The whole reason for doing this is so that you can execute Spark SQL queries on the data. This is what you‚Äôll do now‚Ä¶

Follow these steps:
* On your DataFrame object, call the createOrReplaceTempView() method to create a temporary view in Hive MetaStore. This enables you to execute SQL statements on the data.
* On your GlueContext object, call the sql() method to invoke some SQL. For example, select the DayOfMonth, MaxTemp, and MinTemp columns for each record. Display the results returned by the sql() method.

Run your code and verify it works.

In [None]:
# Add code for part 4



## Part 5 (If Time Permits): Generating computed columns
Spark SQL allows you to add computed columns to a DataFrame. For example, you could add a column named DiurnalRange that contains the difference between the maximum and minimum temperature for each day.

One way to add a column to a DataFrame is to call the withColumn() method. Pass in two parameters as follows:
* The name of the column you want to add, e.g., "DiurnalRange".
* The value you want to compute, i.e., the difference between the MaxTemp column and the MinTemp column. You can access a column value via the col() function, e.g. col("MaxTemp") will give you the value of the MaxTemp column. The col() function is defined in the pyspark.sql.functions module, so you‚Äôll need to import it.

Display the DataFrame to verify that it now has a DiurnalRange column. Consider how you can improve the output, e.g. via the round() function in the pyspark.sql.functions module.

In [None]:
# Add code for part 5 


