# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

In [6]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5
%%configure
{
  "--job-bookmark-option": "job-bookmark-enable",
  "--enable-spark-ui": "true",
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
The following configurations have been updated: {'--job-bookmark-option': 'job-bookmark-enable', '--enable-spark-ui': 'true'}


In [19]:
from awsglue.dynamicframe import DynamicFrame




####  Run this cell to set up and start your interactive session.


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 4f88cc36-2476-4ba1-917b-b5124bd5f59d
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--job-bookmark-option job-bookmark-enable
--enable-spark-ui true
Waiting for session 4f88cc36-2476-4ba1-917b-b5124bd5f59d to get into ready status...
Session 4f88cc36-2476-4ba1-917b-b5124bd5f59d has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [3]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='new_data', table_name='sales_transactions_csv',
                                                    transformation_ctx = "dyf1",
                                                    additional_options = {
                                                        "badRecordspath" : "s3://practice-spark-shu/bad-data/"
                                                    }
                                                   )
dyf.printSchema()

root
|-- transaction_id: long
|-- order_id: string
|-- customer_id: string
|-- customer_name: string
|-- customer_email: string
|-- customer_city: string
|-- customer_state: string
|-- product_id: string
|-- product_name: string
|-- product_category: string
|-- quantity: long
|-- unit_price: long
|-- discount_pct: long
|-- tax_pct: long
|-- payment_mode: string
|-- order_status: string
|-- order_date: string
|-- ingestion_timestamp: string
|-- gross_amount: long
|-- net_amount: long


In [4]:
df = dyf.toDF()




In [5]:
from pyspark.sql.functions import col,lit,when, avg, mean, mode,upper, rank, row_number, dense_rank,count, to_date, to_timestamp, current_date, lower, regexp_extract, length, percentile_approx,year,month,day
from pyspark.sql.window import Window





In [6]:
#Null check on primary keys
df1 = df





#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [7]:
#Reject negative quantity
df1 = df1.withColumn("rejection_flag",when(col("quantity") < 0, lit(True)).
                     when(col("unit_price") == 0, lit(True)).
                     otherwise(lit(False))
                    )




In [7]:
#w = Window.partitionBy("order_id").orderBy(col("order_id").asc())
#df1.withColumn("rows",row_number().over(w)).filter(col("rows")>1).count()

In [8]:
#df1.select("*").groupBy("order_id").agg(count("*").alias("counts")).filter(col("counts") > 1).show()

In [9]:
#df1.filter(col("customer_city").isNull()).count()

In [8]:
df1 = df1.withColumn("order_date",to_date(col("order_date"),"dd-MM-yyyy"))




In [10]:
#df1.filter(col("order_date") > current_date()).show()

In [9]:
df1 = df1.withColumn("rejection_flag",when(col("order_date") > current_date(),lit(True)).otherwise(col("rejection_flag")))




In [10]:
df1 = df1.withColumn("customer_email", lower(col("customer_email")))




In [11]:
df1 = df1.withColumn("payment_mode", upper(col("payment_mode")))




In [12]:
df1 = df1.withColumn("customer_name", regexp_extract(col("customer_name"),r"([a-zA-Z]+)",1))




In [13]:
value = df1.groupBy("product_category").agg(count("*").alias("count")).orderBy(col("count").desc()).limit(2).orderBy(col("count").asc()).limit(1).collect()[0][0]




In [14]:
df1 = df1.withColumn("product_category", when(col("product_category") == 'INVALID',value).otherwise(col("product_category")))




In [15]:
df1 = df1.withColumn("quantity",when(col("quantity") < 0 , col("quantity")*col("quantity")).otherwise(col("quantity")))




In [None]:
#df1.withColumn("Year",year(col("order_date")))\
#.withColumn("month",month(col("order_date")))\
#.withColumn("day",day(col("order_date"))).show()

In [20]:
dyf = DynamicFrame.fromDF(df1,glueContext,"df")




In [None]:
glueContext.write_dynamic_frame.from_options(
    frame = dyf,
    connection_type = "s3",
    connection_options = {
        "paths": "s3://practice-spark-shu/optut_data/",
        "partitionKeys" : ["customer_city"]
    },
    format = "csv",
    format_options = {"withHeader":True}
)