# Spark source code inspection
> based on apache zepplin


## make the csv file from hdfs

- POSCO PoC : 청정기 상 데이터 다운로드*
```spark
# /user/hdfs/parquet/모델명/iaq/일자지정폴더
val df=spark.read.parquet("/user/hdfs/parquet/captain/iaq/20180501")
.filter("devicemodelid='COWAY_CAPTAIN_IN'")
df.coalesce(1).write.option("header", "true").csv("/tmp/iaq_0501.csv")
```

- select expression on spark
```spark
val df=spark.read.parquet("parquet/captain/iaq/20180607","parquet/captain/iaq/20180608","parquet/captain/iaq/20180609")
.filter("dev_id in ('Z01LTEMA08A01P9009','Z01LTEMA08A01P8449','Z01LTEMA08A01P9405','Z01LTEMA08A01P8282','Z01LTEMA08A01P8704')")
.selectExpr("dt","dev_dt","dev_id","pm10","co2")
df.coalesce(1).write.option("header", "true").csv("/tmp/ante_0607_0609.csv")
```

## data handling on hdfs
```bash
%sh

# combine two files
#hdfs dfs -getmerge /tmp/iaq_201805.csv /data/khe/iaq_201805.csv
# count the rows
#wc -l /data/khe/iaq_201805.csv
# list up the hdfs
hdfs dfs -ls /tmp
# delete the temp files
#hdfs dfs -rm -r -skipTrash /tmp/iaq_0501.csv /tmp/iaq_0501_0507.csv /tmp/iaq_201805.csv
# transfer the file to cospark dev
#scp /data/khe/iaq_201805.csv cwsparkdev:/data/download/cospark/iaq_201805.csv
```

## PPAS connection
```spark
import spark.implicits._

val oracle_url = "jdbc:oracle:thin:@10.101.1.212:1521/CDW"

val prop = new java.util.Properties
prop.setProperty("driver", "oracle.jdbc.OracleDriver")
prop.setProperty("user","WJV")
prop.setProperty("password","wjvc3005")

val df = spark.read.parquet("/user/hdfs/parquet/wtrprfr/wtrprfr_err_sttus/20180421","/user/hdfs/parquet/wtrprfr/wtrprfr_err_sttus/20180420"
,"/user/hdfs/parquet/wtrprfr/wtrprfr_err_sttus/20180419")
df.toDF(df.columns.map(_.toUpperCase): _*).write.mode("append").jdbc(oracle_url,"wtrprfr_err_sttus_20180426",prop)
```

# spark group by, select, aggregation
```spark
val df=spark.read
.parquet("/user/hdfs/parquet/captain/iaq/201705*","/user/hdfs/parquet/captain/iaq/201706*","/user/hdfs/parquet/captain/iaq/201707*"
,"/user/hdfs/parquet/captain/iaq/201708*","/user/hdfs/parquet/captain/iaq/201709*")
.filter("devicemodelid in ('COWAY_CAPTAIN_IN','COWAY_BLUEMOON')")
.selectExpr("dt","dev_id","humi","temp")
.groupBy("dev_id")
.agg(countDistinct("dt").as("cnt"),avg("humi").as("avg_humi"),avg("temp").as("avg_temp")).orderBy("dev_id")
df.coalesce(1).write.option("header", "true").csv("/tmp/iaq_humi_temp_05_09.csv")
```

In [None]:
spark.read.parquet(
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170814",
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170815",
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170816",
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170817",
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170818",
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170819",
    "/user/hdfs/parquet/wtrprfr/wtrprfr_snsr/20170820"
)
.filter("devicemodelid in ('COWAY_AIS_HOT', 'COWAY_AIS_SODA' )")
.groupBy("dev_id").count.filter("count > 10000 and count < 10200").orderBy(rand()).limit(100).createOrReplaceTempView("a1")

In [None]:
val df=
spark.read.parquet(
    "/user/hdfs/parquet/captain/iaq/201809*"
)
.filter("devicemodelid in ('COWAY_CAPTAIN_IN','COWAY_BLUEMOON')")
.selectExpr("dt","dev_dt","dev_id","humi","temp")
.groupBy("dt","dev_id")
.agg(countDistinct("dev_dt").as("cnt"),min("humi").as("min_humi"),avg("humi").as("avg_humi"),max("humi").as("max_humi"),
min("temp").as("min_temp"),avg("temp").as("avg_temp"),max("temp").as("max_temp")).orderBy("dev_id","dt")
df.coalesce(1).write.option("header", "true").csv("/tmp/iaq_humi_temp_201809.csv")

In [None]:
val url = "jdbc:oracle:thin:@10.101.1.212:1521/CDW"
val prop = new java.util.Properties
prop.setProperty("driver", "oracle.jdbc.driver.OracleDriver")
prop.setProperty("user","WJV")
prop.setProperty("password","wjvc3005")

val df_tk = spark.read.jdbc(url, "TM_IOT_DEV_ID", prop)
df_tk.createOrReplaceTempView("TM_IOT_DEV_ID")


In [None]:
spark.read.parquet("parquet/captain/iaq/20181023").filter("devicemodelid='COWAY_CAPTAIN_IN'").createOrReplaceTempView("iaq_capt_1023")

In [None]:
%sql
select b.ct_nm, count(distinct dev_id)
from iaq_capt_1023 a, TM_IOT_DEV_ID b
where a.dev_id=b.sernr(+)
and b.PROD_SUB_CD(+) = 'CAPT' 
and b.DO_NM(+) = '서울'
group by  b.ct_nm
order by 2 desc

In [None]:
%sql
select DO_NM, count(USE_CUST_ID)
from TM_IOT_DEV_ID
where PROD_SUB_CD = 'CAPT'
group by DO_NM
order by 2 desc

In [None]:
%sql
select CT_NM, count(USE_CUST_ID)
from TM_IOT_DEV_ID
where PROD_SUB_CD = 'CAPT' and DO_NM = '울산'
group by CT_NM
order by 2 desc

In [None]:
%sql
select CT_NM, count(USE_CUST_ID)
from TM_IOT_DEV_ID
where PROD_SUB_CD = 'CAPT' and DO_NM = '서울'
group by CT_NM
order by 2 desc

In [None]:
%sql
select tm, substr(dev_id, 4,5), count(*)
from aa
group by tm, substr(dev_id, 4,5)
order by 1,2

In [None]:
z.show(spark.read.parquet("/user/hdfs/parquet/captain/airprfr_sttus/201805*").filter("light_stng not in ('0','1','2')"))


In [None]:
%sql
select substr(dev_dt,12,2) as tm, substr(dev_id,4,5) as dev_id45
     , count(distinct dev_id) as cnt
     , sum(pm10) as sum_pm10
     , sum(pm25) as sum_pm25
     , sum(voc_ref) as sum_voc_ref
     , sum(voc_now) as sum_voc_now
     , sum(humi) as sum_humi
     , sum(temp) as sum_temp
     , sum(lux) as sum_lux
     , sum(co2) as sum_co2
     , sum(elect_watt) as sum_elect_watt
     , sum(iaq_idx) as sum_iaq_idx
     , sum(lux) as sum_lux
     , avg(pm10) as pm10
     , avg(pm25) as pm25
     , avg(voc_ref) as voc_ref
     , avg(voc_now) as voc_now
     , avg(humi) as humi
     , avg(temp) as temp
     , avg(lux) as lux
     , avg(co2) as co2
     , avg(elect_watt) as elect_watt
     , avg(iaq_idx) as iaq_idx
     , avg(lux) as lux
from bb
group by substr(dev_dt,12,2), substr(dev_id,4,5)
order by 1, 2

In [None]:
%sql
select devicemodelid, count(distinct dev_id), count(dev_id)
from wtr
where extrt_wtr_qnt is not null 
and extrt_wtr_qnt>0
group by devicemodelid

In [None]:
z.show(spark.read.parquet("/user/hdfs/parquet/captain/iaq_sttus/20180811")
.filter("dev_id='19302D8F1731700121'").orderBy("dev_dt"))


In [None]:
z.show(
spark.read.parquet(
    "/user/hdfs/parquet/captain/iaq/201805*"
)
.filter("dev_id='17202ES817C1300006'")
.selectExpr("dt","dev_dt","dev_id","humi","temp")
.groupBy("dt","dev_id")
.agg(countDistinct("dev_dt").as("cnt"),min("humi").as("min_humi"),avg("humi").as("avg_humi"),max("humi").as("max_humi"),
min("temp").as("min_temp"),avg("temp").as("avg_temp"),max("temp").as("max_temp")).orderBy("dev_id","dt"))


In [None]:
z.show(spark.read.parquet("parquet/wtrprfr/wtrprfr_sttus/201806").filter("dev_id in'10602EUL17A1700003','10702EUL1770500636','10702EUL1771100027'"))

In [None]:
z.show(spark.read.parquet("parquet/wtrprfr/wtrprfr_sttus/201803*").filter("dev_id='10602EUL17A1700003' and wtr_sel=7")
.select("dev_id", "dev_dt",""))