In [1]:
# SparkSessionの作成
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Hiveテーブルからデータを読み込む
df = spark.sql("SELECT * FROM test_db_20250407.can_data_ex")
df.printSchema()
df.show(5, truncate=False)

# データの基本的な操作
from pyspark.sql.functions import col, explode, from_json
from pyspark.sql.types import *

# JSON文字列からデータを抽出するサンプル
# metadataカラムをJSONとしてパース
schema = StructType([
    StructField("model", StringType(), True),
    StructField("driver_id", StringType(), True),
    StructField("scenario", StringType(), True)
])

# JSONカラムを展開
df_expanded = df.withColumn("metadata_json", from_json(col("metadata"), schema)) \
                .select("video_id", "vin", "metadata_json.*", "recording_duration")

df_expanded.show()

# 結果をCSVとして保存
df_expanded.write.mode("overwrite").csv("/home/glue_user/output/expanded_data.csv", header=True)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 17:42:45 INFO HiveConf: Found configuration file file:/home/glue_user/spark/conf/hive-site.xml
25/04/26 17:42:46 

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to instantiate a metastore client factory  due to: java.lang.ClassNotFoundException: Class  not found)

In [2]:
# スパークの設定でHiveメタストアの接続情報を指定
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GlueLocal") \
    .config("spark.sql.warehouse.dir", "/tmp/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive_metastore_20250407:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# テストクエリ
try:
    # データベース一覧を表示
    spark.sql("SHOW DATABASES").show()
    
    # test_db_20250407が存在するか確認
    dbs = [row[0] for row in spark.sql("SHOW DATABASES").collect()]
    if "test_db_20250407" in dbs:
        print("テストデータベースが見つかりました")
        # テーブル一覧を表示
        spark.sql("SHOW TABLES IN test_db_20250407").show()
    else:
        print("テストデータベースが見つかりません")
        
except Exception as e:
    print("エラーが発生しました:", e)

25/04/26 17:43:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/04/26 17:43:33 INFO HiveConf: Found configuration file file:/home/glue_user/spark/conf/hive-site.xml
25/04/26 17:43:33 ERROR HiveUtils: Unable to instantiate a metastore client factory  due to: java.lang.ClassNotFoundException: Class  not found
java.lang.ClassNotFoundException: Class  not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClientFactory(HiveUtils.java:517)
	at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:506)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3856)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3836)
	at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1605)
	at org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1594)
	at org.apache.spark.sql.hive.

エラーが発生しました: An error occurred while calling o65.toString. Trace:
java.lang.IllegalArgumentException: object is not an instance of declaring class
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)




In [3]:
# SparkSessionの作成（シンプルなもの）
from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# CSVファイルを直接読み込む
try:
    # CSVファイルの場所を探す
    import subprocess
    result = subprocess.run(["find", "/home/glue_user", "-name", "*.csv"], capture_output=True, text=True)
    print("見つかったCSVファイル:")
    print(result.stdout)
    
    # 見つかったファイルのパスを使用してCSVを読み込む
    # 例として、先ほど作成したcan_data_exのパスを推測
    csv_path = "/home/glue_user/warehouse/test_db_20250407.db/can_data_ex"  # 推測したパス
    
    # 他の可能性のあるパス
    alternative_paths = [
        "/home/glue_user/data/input/csv/can_data_ex.csv",
        "/data/input/csv/can_data_ex.csv",
        "/opt/glue/data/can_data_ex.csv"
    ]
    
    # まずメインのパスを試す
    try:
        df = spark.read.csv(csv_path, header=True, inferSchema=True)
        print("メインパスからCSVを読み込みました")
        df.printSchema()
        df.show(5, truncate=False)
    except:
        print(f"{csv_path}からの読み込みに失敗しました。代替パスを試します...")
        
        # 代替パスを試す
        for path in alternative_paths:
            try:
                df = spark.read.csv(path, header=True, inferSchema=True)
                print(f"{path}からCSVを読み込みました")
                df.printSchema()
                df.show(5, truncate=False)
                break
            except:
                print(f"{path}からの読み込みに失敗しました")
        
except Exception as e:
    print("エラー発生:", e)

見つかったCSVファイル:
/home/glue_user/column_mappings/input_ap_mapping.csv
/home/glue_user/column_mappings/master_device_attribute_mapping.csv
/home/glue_user/column_mappings/can_data_ex_mapping.csv
/home/glue_user/column_mappings/master_status_mapping.csv
/home/glue_user/column_mappings/can_data_mapping.csv
/home/glue_user/column_mappings/master_user_attribute_mapping.csv
/home/glue_user/column_mappings/input_connection_mapping.csv
/home/glue_user/column_mappings/extended_can_data.xlsx_mapping.csv
/home/glue_user/spark/python/test_support/sql/ages_newlines.csv
/home/glue_user/spark/python/test_support/sql/ages.csv
/home/glue_user/.local/lib/python3.10/site-packages/numpy/core/tests/data/umath-validation-set-log10.csv
/home/glue_user/.local/lib/python3.10/site-packages/numpy/core/tests/data/umath-validation-set-log2.csv
/home/glue_user/.local/lib/python3.10/site-packages/numpy/core/tests/data/umath-validation-set-log.csv
/home/glue_user/.local/lib/python3.10/site-packages/numpy/core/tests/data

In [4]:
# データを直接定義する方法
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import json

# スキーマ定義
schema = StructType([
    StructField("video_id", StringType(), True),
    StructField("vin", StringType(), True),
    StructField("metadata", StringType(), True),
    StructField("recording_duration", DoubleType(), True),
    StructField("start_datetime", StringType(), True),
    StructField("end_datetime", StringType(), True),
    StructField("url", StringType(), True),
    StructField("thumbnail", StringType(), True),
    StructField("labels", StringType(), True),
    StructField("can_data", StringType(), True)
])

# サンプルデータ
data = [
    ["VID20250301001", "JH4DB7550SS801338", json.dumps({"model": "TestCar A", "driver_id": "D001", "scenario": "city_driving"}), 300.5, "2025-03-01T09:30:00", "2025-03-01T09:35:00", "https://test-storage.example.com/videos/20250301001.mp4", "https://test-storage.example.com/thumbnails/20250301001.jpg", json.dumps([{"label_id": "L0001", "frame_number": 120, "class_id": "car", "bbox": [100, 200, 150, 300], "score": 0.95}]), json.dumps([{"senddate": "2025-03-01T09:30:10", "candata": [{"speed": "45", "brake": "0", "steering": "5"}]}])],
    
    ["VID20250301002", "5UXWX7C5XEL982401", json.dumps({"model": "TestCar B", "driver_id": "D002", "scenario": "highway"}), 180.2, "2025-03-01T10:15:00", "2025-03-01T10:18:00", "https://test-storage.example.com/videos/20250301002.mp4", "https://test-storage.example.com/thumbnails/20250301002.jpg", json.dumps([{"label_id": "L0003", "frame_number": 45, "class_id": "car", "bbox": [200, 150, 300, 200], "score": 0.98}]), json.dumps([{"senddate": "2025-03-01T10:15:30", "candata": [{"speed": "95", "brake": "0", "steering": "2"}]}])]
]

# データフレーム作成
df = spark.createDataFrame(data, schema)
df.printSchema()
df.show(truncate=False)

# JSONフィールドを展開したデータ分析の例
from pyspark.sql.functions import col, from_json
import pyspark.sql.functions as F

# メタデータの展開
metadata_schema = StructType([
    StructField("model", StringType(), True),
    StructField("driver_id", StringType(), True),
    StructField("scenario", StringType(), True)
])

# メタデータを展開したデータフレーム
df_expanded = df.withColumn("metadata_json", from_json(col("metadata"), metadata_schema)) \
                .select("video_id", "vin", "metadata_json.*", "recording_duration")

print("メタデータを展開した結果:")
df_expanded.show(truncate=False)

# 車種と走行シナリオの集計例
print("シナリオ別の録画時間合計:")
df_expanded.groupBy("scenario").agg(F.sum("recording_duration").alias("total_duration")).show()

print("車種別の録画件数:")
df_expanded.groupBy("model").count().show()

root
 |-- video_id: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- metadata: string (nullable = true)
 |-- recording_duration: double (nullable = true)
 |-- start_datetime: string (nullable = true)
 |-- end_datetime: string (nullable = true)
 |-- url: string (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- labels: string (nullable = true)
 |-- can_data: string (nullable = true)



                                                                                

+--------------+-----------------+-----------------------------------------------------------------------+------------------+-------------------+-------------------+-------------------------------------------------------+-----------------------------------------------------------+------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+
|video_id      |vin              |metadata                                                               |recording_duration|start_datetime     |end_datetime       |url                                                    |thumbnail                                                  |labels                                                                                                      |can_data                                                                                          |
+--------------+----