# Load Data

In [None]:
%pyspark
basic_folder = "hdfs://master:9000/user/hadoop/dataset/"

provinces = ["강원도","경기도","경상남도","경상북도","광주광역시",
             "대구광역시","대전광역시","부산광역시","서울특별시",
             "세종특별자치시","울산광역시","인천광역시","전라남도",
             "전라북도","제주특별자치도","충청남도","충청북도"]

In [None]:
%pyspark
for p in provinces:
    files = basic_folder + p + '/' + p + '_*' 
    print(files)
    df = spark.read.csv(files, header=True, inferSchema=True)
    df.createOrReplaceTempView('`' + p + '`')

# 전국 단위 테이블 생성

In [None]:
%pyspark
path = "hdfs://master:9000/user/hadoop/dataset/*/*"
df = spark.read.csv(path, header=True, inferSchema=True)
df.createOrReplaceTempView('`전국`')

# 전국 집값 추이

In [None]:
%sql
SELECT substr(`계약년월`, 1, 4) as `연도`,
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as `가격`
FROM `전국`
GROUP BY `연도`

# 전국 지역별 집값 추이

In [None]:
%sql
SELECT substr(`시군구`, 1, instr(`시군구`, ' ')) as `지역`,
       substr(`계약년월`, 1, 4) as `연도`,
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as `가격`
FROM `전국`
GROUP BY `지역`, `연도`
ORDER BY `연도`

# 서울시 구별 집값 추이

In [None]:
%sql
SELECT substr(substring_index(`시군구`, ' ', 2), instr(`시군구`, ' ') + 1, 3) as `지역`,
       substr(`계약년월`, 1, 4) as `연도`,
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as `가격`
FROM `서울특별시`
GROUP BY `지역`, `연도`
ORDER BY `연도`

# 1. 특정시에 대한 집값 추이

In [None]:
%pyspark
df_province = sqlContext.sql("""
select substr(`계약년월`, 1, 4) as y, 
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as price
from `""" + z.get("province") + """`
group by y
order by y
""")
z.show(df_province)

# 2. 상세 행정동에 대한 집값 추이

In [None]:
%pyspark
df_detailAddress = sqlContext.sql("""
select substr(`계약년월`, 1, 4) as year, 
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as price
from `""" + z.get("province") + """`
where `시군구` like '%""" + z.get("detailAddress") + """%'
group by year
order by year
""")
z.show(df_detailAddress)

# 3. 특정 아파트 집값 추이

In [None]:
%pyspark
df_apart = sqlContext.sql("""
select substr(`계약년월`, 1, 4) as y, 
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as price
from `""" + z.get("province") + """`
where `단지명` like '""" + z.get("apart") + """%'
group by y
order by y
""")
z.show(df_apart)

# 4. 검색한 아파트의 층별 집값

In [None]:
%pyspark
df_floor = sqlContext.sql("""
select `층` as floor, 
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as price
from `""" + z.get("province") + """`
where (`단지명` like '""" + z.get("apart") + """%')
group by floor
order by floor
""")
z.show(df_floor)

# 5. 검색한 행정동에서 최신집값이 내가 원하는 집값 사이에 있는 곳

In [None]:
%pyspark
df_recent = sqlContext.sql("""
select `단지명` as apt, max(`계약년월`) as `계약년월`, 
       max(cast(regexp_replace(`거래금액(만원)`, ',', '') as decimal)) as price, 
       round(`전용면적(㎡)`, 2), max(`건축년도`)
from `""" + z.get("province") + """`
where cast(regexp_replace(`거래금액(만원)`, ',', '') as decimal) 
      between """ + z.get("price_up") + """ and """ + z.get("price_down") + """ 
      and (`시군구` like '%""" + z.get("detailAddress") + """%') and `계약년월` like '2021%'
group by apt, `전용면적(㎡)`
order by `계약년월` desc
""")
z.show(df_recent)

# 6. 집값 예측모델

In [None]:
%pyspark
# 아파트 최근 데이터 (train dataset)
df_apart_recent = sqlContext.sql("""
select `계약년월` as year, 
       cast(avg(regexp_replace(`거래금액(만원)`, ',', '')) as decimal) as price
from `""" + z.get("province") + """`
where `단지명` like '""" + z.get("apart") + """%'
group by year
order by year desc limit 6
""")

In [None]:
%pyspark
df_apart.printSchema()

In [None]:
%pyspark
# 아파트 검색 -> 예측값 반환
# get next year for pred_dataset
df_apart_recent.registerTempTable("temp")
df_pred_y = sqlContext.sql("""
select max(y) + 1 as y
from temp
""")

# trainset
from pyspark.sql.types import DateType
df_apart_recent = df_apart_recent.withColumn("y", df_apart_recent['y'].cast('int')) 

from pyspark.ml.feature import VectorAssembler
feature_columns = df_apart_recent.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns,outputCol="year")
dataset = assembler.transform(df_apart_recent)
train, test = dataset.randomSplit([1.0, 0.0]) # train 100% of dataset

# model and train
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="year", labelCol="price")
model = algo.fit(train)

# prediction
testset = assembler.transform(df_pred_y)
predictions = model.transform(testset)

# get predicted val
import pyspark.sql.functions as f
pred_val = predictions.select(f.collect_list('prediction')).first()[0][0]
print(round(pred_val, 2), " (단위 : 만원)")