In [None]:
#　dev限定
import sys
sys.argv = [sys.argv[0]] 
sys.argv += ['--env', 'dev'] 


In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField, BooleanType
import pyspark
from  pyspark.sql.functions import input_file_name
import pyspark.pandas as ps
from awsglue.utils import getResolvedOptions

# pandas udf サンプル
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("file_name", StringType(), True),
    StructField("age_sai", StringType(), True)
])

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    ages = pdf.age
    return pdf.assign(age_sai=ages+"歳")

In [None]:
# conf設定
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")

args = getResolvedOptions(sys.argv, ['env'])

# 環境分離 例
if args['env'] == 'dev':
    prefix='file://'
else:
    prefis='s3://'

# ファイルの読み込み例
spark_df = spark.read.csv(prefix + '/home/glue_user/workspace/jupyter_workspace/test.csv', header=True)
spark_df=spark_df.withColumn('file_name', input_file_name())

#guroup（id）ごとに何かの処理を適用
#pandas udfを読んでいる
kekka_spark_df = spark_df.groupby("id").applyInPandas(subtract_mean, schema=schema)


# parquet書き込み
kekka_spark_df.write.mode('overwrite').parquet(prefix + '/home/glue_user/workspace/jupyter_workspace/data/2023-11-11/output/')

In [None]:
spark.read.parquet(prefix + '/home/glue_user/workspace/jupyter_workspace/data/2023-11-11/output/').show()

In [None]:

# 3種類のデータフレームが登場する
## pandas dataframe(pandas_df)
## spark dataframe(spark_df)
## pandas dataframe on spark(pandas_df_on_spark)
## 分散されるのは後者２つ

import numpy as np
import pandas as pd

# sparkのdataframeで読み込む
spark_df=spark.read.csv(prefix + '/home/glue_user/workspace/jupyter_workspace/test.csv', header=True)

# pandasのデータフレーム(普通のpandasnなので悪手)
#pandas_df = spark_df.select("*").toPandas()
# これはダメ（普通のpandas操作になってる）
#pandas_on_spark_df.asfreq()

# pandas dataframe on spark
import pyspark.pandas as ps
pandas_df_on_spark = ps.DataFrame({'id': range(10)}, index=range(10))
# pandasだけどsparknizeされる
pandas_df_on_spark = pandas_df_on_spark.to_spark(index_col='index')

#これはOK
#ちなみにfilterはspark native
pandas_df_on_spark = pandas_df_on_spark.filter("id > 5")
pandas_df_on_spark
pandas_df_on_spark.show()

In [None]:
pandas_df_on_spark = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
    return pdf + 1

# transfrom型なのでSeries単位がinputになってoutputもSeries
pandas_df_on_spark.pandas_on_spark.transform_batch(pandas_plus)

In [None]:
def length(pdf) -> ps.DataFrame[int, [int]]:
    return pd.DataFrame([len(pdf)])

pandas_df_on_spark = ps.DataFrame({'A': range(1000)})

# apply型なのでDataframe単位がinputになってoutputもDataframe
pandas_df_on_spark.pandas_on_spark.apply_batch(length)  