# abc

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np

# khoi tao sparkSession
spark = SparkSession\
   .builder\
   .appName("Clean Data")\
   .master("yarn")\
   .config("spark.submit.deployMode","client")\
   .enableHiveSupport()\
   .getOrCreate()

# khoi tao schema cho du lieu
schema = StructType([ \
    StructField("Width",DoubleType(),True), \
    StructField("Length",DoubleType(),True), \
    StructField("Bedrooms",IntegerType(),True), \
    StructField("Bathrooms", IntegerType(), True), \
    StructField("District", StringType(), True), \
    StructField("Province", StringType(), True), \
    StructField("Price", LongType(), True)\
  ])

# doc du lieu csv tu hdfs
df = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/user/hadoopuser/csv_data/*")


print(df.count())
df.printSchema()
df.show()

# spark.stop()

158195
root
 |-- Width: double (nullable = true)
 |-- Length: double (nullable = true)
 |-- Bedrooms: integer (nullable = true)
 |-- Bathrooms: integer (nullable = true)
 |-- District: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- Price: long (nullable = true)

+-----+------+--------+---------+-------------------+---------------+-----------+
|Width|Length|Bedrooms|Bathrooms|           District|       Province|      Price|
+-----+------+--------+---------+-------------------+---------------+-----------+
| null|  null|    null|     null|District Binh Thanh|    Ho Chi Minh| 8900000000|
| null|  null|    null|     null|  District Cau Giay|         Ha Noi| 2980000000|
| null|  null|    null|     null| District Long Bien|         Ha Noi| 4500000000|
| null|  null|    null|     null| District Thanh Khe|        Da Nang| 4100000000|
| null|  null|    null|     null|         District 1|    Ho Chi Minh|10500000000|
| null|  null|    null|     null|         District 1|    H

In [2]:
# loai distrcit, provice, price null
# giu lai cac row co it nhat 1 gia tri khac null cua 4 cot dau
# bo cac row co price ao

def fill_negative(value):
    if value is None:
        return None
    if value > 0:
        return value
    return None

fill_negative_wl_udf = udf(fill_negative, DoubleType())
fill_negative_bb_udf = udf(fill_negative, IntegerType())


df1 = df.filter("District is not NULL")\
.filter("Province is not NULL")\
.filter("Price is not NULL")\
.filter(col("Width").isNotNull() | col("Length").isNotNull() | col("Bedrooms").isNotNull() | col("Bathrooms").isNotNull() )\
.filter(col('price') >= 100000000)\
.filter(col('price') <= 100000000000)\
.withColumn('Width', fill_negative_wl_udf(col('Width')))\
.withColumn('Length', fill_negative_wl_udf(col('Length')))\
.withColumn('Bedrooms', fill_negative_bb_udf(col('Bedrooms')))\
.withColumn('Bathrooms', fill_negative_bb_udf(col('Bathrooms')))\
.distinct()


print("num rows: "+ str(df1.count()))
df1.show()


num rows: 67442
+-----+------+--------+---------+--------------------+---------------+-----------+
|Width|Length|Bedrooms|Bathrooms|            District|       Province|      Price|
+-----+------+--------+---------+--------------------+---------------+-----------+
|  3.0|  null|       4|        3|     District Tay Ho|         Ha Noi| 2350000000|
|  4.2|  12.5|       4|        5|          District 7|    Ho Chi Minh| 6200000000|
| null|  null|       3|     null|  District Hoang Mai|         Ha Noi| 2100000000|
|  5.0|  14.0|       4|        5|   District Thuan An|     Binh Duong| 2500000000|
| null|  null|       5|        4|          District 9|    Ho Chi Minh| 9800000000|
|  4.0|  16.0|       3|        2|         District 12|    Ho Chi Minh|  800000000|
|  5.5|  null|       3|     null|  District Hoang Mai|         Ha Noi| 2500000000|
| null|  null|       5|     null|            Vung Tau|Ba Ria Vung Tau| 6500000000|
| 10.0|  null|       5|        6|District Nam Tu Liem|         Ha Noi|1

In [3]:
# xu ly truong district
def process_dis(dis):
    dis = dis.lower()
#     dis = dis.replace("quan ", "")
    dis = dis.replace("district", "")
    dis = dis.replace("thi xa lagi", "la gi")
    dis = dis.replace("thi xa", "")
    dis = dis.replace("town", "")
    dis = dis.replace("dao", "")
    dis = dis.replace("huyen", "")
    dis = dis.replace("thanh pho", "")
    dis = dis.replace("city", "")
    dis = dis.replace("tp.", "")
    dis = dis.replace("qui nhon", "quy nhon")
    dis = dis.strip()
    try:
        int(dis)
        return "quan " + dis
    except Exception as e:
        return dis

process_dis_udf = udf(process_dis, StringType())

# xoa bo row co district la quan 2, quan 9, ....
df1 = df1.withColumn('District', process_dis_udf(col('District')))\
         .filter(~((col('District') == 'quan 2' )| (col('District') == 'quan 9')|\
            (col('District') == 'krong pak')| (col('District') == 'hoanh bo')|\
            (col('District') == 'ky son') | (col('District') == 'tinh gia')))


# xu ly truong province
def process_pro(pro):
    pro = pro.lower()
    pro = pro.replace("daklak", "dak lak")
    pro = pro.replace("ba ria vung tau", "ba ria-vung tau")
    pro = pro.replace("thua thien hue", "thua thien-hue")
#     pro = pro.replace("ho chi minh", "tp ho chi minh")
    pro = pro.replace("city" ,"")
    pro = pro.strip()
    if 'ho chi minh' in pro:
        pro = 'tp ho chi minh'
    
    return pro

# dang ky ham process_pro duoi dang udf
process_pro_udf = udf(process_pro, StringType())

df1 = df1.withColumn('Province', process_pro_udf(col('Province')))

print("num rows: " + str(df1.count()))
df1.show()

num rows: 65420
+-----+------+--------+---------+------------+---------------+-----------+
|Width|Length|Bedrooms|Bathrooms|    District|       Province|      Price|
+-----+------+--------+---------+------------+---------------+-----------+
|  3.0|  null|       4|        3|      tay ho|         ha noi| 2350000000|
|  4.2|  12.5|       4|        5|      quan 7| tp ho chi minh| 6200000000|
| null|  null|       3|     null|   hoang mai|         ha noi| 2100000000|
|  5.0|  14.0|       4|        5|    thuan an|     binh duong| 2500000000|
|  4.0|  16.0|       3|        2|     quan 12| tp ho chi minh|  800000000|
|  5.5|  null|       3|     null|   hoang mai|         ha noi| 2500000000|
| null|  null|       5|     null|    vung tau|ba ria-vung tau| 6500000000|
| 10.0|  null|       5|        6| nam tu liem|         ha noi|15000000000|
|  3.8|  16.0|       4|        3|     ha dong|         ha noi| 3600000000|
|  4.0|  12.5|       3|        2|     thu duc| tp ho chi minh| 3000000000|
|  4.3|  

In [4]:
# tinh trung vi cua cac field width, length, bed, bath nam trong muc dinh ao

w_m = np.median(df1.select('Width').filter(col('Width').isNotNull()).filter(col('Width') <= 100).collect())
l_m = np.median(df1.select('Length').filter(col('Length').isNotNull()).filter(col('Length') <= 100).collect())
bed_m = int(np.median(df1.select('Bedrooms').filter(col('Bedrooms').isNotNull()).filter(col('Bedrooms') <= 30).collect()))
bath_m = int(np.median(df1.select('Bathrooms').filter(col('Bathrooms').isNotNull()).filter(col('Bathrooms') <= 10).collect()))

print(w_m)
print(l_m)
print(bed_m)
print(bath_m)


4.2
13.0
4
3


In [5]:
# nhung row lon hon gia tri cho phep, gan lai bang gia tri trung vi

def fill_over(value, check, avg):
    if value is None:
        return None
    if value <= check:
        return value
    if value > check:
        return avg

# dang ky def fill_over duoi dang udf
fill_over_double = udf(fill_over, DoubleType())
fill_over_int = udf(fill_over, IntegerType())

# thay gia tri qua muc dinh ao bang median
df1 = df1.withColumn("Width", fill_over_double(col('Width'), lit(100), lit(w_m)))\
.withColumn("Length", fill_over_double(col('Length'), lit(100), lit(l_m)))\
.withColumn("Bedrooms", fill_over_int(col('Bedrooms'), lit(30), lit(bed_m)))\
.withColumn("Bathrooms", fill_over_int(col('Bathrooms'), lit(10), lit(bath_m)))

print('num rows: ' + str(df1.count()))
df1.show()

num rows: 65420
+-----+------+--------+---------+------------+---------------+-----------+
|Width|Length|Bedrooms|Bathrooms|    District|       Province|      Price|
+-----+------+--------+---------+------------+---------------+-----------+
|  3.0|  null|       4|        3|      tay ho|         ha noi| 2350000000|
|  4.2|  12.5|       4|        5|      quan 7| tp ho chi minh| 6200000000|
| null|  null|       3|     null|   hoang mai|         ha noi| 2100000000|
|  5.0|  14.0|       4|        5|    thuan an|     binh duong| 2500000000|
|  4.0|  16.0|       3|        2|     quan 12| tp ho chi minh|  800000000|
|  5.5|  null|       3|     null|   hoang mai|         ha noi| 2500000000|
| null|  null|       5|     null|    vung tau|ba ria-vung tau| 6500000000|
| 10.0|  null|       5|        6| nam tu liem|         ha noi|15000000000|
|  3.8|  16.0|       4|        3|     ha dong|         ha noi| 3600000000|
|  4.0|  12.5|       3|        2|     thu duc| tp ho chi minh| 3000000000|
|  4.3|  

In [6]:

def get_median_wl(lists):
    try:
        lists = list(filter(None, lists))
        if len(lists) > 0:
            med = np.median(lists)
            return float(med)
        return None
    except Exception as e:
        return None


get_median_wl_udf = udf(get_median_wl, DoubleType())

# tao dataframe chua column median cua width, length theo tinh vao huyen
avg_wl_group = df1.groupBy('Province', 'District')\
                   .agg(collect_list('Width').alias('Ws'), collect_list('Length').alias('Ls'))\
                   .withColumn('avg_w', get_median_wl_udf(col('Ws')) )\
                   .withColumn('avg_l', get_median_wl_udf(col('Ls')) )\
                   .withColumnRenamed("Province","P")\
                   .withColumnRenamed("District", "D")

avg_wl_group.show()   

# join voi df1 
df2 = df1.join(avg_wl_group,(df1.Province == avg_wl_group.P) & (df1.District == avg_wl_group.D), "left_outer")\
        .drop(avg_wl_group.P)\
        .drop(avg_wl_group.D)\
        .drop('Ws')\
        .drop('Ls')

df2.show()

+--------------+----------+--------------------+--------------------+-----+-----+
|             P|         D|                  Ws|                  Ls|avg_w|avg_l|
+--------------+----------+--------------------+--------------------+-----+-----+
|        ha noi|  quoc oai|              [4.32]|              [24.5]| 4.32| 24.5|
|      dong nai|long khanh|[6.6, 9.0, 6.0, 5...|[23.2, 20.0, 14.0...|  6.3| 22.6|
|    tien giang|    my tho|[5.0, 8.0, 6.0, 2...|[11.0, 30.0, 22.0...|  5.0| 19.0|
|    quang ninh|   cam pha|        [16.0, 16.0]|        [20.0, 20.0]| 16.0| 20.0|
|tp ho chi minh|  binh tan|[4.0, 4.0, 3.6, 3...|[16.0, 8.0, 9.0, ...|  4.0| 11.0|
|tp ho chi minh|   can gio|[8.0, 5.0, 30.0, ...|              [22.0]| 19.0| 22.0|
|        ha noi|thuong tin|          [4.0, 4.0]|        [20.0, 15.0]|  4.0| 17.5|
|      dong nai|  xuan loc|         [10.0, 5.0]|        [86.0, 35.0]|  7.5| 60.5|
|       can tho| binh thuy|[5.0, 4.8, 4.5, 4...|[13.5, 20.0, 42.0...| 4.95| 14.0|
|      lam dong|

In [7]:
# null thay bang avg_groupBy
# sau b1, null thay bang avg_all

def fill_null(value, avg):
    if value is None:
        return avg
    return value

fill_null_wl_udf = udf(fill_null, DoubleType())
fill_null_bb_udf = udf(fill_null, IntegerType())

#thay nhung gia tri null cua width, length bang cot avg_w, avg_l
df2 = df2.withColumn("Width", fill_null_wl_udf(col('Width'), col('avg_w')))\
         .withColumn("Length", fill_null_wl_udf(col('Length'), col('avg_l')))\
         .drop('avg_w')\
         .drop('avg_l')

# tinh lai median all cua width, length
w_m = np.median(df2.select('Width').filter(col('Width').isNotNull()).collect())
l_m = np.median(df2.select('Length').filter(col('Length').isNotNull()).collect())

# thay nhung gia tri null cua width, length con lai bang w_m, l_m
df2 = df2.withColumn("Width", fill_null_wl_udf(col('Width'), lit(w_m)))\
         .withColumn("Length", fill_null_wl_udf(col('Length'), lit(l_m)))
df2.show()


+-----+------+--------+---------+------------+---------------+-----------+
|Width|Length|Bedrooms|Bathrooms|    District|       Province|      Price|
+-----+------+--------+---------+------------+---------------+-----------+
|  3.0|  10.3|       4|        3|      tay ho|         ha noi| 2350000000|
|  4.2|  12.5|       4|        5|      quan 7| tp ho chi minh| 6200000000|
|  4.0|  10.0|       3|     null|   hoang mai|         ha noi| 2100000000|
|  5.0|  14.0|       4|        5|    thuan an|     binh duong| 2500000000|
|  4.0|  16.0|       3|        2|     quan 12| tp ho chi minh|  800000000|
|  5.5|  10.0|       3|     null|   hoang mai|         ha noi| 2500000000|
|  5.0|  18.0|       5|     null|    vung tau|ba ria-vung tau| 6500000000|
| 10.0|  10.0|       5|        6| nam tu liem|         ha noi|15000000000|
|  3.8|  16.0|       4|        3|     ha dong|         ha noi| 3600000000|
|  4.0|  12.5|       3|        2|     thu duc| tp ho chi minh| 3000000000|
|  4.3|  10.0|    null|  

In [8]:
# add column area
df3 = df2.withColumn("area", round(col('Length') * col('Width') ))
df3.show()

def get_median_bb(lists):
    try:
        lists = list(filter(None, lists))
        if len(lists) > 0:
            med = np.median(lists)
            return int(med)
        return None
    except Exception as e:
        return None
    
get_median_bb_udf = udf(get_median_bb, IntegerType())
# tao dataframe co column median cua bedrooms, bathrooms theo area
avg_b_group = df3.groupBy('area')\
                .agg(collect_list('Bedrooms').alias('BEs'), collect_list('Bathrooms').alias('BAs'))\
                .withColumn('avg_be', get_median_bb_udf(col('BEs')) )\
                .withColumn('avg_bb', get_median_bb_udf(col('BAs')) )\
                .drop('BAs')\
                .drop('BEs')
avg_b_group.show()
# join 
df4 = df3.join(avg_b_group, df3.area == avg_b_group.area, 'left_outer').drop(df3.area).drop('area')
df4.show()

+-----+------+--------+---------+------------+---------------+-----------+-----+
|Width|Length|Bedrooms|Bathrooms|    District|       Province|      Price| area|
+-----+------+--------+---------+------------+---------------+-----------+-----+
|  3.0|  10.3|       4|        3|      tay ho|         ha noi| 2350000000| 31.0|
|  4.2|  12.5|       4|        5|      quan 7| tp ho chi minh| 6200000000| 53.0|
|  4.0|  10.0|       3|     null|   hoang mai|         ha noi| 2100000000| 40.0|
|  5.0|  14.0|       4|        5|    thuan an|     binh duong| 2500000000| 70.0|
|  4.0|  16.0|       3|        2|     quan 12| tp ho chi minh|  800000000| 64.0|
|  5.5|  10.0|       3|     null|   hoang mai|         ha noi| 2500000000| 55.0|
|  5.0|  18.0|       5|     null|    vung tau|ba ria-vung tau| 6500000000| 90.0|
| 10.0|  10.0|       5|        6| nam tu liem|         ha noi|15000000000|100.0|
|  3.8|  16.0|       4|        3|     ha dong|         ha noi| 3600000000| 61.0|
|  4.0|  12.5|       3|     

In [9]:
# null thay bang avg_groupBy
# sau b1, null thay bang avg_all

# thay nhung gia tri null cua bed, bath bang column avg_be, avg_bb
df5 = df4.withColumn("Bedrooms", fill_null_bb_udf(col('Bedrooms'), col('avg_be')))\
         .withColumn("Bathrooms", fill_null_bb_udf(col('Bathrooms'), col('avg_bb')))\
         .drop('avg_be')\
         .drop('avg_bb')

# tinh lai median
bed_m = int(np.median(df1.select('Bedrooms').filter(col('Bedrooms').isNotNull()).collect()))
bath_m = int(np.median(df1.select('Bathrooms').filter(col('Bathrooms').isNotNull()).collect()))

# thay nhung gia tri null cua bed, bath con lai bang bed_m, bath_m
df5 = df5.withColumn("Bedrooms", fill_null_bb_udf(col('Bedrooms'), lit(bed_m)))\
         .withColumn("Bathrooms", fill_null_bb_udf(col('Bathrooms'), lit(bath_m)))

print(df5.count())
df5.show()    

65420
+-----+------+--------+---------+-----------+---------------+-----------+
|Width|Length|Bedrooms|Bathrooms|   District|       Province|      Price|
+-----+------+--------+---------+-----------+---------------+-----------+
|15.75|  19.0|       6|        6|     ba ria|ba ria-vung tau| 8000000000|
| 13.0|  23.0|       5|        6|     quan 1| tp ho chi minh|65000000000|
|  4.4|  68.0|       4|        6|    thu duc| tp ho chi minh| 6600000000|
| 12.2|  25.0|       6|        3|   tan binh| tp ho chi minh|45000000000|
| 12.2|  25.0|       6|        3|   tan binh| tp ho chi minh|45000000000|
| 18.0|  31.0|      25|        3|     quan 8| tp ho chi minh|35000000000|
|  7.0|  21.0|       5|        5|  phu nhuan| tp ho chi minh|16000000000|
|  6.0|  24.5|       3|        4|   vung tau|ba ria-vung tau|11000000000|
|  6.0|  24.5|       5|        6|   tan binh| tp ho chi minh|35000000000|
|  6.0|  24.5|       6|        5|   binh tan| tp ho chi minh|25000000000|
|  7.0|  21.0|       4|        4

In [10]:
# df5.select('District').filter(col('District').contains('quan')).show(100)
# ghi vao file
df5.repartition(1).write.format('csv')\
                  .option('header',True)\
                  .mode('overwrite')\
                  .option('sep',',')\
                  .save('/output')

In [28]:
spark.stop()