In [9]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.functions import explode, col, get_json_object, from_json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

# パラメータの取得
# args = getResolvedOptions(sys.argv, ['JOB_NAME', 'INPUT_PATH', 'OUTPUT_PATH'])

# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = {}
args['JOB_NAME'] = 'sample'
args['INPUT_PATH'] = 's3://bucket001/sample/input/'
args['OUTPUT_PATH'] = 's3://bucket001/sample/output/'



sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 入力データの読み込み
input_path = args['INPUT_PATH']
output_path = args['OUTPUT_PATH']

data = spark.read.format("csv").option("header", "true").option("delimiter","\t").option("escape","\"").load(input_path)
df = data

# JSONスキーマを定義する
item_schema = StructType([
    StructField("type_kbn", StringType(), True),
    StructField("uuid", StringType(), True),
    StructField("answer", StructType([
        StructField("uuid", StringType(), False),
        StructField("value", StringType(), False),
        StructField("uuids", ArrayType(StringType()), False)
    ]), True)
])

json_schema = StructType([
    StructField("items", ArrayType(item_schema), True),
    StructField("survey_uuid", StringType(), False)
])



df = df.withColumn("item_json_decoded", from_json(col("item_json"),json_schema)).drop("item_json")
df = df.withColumn("survey_uuid", col("item_json_decoded.survey_uuid"))
df = df.withColumn("item", col("item_json_decoded.items"))
df = df.withColumn("item", explode(col("item_json_decoded.items")))
df = df.withColumn("item_uuid", col("item.uuid"))
df = df.withColumn("type_kbn", col("item.type_kbn"))

# 出力パスの取得

df.filter(col("type_kbn") == '00101')\
    .withColumn("option_uuids", col("item.answer.uuids")) \
    .select("uuid", "user_id_code", 'entried_at', 'survey_uuid', 'item_uuid', 'type_kbn', 'option_uuids') \
    .write.mode('overwrite').partitionBy('type_kbn', 'item_uuid', 'survey_uuid').parquet(f"{output_path}/large/00101")
    
df.filter(col("type_kbn") == '00102')\
    .withColumn("value", col("item.answer.value")) \
    .select("uuid", "user_id_code", 'entried_at', 'survey_uuid', 'item_uuid', 'type_kbn', 'value') \
    .write.mode('overwrite').partitionBy('type_kbn', 'item_uuid', 'survey_uuid').parquet(f"{output_path}/large/00102")
    
df.filter(col("type_kbn") == '00104')\
    .withColumn("value", col("item.answer.value")) \
    .select("uuid", "user_id_code", 'entried_at', 'survey_uuid', 'item_uuid', 'type_kbn', 'value') \
    .write.mode('overwrite').partitionBy('type_kbn', 'item_uuid', 'survey_uuid').parquet(f"{output_path}/large/00104")

df.filter(col("type_kbn") == '00105')\
    .withColumn("option_uuid", col("item.answer.uuid")) \
    .select("uuid", "user_id_code", 'entried_at', 'survey_uuid', 'item_uuid', 'type_kbn', 'option_uuid') \
    .write.mode('overwrite').partitionBy('type_kbn', 'item_uuid', 'survey_uuid').parquet(f"{output_path}/large/00105")
    


job.commit()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling z:com.amazonaws.services.glue.util.Job.commit.
: java.lang.RuntimeException: Not initialized
	at com.amazonaws.services.glue.util.JobBookmark$.preCheck(JobBookmarkUtils.scala:22)
	at com.amazonaws.services.glue.util.JobBookmark$.commit(JobBookmarkUtils.scala:87)
	at com.amazonaws.services.glue.util.Job$.commit(Job.scala:122)
	at com.amazonaws.services.glue.util.Job.commit(Job.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.command

In [13]:
%%sql

create schema minio.bucket with(location = 's3a://bucket001/');

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:

missing {'DBPROPERTIES', 'PROPERTIES'} at '('(line 2, pos 31)

== SQL ==

create schema minio.bucket with(location = 's3a://bucket001/');
-------------------------------^^^
 

Traceback (most recent call last):
  File "/home/glue_user/spark/python/pyspark/sql/session.py", line 723, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/glue_user/spark/python/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.ParseException: 
missing {'DBPROPERTIES', 'PROPERTIES'} at '('(line 2, pos 31)

== SQL ==

create schema minio.bucket with(location = 's3a://bucket001/');
-------------------------------^^^
 




In [22]:
%%sql


CREATE EXTERNAL TABLE IF NOT EXISTS tmp_1_00101 (
  `participant_id` string,
  `option_uuid_array` array < string >,
  `entried_at` timestamp,
  `answer_uuid` char(36)
)
PARTITIONED BY (
  `item_uuid` char(36),
  `survey_uuid` char(36)
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://bucket001/sample/output/large/00101/type_kbn=00101/'
TBLPROPERTIES ('classification' = 'parquet');

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [14]:
%%sql


CREATE EXTERNAL TABLE IF NOT EXISTS tmp_00101_1 (
  `participant_id` string,
  `option_uuid_array` array < string >,
  `entried_at` timestamp,
  `answer_uuid` char(36)
)
PARTITIONED BY (
  `item_uuid` char(36),
  `survey_uuid` char(36)
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
EXTERNAL_LOCATION 's3a://bucket001/sample/output/large/00101/type_kbn=00101/item_uuid=552703b5-a22f-47fe-9f2d-51b1f40cbe26/'
TBLPROPERTIES ('classification' = 'parquet');

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:

mismatched input 'EXTERNAL_LOCATION' expecting {<EOF>, ';'}(line 15, pos 0)

== SQL ==


CREATE EXTERNAL TABLE IF NOT EXISTS tmp_00101_2 (
  `participant_id` string,
  `option_uuid_array` array < string >,
  `entried_at` timestamp,
  `answer_uuid` char(36)
)
PARTITIONED BY (
  `item_uuid` char(36),
  `survey_uuid` char(36)
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
EXTERNAL_LOCATION 's3a://bucket001/sample/output/large/00101/type_kbn=00101/item_uuid=552703b5-a22f-47fe-9f2d-51b1f40cbe26/'
^^^
TBLPROPERTIES ('classification' = 'parquet');
 

Traceback (most recent call last):
  File "/home/glue_user/spark/python/pyspark/sql/session.py", line 723, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/home/glue_user/spark/python/lib

In [19]:
%%sql


CREATE TABLE IF NOT EXISTS s3.default.tmp_00101_2 (
  `participant_id` string,
  `option_uuid_array` array < string >,
  `entried_at` timestamp,
  `answer_uuid` char(36)
)
WITH (
  format = 'parquet',
  external_location = 's3a://bucket001/sample/output/large/00101/type_kbn=00101/item_uuid=552703b5-a22f-47fe-9f2d-51b1f40cbe26/'
);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:

extraneous input '(' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FU