In [1]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Session ID: 01249033-6f0e-49f1-8fc7-c507a9efdfb7
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session 01249033-6f0e-49f1-8fc7-c507a9efdfb7 to get into ready status...
Session 01249033-6f0e-49f1-8fc7-c507a9efdfb7 has been created.



In [3]:
persons = glueContext.create_dynamic_frame.from_catalog(
             database="ltm893-crawler-demo-database",
             table_name="ltm893-crawler-csv")
print ("Count: ", persons.count())
persons.printSchema()

Count:  28580
root
|-- zipcode: long
|-- group: long


In [4]:
persons = glueContext.create_dynamic_frame.from_catalog(
             database="ltm893-crawler-demo-database",
             table_name="ltm893-crawler-json")
print ("Count: ", persons.count())
persons.printSchema()

Count:  999999
root
|-- name: string
|-- usage: string
|-- zip: string


In [7]:
zip_group = glueContext.create_dynamic_frame.from_catalog(
            database="ltm893-crawler-demo-database",
            table_name="ltm893-crawler-csv")
print ("Count: ", zip_group.count())
zip_group.printSchema()

Count:  28580
root
|-- zipcode: long
|-- group: long


In [8]:
l_person_zip = Join.apply(persons,zip_group,'zip','zipcode').drop_fields(['zipcode'])				   				   
print ("Count: ", l_person_zip.count())
l_person_zip.printSchema()

Count:  922976
root
|-- usage: string
|-- group: long
|-- name: string
|-- zip: string


In [9]:
glueContext.write_dynamic_frame.from_options(frame =l_person_zip,
          connection_type = "s3",
          connection_options = {"path": "s3://ltm893-gluedemo/output/people_zipgroup"},
          format = "parquet")

<awsglue.dynamicframe.DynamicFrame object at 0x7f731a93fd30>


In [11]:
person_zip = l_person_zip.toDF().repartition(1)
person_zip.write.parquet('s3://ltm893-gluedemo/output-single/people_zipgroup')




In [12]:
import sys
from datetime import date

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

today = date.today()
datedir= today.strftime("%Y%m%d")
datedir='20240405'

glueContext = GlueContext(SparkContext.getOrCreate())


persons = glueContext.create_dynamic_frame.from_catalog(
             database="ltm893-crawler-demo-database",
             table_name="ltm893-crawler-json")

zip_group = glueContext.create_dynamic_frame.from_catalog(
            database="ltm893-crawler-demo-database",
            table_name="ltm893-crawler-csv")

person_zip = Join.apply(persons,zip_group,'zip','zipcode').drop_fields(['zipcode'])				   				   


glueContext.write_dynamic_frame.from_options(frame =person_zip,
          connection_type = "s3",
          connection_options = {f"path": "s3://ltm893-gluedemo/${datedir}/people_zipgroup"},
          format = "parquet")


<awsglue.dynamicframe.DynamicFrame object at 0x7f731a798e80>


In [14]:
import sys
from datetime import date


from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

today = date.today()
datedir= today.strftime("%Y%m%d")
datedir='20240404'
daily_output = "s3://ltm893-gluedemo/output/" + datedir + "/people_zipgroup"

glueContext = GlueContext(SparkContext.getOrCreate())


persons = glueContext.create_dynamic_frame.from_catalog(
             database="ltm893-crawler-demo-database",
             table_name="ltm893-crawler-json")

zip_group = glueContext.create_dynamic_frame.from_catalog(
            database="ltm893-crawler-demo-database",
            table_name="ltm893-crawler-csv")

person_zip = Join.apply(persons,zip_group,'zip','zipcode').drop_fields(['zipcode'])				   				   


glueContext.write_dynamic_frame.from_options(frame =person_zip,
          connection_type = "s3",
          connection_options = {"path": daily_output },
          format = "parquet")

<awsglue.dynamicframe.DynamicFrame object at 0x7f731a799030>
