# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
    %session_type       String        Specify a session_type to be used. Supported values: streaming, etl and glue_ray. 
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Session

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray session. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 3e987287-fc08-4775-bb5d-1e5e82f35b2a
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session 3e987287-fc08-4775-bb5d-1e5e82f35b2a to get into ready status...
Session 3e987287-fc08-4775-bb5d-1e5e82f35b2a has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='db_sales_raw_catalog', table_name='sales_table_raw')
dyf.printSchema()

root
|-- name: string
|-- address: string
|-- city: string
|-- country: string
|-- region: string
|-- productname: string
|-- productcategory: string
|-- productcategorydescription: string
|-- productunitprice: string
|-- quantityorderded: string
|-- orderdate: string


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show(5)

+--------------------+--------------------+--------------+-------+--------------+--------------------+--------------------+--------------------------+--------------------+--------------------+--------------------+
|                name|             address|          city|country|        region|         productname|     productcategory|productcategorydescription|    productunitprice|    quantityorderded|           orderdate|
+--------------------+--------------------+--------------+-------+--------------+--------------------+--------------------+--------------------------+--------------------+--------------------+--------------------+
|Zbyszek Piestrzen...|     ul. Filtrowa 68|      Warszawa| Poland|Eastern Europe|Gorgonzola Telino...|Dairy Products;Se...|      Cheeses;Seaweed a...|12.5;13.25;18;19;...|30;15;6;10;15;10;...|20121205;20121205...|
|    Jonas Bergulfsen|Erling Skakkes ga...|       Stavern| Norway|   Scandinavia|Guarana Fantastic...|Beverages;Produce...|      Soft drinks, co

#### Create Dataframes for all the tables in the structure same as normalized tables

- Region
    - [RegionID] Integer not null primary key
    - [Region] Text not null
- Country
  - [CountryID] integer not null Primary key
  - [Country] Text not null
  - [RegionID] integer not null foreign key to Region table
- Customer
  - [CustomerID] integer not null Primary Key
  - [FirstName] Text not null
  - [LastName] Text not null
  - [Address] Text not null
  - [City] Text not null
  - [CountryID] integer not null foreign key to Country table 
- ProductCateogry
  - [ProductCategoryID] integer not null Primary Key
  - [ProductCategory] Text not null
  - [ProductCategoryDescription] Text not null
- Product
  - [ProductID] integer not null Primary key
  - [ProductName] Text not null
  - [ProductUnitPrice] Real not null
  - [ProductCategoryID] integer not null foreign key to ProductCateogry table
- OrderDetail
  - [OrderID] integer not null Primary Key
  - [CustomerID] integer not null foreign key to Customer table
  - [ProductID] integer not null foreign key to Product table
  - [OrderDate] integer not null 
  - [QuantityOrdered] integer not null

In [4]:
df=df.dropDuplicates()
rdd_df=df.rdd




In [5]:
region_rdd = (
        rdd_df.map(lambda line: line[4])  # Select the 3rd column (indexing starts from 0)
           .distinct()                             # Deduplicate the regions
           .sortBy(lambda x: x)                    # Order the regions
           .zipWithIndex()                         # Add index starting from 0
           .map(lambda x: (int(x[1] + 1), x[0]))        # Increment index by 1 and create tuples (RegionID, Region)
    )
region_df= region_rdd.toDF(["RegionID", "Region"])




In [6]:
region_df.show()

+--------+---------------+
|RegionID|         Region|
+--------+---------------+
|       1|  British Isles|
|       2|Central America|
|       3| Eastern Europe|
|       4|  North America|
|       5|Northern Europe|
|       6|    Scandinavia|
|       7|  South America|
|       8|Southern Europe|
|       9| Western Europe|
+--------+---------------+


In [5]:
# Region Dataframe
# from pyspark.sql.types import *
# from pyspark.sql.functions import monotonically_increasing_id

# schema = StructType([
#     StructField("RegionID", IntegerType(), True),
#     StructField("Region", StringType(), True)
# ])
# region_df=spark.createDataFrame([], schema)
# region_df = (
#     df.select("Region")              # Select the 'Region' column
#       .distinct()                    # Deduplicate the rows
#       .orderBy("Region")             # Order the DataFrame by 'Region'
#       .withColumn("RegionID", monotonically_increasing_id() + 1)  # Add 'RegionID' column starting from 1
#       .select("RegionID", "Region")  # Select only 'RegionID' and 'Region' columns
# )
# region_df.show(30)

In [7]:
# Region to RegionID Dict
region_dict = region_df.select('Region', 'RegionID').rdd.collectAsMap()
region_dict

{'British Isles': 1, 'Central America': 2, 'Eastern Europe': 3, 'North America': 4, 'Northern Europe': 5, 'Scandinavia': 6, 'South America': 7, 'Southern Europe': 8, 'Western Europe': 9}


In [8]:
country_rdd = (
        rdd_df.map(lambda line:(line[3],line[4]))
           .distinct()                            
           .sortBy(lambda x: x[0])
           .zipWithIndex()                        
           .map(lambda x: (int(x[1]+1), x[0][0],int(region_dict.get(x[0][1], None))))
)
country_df= country_rdd.toDF(["CountryID", "Country","RegionID"])




In [9]:
country_df.show(25)

+---------+-----------+--------+
|CountryID|    Country|RegionID|
+---------+-----------+--------+
|        1|  Argentina|       7|
|        2|    Austria|       9|
|        3|    Belgium|       9|
|        4|     Brazil|       7|
|        5|     Canada|       4|
|        6|    Denmark|       5|
|        7|    Finland|       6|
|        8|     France|       9|
|        9|    Germany|       9|
|       10|    Ireland|       1|
|       11|      Italy|       8|
|       12|     Mexico|       2|
|       13|     Norway|       6|
|       14|     Poland|       3|
|       15|   Portugal|       8|
|       16|      Spain|       8|
|       17|     Sweden|       5|
|       18|Switzerland|       9|
|       19|         UK|       1|
|       20|        USA|       4|
|       21|  Venezuela|       7|
+---------+-----------+--------+


In [6]:
# # Country Dataframe
# from pyspark.sql.functions import col, create_map, lit
# from itertools import chain
# mapping_expr = create_map([lit(x) for x in chain(*region_dict.items())]) 

# schema2 = StructType([
#     StructField("CountryID", IntegerType(), True),
#     StructField("Country", StringType(), True),
#     StructField("RegionID", IntegerType(), True)
# ])
# country_df=spark.createDataFrame([], schema2)
# country_df = (
#     df.select("Country", "Region")
#       .distinct()
#       .orderBy("Country")
#       .withColumn("CountryID", monotonically_increasing_id() + 1)
#       .withColumn("RegionID", mapping_expr[col("Region")])
#       .select("CountryID", "Country","RegionID")
# )
# country_df.show(25)

In [10]:
# Country Dictionary
country_dict = country_df.select('Country', 'CountryID').rdd.collectAsMap()
country_dict

{'Argentina': 1, 'Austria': 2, 'Belgium': 3, 'Brazil': 4, 'Canada': 5, 'Denmark': 6, 'Finland': 7, 'France': 8, 'Germany': 9, 'Ireland': 10, 'Italy': 11, 'Mexico': 12, 'Norway': 13, 'Poland': 14, 'Portugal': 15, 'Spain': 16, 'Sweden': 17, 'Switzerland': 18, 'UK': 19, 'USA': 20, 'Venezuela': 21}


In [11]:
# Customer Dataframe
customer_rdd=(
rdd_df.map(lambda line: (tuple(line[0].split(' ',1)),line[1],line[2],line[3]))
    .distinct()
    .sortBy(lambda x: (x[0][0],x[0][1]))
    .zipWithIndex()
    .map(lambda x: (int(x[1]+1),x[0][0][0],x[0][0][1],x[0][1],x[0][2],int(country_dict.get(x[0][3],None))))
)
customer_df= customer_rdd.toDF(["CustomerID","FirstName","LastName", "Address","City","CountryID"])




In [12]:
customer_df.show(100)

+----------+----------+---------------+--------------------+---------------+---------+
|CustomerID| FirstName|       LastName|             Address|           City|CountryID|
+----------+----------+---------------+--------------------+---------------+---------+
|         1| Alejandra|         Camino|         Gran Via, 1|         Madrid|       16|
|         2| Alexander|          Feuer|         Heerstr. 22|        Leipzig|        9|
|         3|       Ana|       Trujillo|Avda. de la Const...|    Mexico D.F.|       12|
|         4|   Anabela|      Domingues|Av. Ines de Castr...|      Sao Paulo|        4|
|         5|     Andre|        Fonseca|     Av. Brasil, 442|       Campinas|        4|
|         6|       Ann|          Devon|      35 King George|         London|       19|
|         7|   Annette|         Roulet|1 rue Alsace-Lorr...|       Toulouse|        8|
|         8|   Antonio|         Moreno|     Mataderos  2312|    Mexico D.F.|       12|
|         9|      Aria|           Cruz|    

In [13]:
# # Customer Dataframe
# schema3 = StructType([
#     StructField("CustomerID", IntegerType(), True),
#     StructField("FirstName", StringType(), True),
#     StructField("LastName", StringType(), True),
#     StructField("Address", StringType(), True),
#     StructField("City", StringType(), True),
#     StructField("CountryID", IntegerType(), True)
# ])
# mapping_expr = create_map([lit(x) for x in chain(*country_dict.items())]) 
# customer_df=spark.createDataFrame([], schema3)
# # Splitting Name column into FirstName and LastName
# from pyspark.sql.functions import split

# name_split = split(df['Name'], ' ',2)
# df = df.withColumn('FirstName', name_split.getItem(0))
# df = df.withColumn('LastName', name_split.getItem(1))
# customer_df = (
#       df.select('FirstName','LastName','Country','Address','City')
#       .distinct()
#       .orderBy('FirstName','LastName')
#     .withColumn("CustomerID", monotonically_increasing_id() + 1)
#     .withColumn('CountryID',mapping_expr[col("Country")])
#     .select("CustomerID","FirstName","LastName","Address","City","CountryID")
      
# )
# customer_df.show(100)




In [14]:
customer_dict = customer_df.rdd.map(lambda row: (row['FirstName'] + ' ' + row['LastName'], row['CustomerID'])).collectAsMap()
customer_dict

{'Alejandra Camino': 1, 'Alexander Feuer': 2, 'Ana Trujillo': 3, 'Anabela Domingues': 4, 'Andre Fonseca': 5, 'Ann Devon': 6, 'Annette Roulet': 7, 'Antonio Moreno': 8, 'Aria Cruz': 9, 'Art Braunschweiger': 10, 'Bernardo Batista': 11, 'Carine Schmitt': 12, 'Carlos Gonzalez': 13, 'Carlos Hernandez': 14, 'Catherine Dewey': 15, 'Christina Berglund': 16, 'Daniel Tonini': 17, 'Diego Roel': 18, 'Dominique Perrier': 19, 'Eduardo Saavedra': 20, 'Elizabeth Brown': 21, 'Elizabeth Lincoln': 22, 'Felipe Izquierdo': 23, 'Fran Wilson': 24, 'Francisco Chang': 25, 'Frederique Citeaux': 26, 'Georg Pipps': 27, 'Giovanni Rovelli': 28, 'Guillermo Fernandez': 29, 'Hanna Moos': 30, 'Hari Kumar': 31, 'Helen Bennett': 32, 'Helvetius Nagy': 33, 'Henriette Pfalzheim': 34, 'Horst Kloss': 35, 'Howard Snyder': 36, 'Isabel de Castro': 37, 'Jaime Yorres': 38, 'Janete Limeira': 39, 'Janine Labrune': 40, 'Jean Fresniere': 41, 'John Steel': 42, 'Jonas Bergulfsen': 43, 'Jose Pavarotti': 44, 'Jose Pedro Freyre': 45, 'Jytte

In [15]:
ProductCategory_rdd = (rdd_df.flatMap(lambda line: [(x, y) for x, y in zip(line[6].split(';'), line[7].split(';'))]) \
           .distinct() \
           .sortBy(lambda x: (x[0], x[1])) \
           .zipWithIndex() \
           .map(lambda x: (int(x[1] + 1), x[0][0], x[0][1])) \
)

# Convert RDD to DataFrame
productcategory_df = ProductCategory_rdd.toDF(["ProductCategoryID", "ProductCategory", "ProductCategoryDescription"])




In [16]:
productcategory_df.show()

+-----------------+---------------+--------------------------+
|ProductCategoryID|ProductCategory|ProductCategoryDescription|
+-----------------+---------------+--------------------------+
|                1|      Beverages|      Soft drinks, coff...|
|                2|     Condiments|      Sweet and savory ...|
|                3|    Confections|      Desserts, candies...|
|                4| Dairy Products|                   Cheeses|
|                5| Grains/Cereals|      Breads, crackers,...|
|                6|   Meat/Poultry|            Prepared meats|
|                7|        Produce|      Dried fruit and b...|
|                8|        Seafood|          Seaweed and fish|
+-----------------+---------------+--------------------------+


In [17]:
pc_dict = productcategory_df.select('ProductCategory', 'ProductCategoryID').rdd.collectAsMap()
pc_dict

{'Beverages': 1, 'Condiments': 2, 'Confections': 3, 'Dairy Products': 4, 'Grains/Cereals': 5, 'Meat/Poultry': 6, 'Produce': 7, 'Seafood': 8}


In [18]:
#Product Dataframe
Product_rdd=(
rdd_df.flatMap(lambda line:[(x,y,z) for x,y,z in zip(line[5].split(';'),line[8].split(';'),line[6].split(';'))])\
.distinct()\
.sortBy(lambda x: x[0])\
.zipWithIndex()\
.map(lambda x:(int(x[1]+1),x[0][0],float(x[0][1]),int(pc_dict.get(x[0][2],None))))
)
product_df = Product_rdd.toDF(["ProductID","ProductName", "ProductUnitPrice", "ProductCategoryID"])




In [19]:
product_df.show(100)

+---------+--------------------+----------------+-----------------+
|ProductID|         ProductName|ProductUnitPrice|ProductCategoryID|
+---------+--------------------+----------------+-----------------+
|        1|        Alice Mutton|            39.0|                6|
|        2|       Aniseed Syrup|            10.0|                2|
|        3|    Boston Crab Meat|            18.4|                8|
|        4|   Camembert Pierrot|            34.0|                4|
|        5|    Carnarvon Tigers|            62.5|                8|
|        6|                Chai|            18.0|                1|
|        7|               Chang|            19.0|                1|
|        8|    Chartreuse verte|            18.0|                1|
|        9|Chef Anton's Caju...|            22.0|                2|
|       10|Chef Anton's Gumb...|           21.35|                2|
|       11|           Chocolade|           12.75|                3|
|       12|       Cote de Blaye|           263.5

In [20]:
p_dict = product_df.select('ProductName', 'ProductID').rdd.collectAsMap()
p_dict

{'Alice Mutton': 1, 'Aniseed Syrup': 2, 'Boston Crab Meat': 3, 'Camembert Pierrot': 4, 'Carnarvon Tigers': 5, 'Chai': 6, 'Chang': 7, 'Chartreuse verte': 8, "Chef Anton's Cajun Seasoning": 9, "Chef Anton's Gumbo Mix": 10, 'Chocolade': 11, 'Cote de Blaye': 12, 'Escargots de Bourgogne': 13, 'Filo Mix': 14, 'Flotemysost': 15, 'Geitost': 16, 'Genen Shouyu': 17, 'Gnocchi di nonna Alice': 18, 'Gorgonzola Telino': 19, "Grandma's Boysenberry Spread": 20, 'Gravad lax': 21, 'Guarana Fantastica': 22, 'Gudbrandsdalsost': 23, 'Gula Malacca': 24, 'Gumbar Gummibarchen': 25, "Gustaf's Knackebrod": 26, 'Ikura': 27, 'Inlagd Sill': 28, 'Ipoh Coffee': 29, "Jack's New England Clam Chowder": 30, 'Konbu': 31, 'Lakkalikoori': 32, 'Laughing Lumberjack Lager': 33, 'Longlife Tofu': 34, 'Louisiana Fiery Hot Pepper Sauce': 35, 'Louisiana Hot Spiced Okra': 36, 'Manjimup Dried Apples': 37, 'Mascarpone Fabioli': 38, 'Maxilaku': 39, 'Mishi Kobe Niku': 40, 'Mozzarella di Giovanni': 41, 'Nord-Ost Matjeshering': 42, 'Nort

In [21]:
import datetime

orders_rdd=(
rdd_df.flatMap(lambda line:[(line[0],x,y,z) for x,y,z in zip([p_dict.get(i) for i in line[5].split(';')],[datetime.datetime.strptime(j, "%Y%m%d").strftime("%Y-%m-%d") for j in line[10].split(';')],line[9].split(';'))])\
    .sortBy(lambda x: x[0])\
    .zipWithIndex()\
    .map(lambda x:(x[1]+1,customer_dict.get(x[0][0]),x[0][1],x[0][2],x[0][3]))
)




In [22]:
orderdetail_df = orders_rdd.toDF(["OrderID","CustomerID","ProductID", "OrderDate", "QuantityOrdered"])
orderdetail_df.show()

+-------+----------+---------+----------+---------------+
|OrderID|CustomerID|ProductID| OrderDate|QuantityOrdered|
+-------+----------+---------+----------+---------------+
|      1|         1|       68|2012-08-14|              1|
|      2|         1|       22|2012-08-14|              6|
|      3|         1|       66|2012-08-14|              4|
|      4|         1|       42|2012-08-15|              6|
|      5|         1|       53|2012-08-15|              2|
|      6|         1|       42|2012-09-16|             10|
|      7|         1|       49|2012-09-16|             10|
|      8|         1|       71|2012-09-16|              5|
|      9|         1|       42|2014-03-02|              1|
|     10|         1|        4|2014-03-02|             10|
|     11|         1|       72|2014-04-09|             10|
|     12|         1|       61|2014-04-09|              4|
|     13|         1|       56|2014-04-09|             20|
|     14|         1|       60|2014-04-09|              2|
|     15|     

In [23]:
orderdetail_df.count()

621806


#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [24]:
dfs=[region_df,country_df,customer_df,productcategory_df,product_df,orderdetail_df]




In [28]:
dfs[0].show()

+--------+---------------+
|RegionID|         Region|
+--------+---------------+
|       1|  British Isles|
|       2|Central America|
|       3| Eastern Europe|
|       4|  North America|
|       5|Northern Europe|
|       6|    Scandinavia|
|       7|  South America|
|       8|Southern Europe|
|       9| Western Europe|
+--------+---------------+


In [25]:
dfs[0].columns[0].rstrip("ID").lower()

'region'


In [32]:
import os
from awsglue.dynamicframe import DynamicFrame
for i in dfs:
    table_name = "{}_table".format(i.columns[0].rstrip("ID").lower())
    file_path = os.path.join("s3://sales-data-parquet-ss/sales_db_parquet/", "{}.parquet".format(table_name))
    i=DynamicFrame.fromDF(i, glueContext, "convert")
    # Assuming glueContext is properly initialized
    s3output = glueContext.write_dynamic_frame.from_options(
        frame=i,
        connection_type="s3",
        connection_options={"path": file_path},
        format="glueparquet",
        transformation_ctx="s3output"
    )




In [34]:
import os
for i in dfs:
    table_name = "{}_table".format(i.columns[0].rstrip("ID").lower())
    file_path = os.path.join("s3://sales-data-parquet-ss/sales_db_parquet/", "{}.parquet".format(table_name))
    i=DynamicFrame.fromDF(i, glueContext, "convert")
    s3output = glueContext.getSink(
      path=file_path,
      connection_type="s3",
      updateBehavior="UPDATE_IN_DATABASE",
      partitionKeys=[],
      compression="snappy",
      enableUpdateCatalog=True,
      transformation_ctx="s3output",
    )
    s3output.setCatalogInfo(
      catalogDatabase="sales_parquet_db", catalogTableName=table_name
    )
    s3output.setFormat("glueparquet")
    s3output.writeFrame(i)

<awsglue.dynamicframe.DynamicFrame object at 0x7fbbcb025270>
<awsglue.dynamicframe.DynamicFrame object at 0x7fbbcb025390>
<awsglue.dynamicframe.DynamicFrame object at 0x7fbbcb025570>
<awsglue.dynamicframe.DynamicFrame object at 0x7fbbdb89fe20>
<awsglue.dynamicframe.DynamicFrame object at 0x7fbbcb0401c0>
<awsglue.dynamicframe.DynamicFrame object at 0x7fbbcb040f70>
