
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: fcb82baf-9639-45c4-affa-2787177aa1dd
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session fcb82baf-9639-45c4-affa-2787177aa1dd to get into ready status...
Session fcb82baf-9639-45c4-affa-2787177aa1dd has been created.



## What is RDD (Resilient Distributed Dataset)?

RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. 
Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. 

In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

## Creating RDD

RDDs are created primarily in two different ways,

* parallelizing an existing collection and
* referencing a dataset in an external storage system (HDFS, S3 and many more). 

#### Create RDD from parallelize 

In [4]:
#Create RDD from parallelize    

data = [1,2,3,4,5,6,7,8,9,10,11,12]

rdd=spark.sparkContext.parallelize(data)





In [5]:
type(rdd)

<class 'pyspark.rdd.RDD'>


In [6]:
rdd.count()

12


#### Creating RDD referencing a dataset in S3

In [2]:
# Retrieve the list of existing buckets

import boto3

s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Existing buckets:
  aws-glue-assets-030798167757-us-east-1
  aws-glue-bootcamp-030798167757-us-east-1
  bluejeans59
  cdk-hnb659fds-assets-030798167757-us-east-1
  cf-templates-leyxvlmxcxmr-us-east-1
  deh-awsbootcamp-030798167757-us-east-1
  deh-awsdataengineering-030798167757-us-east-1


In [12]:


## rdd = spark.sparkContext.textFile("s3://${BUCKET_NAME}/input/lab2/sample.csv")

rdd = spark.sparkContext.textFile("s3://aws-glue-bootcamp-030798167757-us-east-1/raw/sales/input/sales_rds_excercise.csv")




#### RDD Actions 

##### count() --> Returns the number of records in an RDD

In [13]:

# Action - count

print("Count : "+str(rdd.count()))


Count : 900


##### first() --> Returns the first record.

In [14]:

# Action - first

firstRec = rdd.first()

print("First Record : "+ firstRec)


First Record : uuid,Country,ItemType,SalesChannel,OrderPriority,OrderDate,Region,ShipDate,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit


In [18]:
## By using rdd.first(), we have identified the spark RDD rdd which you have just create has first row as header. In this example you are loooking to filter the row as header.

rddheader = rdd.first()

rddwithoutheader = rdd.filter(lambda line: line != rddheader)
rddwithoutheader.take(2)

['874708545,Panama,Cosmetics,Offline,L,2/22/2015,Central America and the Caribbean,2/27/2015,4551,437.2,263.33,1989697.2,1198414.83,791282.37', '854349935,Sao Tome and Principe,Fruits,Offline,M,12/9/2015,Sub-Saharan Africa,1/18/2016,9986,9.33,6.92,93169.38,69103.12,24066.26']


In [20]:
# Split the data into rows
rdd = rddwithoutheader.map(lambda line: line.split(","))
rdd.take(2)

[['874708545', 'Panama', 'Cosmetics', 'Offline', 'L', '2/22/2015', 'Central America and the Caribbean', '2/27/2015', '4551', '437.2', '263.33', '1989697.2', '1198414.83', '791282.37'], ['854349935', 'Sao Tome and Principe', 'Fruits', 'Offline', 'M', '12/9/2015', 'Sub-Saharan Africa', '1/18/2016', '9986', '9.33', '6.92', '93169.38', '69103.12', '24066.26']]


In [23]:
# Create a tuple of (Region, (UnitsSold, TotalRevenue, TotalCost, TotalProfit))
grouped_data = rdd.map(lambda row: (row[6], (int(row[8]), float(row[11]), float(row[12]), float(row[13]))))

grouped_data.take(2)

[('Middle East and North Africa', (934, 142509.72, 91008.96, 51500.76)), ('Central America and the Caribbean', (4551, 1989697.2, 1198414.83, 791282.37))]


In [24]:
# Group data by region and sum the aggregations
region_aggregates = grouped_data.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3]))
region_aggregates.take(2)

[('Australia and Oceania', (317097, 77734113.97, 54654560.129999995, 23079553.84)), ('Sub-Saharan Africa', (1128765, 285499363.1299999, 199361113.02000007, 86138250.11000001))]


In [25]:

# Optionally, collect the results as a list for immediate display
results = region_aggregates.collect()
for region, (unitsSold, totalRevenue, totalCost, totalProfit) in results:
 print(f"Region: {region}, Units Sold: {unitsSold}, Total Revenue: ${totalRevenue:.2f}, Total Cost: ${totalCost:.2f}, Total Profit: ${totalProfit:.2f}")

# Keep the RDD for further processing or saving


Region: Australia and Oceania, Units Sold: 317097, Total Revenue: $77734113.97, Total Cost: $54654560.13, Total Profit: $23079553.84
Region: Sub-Saharan Africa, Units Sold: 1128765, Total Revenue: $285499363.13, Total Cost: $199361113.02, Total Profit: $86138250.11
Region: Middle East and North Africa, Units Sold: 563371, Total Revenue: $152260199.03, Total Cost: $109301663.24, Total Profit: $42958535.79
Region: Central America and the Caribbean, Units Sold: 448709, Total Revenue: $136074348.13, Total Cost: $95515203.56, Total Profit: $40559144.57
Region: Asia, Units Sold: 587833, Total Revenue: $168269165.41, Total Cost: $119429549.32, Total Profit: $48839616.09
Region: Europe, Units Sold: 1242668, Total Revenue: $366587296.27, Total Cost: $266285239.92, Total Profit: $100302056.35
Region: North America, Units Sold: 90173, Total Revenue: $15429562.63, Total Cost: $10437286.35, Total Profit: $4992276.28


### Saving the data to S3

**saveAsTextFile(path)**: Write the elements of the dataset as a text file (or set of text files) to a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file

In [29]:
## save the value to s3 as text/CSV file with Gzip compression, replace the bucket name

sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter")
codec = "org.apache.hadoop.io.compress.GzipCodec"

region_aggregates.saveAsTextFile("s3://aws-glue-bootcamp-030798167757-us-east-1/raw/sales/output" + "/part-0000*" , codec)




### Converting to DataFrame from RDD

In next section of this lab; we are working with DataFrames. Let's understand how to can convert RDD into dataframe. The easiest way is to use **rdd.toDF()** function.

In [42]:
# run one by one
type(region_aggregates)

region_aggregatesDF = region_aggregates.toDF()

type(region_aggregatesDF)


<class 'pyspark.sql.dataframe.DataFrame'>


In [43]:
region_aggregatesDF.show(2)

+--------------------+--------------------+
|                  _1|                  _2|
+--------------------+--------------------+
|Australia and Oce...|{317097, 7.773411...|
|  Sub-Saharan Africa|{1128765, 2.85499...|
+--------------------+--------------------+
only showing top 2 rows


Since RDD is schema-less without column names and data type, converting from RDD to DataFrame gives you default column names as _1, _2 and so on and data type as String.

In [31]:
region_aggregatesDF.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: long (nullable = true)
 |    |-- _2: double (nullable = true)
 |    |-- _3: double (nullable = true)
 |    |-- _4: double (nullable = true)


In [44]:
from pyspark.sql.functions import col

# Flatten the struct column
flattened_df = region_aggregatesDF.select(col("_1").alias("Region"),
                                          col("_2._1").alias("UnitsSold"),
                                          col("_2._2").alias("TotalRevenue"),
                                          col("_2._3").alias("TotalCost"),
                                          col("_2._4").alias("TotalProfit"))

# Show the dataframe
flattened_df.show(2)


+--------------------+---------+-------------------+--------------------+-------------------+
|              Region|UnitsSold|       TotalRevenue|           TotalCost|        TotalProfit|
+--------------------+---------+-------------------+--------------------+-------------------+
|Australia and Oce...|   317097|      7.773411397E7|5.4654560129999995E7|      2.307955384E7|
|  Sub-Saharan Africa|  1128765|2.854993631299999E8|1.9936111302000007E8|8.613825011000001E7|
+--------------------+---------+-------------------+--------------------+-------------------+
only showing top 2 rows


Using createDataFrame() from SparkSession is another way to create and it takes rdd object as an argument. and chain with toDF() to specify names to the columns.

### Writing as Parquet to Amazon S3

In [46]:
flattened_df.write.parquet('s3://aws-glue-bootcamp-030798167757-us-east-1/raw/sales/parquetresult/')




In [2]:
## stop the current session 

%stop_session

Stopping session: fcb82baf-9639-45c4-affa-2787177aa1dd
Stopped session.
