In [11]:
import findspark
findspark.init("/opt/homebrew/opt/apache-spark/libexec/")


In [12]:
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [13]:
#pysparkに必要なライブラリを読み込む

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder \
    .appName("chapter1") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sql("show tables").show() 

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|jinko_avg|      false|
+---------+---------+-----------+



In [53]:
df = spark.read.option("multiLine","true").option("encoding","SJIS").csv("/Users/isomasaki/pyspark_batch/dataset/jinko.csv",header=True,sep=",",inferSchema=False)
df.count()

982

In [15]:
df.show(truncate=False)


+--------------+----------+----+----------+----------+----+------------+----------+----------+
|都道府県コード|都道府県名|元号|和暦（年）|西暦（年）|注  |人口（総数）|人口（男）|人口（女）|
+--------------+----------+----+----------+----------+----+------------+----------+----------+
|00            |全国      |大正|9         |1920      |null|55963053    |28044185  |27918868  |
|01            |北海道    |大正|9         |1920      |null|2359183     |1244322   |1114861   |
|02            |青森県    |大正|9         |1920      |null|756454      |381293    |375161    |
|03            |岩手県    |大正|9         |1920      |null|845540      |421069    |424471    |
|04            |宮城県    |大正|9         |1920      |null|961768      |485309    |476459    |
|05            |秋田県    |大正|9         |1920      |null|898537      |453682    |444855    |
|06            |山形県    |大正|9         |1920      |null|968925      |478328    |490597    |
|07            |福島県    |大正|9         |1920      |null|1362750     |673525    |689225    |
|08            |茨城県    |大正|9       

In [56]:
#日本語のカラム名をローマ字へ変換
from pyspark.sql.types import StructType,StructField,StringType
from pyspark.sql.functions import col

struct = StructType([
    StructField("code",StringType(),False),
    StructField("kenmei",StringType(),False),
    StructField("gengo",StringType(),False),
    StructField("wareki",StringType(),False),
    StructField("seireki",StringType(),False),
    StructField("chu",StringType(),False),
    StructField("sokei",StringType(),False),
    StructField("jinko_male",StringType(),False),
    StructField("jinko_female",StringType(),False)
])

df = spark.read.option("multiLine","true").option("encoding","SJIS") \
.csv("/Users/isomasaki/pyspark_batch/dataset/jinko.csv",header=False,sep=",",inferSchema=False,schema=struct)
df.show()

+--------------+----------+-----+----------+----------+----+------------+----------+------------+
|          code|    kenmei|gengo|    wareki|   seireki| chu|       sokei|jinko_male|jinko_female|
+--------------+----------+-----+----------+----------+----+------------+----------+------------+
|都道府県コード|都道府県名| 元号|和暦（年）|西暦（年）|  注|人口（総数）|人口（男）|  人口（女）|
|            00|      全国| 大正|         9|      1920|null|    55963053|  28044185|    27918868|
|            01|    北海道| 大正|         9|      1920|null|     2359183|   1244322|     1114861|
|            02|    青森県| 大正|         9|      1920|null|      756454|    381293|      375161|
|            03|    岩手県| 大正|         9|      1920|null|      845540|    421069|      424471|
|            04|    宮城県| 大正|         9|      1920|null|      961768|    485309|      476459|
|            05|    秋田県| 大正|         9|      1920|null|      898537|    453682|      444855|
|            06|    山形県| 大正|         9|      1920|null|      968925|    478328|      49059

In [17]:
df.where(df.gengo == "平成").show()


+----+----------------------+-----+------+-------+----+---------+----------+------------+
|code|                kenmei|gengo|wareki|seireki| chu|    sokei|jinko_male|jinko_female|
+----+----------------------+-----+------+-------+----+---------+----------+------------+
|  00|                  全国| 平成|     2|   1990|null|123611167|  60696724|    62914443|
|  0A|          人口集中地区| 平成|     2|   1990|null| 78152452|  38564229|    39588223|
|  0B|人口集中地区以外の地区| 平成|     2|   1990|null| 45458715|  22132495|    23326220|
|  01|                北海道| 平成|     2|   1990|null|  5643647|   2722988|     2920659|
|  02|                青森県| 平成|     2|   1990|null|  1482873|    704758|      778115|
|  03|                岩手県| 平成|     2|   1990|null|  1416928|    680197|      736731|
|  04|                宮城県| 平成|     2|   1990|null|  2248558|   1105103|     1143455|
|  05|                秋田県| 平成|     2|   1990|null|  1227478|    584678|      642800|
|  06|                山形県| 平成|     2|   1990|null|  1258390|

In [18]:
import pyspark.sql.functions as sf
df.where(df.gengo == "平成").groupBy("kenmei")\
    .agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")).show() 

+----------------------+--------------------+--------------------+
|                kenmei|            male_avg|          female_avg|
+----------------------+--------------------+--------------------+
|人口集中地区以外の地区|2.0976203166666668E7|2.2272045166666668E7|
|                佐賀県|            408192.5|            456442.5|
|                栃木県|   987741.8333333334|            999415.5|
|                京都府|  1268325.3333333333|  1360099.3333333333|
|                香川県|   485871.8333333333|   523763.6666666667|
|                愛媛県|   692188.3333333334|   774376.1666666666|
|                秋田県|   542928.3333333334|            604578.5|
|                広島県|           1387308.5|  1478006.8333333333|
|                宮崎県|            542386.5|            608793.0|
|              鹿児島県|            818506.0|            929134.0|
|                埼玉県|  3492880.3333333335|  3443447.8333333335|
|                三重県|            893167.5|   944959.6666666666|
|                島根県|            356034.

In [19]:
df_after_t = df.where(df.gengo == "平成").groupBy("kenmei")\
    .agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("jinko_female_avg"))\
        .filter(df.kenmei != "人口集中地区以外の地区").sort("male_avg")

In [20]:
df_after_t.show()

+--------+-----------------+-----------------+
|  kenmei|         male_avg| jinko_female_avg|
+--------+-----------------+-----------------+
|  鳥取県|287885.3333333333|314291.3333333333|
|  島根県|         356034.5|388621.6666666667|
|  高知県|372268.1666666667|         418517.0|
|  徳島県|383399.1666666667|         423152.0|
|  福井県|         395512.5|420182.6666666667|
|  佐賀県|         408192.5|         456442.5|
|  山梨県|425777.8333333333|441831.1666666667|
|  香川県|485871.8333333333|523763.6666666667|
|和歌山県|         490624.0|547112.3333333334|
|  富山県|         532857.0|573049.8333333334|
|  宮崎県|         542386.5|         608793.0|
|  秋田県|542928.3333333334|         604578.5|
|  石川県|         566064.0|         604518.5|
|  大分県|571530.6666666666|638773.6666666666|
|  山形県|         583603.5|627811.6666666666|
|  沖縄県|         654622.0|679050.6666666666|
|  岩手県|659592.6666666666|714973.1666666666|
|  滋賀県|         662391.0|         680326.0|
|  奈良県|671178.6666666666|734736.6666666666|
|  青森県|675238.6666666666

In [21]:
#カラムナーフォーマットへの変換
df_after_t.write.mode("overwrite").parquet("/Users/isomasaki/pyspark_batch/dataset/parquet")


                                                                                

In [22]:
!ls -l /Users/isomasaki/pyspark_batch/dataset/parquet

/Users/isomasaki/.zshenv:4: command not found: goenv
total 8
-rw-r--r--  1 isomasaki  staff     0 Jul 21 09:19 _SUCCESS
-rw-r--r--  1 isomasaki  staff  2083 Jul 21 09:19 part-00000-1f96165f-5412-4cc1-87e3-4200f6fe7bfb-c000.snappy.parquet


In [23]:
#スモールファイル問題に対処するためにファイルをまとめる(ノードの数)
df_after_t.repartition(1).write.mode("overwrite").parquet("/Users/isomasaki/pyspark_batch/dataset/parquet")

In [24]:
!ls -l /Users/isomasaki/pyspark_batch/dataset/parquet

/Users/isomasaki/.zshenv:4: command not found: goenv
total 8
-rw-r--r--  1 isomasaki  staff     0 Jul 21 09:19 _SUCCESS
-rw-r--r--  1 isomasaki  staff  2083 Jul 21 09:19 part-00000-e4f5624d-8fa8-4c24-b8ae-1f49d59da26f-c000.snappy.parquet


In [25]:
df_after_t.repartition(1).write.partitionBy("kenmei").mode("overwrite").parquet("/Users/isomasaki/pyspark_batch/dataset/parquet")

                                                                                

In [26]:
!ls -l /Users/isomasaki/pyspark_batch/dataset/parquet

/Users/isomasaki/.zshenv:4: command not found: goenv
total 0
-rw-r--r--  1 isomasaki  staff    0 Jul 21 09:19 _SUCCESS
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=三重県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=京都府[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=人口集中地区[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=佐賀県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=全国[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=兵庫県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=北海道[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=千葉県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=和歌山県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=埼玉県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=大分県[m[m
drwxr-xr-x  4 isomasaki  staff  128 Jul 21 09:19 [34mkenmei=大阪府[m[m
drwxr-xr-x  4 isomasaki  s

In [27]:
!ls -l /Users/isomasaki/pyspark_batch/dataset/parquet/kenmei=香川県

/Users/isomasaki/.zshenv:4: command not found: goenv
total 8
-rw-r--r--  1 isomasaki  staff  786 Jul 21 09:19 part-00000-af44ab2c-c3e8-4e15-b63d-6ed105633941.c000.snappy.parquet


In [28]:
#香川県のみのデータ
parquet_df = spark.read.parquet("/Users/isomasaki/pyspark_batch/dataset/parquet/kenmei=香川県")
parquet_df.show()

+-----------------+-----------------+
|         male_avg| jinko_female_avg|
+-----------------+-----------------+
|485871.8333333333|523763.6666666667|
+-----------------+-----------------+



In [29]:
spark.sql(
    """
    CREATE EXTERNAL TABLE IF NOT EXISTS default.jinko_avg (male_avg double,jinko_female_avg double)
    PARTITIONED BY (kenmei String)
    STORED AS PARQUET
    LOCATION '/Users/isomasaki/pyspark_batch/dataset/parquet/';
    """
)

DataFrame[]

In [30]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|jinko_avg|      false|
+---------+---------+-----------+



In [31]:
df_result = spark.sql("select * from default.jinko_avg")
df_result.show()
#partitionつきのテーブルでは、最初はテーブルの中身を確認できない

22/07/21 09:19:40 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
22/07/21 09:19:40 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
22/07/21 09:19:40 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/07/21 09:19:40 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
+--------------------+--------------------+------------+
|            male_avg|    jinko_female_avg|      kenmei|
+--------------------+--------------------+------------+
|            893167.5|   944959.6666666666|      三重県|
|  1268325.3333333333|  1360099.3333333333|      京都府|
|4.0840519833333336E7|4.2415789666666664E7|人口集中地区|
|         6.1816723E7|6.4687834833333336E7|        全国|
|           2650310.5|           2861527.0|      兵庫県|
|  2665781.3333333335|  2923371.8333333335|      北海道|
|           2987847.0|           2974638.5|      千葉県|


In [32]:
#メタデータにパーティションを登録
spark.sql("msck repair table jinko_avg")

DataFrame[]

In [33]:
df_result = spark.sql("select * from default.jinko_avg")
df_result.show()
#partitionつきのテーブルでは、最初はテーブルの中身を確認できない

+--------------------+--------------------+------------+
|            male_avg|    jinko_female_avg|      kenmei|
+--------------------+--------------------+------------+
|            893167.5|   944959.6666666666|      三重県|
|  1268325.3333333333|  1360099.3333333333|      京都府|
|4.0840519833333336E7|4.2415789666666664E7|人口集中地区|
|         6.1816723E7|6.4687834833333336E7|        全国|
|           2650310.5|           2861527.0|      兵庫県|
|  2665781.3333333335|  2923371.8333333335|      北海道|
|           2987847.0|           2974638.5|      千葉県|
|            490624.0|   547112.3333333334|    和歌山県|
|  3492880.3333333335|  3443447.8333333335|      埼玉県|
|   571530.6666666666|   638773.6666666666|      大分県|
|   4292675.833333333|           4517115.0|      大阪府|
|   671178.6666666666|   734736.6666666666|      奈良県|
|           1139561.5|           1191255.0|      宮城県|
|            542386.5|            608793.0|      宮崎県|
|            532857.0|   573049.8333333334|      富山県|
|   709497.8333333334|

In [34]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("chaper2")\
    .config("hive.exec.dynamic.partition","true")\
    .config("hive.exec.dynamic.partition.mode","nonstrict")\
    .config("spark.sql.session.timeZone","JST")\
    .config("spark.ui.enabled","true")\
    .enableHiveSupport()\
    .getOrCreate()
    

22/07/21 09:19:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [35]:
#仮想テーブルの作成
df.createOrReplaceTempView("jinko")

In [36]:
#sparkSQLの実行

spark.sql("select * from jinko").show()

+--------------+----------+-----+----------+----------+----+------------+----------+------------+
|          code|    kenmei|gengo|    wareki|   seireki| chu|       sokei|jinko_male|jinko_female|
+--------------+----------+-----+----------+----------+----+------------+----------+------------+
|都道府県コード|都道府県名| 元号|和暦（年）|西暦（年）|  注|人口（総数）|人口（男）|  人口（女）|
|            00|      全国| 大正|         9|      1920|null|    55963053|  28044185|    27918868|
|            01|    北海道| 大正|         9|      1920|null|     2359183|   1244322|     1114861|
|            02|    青森県| 大正|         9|      1920|null|      756454|    381293|      375161|
|            03|    岩手県| 大正|         9|      1920|null|      845540|    421069|      424471|
|            04|    宮城県| 大正|         9|      1920|null|      961768|    485309|      476459|
|            05|    秋田県| 大正|         9|      1920|null|      898537|    453682|      444855|
|            06|    山形県| 大正|         9|      1920|null|      968925|    478328|      49059

In [37]:
df_after_t_by_sql = spark.sql(
    """
    select 
      kenmei,
      avg(jinko_male) as male_avg,
      avg(jinko_female) as female_avg
    from 
      jinko
    where
      gengo = "平成"
      and kenmei != "人口集中地区以外の地区"
    group by kenmei
    order by male_avg
    """
)

df_after_t_by_sql.show()

+--------+-----------------+-----------------+
|  kenmei|         male_avg|       female_avg|
+--------+-----------------+-----------------+
|  鳥取県|287885.3333333333|314291.3333333333|
|  島根県|         356034.5|388621.6666666667|
|  高知県|372268.1666666667|         418517.0|
|  徳島県|383399.1666666667|         423152.0|
|  福井県|         395512.5|420182.6666666667|
|  佐賀県|         408192.5|         456442.5|
|  山梨県|425777.8333333333|441831.1666666667|
|  香川県|485871.8333333333|523763.6666666667|
|和歌山県|         490624.0|547112.3333333334|
|  富山県|         532857.0|573049.8333333334|
|  宮崎県|         542386.5|         608793.0|
|  秋田県|542928.3333333334|         604578.5|
|  石川県|         566064.0|         604518.5|
|  大分県|571530.6666666666|638773.6666666666|
|  山形県|         583603.5|627811.6666666666|
|  沖縄県|         654622.0|679050.6666666666|
|  岩手県|659592.6666666666|714973.1666666666|
|  滋賀県|         662391.0|         680326.0|
|  奈良県|671178.6666666666|734736.6666666666|
|  青森県|675238.6666666666

In [38]:
spark.sql(
    """
   insert overwrite table jinko_avg partition(kenmei)
   select 
      avg(jinko_male) as male_avg,
      avg(jinko_female),kenmei as female_avg
    from 
      jinko
    where
      gengo = "平成"
      and kenmei != "人口集中地区以外の地区"
    group by kenmei
    order by male_avg
    """
)

                                                                                

DataFrame[]

In [39]:
spark.sql("select * from jinko_avg").show()

+--------------------+--------------------+------------+
|            male_avg|    jinko_female_avg|      kenmei|
+--------------------+--------------------+------------+
|            893167.5|   944959.6666666666|      三重県|
|  1268325.3333333333|  1360099.3333333333|      京都府|
|4.0840519833333336E7|4.2415789666666664E7|人口集中地区|
|         6.1816723E7|6.4687834833333336E7|        全国|
|           2650310.5|           2861527.0|      兵庫県|
|  2665781.3333333335|  2923371.8333333335|      北海道|
|           2987847.0|           2974638.5|      千葉県|
|            490624.0|   547112.3333333334|    和歌山県|
|  3492880.3333333335|  3443447.8333333335|      埼玉県|
|   571530.6666666666|   638773.6666666666|      大分県|
|   4292675.833333333|           4517115.0|      大阪府|
|   671178.6666666666|   734736.6666666666|      奈良県|
|           1139561.5|           1191255.0|      宮城県|
|            542386.5|            608793.0|      宮崎県|
|            532857.0|   573049.8333333334|      富山県|
|   709497.8333333334|

In [40]:
#hint文でスモールファイル問題の解決 /** repartition(1) */ 忘れずに
#df2 でキャッシュ
df2 = spark.sql(
   """
   insert overwrite table jinko_avg partition(kenmei)
   select /** repartition(1) */
      avg(jinko_male) as male_avg,
      avg(jinko_female),kenmei as female_avg
    from 
      jinko
    where
      gengo = "平成"
      and kenmei != "人口集中地区以外の地区"
    group by kenmei
    order by male_avg
    """
)

                                                                                

In [41]:
#本番環境を想定したpyspark 昭和
spark.sql("select * from jinko_avg").show()

+--------------------+--------------------+------------+
|            male_avg|    jinko_female_avg|      kenmei|
+--------------------+--------------------+------------+
|            893167.5|   944959.6666666666|      三重県|
|  1268325.3333333333|  1360099.3333333333|      京都府|
|4.0840519833333336E7|4.2415789666666664E7|人口集中地区|
|         6.1816723E7|6.4687834833333336E7|        全国|
|           2650310.5|           2861527.0|      兵庫県|
|  2665781.3333333335|  2923371.8333333335|      北海道|
|           2987847.0|           2974638.5|      千葉県|
|            490624.0|   547112.3333333334|    和歌山県|
|  3492880.3333333335|  3443447.8333333335|      埼玉県|
|   571530.6666666666|   638773.6666666666|      大分県|
|   4292675.833333333|           4517115.0|      大阪府|
|   671178.6666666666|   734736.6666666666|      奈良県|
|           1139561.5|           1191255.0|      宮城県|
|            542386.5|            608793.0|      宮崎県|
|            532857.0|   573049.8333333334|      富山県|
|   709497.8333333334|

In [None]:
#localhost:4040から

In [43]:
df2.cache()
df2.is_cached

22/07/21 09:36:12 WARN CacheManager: Asked to cache already cached data.


True

In [48]:
#テーブルをキャッシュ
spark.catalog.cacheTable("jinko_avg")
spark.catalog.isCached("jinko_avg")

True

In [54]:
#メタデータについて mysqlにmetadatastoreを作成
df.show()


+--------------+----------+----+----------+----------+----+------------+----------+----------+
|都道府県コード|都道府県名|元号|和暦（年）|西暦（年）|  注|人口（総数）|人口（男）|人口（女）|
+--------------+----------+----+----------+----------+----+------------+----------+----------+
|            00|      全国|大正|         9|      1920|null|    55963053|  28044185|  27918868|
|            01|    北海道|大正|         9|      1920|null|     2359183|   1244322|   1114861|
|            02|    青森県|大正|         9|      1920|null|      756454|    381293|    375161|
|            03|    岩手県|大正|         9|      1920|null|      845540|    421069|    424471|
|            04|    宮城県|大正|         9|      1920|null|      961768|    485309|    476459|
|            05|    秋田県|大正|         9|      1920|null|      898537|    453682|    444855|
|            06|    山形県|大正|         9|      1920|null|      968925|    478328|    490597|
|            07|    福島県|大正|         9|      1920|null|     1362750|    673525|    689225|
|            08|    茨城県|大正|        

In [59]:
#日本語のカラム名をローマ字へ変換
from pyspark.sql.types import StructType,StructField,StringType
from pyspark.sql.functions import col

struct = StructType([
    StructField("code",StringType(),False),
    StructField("kenmei",StringType(),False),
    StructField("gengo",StringType(),False),
    StructField("wareki",StringType(),False),
    StructField("seireki",StringType(),False),
    StructField("chu",StringType(),False),
    StructField("sokei",StringType(),False),
    StructField("jinko_male",StringType(),False),
    StructField("jinko_female",StringType(),False)
])

df = spark.read.option("multiLine","true").option("encoding","SJIS") \
.csv("/Users/isomasaki/pyspark_batch/dataset/jinko.csv",header=False,sep=",",inferSchema=False,schema=struct)

data_t = df.filter("code!='都道府県コード'").filter("gengo='平成'")
data_t.show()

+----+----------------------+-----+------+-------+----+---------+----------+------------+
|code|                kenmei|gengo|wareki|seireki| chu|    sokei|jinko_male|jinko_female|
+----+----------------------+-----+------+-------+----+---------+----------+------------+
|  00|                  全国| 平成|     2|   1990|null|123611167|  60696724|    62914443|
|  0A|          人口集中地区| 平成|     2|   1990|null| 78152452|  38564229|    39588223|
|  0B|人口集中地区以外の地区| 平成|     2|   1990|null| 45458715|  22132495|    23326220|
|  01|                北海道| 平成|     2|   1990|null|  5643647|   2722988|     2920659|
|  02|                青森県| 平成|     2|   1990|null|  1482873|    704758|      778115|
|  03|                岩手県| 平成|     2|   1990|null|  1416928|    680197|      736731|
|  04|                宮城県| 平成|     2|   1990|null|  2248558|   1105103|     1143455|
|  05|                秋田県| 平成|     2|   1990|null|  1227478|    584678|      642800|
|  06|                山形県| 平成|     2|   1990|null|  1258390|

In [62]:
spark.sql(
    """
    create database if not exists data_management_c
    """
)

22/07/22 09:19:58 WARN ObjectStore: Failed to get database data_management_c, returning NoSuchObjectException
22/07/22 09:19:58 WARN ObjectStore: Failed to get database data_management_c, returning NoSuchObjectException
22/07/22 09:19:58 WARN ObjectStore: Failed to get database data_management_c, returning NoSuchObjectException


DataFrame[]

In [65]:
spark.sql(
    """
    create table if not exists data_management_c.jinko_t (
        code String,
        gengo String,
        wareki String,
        seireki String,
        chu String,
        sokei String,
        jinko_male String,
        jinko_female String
    )
    PARTITIONED BY (kenmei String)
    STORED AS PARQUET
    LOCATION '/Users/isomasaki/pyspark_datamanagement_metadata/dataset/data_management_c.db/jinko_t';
    """
)

22/07/22 09:46:19 ERROR ConnectionHandle: Database access problem. Killing off this connection and all remaining connections in the connection pool. SQL State = 08S01
22/07/22 09:46:19 WARN ObjectStore: Falling back to ORM path due to direct SQL failure (this is not an error): Communications link failure

The last packet successfully received from the server was 1,581,243 milliseconds ago. The last packet sent successfully to the server was 1,581,243 milliseconds ago. at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:543) at org.datanucleus.api.jdo.JDOPersistenceManager.getDataStoreConnection(JDOPersistenceManager.java:2284) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.executeNoResult(MetaStoreDirectSql.java:252) at org.apache.hadoop.hive.metastore.MetaStoreDirectSql.prepareTxn(MetaStoreDirectSql.java:1783) at org.apache.hadoop.hive.metastore.ObjectStore$GetHelper.run(ObjectStore.java:2964) at org.apache.hadoop.hive.metastore

DataFrame[]

In [66]:
spark.sql(
    """
    create table if not exists data_management_c.jinko_code (code String , kenmei String)
    row format delimited
    fields terminated by ','
    lines terminated by '\n'
    LOCATION '/Users/isomasaki/pyspark_datamanagement_metadata/dataset/data_management_c.db/jinko_code';
    """
)

DataFrame[]

In [67]:
spark.sql("show tables in data_management_c").show(truncate=False)

+-----------------+----------+-----------+
|namespace        |tableName |isTemporary|
+-----------------+----------+-----------+
|data_management_c|jinko_code|false      |
|data_management_c|jinko_t   |false      |
|                 |jinko     |true       |
+-----------------+----------+-----------+



In [68]:
#テーブル定義
spark.sql("show create table data_management_c.jinko_t").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                                                                                                                                                                                                                             |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [69]:
#dfからテーブルにデータを登録
data_t.createOrReplaceTempView("jinko_table_tmp") 
#tmpテーブルからテーブルへ移動
spark.sql(""" 
          insert overwrite table data_management_c.jinko_t partition(kenmei)
          /** repartition(1) */
          select 
            code,
            gengo,
            wareki,
            seireki,
            chu,
            sokei,
            jinko_male,
            jinko_female,
            kenmei
          from 
            jinko_table_tmp
          """)

                                                                                

22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updated size to 2357
22/07/23 14:11:02 WARN log: Updated size to 2344
22/07/23 14:11:02 WARN log: Updated size to 2378
22/07/23 14:11:02 WARN log: Updated size to 2348
22/07/23 14:11:02 WARN log: Updated size to 2349
22/07/23 14:11:02 WARN log: Updated size to 2334
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updated size to 2334
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 WARN log: Updating partition stats fast for: jinko_t
22/07/23 14:11:02 

DataFrame[]

In [70]:
spark.sql("""
          select
            kenmei,
            count(1) as cnt
          from 
            data_management_c.jinko_t
          group by kenmei
          """).show()



+----------------------+---+
|                kenmei|cnt|
+----------------------+---+
|人口集中地区以外の地区|  6|
|              神奈川県|  6|
|          人口集中地区|  6|
|                  全国|  6|
|                愛知県|  6|
|                東京都|  6|
|                千葉県|  6|
|                埼玉県|  6|
|                長野県|  6|
|                大阪府|  6|
|                静岡県|  6|
|                宮城県|  6|
|                北海道|  6|
|                福岡県|  6|
|                栃木県|  6|
|                京都府|  6|
|                秋田県|  6|
|                新潟県|  6|
|                兵庫県|  6|
|                茨城県|  6|
+----------------------+---+
only showing top 20 rows



                                                                                

In [79]:
#codeにデータを登録

data_t.createOrReplaceTempView("jinko_code_tmp")
spark.sql("""
          insert overwrite table data_management_c.jinko_code
          /** repartition(1) */
          select
            case kenmei
              when "人口集中地区" then "elsecode"
              when "全国" then "allct"
              when "神奈川県" then "AA"
            else code 
            end as code,
            kenmei
          from 
            jinko_code_tmp
          """)

DataFrame[]

In [80]:
spark.sql("""
          select
            *
          from 
            data_management_c.jinko_code
          """).show()

+--------+----------------------+
|    code|                kenmei|
+--------+----------------------+
|   allct|                  全国|
|elsecode|          人口集中地区|
|      0B|人口集中地区以外の地区|
|      01|                北海道|
|      02|                青森県|
|      03|                岩手県|
|      04|                宮城県|
|      05|                秋田県|
|      06|                山形県|
|      07|                福島県|
|      08|                茨城県|
|      09|                栃木県|
|      10|                群馬県|
|      11|                埼玉県|
|      12|                千葉県|
|      13|                東京都|
|      AA|              神奈川県|
|      15|                新潟県|
|      16|                富山県|
|      17|                石川県|
+--------+----------------------+
only showing top 20 rows



In [81]:
spark.stop()
spark.sparkContext.stop()

In [None]:
#ビジネスメタデータ(カラム名確認)
#select co.COLUMN_NAME from TBLS as t inner join SDS as s on t.SD = s.SD_ID inner join CDS as c on s.CD=c.CD_ID inner join COLUMNS_V2 as co on c.CD_ID = co.CD_ID where TBL_NAME = "jinko_t";



In [None]:
--メタデータを保存するためのデータベースとテーブル
create database if not exists metadata;
use metadata;
create table if not exists metadatas (
    database_name varchar(255),
    table_name varchar(255),
    table_definition varchar(255),
    sammary varchar(255),
    row_num varchar(255),
    selectivity varchar(255),
    consistency_flag varchar(255),
    frequency_flag varchar(255),
    primary key (database_name,table_name)
);

In [82]:
import findspark
findspark.init("/opt/homebrew/opt/apache-spark/libexec/")

In [83]:
#pysparkに必要なライブラリを読み込む

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder \
    .appName("chapter2") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .enableHiveSupport() \
    .getOrCreate()

In [96]:
spark.sql("""
          create database if not exists metadata_tmp
          """)
spark.sql("""
           create table if not exists metadata_tmp.sample_metadata(
    database_name String,
    table_name String,
    table_definition String,
    sammary String)
STORED AS PARQUET
LOCATION "/Users/isomasaki/pyspark_datamanagement_metadata/dataset/metadata_tmp.db/sample_metadata";
          """)

DataFrame[]

In [None]:
create table if not exists metadata_tmp.sample_metadata(
    database_name String,
    table_name String,
    table_definition String,
    sammary String,
    row_num String,
    selectivity String,
    consistency_flag String,
    frequency_flag String)
STORED AS PARQUET
LOCATION "/Users/isomasaki/pyspark_datamanagement_metadata/dataset/metadata_tmp.db/sample_metadata";

In [93]:
spark.sql("show tables in metadata_tmp").show()

+------------+---------------+-----------+
|   namespace|      tableName|isTemporary|
+------------+---------------+-----------+
|metadata_tmp|sample_metadata|      false|
|            |     sample_tmp|       true|
+------------+---------------+-----------+



In [87]:
from pyspark.sql.types import StructType,StructField,StringType,BooleanType
from pyspark.sql.functions import when

#spark経由でテーブル定義を取得
table_def = spark.sql("""
                      show create table data_management_c.jinko_t
                      """).collect()[0].asDict()["createtab_stmt"]

struct = StructType([StructField("database_name",StringType(),True),
                     StructField("table_name",StringType(),True),
                     StructField("table_definition",StringType(),True),
                     StructField("sammary",StringType(),True),
                     StructField("row_num",StringType(),True),
                     StructField("selectivity",StringType(),True),
                     StructField("consistency_flag",StringType(),True),
                     StructField("frequency_flag",StringType(),True)
                     ])
df_meta = spark.createDataFrame([(None,None,None,None,None,None,None,None)],struct) 

In [89]:
df_meta2 = df_meta.withColumn("database_name",when(df_meta.database_name.isNull(),"data_management_c").otherwise(df_meta.database_name))
df_meta2 = df_meta2.withColumn("table_name",when(df_meta.table_name.isNull(),"jinko_t").otherwise(df_meta.table_name))
df_meta2 = df_meta2.withColumn("table_definition",when(df_meta.table_definition.isNull(),table_def).otherwise(df_meta.table_definition))
df_meta2 = df_meta2.withColumn("sammary",when(df_meta.sammary.isNull(),"一旦説明は空").otherwise(df_meta.sammary))

df_meta2.show(truncate=False)

+-----------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-------+-----------+----------------+--------------+
|database_name    |table_name|table_definition                                                                                                                                                                                                                                                                                                                                                                                                           |sammary     |row_num|selectivity|consistency_flag|

In [97]:
df_meta2.createOrReplaceTempView("sample_tmp")
spark.sql("""
          insert overwrite table metadata_tmp.sample_metadata
          select /** repartition(1) */
            *
          from 
            sample_tmp
          """)

DataFrame[]

In [98]:
spark.sql("""
          select 
            *
          from 
            metadata_tmp.sample_metadata
          """).show()

+-----------------+----------+--------------------+------------+-------+-----------+----------------+--------------+
|    database_name|table_name|    table_definition|     sammary|row_num|selectivity|consistency_flag|frequency_flag|
+-----------------+----------+--------------------+------------+-------+-----------+----------------+--------------+
|data_management_c|   jinko_t|CREATE TABLE data...|一旦説明は空|   null|       null|            null|          null|
+-----------------+----------+--------------------+------------+-------+-----------+----------------+--------------+



In [99]:
spark.stop()
spark.sparkContext.stop()