In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::148922931563:role/AdminRole
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 3b03815e-2acd-430a-b919-a16e4d4bfcac
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 3b03815e-2acd-430a-b919-a16e4d4bfcac to get into ready status...
Session 3b03815e-2acd-430a-b919-a16e4d4bfcac has been created.



In [2]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time





In [3]:

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session





In [None]:
raw_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "raw2024")

reference_data = glueContext.create_dynamic_frame.from_catalog(database = "summitdb", table_name = "reference_data")




In [5]:
raw_data.printSchema()

root
|-- uuid: string
|-- device_ts: string
|-- device_id: int
|-- device_temp: int
|-- track_id: int
|-- activity_type: string
|-- partition_0: string
|-- partition_1: string
|-- partition_2: string


In [6]:
reference_data.printSchema()

root
|-- track_id: string
|-- track_name: string
|-- artist_name: string


In [7]:
print('raw_data (Count) = ' + str(raw_data.count()))
print('reference_data (Count) = ' + str(reference_data.count()))

raw_data (Count) = 105000
reference_data (Count) = 100


In [8]:
raw_data.toDF().show(5)

+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+
|                uuid|           device_ts|device_id|device_temp|track_id|activity_type|partition_0|partition_1|partition_2|
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+
|084404a8-0438-457...|2023-01-22 12:35:...|       25|         34|      25|    Traveling|         01|         22|         12|
|c3bb0a21-41bc-4e4...|2023-01-22 12:35:...|       22|         32|      23|    Traveling|         01|         22|         12|
|e5ca0c66-3394-4e8...|2023-01-22 12:35:...|       29|         32|      29|    Traveling|         01|         22|         12|
|f4eb814b-2906-4d7...|2023-01-22 12:35:...|       35|         32|      27|    Traveling|         01|         22|         12|
|157c32d8-0599-421...|2023-01-22 12:35:...|       25|         32|      27|    Traveling|         01|         22|         12|


In [9]:
reference_data.toDF().show(5)

+--------+-----------+--------------------+
|track_id| track_name|         artist_name|
+--------+-----------+--------------------+
|       1| God's Plan|               Drake|
|       2|Meant To Be|Bebe Rexha & Flor...|
|       3|    Perfect|          Ed Sheeran|
|       4|    Finesse|Bruno Mars & Cardi B|
|       5|     Psycho|Post Malone Featu...|
+--------+-----------+--------------------+
only showing top 5 rows


In [10]:
# Adding raw_data as a temporary table in sql context for spark

raw_data.toDF().createOrReplaceTempView("temp_raw_data")

# Running the SQL statement which 
runningDF = spark.sql("select * from temp_raw_data where activity_type = 'Running'")
print("Running (count) : " + str(runningDF.count()))

runningDF.show(5)


Running (count) : 10546
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+
|                uuid|           device_ts|device_id|device_temp|track_id|activity_type|partition_0|partition_1|partition_2|
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+
|710cd618-79b9-43b...|2023-01-22 12:35:...|       49|         32|      11|      Running|         01|         22|         12|
|66a9ea56-a161-4b1...|2023-01-22 12:35:...|       33|         28|      26|      Running|         01|         22|         12|
|4480d3d0-1ec5-48d...|2023-01-22 12:35:...|       34|         34|      22|      Running|         01|         22|         12|
|cc6ef83a-ef67-4aa...|2023-01-22 12:35:...|       45|         32|      19|      Running|         01|         22|         12|
|f31193cd-21d9-47a...|2023-01-22 12:35:...|       12|         32|      16|      Running|         01| 

In [11]:
# Running the SQL statement which 
workingDF = spark.sql("select * from temp_raw_data where activity_type = 'Working'")
print("Working (count) : " + str(workingDF.count()))

workingDF.show(5)


Working (count) : 21051
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+
|                uuid|           device_ts|device_id|device_temp|track_id|activity_type|partition_0|partition_1|partition_2|
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+
|4ba13649-70b2-433...|2023-01-22 12:35:...|       41|         32|      11|      Working|         01|         22|         12|
|7a670cc5-c6e0-420...|2023-01-22 12:35:...|       27|         32|      25|      Working|         01|         22|         12|
|2b936660-640a-462...|2023-01-22 12:35:...|       29|         40|      25|      Working|         01|         22|         12|
|5883205c-e984-4ac...|2023-01-22 12:35:...|       37|         40|      30|      Working|         01|         22|         12|
|7abb64f1-7d28-440...|2023-01-22 12:35:...|       14|         40|      10|      Working|         01| 

In [12]:

def filter_function(dynamicRecord):
	if dynamicRecord['activity_type'] == 'Running':
		return True
	else:
		return False
runningDF = Filter.apply(frame = raw_data, f = filter_function)

print("Running (count) : " + str(runningDF.count()))

Running (count) : 10546


In [13]:

workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')

print("Working (count) : " + str(workingDF.count()))

Working (count) : 21051


In [14]:

joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')





In [15]:
joined_data.printSchema()

root
|-- track_id: string
|-- partition_2: string
|-- activity_type: string
|-- .track_id: int
|-- partition_1: string
|-- device_temp: int
|-- track_name: string
|-- artist_name: string
|-- device_ts: string
|-- device_id: int
|-- partition_0: string
|-- uuid: string


In [16]:

joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])






In [17]:
joined_data_clean.printSchema()

root
|-- track_id: string
|-- activity_type: string
|-- .track_id: int
|-- device_temp: int
|-- track_name: string
|-- artist_name: string
|-- device_ts: string
|-- device_id: int
|-- uuid: string


In [18]:
joined_data_clean.toDF().show(5)

+--------+-------------+---------+-----------+----------+---------------+--------------------+---------+--------------------+
|track_id|activity_type|.track_id|device_temp|track_name|    artist_name|           device_ts|device_id|                uuid|
+--------+-------------+---------+-----------+----------+---------------+--------------------+---------+--------------------+
|      22|    Traveling|       22|         32|   Thunder|Imagine Dragons|2023-01-22 12:35:...|       16|5f56514e-97ce-472...|
|      22|    Traveling|       22|         32|   Thunder|Imagine Dragons|2023-01-22 12:35:...|       11|4445f190-ec53-436...|
|      22|    Traveling|       22|         28|   Thunder|Imagine Dragons|2023-01-22 12:35:...|       29|a8ac8c21-2ec4-446...|
|      22|    Traveling|       22|         28|   Thunder|Imagine Dragons|2023-01-22 12:35:...|       39|fbccab71-0d09-4db...|
|      22|    Traveling|       22|         34|   Thunder|Imagine Dragons|2023-01-22 12:35:...|       34|f24d5e7d-c4d7-

In [None]:
try:
    datasink = glueContext.write_dynamic_frame.from_options(
        frame = joined_data_clean, connection_type = "s3",
        connection_options = {"path": "s3://asg-datalake-demo-2024/data/processed-data/"},
        format = "parquet")
    print('Transformed data written to S3')
except Exception as ex:
    print('Something went wrong')
    print(ex)

Transformed data written to S3


In [None]:

glueclient = boto3.client('glue',region_name='ap-southeast-1')

response = glueclient.start_crawler(Name='summitcrawler')

print('---')

crawler_state = ''
while (crawler_state != 'STOPPING'):
    response = glueclient.get_crawler(Name='summitcrawler')
    crawler_state = str(response['Crawler']['State'])
    time.sleep(1)

print('Crawler : Stopped')
print('---')
time.sleep(3)


---
Crawler : Stopped
---


In [22]:

print('** Summitdb has following tables**')
response = glueclient.get_tables(
    DatabaseName='summitdb',
)

for table in response['TableList']:
    print(table['Name'])



** Summitdb has following tables**
raw2023
reference_data
