
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [2]:
%number_of_workers 2

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Previous number of workers: 5
Setting new number of workers to: 2


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import StructType,StructField,ArrayType,MapType,StringType,TimestampType
import pyspark.sql.functions as F

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::425511518222:role/jenny-glue-etl-test
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: 62ded44a-d437-49dd-aea6-cc2288d1d603
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 62ded44a-d437-49dd-aea6-cc2288d1d603 to get into ready status...
Session 62ded44a-d437-49dd-aea6-cc2288d1d603 has been created.



In [3]:
# typelist=['sr_df','ss_df','cr_df','cs_df']
# for i in typelist:
#     print(f'select * from split_etl.{i}')

select * from split_etl.sr_df
select * from split_etl.ss_df
select * from split_etl.cr_df
select * from split_etl.cs_df


In [2]:
schema = StructType(
    [
        StructField('sequence_number', StringType(), True),
        StructField('table_name', StringType(), True),
        StructField('timestamp', StringType(), True),
        StructField('data', ArrayType(MapType(StringType(),MapType(StringType(),StringType(),True),True),True), True)
    ]
)
#구조 다시 확인해보기




In [22]:
# print(df2.toPandas())

                                                  data
0    [{'A': {'wholesale_cost_a': '72.31', 'ext_disc...
1    [{'A': {'wholesale_cost_a': '57.52', 'ext_disc...
2    [{'A': {'wholesale_cost_a': '73.36', 'ext_disc...
3    [{'A': {'wholesale_cost_a': '60.98', 'ext_disc...
4    [{'A': {'wholesale_cost_a': '97.06', 'ext_disc...
..                                                 ...
98   [{'A': {'wholesale_cost_a': '96.82', 'ext_disc...
99   [{'A': {'wholesale_cost_a': '82.2', 'ext_disco...
100  [{'A': {'wholesale_cost_a': '18.69', 'ext_disc...
101  [{'A': {'wholesale_cost_a': '8.21', 'ext_disco...
102  [{'A': {'wholesale_cost_a': '28.33', 'ext_disc...

[103 rows x 1 columns]


In [None]:
typelist=['sr_df','ss_df','cr_df','cs_df']
for i in typelist:
    df = spark.sql(f'select * from split_etl.{i}').select(F.col('data'))
    df1 = df.rdd.map(lambda x:eval(x.__getitem__('data').replace('nan','"nan"'))).toDF(schema)
    df2 = df1.select(F.col('data'))
    df3 = df2.select(df2.data.getItem(0).alias('part0'), df2.data.getItem(1).alias('part1'), df2.data.getItem(2).alias('part2'))
    df4 = df3.select(df3.part0.getItem('A').alias('part00'), df3.part1.getItem('B').alias('part01'), df3.part2.getItem('C').alias('part02'))
    df5 = df4.withColumn('partA', F.map_concat('part00','part01','part02')).select('partA')
    df6 = df5.rdd.map(lambda x: x.__getitem__('partA')).toDF()
    print(df6.toPandas())
    df6.coalesce(1).write.format('parquet').mode('overwrite').save(f's3://jenny-etl-test/glue_split_etl/one_result/{i}')
#show 하면 하나만 나옴. 

In [26]:
df = spark.sql(f'select * from split_etl.cr_df').select(F.col('data'))
df1 = df.rdd.map(lambda x:eval(x.__getitem__('data').replace('nan','"nan"'))).toDF(schema)
df2 = df1.select(F.col('data'))
df3 = df2.select(df2.data.getItem(0).alias('part0'), df2.data.getItem(1).alias('part1'), df2.data.getItem(2).alias('part2'))
df4 = df3.select(df3.part0.getItem('A').alias('part00'), df3.part1.getItem('B').alias('part01'), df3.part2.getItem('C').alias('part02'))
df5 = df4.withColumn('partA', F.map_concat('part00','part01','part02')).select('partA')
df6 = df5.rdd.map(lambda x: x.__getitem__('partA')).toDF()
df6.coalesce(1).write.option('header','true').csv('s3://jenny-etl-test/glue_split_etl/complete2/')
print(df6.toPandas())
#카탈로그환불기록

             address_id         birth_country  ... warehouse_sk      zip
0      AAAAAAAANBFECFAA  NETHERLANDS ANTILLES  ...          9.0  51387.0
1      AAAAAAAAEINFEFAA         LIECHTENSTEIN  ...          8.0  38370.0
2      AAAAAAAADCMEJBAA             GUATEMALA  ...         18.0  80169.0
3      AAAAAAAANKCIOCAA            MOZAMBIQUE  ...         18.0  46971.0
4      AAAAAAAAEEDJKDAA              CAMBODIA  ...         16.0  43511.0
...                 ...                   ...  ...          ...      ...
15466  AAAAAAAAMHAMMDAA               COMOROS  ...         11.0  58048.0
15467  AAAAAAAAIKHABCAA                   nan  ...          1.0  45124.0
15468  AAAAAAAANHMBHEAA             NICARAGUA  ...          3.0  29431.0
15469  AAAAAAAAEDIMMCAA                ZAMBIA  ...         14.0      nan
15470  AAAAAAAAJAGIPAAA              MONGOLIA  ...          5.0  68605.0

[15471 rows x 56 columns]


In [27]:
df = spark.sql(f'select * from split_etl.sr_df').select(F.col('data'))
df1 = df.rdd.map(lambda x:eval(x.__getitem__('data').replace('nan','"nan"'))).toDF(schema)
df2 = df1.select(F.col('data'))
df3 = df2.select(df2.data.getItem(0).alias('part0'), df2.data.getItem(1).alias('part1'), df2.data.getItem(2).alias('part2'))
df4 = df3.select(df3.part0.getItem('A').alias('part00'), df3.part1.getItem('B').alias('part01'), df3.part2.getItem('C').alias('part02'))
df5 = df4.withColumn('partA', F.map_concat('part00','part01','part02')).select('partA')
df6 = df5.rdd.map(lambda x: x.__getitem__('partA')).toDF()
df6.coalesce(1).write.option('header','true').csv('s3://jenny-etl-test/glue_split_etl/complete3/')
print(df6.toPandas())
#매장환불기록

      addr_sk         birth_country  ... ticket_number      zip
0   3638139.0              SLOVAKIA  ...       1692817  97057.0
1   1219126.0     WALLIS AND FUTUNA  ...     207997757  36871.0
2   1827261.0     WALLIS AND FUTUNA  ...      27401269  44536.0
3   4570468.0               ROMANIA  ...     103368137  40059.0
4    354066.0                 GABON  ...       6051988  73604.0
5   2249261.0                 GABON  ...      64218649  27057.0
6    506957.0              ETHIOPIA  ...     232237788  33003.0
7     55542.0              CAMEROON  ...      73383807  33003.0
8   1912041.0              CAMEROON  ...      73383807  99003.0
9   4621327.0              CAMEROON  ...      73383807  91904.0
10  2602725.0            GUADELOUPE  ...     208550850  66192.0
11  2347875.0          BURKINA FASO  ...     155584435  58482.0
12  5353296.0               IRELAND  ...      44147312  89454.0
13  4298223.0               IRELAND  ...     142247766  46534.0
14  4723535.0               FINLAND  ...

In [28]:
df = spark.sql(f'select * from split_etl.ss_df').select(F.col('data'))
df1 = df.rdd.map(lambda x:eval(x.__getitem__('data').replace('nan','"nan"'))).toDF(schema)
df2 = df1.select(F.col('data'))
df3 = df2.select(df2.data.getItem(0).alias('part0'), df2.data.getItem(1).alias('part1'), df2.data.getItem(2).alias('part2'))
df4 = df3.select(df3.part0.getItem('A').alias('part00'), df3.part1.getItem('B').alias('part01'), df3.part2.getItem('C').alias('part02'))
df5 = df4.withColumn('partA', F.map_concat('part00','part01','part02')).select('partA')
df6 = df5.rdd.map(lambda x: x.__getitem__('partA')).toDF()
df6.coalesce(1).write.option('header','true').csv('s3://jenny-etl-test/glue_split_etl/complete3/ss_df')
print(df6.toPandas())
#매장판매기록

        addr_sk               brand  ... wholesale_cost_b      zip
0     1781021.0  edu packexporti #1  ...             0.95  19584.0
1     2438687.0  importonameless #5  ...             3.65  37838.0
2     1099416.0   exportiimporto #2  ...             2.12  35752.0
3     1781021.0      scholarcorp #7  ...             1.39  19584.0
4     2438687.0     edu packmaxi #5  ...             2.75  37838.0
...         ...                 ...  ...              ...      ...
5216  3145688.0       brandbrand #2  ...             1.52  35752.0
5217  1781021.0     exportibrand #1  ...             3.87  19584.0
5218  1099416.0      scholarmaxi #8  ...            36.45  35752.0
5219  2438687.0    univunivamalg #5  ...            51.84  37838.0
5220  3145688.0       amalgamalg #1  ...             9.19  35752.0

[5221 rows x 72 columns]


In [None]:
## 아래의 코드는 spread Ver.

In [13]:
import pandas as pd

# df6.coalesce(1).write.option('header','true').csv('s3://jenny-etl-test/glue_split_etl/complete/test')
#카탈로그판매기록 




In [2]:
schema = StructType(
    [
        StructField('sequence_number', StringType(), True),
        StructField('table_name', StringType(), True),
        StructField('timestamp', StringType(), True),
        StructField('data', ArrayType(MapType(StringType(),MapType(StringType(),StringType(),True),True),True), True)
    ]
)
#구조 다시 확인해보기




In [3]:
df = spark.sql(f'select * from split_etl.ss_df').select(F.col('data'))
df.limit(1).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
df1 = df.rdd.map(lambda x:eval(x.__getitem__('data').replace('nan','"nan"'))).toDF(schema)
# -> 이런 표현을 한 화살표가 map으로 인식이 잘 되어있는 것을 뜻함.
# getitem -> python def (개체 가져오는 함수) | 함수 공부 다시 알아보기




In [5]:
df2 = df1.select(F.col('data'))
df2.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
df2.select(df2.data.getItem(0).alias('part0')).show(truncate=False)
#part0에 A만 가져오기

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|part0                                                                                                                                                                                                                                                                                                                                                                                                                                 

In [7]:
df3 = df2.select(df2.data.getItem(0).alias('part0'), df2.data.getItem(1).alias('part1'), df2.data.getItem(2).alias('part2'))
df3.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
df4 = df3.select(df3.part0.getItem('A').alias('part00'), df3.part1.getItem('B').alias('part01'), df3.part2.getItem('C').alias('part02'))
df4.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [32]:
df5 = df4.withColumn('partA', F.map_concat('part00','part01','part02')).select('partA')




In [None]:
df5.rdd.map(lambda x: x.__getitem__('partA')).toDF().toPandas()
# +--+--+-- => toDF
# DF => print()할때만 toPandas()

NameError: name 'df5' is not defined
