# 載入 sqlContext 函式庫
## 抓取 Mongodb 資料，例如: 2019063020 (YYYYMMDDHH)

In [1]:
from pyspark.sql import SQLContext, SparkSession
from pyspark import SparkContext, SparkConf

sparkConf = SparkConf().setMaster("local") \
            .setAppName("mysql-mongodb-etl") 
            
sc = SparkContext(conf=sparkConf)
sqlContext = SQLContext(sc)

# 設定遠端 Mongodb 連線資訊
## 修改抓資料日期

In [2]:
#data_2019063020
#data_2019063021
df_mongo = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
                     .option("spark.mongodb.input.uri", "mongodb://mongodb:27017/ltu_demo.data_2019063020").load()

# 試試看，是否有資料

In [3]:
df_mongo.first()

Row(Humidity=66, Temperature=27.0, _id=Row(oid='5d1b8d40be567be7cc90353b'), device_id='001', timestamp='2019-07-03 00:37:12')

# 設定遠端 Mysql 連線資訊

In [4]:
df_mysql=sqlContext.read.format("jdbc").options(url="jdbc:mysql://mysql:3306/ltu",
                                             driver="com.mysql.jdbc.Driver",
                                             dbtable="(SELECT * FROM device) tmp",user="root",
                                             password="iii").load()

# 試試看，是否有資料

In [5]:
df_mysql.show()

+---------+--------+--------------------+
|device_id|location|         description|
+---------+--------+--------------------+
|      001|Taichung|Semiconductor-Mac...|
|      002|Taichung|Semiconductor-Mac...|
|      003|Taichung|Semiconductor-Mac...|
|      004|Taichung|Semiconductor-Mac...|
|      005|Taichung|Semiconductor-Mac...|
+---------+--------+--------------------+



# 將 SQLContext 轉成 pandas 資料格式
## 因為我們希望透過 pandas 幫忙 merge 兩個資料集

In [6]:
pandas_mysql=df_mysql.toPandas()
pandas_mongo=df_mongo.toPandas()

# merge 兩個資料集
## 以 device_id 合併 

In [7]:
import pandas as pd

pandas_result = pd.merge(pandas_mysql, pandas_mongo, on=['device_id'])

# 查看 merge 之後的結果

In [8]:
pandas_result

Unnamed: 0,device_id,location,description,Humidity,Temperature,_id,timestamp
0,001,Taichung,Semiconductor-Machine-01,66,27.0,"(5d1b8d40be567be7cc90353b,)",2019-07-03 00:37:12
1,001,Taichung,Semiconductor-Machine-01,66,27.0,"(5d1b8d40be567be7cc90353c,)",2019-07-03 00:41:30
2,001,Taichung,Semiconductor-Machine-01,64,27.0,"(5d1b8d40be567be7cc90353d,)",2019-07-03 00:42:47
3,001,Taichung,Semiconductor-Machine-01,64,27.0,"(5d1b8d40be567be7cc90353e,)",2019-07-03 00:44:49
4,001,Taichung,Semiconductor-Machine-01,64,27.0,"(5d1b8d40be567be7cc90353f,)",2019-07-03 00:45:10
5,001,Taichung,Semiconductor-Machine-01,63,27.0,"(5d1b8d40be567be7cc903540,)",2019-07-03 00:45:51
6,001,Taichung,Semiconductor-Machine-01,64,27.0,"(5d1b8d40be567be7cc903541,)",2019-07-03 00:46:43
7,001,Taichung,Semiconductor-Machine-01,65,27.0,"(5d1b8d40be567be7cc903542,)",2019-07-03 00:47:24
8,001,Taichung,Semiconductor-Machine-01,65,27.0,"(5d1b8d40be567be7cc903543,)",2019-07-03 00:47:35
9,001,Taichung,Semiconductor-Machine-01,67,27.0,"(5d1b8d40be567be7cc903544,)",2019-07-03 00:48:42


# 將 pandas 轉回 SQLContext 
## 因為我們要透過 SQLContext 回寫 Mongodb

In [9]:
sqlContext = SQLContext(sc)
df_result = sqlContext.createDataFrame(pandas_result)

# 查看經過整合資後的資料樣貌

In [10]:
df_result.show()

+---------+--------+--------------------+--------+-----------+--------------------+-------------------+
|device_id|location|         description|Humidity|Temperature|                 _id|          timestamp|
+---------+--------+--------------------+--------+-----------+--------------------+-------------------+
|      001|Taichung|Semiconductor-Mac...|      66|       27.0|[5d1b8d40be567be7...|2019-07-03 00:37:12|
|      001|Taichung|Semiconductor-Mac...|      66|       27.0|[5d1b8d40be567be7...|2019-07-03 00:41:30|
|      001|Taichung|Semiconductor-Mac...|      64|       27.0|[5d1b8d40be567be7...|2019-07-03 00:42:47|
|      001|Taichung|Semiconductor-Mac...|      64|       27.0|[5d1b8d40be567be7...|2019-07-03 00:44:49|
|      001|Taichung|Semiconductor-Mac...|      64|       27.0|[5d1b8d40be567be7...|2019-07-03 00:45:10|
|      001|Taichung|Semiconductor-Mac...|      63|       27.0|[5d1b8d40be567be7...|2019-07-03 00:45:51|
|      001|Taichung|Semiconductor-Mac...|      64|       27.0|[5

# 查看 timestamp 資料

In [11]:
df_result.select("timestamp").show()

+-------------------+
|          timestamp|
+-------------------+
|2019-07-03 00:37:12|
|2019-07-03 00:41:30|
|2019-07-03 00:42:47|
|2019-07-03 00:44:49|
|2019-07-03 00:45:10|
|2019-07-03 00:45:51|
|2019-07-03 00:46:43|
|2019-07-03 00:47:24|
|2019-07-03 00:47:35|
|2019-07-03 00:48:42|
|2019-07-03 00:49:13|
|2019-07-03 00:49:23|
|2019-07-03 00:49:38|
|2019-07-03 00:49:43|
|2019-07-03 00:50:40|
|2019-07-03 00:51:26|
|2019-07-03 00:52:28|
|2019-07-03 00:52:33|
|2019-07-03 00:52:38|
|2019-07-03 00:52:49|
+-------------------+
only showing top 20 rows



# 查看 humidity 資料

In [12]:
df_result.select("humidity").show()

+--------+
|humidity|
+--------+
|      66|
|      66|
|      64|
|      64|
|      64|
|      63|
|      64|
|      65|
|      65|
|      67|
|      67|
|      67|
|      66|
|      66|
|      66|
|      65|
|      65|
|      65|
|      65|
|      65|
+--------+
only showing top 20 rows



# 寫入 Mongodb，寫入位置: ltu-demo.final
## 覆蓋到原有資料: mode("overwrite") 

In [13]:
df_result.write.format("com.mongodb.spark.sql.DefaultSource")\
                    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/ltu_demo.final")\
                    .mode("append")\
                    .save()

In [14]:
sc.stop()