In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import os
import numpy as np
import pyspark.sql.functions as f
import subprocess
from pyspark.sql.functions import min, max, col
from pyspark.sql.functions import *

In [2]:
sc = SparkContext()
sqlcontext = SQLContext(sc)

In [3]:
dong_df = sqlcontext.read.option("header","true").csv("data/cur_dongCode.csv")
dong_df.show()

+--------+--------+---+----+-----------+
|통계청행정동코드|행자부행정동코드|시도명|시군구명|       행정동명|
+--------+--------+---+----+-----------+
| 1101053|11110530| 서울| 종로구|        사직동|
| 1101054|11110540| 서울| 종로구|        삼청동|
| 1101055|11110550| 서울| 종로구|        부암동|
| 1101056|11110560| 서울| 종로구|        평창동|
| 1101057|11110570| 서울| 종로구|        무악동|
| 1101058|11110580| 서울| 종로구|        교남동|
| 1101060|11110600| 서울| 종로구|        가회동|
| 1101061|11110615| 서울| 종로구|종로1.2.3.4가동|
| 1101063|11110630| 서울| 종로구|    종로5.6가동|
| 1101064|11110640| 서울| 종로구|        이화동|
| 1101067|11110670| 서울| 종로구|       창신1동|
| 1101068|11110680| 서울| 종로구|       창신2동|
| 1101069|11110690| 서울| 종로구|       창신3동|
| 1101070|11110700| 서울| 종로구|       숭인1동|
| 1101071|11110710| 서울| 종로구|       숭인2동|
| 1101072|11110515| 서울| 종로구|      청운효자동|
| 1101073|11110650| 서울| 종로구|        혜화동|
| 1102052|11140520| 서울|  중구|        소공동|
| 1102054|11140540| 서울|  중구|        회현동|
| 1102055|11140550| 서울|  중구|         명동|
+--------+--------+---+----+-----------+
only showing top

In [4]:
dir_in = "/user/hadoop/data/dong_population "
args = "hdfs dfs -ls "+dir_in+" | awk '{print $8}'"
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

s_output, s_err = proc.communicate()
all_dart_dirs = s_output.decode('utf-8').split()
print(all_dart_dirs)

['/user/hadoop/data/dong_population/DONG_POPULATION_2017_.csv', '/user/hadoop/data/dong_population/DONG_POPULATION_2018_.csv', '/user/hadoop/data/dong_population/DONG_POPULATION_2019_.csv']


In [5]:
df = sqlcontext.read.format("com.databricks.spark.csv")\
    .option("header", "true").option("inferSchema", "true").load(all_dart_dirs)

In [7]:
#df = df.groupBy("행정동").agg(mean('한국인'))
df = df.withColumn('Dong', regexp_replace('행정동', '(^종로5·6가동)', '종로5.6가동'))
#df.show(424)
print(df.count())

1347


In [8]:
joined = df.join(dong_df, df.Dong == dong_df.행정동명, how='right')
joined.show()
print(joined.count())

+---+-----+-----+----+--------+--------+---+----+----+
|행정동|   합계|  한국인|Dong|통계청행정동코드|행자부행정동코드|시도명|시군구명|행정동명|
+---+-----+-----+----+--------+--------+---+----+----+
|사직동| 9818| 9546| 사직동| 1101053|11110530| 서울| 종로구| 사직동|
|사직동| 9782| 9504| 사직동| 1101053|11110530| 서울| 종로구| 사직동|
|사직동| 9717| 9441| 사직동| 1101053|11110530| 서울| 종로구| 사직동|
|삼청동| 3073| 2836| 삼청동| 1101054|11110540| 서울| 종로구| 삼청동|
|삼청동| 3045| 2804| 삼청동| 1101054|11110540| 서울| 종로구| 삼청동|
|삼청동| 3097| 2907| 삼청동| 1101054|11110540| 서울| 종로구| 삼청동|
|부암동|10537|10149| 부암동| 1101055|11110550| 서울| 종로구| 부암동|
|부암동|10523|10111| 부암동| 1101055|11110550| 서울| 종로구| 부암동|
|부암동|10726|10317| 부암동| 1101055|11110550| 서울| 종로구| 부암동|
|평창동|19105|18951| 평창동| 1101056|11110560| 서울| 종로구| 평창동|
|평창동|19002|18844| 평창동| 1101056|11110560| 서울| 종로구| 평창동|
|평창동|19163|18995| 평창동| 1101056|11110560| 서울| 종로구| 평창동|
|무악동| 8260| 8215| 무악동| 1101057|11110570| 서울| 종로구| 무악동|
|무악동| 8585| 8542| 무악동| 1101057|11110570| 서울| 종로구| 무악동|
|무악동| 8375| 8320| 무악동| 1101057|11110570| 서울| 종로구| 무악동|
|교남동|10846

In [9]:
df_ = sqlcontext.read.csv("/user/hadoop/0614korean_/part-00000")
df_.show()

+--------+------------------+------------------+
|     _c0|               _c1|               _c2|
+--------+------------------+------------------+
|11440710|20365.234847316482|16223.886127148015|
|11530510|34055.502021853186|33991.773300818284|
|11305630|27127.469100312872|28427.690106281585|
|11215840|24339.137178170873|22620.401528953073|
|11110680|10673.320696365825| 8939.117836558364|
|11305606|19331.100754584833|13524.485296486158|
|11200520|14064.368803032488| 9665.649934223826|
|11590620|28185.191364765335| 24300.56854604091|
|11680700|48072.257263489766|36860.178391624555|
|11140550|12014.007605270755| 84084.10366454872|
|11380530| 27045.24168630566| 20114.18668238267|
|11530550|18511.409822623347|15621.102345150422|
|11530560|  34592.4629323225| 41075.05364079422|
|11620775| 23660.85507022864|16851.503311841152|
|11740530| 27553.49757121541|  26502.7172122503|
|11200790|19550.109761636588|  22961.8473189651|
|11350695| 38837.16987200963| 43633.99753078219|
|11650510|27570.4431

In [10]:
#번1동 11305590 -> 11305595
#번2동 11305600 -> 11305603
#번3동 11305606->  11305608
#수유1동 11305610 -> 11305615
#수유2동 11305620 -> 11305625
#수유3동 11305630 -> 11305635


#df_.withColum("_c0",
 # when(col("_c0") == '11305590', "113").otherwise("Y"))

df_ = df_.withColumn("_c0",regexp_replace(col("_c0"), "(^11305590)", "11305595"))
df_ = df_.withColumn("_c0",regexp_replace(col("_c0"), "(^11305600)", "11305603"))
df_ = df_.withColumn("_c0",regexp_replace(col("_c0"), "(^11305598)", "11305608"))
df_ = df_.withColumn("_c0",regexp_replace(col("_c0"), "(^11305610)", "11305615"))
df_ = df_.withColumn("_c0",regexp_replace(col("_c0"), "(^11305620)", "11305625"))
df_ = df_.withColumn("_c0",regexp_replace(col("_c0"), "(^11305630)", "11305635"))

df_.show()
df_.count()

+--------+------------------+------------------+
|     _c0|               _c1|               _c2|
+--------+------------------+------------------+
|11440710|20365.234847316482|16223.886127148015|
|11530510|34055.502021853186|33991.773300818284|
|11305635|27127.469100312872|28427.690106281585|
|11215840|24339.137178170873|22620.401528953073|
|11110680|10673.320696365825| 8939.117836558364|
|11305606|19331.100754584833|13524.485296486158|
|11200520|14064.368803032488| 9665.649934223826|
|11590620|28185.191364765335| 24300.56854604091|
|11680700|48072.257263489766|36860.178391624555|
|11140550|12014.007605270755| 84084.10366454872|
|11380530| 27045.24168630566| 20114.18668238267|
|11530550|18511.409822623347|15621.102345150422|
|11530560|  34592.4629323225| 41075.05364079422|
|11620775| 23660.85507022864|16851.503311841152|
|11740530| 27553.49757121541|  26502.7172122503|
|11200790|19550.109761636588|  22961.8473189651|
|11350695| 38837.16987200963| 43633.99753078219|
|11650510|27570.4431

424

In [11]:
joined_ = joined.join(df_, joined.행자부행정동코드 == df_._c0, how='right')
joined_.show(424)
print(joined_.count())

+-----------+-----+-----+-----------+--------+--------+----+----+-----------+--------+------------------+------------------+
|        행정동|   합계|  한국인|       Dong|통계청행정동코드|행자부행정동코드| 시도명|시군구명|       행정동명|     _c0|               _c1|               _c2|
+-----------+-----+-----+-----------+--------+--------+----+----+-----------+--------+------------------+------------------+
|       홍은1동|24131|23814|       홍은1동| 1113066|11410660|  서울|서대문구|       홍은1동|11410660|  18104.4739711432|11895.078196534296|
|       홍은1동|23731|23430|       홍은1동| 1113066|11410660|  서울|서대문구|       홍은1동|11410660|  18104.4739711432|11895.078196534296|
|       홍은1동|23321|23008|       홍은1동| 1113066|11410660|  서울|서대문구|       홍은1동|11410660|  18104.4739711432|11895.078196534296|
|       오류1동|22651|21952|       오류1동| 1117067|11530770|  서울| 구로구|       오류1동|11530770| 18878.71699679904|15905.694264596872|
|       오류1동|22686|21989|       오류1동| 1117067|11530770|  서울| 구로구|       오류1동|11530770| 18878.71699679904|15905.694264596872|


1276


In [12]:
joined_ =joined_.withColumn("result_night", col("_c1") -  col("avg(한국인)") )
save_df = joined_.withColumn("result_sun", col("_c2") -  col("avg(한국인)") )
save_df.select(["행자부행정동코드","result_sun", "result_night"]).show(424)

AnalysisException: "cannot resolve '`avg(한국인)`' given input columns: [시군구명, Dong, 행정동명, 한국인, _c2, 통계청행정동코드, 행자부행정동코드, _c0, _c1, 합계, 행정동, 시도명];;\n'Project [행정동#47, 합계#48, 한국인#49, Dong#54, 통계청행정동코드#12, 행자부행정동코드#13, 시도명#14, 시군구명#15, 행정동명#16, _c0#191, _c1#152, _c2#153, (_c1#152 - 'avg(한국인)) AS result_night#302]\n+- Join RightOuter, (행자부행정동코드#13 = _c0#191)\n   :- Join RightOuter, (Dong#54 = 행정동명#16)\n   :  :- Project [행정동#47, 합계#48, 한국인#49, regexp_replace(행정동#47, (^종로5·6가동), 종로5.6가동) AS Dong#54]\n   :  :  +- Relation[행정동#47,합계#48,한국인#49] csv\n   :  +- Relation[통계청행정동코드#12,행자부행정동코드#13,시도명#14,시군구명#15,행정동명#16] csv\n   +- Project [regexp_replace(_c0#186, (^11305630), 11305635) AS _c0#191, _c1#152, _c2#153]\n      +- Project [regexp_replace(_c0#181, (^11305620), 11305625) AS _c0#186, _c1#152, _c2#153]\n         +- Project [regexp_replace(_c0#176, (^11305610), 11305615) AS _c0#181, _c1#152, _c2#153]\n            +- Project [regexp_replace(_c0#171, (^11305598), 11305608) AS _c0#176, _c1#152, _c2#153]\n               +- Project [regexp_replace(_c0#166, (^11305600), 11305603) AS _c0#171, _c1#152, _c2#153]\n                  +- Project [regexp_replace(_c0#151, (^11305590), 11305595) AS _c0#166, _c1#152, _c2#153]\n                     +- Relation[_c0#151,_c1#152,_c2#153] csv\n"

In [76]:
save_df.select(["행자부행정동코드","result_sun", "result_night"]).write.csv("sun_night") #.select(['Dong', 'scale']).sort('Dong').write.csv("t1_")

In [77]:
sc.textFile("hdfs://master:9000/user/hadoop/sun_night/part*").coalesce(1).saveAsTextFile("hdfs://master:9000/user/hadoop/sun_night_")