In [None]:
# コンソールで設定したSparkとNoteBookを接続します(動かす前に毎度実行する必要があります)
import findspark
findspark.init("/home/pyspark/spark")

In [None]:
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder \
    .appName("chapter1") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .enableHiveSupport() \
    .getOrCreate()


# spark.xxxxxと記載することで処理を分散させることが可能です。

In [None]:
#　データの読み込みを行う

from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col
import pyspark.sql.functions as F

struct = StructType([
    StructField("code", StringType(), False),
    StructField("gengo", StringType(), False),
    StructField("wareki", StringType(), False),
    StructField("seireki", StringType(), False),
    StructField("chu", StringType(), False),
    StructField("sokei", StringType(), False),
    StructField("jinko_male", StringType(), False),
    StructField("jinko_female", StringType(), False)
])
df=spark.read.option("multiLine", "true").option("encoding", "UTF-8") \
    .csv("./datafile/jinko.csv", header=False, sep=',', inferSchema=False,schema=struct)

struct2 = StructType([
    StructField("code", StringType(), False),
    StructField("kenmei", StringType(), False)
])
df2=spark.read.option("multiLine", "true").option("encoding", "UTF-8") \
    .csv("./datafile/kenmei_master.csv", header=False, sep=',', inferSchema=False,schema=struct2)


struct3 = StructType([
    StructField("codes", StringType(), False),
    StructField("kenmei", StringType(), False)
])
df3=spark.read.option("multiLine", "true").option("encoding", "UTF-8") \
    .csv("./datafile/kenmei_master.csv", header=False, sep=',', inferSchema=False,schema=struct3)


# 今までのテスト結果をデータフレームに保存してみます

In [None]:
# 結果の結合をおこなってdf_resultデータフレームに保存します

# データのリペア
df = df.filter(F.col("code")!="都道府県コード") \
    .filter(~F.col("code").contains("1)　沖縄県は調査され"))\
    .filter(~F.col("code").contains("2)　長野県西筑摩群山口村"))

# column num
df = df.withColumn("column_num_check", F.when(F.lit(len(df.columns)) == 8,F.lit(1)))

# gengo
df = df.withColumn("gengo_wareki_if_then_check",
    F.when(F.col("gengo") == "大正", 
        ((F.col("wareki").cast("integer") > 0) & (F.col("wareki").cast("integer") <= 62)).cast("long")
    )
)

# zero_control
df = df.withColumn(
    'sokei_check_zero_control', 
    (F.col('sokei') == (F.col('jinko_male') + F.col('jinko_female'))).cast("long")
)

# dictionary
df = df.withColumn("gengo_dictionary_chek", (F.col("gengo").isin(['大正','昭和','平成'])).cast("long"))

# range
df = df.withColumn("seireki_range_check", (F.col("seireki").between(1920,2015)).cast("long"))

# null/uniqueness
df = df.withColumn("unique_ness_check", 
    F.lit(df.agg(F.countDistinct("code","gengo", "wareki").alias("countdistinct")).collect()[0][0]) / F.lit(df.count()))

# pattern
df = df.withColumn("seireki_pattern_chek", (F.col("seireki").rlike("\d{4}")).cast("long"))

# consistency
df = df.withColumn("code_consistency_check", 
    F.lit(df.select("code").distinct().count()) / F.lit(df.select("code").intersect(df2.select("code")).count()))

# ratio
df = df.withColumn("jinko_male_jinko_female_ratio_check", 
    F.col("jinko_male").cast("integer") / F.col("jinko_female").cast("integer")) \
        .withColumn("jinko_male_jinko_female_ratio_check", (F.col("jinko_male_jinko_female_ratio_check").between(0.8, 1.2)).cast("long"))

# timeliness
df = df.withColumn("timelineness_check", F.current_timestamp())

# 0 check
df_result = df.withColumn("count_check", F.when(F.lit(df.count()) > 0, F.lit(1)))

In [None]:
# check以外の項目を除くことも可能
df_result.select(df_result.colRegex("`.*check.*`"))

# 重複が多いので一つのカラムの中に一個でもFalseを含んでいたらCheckの結果としてFlaseというサマリーデータを格納するのもOK
# 詳細の確認はデータが大きくなることからmysqlに保存するのではなくS3などにそのまま保存しておくのはOK

peke = df_result.summary()

# 日付はダメなので add columnすることも考える
peke.withColmn("timelineness_check", df.timelineness_check)


In [None]:
# mysqlへのデータ書き込み
# mysqlのテーブルは勝手に作ってくれます。

df_result.write.format('jdbc').options(
      url='jdbc:mysql://db_mysql/metadata?enabledTLSProtocols=TLSv1.2',
      driver='com.mysql.jdbc.Driver',
      dbtable='metadatas',
      user='root',
      password='root').mode('overwrite').save()

# mysqlでの確認をしてみましょう

```
mysql -h db_mysql -u root -proot  metadata
select * from metadatas
```

In [None]:
# 最後はSparkをクローズする
spark.stop()
spark.sparkContext.stop()