# 環境変数読み込み

In [1]:
import os

es_spark_jar = os.getenv('ES_SPARK_JAR', '')
print("es_spark_jar: %s" % es_spark_jar)

es_spark_jar: /Users/matsushitas/dev/src/github.com/romiogaku/jupyter-notebook-spark-sample/elasticsearch-hadoop-6.3.2/dist/elasticsearch-spark-20_2.11-6.3.2.jar


# SparkをJupyter上で実行するための準備
※ Jupyterから起動する実行方法3と4のみ必要になりますので、コメントインしてください。

In [2]:
# import sys

# spark_home = os.environ.get('SPARK_HOME', None)
# sys.path.insert(0, spark_home + "/python")
# # py4j-0.10.7-src.zipのバージョンは適宜変更してください
# sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.4-src.zip"))

# Spark Session作成

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("sample") \
    .master(master="local[*]")\
    .config("spark.jars", es_spark_jar)\
    .getOrCreate()

df = spark.read.format("csv").option("header", "true").load("sample.csv")

df.show()

+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+



# Spark MLlibを使った処理

In [4]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



# Spark DataFrameをElasticsearchに書き込む

In [5]:
from elasticsearch import Elasticsearch

es = Elasticsearch(["localhost:9200"])

index_name = "sample_index"
type_name = "sample_type"

indexed.write.format("es").option("es.nodes.wan.only", "true") \
    .option("es.write.operation", "index") \
    .save("{}/{}".format(index_name, type_name), mode="append")

# Kibanaで確認
[http://localhost:5601/app/kibana](http://localhost:5601/app/kibana)