In [23]:
import findspark
findspark.init("/opt/spark/")

In [24]:
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder.master("spark://master:707") \
    .appName("chapter2") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .enableHiveSupport() \
    .getOrCreate()

22/02/26 10:32:01 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
22/02/26 10:32:01 ERROR AsyncEventQueue: Listener AppStatusListener threw an exception
java.lang.NullPointerException
	at org.apache.spark.status.AppStatusListener.onApplicationEnd(AppStatusListener.scala:192)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:57)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NullPointerException
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:646)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
#データソースの読み込み
#sep='\t'とすればtsvでも読み込みが可能です
#multiLineは、CSVやTSVの各カラムに改行が含まれていた時の対策です。
df=spark.read.csv("file:///notebook/dataset/population.csv", header=True)
df.count()

In [None]:
df.show(truncate=False)

In [None]:
#df_after_t=df.where(df."和暦（年）"== "平成")

In [None]:
#うーん使いづらい。。(日本語))
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

#スキーマ設定をしていきましょう
# カラム名、型、NullOKか？で設定していきます
struct = StructType([
    StructField("code", StringType(), False),
    StructField("kenmei", StringType(), False),
    StructField("gengo", StringType(), False),
    StructField("wareki", StringType(), False),
    StructField("seireki", StringType(), False),
    StructField("chu", StringType(), False),
    StructField("sokei", StringType(), False),
    StructField("jinko_male", StringType(), False),
    StructField("jinko_female", StringType(), False)
])
df=spark.read.csv("file:///notebook/dataset/population.csv", header=False, sep=',', inferSchema=False,schema=struct)
df.show()

In [None]:
# 大正や昭はもう不要かなと感じたら変換処理にて
df.where(df.gengo == "平成").show()
# where 以外にもfilterと呼ばれるものがあります。機能は同じなので好きな方を選んで大丈夫です
df.filter(df.gengo == "平成").show()

In [None]:
#集計をしてみます
#平成の県ごとの男女の数の平均
#groupByは県名ごとにグルーピングする記述です。
#aggはカラムごとに集計する関数で今回は男性の人口と、女性の人口毎の平均を集計しています。
#aliasは別名をつける関数です。例えば、女性の人口毎の平均の結果にはfemale_avgという別名を付けています。
import pyspark.sql.functions as sf
df.where(df.gengo == "平成").groupBy("kenmei") \
  .agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")).show()

In [None]:
#「人口集中地区以外の地区」がいらなそうですね。
# データをクレンジングして不要なデータを除きましょう
# sortは並び替えです（デフォルトでは昇順になります）

df.where(df.gengo == "平成").groupBy("kenmei") \
  .agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")) \
    .filter(df.kenmei != "人口集中地区以外の地区").sort("male_avg").show()

#良さそうです！

In [None]:
#結果を一度保存しておきます
df_after_t=df.where(df.gengo == "平成").groupBy("kenmei") \
  .agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")) \
  .filter(df.kenmei != "人口集中地区以外の地区").sort("male_avg")

In [None]:
#単純に吐き出す方法
df_after_t.write.mode("overwrite").parquet("/user/spark/pyspark_batch/dataset/parquet")

In [None]:
!hdfs dfs -ls /user/spark/pyspark_batch/dataset/parquet

In [None]:
#もう少し書き込みのオプションを見ていきます
# partitionByを使うことで、データをパーティションごと(次に説明します)に分けて配置することができます。
# 今回はkenmei(県名)ごとにデータを保存してみようと思います。
df_after_t.repartition(1).write.partitionBy("kenmei").mode("overwrite").parquet("/user/spark/pyspark_batch/dataset/parquet")

In [None]:
!hdfs dfs -ls /user/spark/pyspark_batch/dataset/parquet

In [None]:
!hdfs dfs -ls /user/spark/pyspark_batch/dataset/parquet/kenmei=三重県

In [None]:
# Createテーブルを発行します
# 次のチャプターでも紹介しますが、spark.sqlという関数を使います

#jinko_avgテーブルを作成します。
#パーティションとはデータを分けるフォルダみたいなもの、パーティションを分けることで読み込むデータ量を少なくしたりできるので最適化できる
#先程確認したkenmei=の部分がパーティションになっているのでその出力結果に合わせてテーブルを作成してみます。

#ロケーションは、kenmeiを含まずに指定します。

spark.sql(""" 
CREATE EXTERNAL TABLE IF NOT EXISTS default.jinko_avg ( male_avg double, female_avg double)
PARTITIONED BY (kenmei String)
STORED AS PARQUET
LOCATION '/user/spark/pyspark_batch/dataset/parquet/';
""")

In [None]:
# テーブルを見てみます。
spark.sql("show tables").show()

# ちゃんとできているようですね。
# ちなみに実行しているSQLは実はSQLみたいなものでHiveSQLと呼ばれるものです。
# Mysqlの扱いとほとんど同じなので、Mysqlみたいに使って動かなかったところだけ検索すると効率が良いと思います。

In [None]:
#spark.sql()はdataframeを戻り値として返してくれます

df_result=spark.sql("select * from default.jinko_avg")
df_result.show()

#おや。データを見ることができません。。

In [None]:
# テーブルだけでなく、Partitionを認識させてあげないといけません
# msck repair table　テーブル名と実行するとパーティションが認識されます(ちなみにAdd partitionというコマンドもあります)。
spark.sql("msck repair table jinko_avg")

In [None]:
# 今一度検索をしてみます。
spark.sql("select * from default.jinko_avg").show()
#　今度は出ましたね！

In [None]:
# もちろんSQLなので whereも可能です
spark.sql("select * from default.jinko_avg where kenmei='東京都'").show()

In [None]:
# Spark利用の停止
spark.sparkContext.stop()
spark.stop()