
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


## What is RDD (Resilient Distributed Dataset)?

RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. 
Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. 

In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

## PySpark RDD Benefits

**In-Memory Processing** - PySpark loads the data from disk and process in memory and keeps the data in memory, this is the main difference between PySpark and Mapreduce (I/O intensive). In between the transformations, we can also cache/persists the RDD in memory to reuse the previous computations.

**Immutability** - PySpark RDDs are immutable in nature meaning, once RDDs are created you cannot modify. When we apply transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.


**Fault Tolerance** - PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures are automatically recovered for a certain number of times (as per the configuration) and finish the application seamlessly.

**Lazy Evolution** - PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.

**Partitioning** - When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the number of cores available.

## Creating RDD

RDDs are created primarily in two different ways,

* parallelizing an existing collection and
* referencing a dataset in an external storage system (HDFS, S3 and many more). 

#### Create RDD from parallelize 

In [None]:
#Create RDD from parallelize    

data = [1,2,3,4,5,6,7,8,9,10,11,12]

rdd=spark.sparkContext.parallelize(data)


In [None]:
type(rdd)

In [None]:
rdd.count()

#### Creating RDD referencing a dataset in S3

In [None]:
# Retrieve the list of existing buckets

import boto3

s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

In [None]:
## Replace S3 bucket

## rdd = spark.sparkContext.textFile("s3://${BUCKET_NAME}/input/lab2/sample.csv")

rdd = spark.sparkContext.textFile("s3://===$bucket===/input/lab2/sample.csv")

#### RDD Actions 

##### count() --> Returns the number of records in an RDD

In [None]:

# Action - count

print("Count : "+str(rdd.count()))


##### first() --> Returns the first record.

In [None]:

# Action - first

firstRec = rdd.first()

print("First Record : "+ firstRec)


#### Spark RDD Transformations

Transfromation performed over Spark are RDD Transformations; which may result to one or more RDDs. RDDs are immutable in nature, transformation always create new RDD without updating the existing one.

**Lazy Transformation**

RDD Transformations are lazy transformations. It mean none of the operations get executed untill you can call an action on Spark RDD.

**RDD Transformation Type**

**1. Narrow Transformation**

Narrow transformations are the result of map() and filter() functions on the compute data that live on a single partition. That means there is no data movement between partitions to execute narrow transformations.

Functions such as map(), mapPartition(), flatMap(), filter(), union() are some examples of narrow transformation

**2. Wider/Shuffle transformations**

Wider transformations are the result of groupByKey() and reduceByKey() functions on the compute data that live on many partitions. This means there will be data movements between partitions to execute wider transformations. Since these shuffles the data, they also called shuffle transformations.

You can learn more about Spark transformation here .

filter() Return a new dataset formed by selecting those elements of the source on which function returns true.

In [None]:
## By using rdd.first(), we have identified the spark RDD rdd which you have just create has first row as header. In this example you are loooking to filter the row as header.

rddheader = rdd.first()

rddwithoutheader = rdd.filter(lambda c: c != rddheader)


#### Here we are showing you - How you can use single record from Spark RDD to perform transformation; and once transformation is complete, you can apply it to the entire dataset.

In [None]:
#### if you review, you will find each records in rdd are string

type(rddwithoutheader.first())

#### Step 1 - Convert string into list

In [None]:
# reading first record form rdd after removing header
s = rddwithoutheader.first()

# checkig type
type(s)

# printing value of s
print(s)

# converting string into list
sl = s.split(',')

# checking type
type(sl)

# printing list
for i in sl[0:len(sl)]:
    print(i)



### Step 2 - Find distinct Country

As rdd.first() shows Country is the second column in list, we can access this by using its index entry (note: index for lists start from 0). The command below will return the country name

In [None]:
sl[1]

Now let's find all distinct country; for that we are going to use distinct() Spark RDD function.

distinct() Return a new dataset that contains the distinct elements of the source dataset.

In [None]:
rddcountry = rddwithoutheader.map(lambda r : r.split(',')[1]).distinct()

rddcountry.count()

for i in rddcountry.take(10):
    print(i)

#### Step 3 - filter all the records where country is United Kingdom

In [None]:
rddcountryuk = rddwithoutheader.filter(lambda o : o.split(',')[1] in ('United Kingdom'))

rddcountryuk.count()

for i in rddcountryuk.take(10):
    print(i)



#### Working with Key-Value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed-shuffle-operations, such as grouping or aggregating the elements by a key.

In Python, these operations work on RDDs containing built-in Python tuples such as (1, 2). Simply create such tuples and then call your desired operation.

In [None]:
## convert RDD into KeyValue table; here we have indexed "Item Type" and assign 1 to each "Item Type"; 
## this will help us to count each "Item Type"

rddcountryukkv = rddcountryuk.map(lambda r : (r.split(',')[2],1))

# list 10 item_type 
rddcountryukkv.take(10)

## count the total "Item Type" by using reduceByKey

rddcountryukkvrkey = rddcountryukkv.reduceByKey(lambda a, b: a + b)

## print item_type with count

for i in rddcountryukkvrkey.take(20):
    print(i)



#### Sorting the value of Spark RDD

**sortByKey([ascending], [numPartitions])** : When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

In [None]:
## sorting the Item_Type with count; In Spark RDD you can use sortByKey to perform the sort

rddcountryukkvrkeysort = rddcountryukkvrkey.sortByKey()

for i in rddcountryukkvrkeysort.take(10):
    print(i)

# if you see the previous output; it performed the sorting but on Item_Type 
# not on count; it is because sortByKey perform the sorting on Key instead of
# Value i.e. why now we need to perform one transform change the key value pair

rddcountryukkvrkey = rddcountryukkvrkey.map(lambda o : (o[1],o[0]))

rddcountryukkvrkey.take(10)

## Now let's perform the sorting 
rddcountryukkvrkeysort = rddcountryukkvrkey.sortByKey(False)

for i in rddcountryukkvrkeysort.take(10):
    print(i)



### Saving the data to S3

**saveAsTextFile(path)**: Write the elements of the dataset as a text file (or set of text files) to a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file

In [None]:
## before saving the file to S3 let's do final transform

rddcountryukkvrkeysort = rddcountryukkvrkeysort.map(lambda o : (o[1],o[0]))

## print the value

for i in rddcountryukkvrkeysort.take(10):
    print(i)

## save the value to s3 as text/CSV file with Gzip compression, replace the bucket name

sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")
codec = "org.apache.hadoop.io.compress.GzipCodec"
## rddcountryukkvrkeysort.saveAsTextFile("s3a://${BUCKET_NAME}/input/lab2/result" + "/part-0000*" , codec)


rddcountryukkvrkeysort.saveAsTextFile("s3://===$bucket===/input/lab2/result" + "/part-0000*" , codec)




You have just leared how to malupulate RDD into key,vlaue to perform the Item Type count. Now, we are going to learn how to perform summation in RDD using **reduceByKey()**

#### Calcuate sum of total reveue per Item Type

In [None]:
# In this part, we are going to calculate sum of total revenue per item type

# let's convert the data into key value pair to extract Item Type and Total Revenue  

rddcountryukkv = rddcountryuk.map(lambda sl : (sl.split(',')[2],float(sl.split(',')[11])))

# perform the sum by using reduceByKey()

rddcountryukkvggregateByKey = rddcountryukkv.reduceByKey(lambda total, value : total + value )

# print 10 values

for i in rddcountryukkvggregateByKey.take(10):
    print(i)


### Converting to DataFrame from RDD

In next section of this lab; we are working with DataFrames. Let's understand how to can convert RDD into dataframe. The easiest way is to use **rdd.toDF()** function.

In [None]:
# run one by one
type(rddcountryukkvggregateByKey)

rddcountryukkvggregateByKeyDF = rddcountryukkvggregateByKey.toDF()

type(rddcountryukkvggregateByKeyDF)


Since RDD is schema-less without column names and data type, converting from RDD to DataFrame gives you default column names as _1, _2 and so on and data type as String.

In [None]:
rddcountryukkvggregateByKeyDF.printSchema()

Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.

In [None]:
# Create a schema for the dataframe

from pyspark.sql.types import StructField, StructType, StringType, IntegerType , FloatType

schema = StructType([
    StructField('ItemType', StringType(), True),
    StructField('SumofTotalRevenue', FloatType(), True)
])

# Create data frame

df = spark.createDataFrame(rddcountryukkvggregateByKey,schema)

# show data frame

df.show()


### Writing as Parquet to Amazon S3

In [None]:
df.write.parquet('s3://===$bucket===/input/lab2/parquetresult/')

In [None]:
## Lists all currently running sessions by name and ID.

%list_sessions

In [None]:
## stop the current session 

%stop_session