In [1]:
import gc
import re
import string
import logging
from copy import copy, deepcopy

import underthesea
import numpy as np
import pyspark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
from pyspark.sql.functions import col, udf, lit, greatest, monotonically_increasing_id, concat_ws
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, StructType, StructField, MapType, BooleanType
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, MinHashLSH
from pyspark.ml.linalg import Vectors, VectorUDT

In [2]:
# spark = SparkSession.builder.master("spark://172.24.0.11:7077").getOrCreate()
spark = SparkSession.builder.appName('streaming').getOrCreate()

24/01/03 18:51:59 WARN Utils: Your hostname, haihp02 resolves to a loopback address: 127.0.1.1; using 192.168.102.7 instead (on interface wlp5s0)
24/01/03 18:51:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/03 18:52:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/03 18:52:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
hdfs_file_path = [
    'hdfs://192.168.102.7:10000/haihp02/real_estate_data/bds.jsonl',
    'hdfs://192.168.102.7:10000/haihp02/real_estate_data/i-batdongsan.jsonl',
    'hdfs://192.168.102.7:10000/haihp02/real_estate_data/nhadatviet.jsonl',
]
df = spark.read.json(hdfs_file_path)

                                                                                

In [4]:
df.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- full_address: string (nullable = true)
 |    |-- province: string (nullable = true)
 |    |-- ward: string (nullable = true)
 |-- contact_info: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- estate_type: string (nullable = true)
 |-- extra_infos: struct (nullable = true)
 |    |-- Chiều dài: string (nullable = true)
 |    |-- Chiều ngang: string (nullable = true)
 |    |-- Chính chủ: boolean (nullable = true)
 |    |-- Chổ để xe hơi: boolean (nullable = true)
 |    |-- Hướng: string (nullable = true)
 |    |-- Loại tin: string (nullable = true)
 |    |-- Lộ giới: string (nullable = true)
 |    |-- Nhà bếp: boolean (nullable = true)
 |    |-- Pháp lý: string (nullable = true)
 |    |-- Phòng ăn: boolean (nullable = true)
 |    |-- 

In [5]:
df.count()

                                                                                

250796

## De-duplicate

In [7]:
df = df.withColumn('Id', monotonically_increasing_id())

In [8]:
df = df.withColumn("text", concat_ws(' ', col('title'), col('description')))
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
df = tokenizer.transform(df)

In [9]:
hashingTF = HashingTF(inputCol="tokens", outputCol="tf")
df = hashingTF.transform(df)

idf = IDF(inputCol="tf", outputCol="tfidf")
idf_model = idf.fit(df)
df = idf_model.transform(df)

                                                                                

In [10]:
append_non_zero_udf = udf(lambda v: Vectors.sparse(len(v) + 1, list(v.indices) + [len(v)], list(v.values) + [1e-5]), VectorUDT())

df = df.withColumn("tfidf", append_non_zero_udf(col("tfidf")))

In [30]:
num_hash_tables = 5
minhashLSH = MinHashLSH(inputCol="tfidf", outputCol="hashes", numHashTables=num_hash_tables)
model = minhashLSH.fit(df)
df_duplicates = model.approxSimilarityJoin(df.select("Id", "tfidf"), df.select("Id", "tfidf"), 0.8, distCol="JaccardDistance") \
    .filter("datasetA.id < datasetB.id")  # Avoid comparing a row to itself

24/01/02 22:48:16 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


In [31]:
df_duplicates.show()

24/01/02 22:48:17 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/02 22:48:18 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB


+--------------------+--------------------+------------------+
|            datasetA|            datasetB|   JaccardDistance|
+--------------------+--------------------+------------------+
|{6, (262145,[191,...|{16, (262145,[191...|0.7549019607843137|
|{6, (262145,[191,...|{14, (262145,[191...|0.6513761467889908|
|{13, (262145,[7,4...|{15, (262145,[191...|0.7669902912621359|
|{11, (262145,[191...|{12, (262145,[704...|0.7272727272727273|
|{3, (262145,[191,...|{18, (262145,[191...|0.5189873417721519|
|{2, (262145,[191,...|{18, (262145,[191...|0.7213114754098361|
|{3, (262145,[191,...|{6, (262145,[191,...|0.5544554455445545|
|{14, (262145,[191...|{16, (262145,[191...|0.6578947368421053|
|{2, (262145,[191,...|{15, (262145,[191...|0.7355371900826446|
|{16, (262145,[191...|{18, (262145,[191...|0.7272727272727273|
|{4, (262145,[191,...|{13, (262145,[7,4...|0.7966101694915254|
|{5, (262145,[191,...|{13, (262145,[7,4...|0.7699115044247787|
|{6, (262145,[191,...|{17, (262145,[191...|0.6388888888

24/01/02 22:48:19 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


In [45]:
result_df = df.join(remove_ids, df["Id"] == remove_ids["id"], "leftanti")

In [46]:
result_df.show()

24/01/02 22:53:33 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/02 22:53:33 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/02 22:53:34 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+--------------------+--------------------+--------------------+------------+--------------------+--------------------+----------+-------+----------+------+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|             address|        contact_info|         description| estate_type|         extra_infos|                link| post_date|post_id|     price|square|               title| Id|                text|              tokens|                  tf|               tfidf|
+--------------------+--------------------+--------------------+------------+--------------------+--------------------+----------+-------+----------+------+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|{Hai Bà Trưng, đư...|{Lại Tiến Dũng, [...|Dòng tiền ổn định...|Nhà ngõ, hẻm|{10.7m, 4.2m, Đôn...|https://guland.vn...|2023/05/29| 464352|5500000000|    45|Dòng tiền ổn định...|  7|Dòng tiền ổn định...|

24/01/02 22:53:34 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


## Text

In [4]:
def get_special_chars(df: pyspark.sql.dataframe.DataFrame):
    # get concatenated text
    concatenated_text = df.select(f.concat_ws(' ', col('title'), col('description')).alias('concatenated_text'))
    all_characters = concatenated_text.rdd.flatMap(lambda x: x[0])
    special_characters = all_characters.filter(lambda c: not c.isalnum() and not c.isspace() and not c in string.punctuation)
    return set(special_characters.collect())

@udf(returnType=StringType())
def remove_special_chars(input_string, special_chars_list, at_once=False):
    if not input_string:
        return None
    if at_once:
        special_chars_string = ''.join(special_chars_list)
        translator = str.maketrans('', '', special_chars_string)
        result = input_string.translate(translator)
    else:
        result = input_string
        for c in special_chars_list:
            result = result.replace(c, '')
    return result

In [6]:
special_chars_list = list(get_special_chars(df))
special_chars_lit = lit(special_chars_list)
df = df.withColumn("title", remove_special_chars("title", special_chars_lit))
df = df.withColumn("description", remove_special_chars("description", special_chars_lit))

                                                                                

In [7]:
special_chars_list

['→',
 '\u202a',
 '\uf0d8',
 '✤',
 '\u200c',
 'ۣ',
 '🅖',
 '–',
 '₋',
 '●',
 '¬',
 '̶',
 '▬',
 '≈',
 '🫵',
 '◇',
 '▷',
 '🪷',
 '◊',
 '‐',
 '🫴',
 '\uf05b',
 '⦁',
 '️',
 '㎡',
 '🫰',
 '′',
 '✥',
 '✧',
 '♤',
 '🫶',
 'ۜ',
 '❃',
 '̀',
 '֍',
 '\u2060',
 '\u206e',
 '‘',
 '❈',
 '🅣',
 '🅘',
 '℅',
 '\ufeff',
 '″',
 '\u200b',
 '♚',
 '̣',
 '₫',
 '\uf06e',
 '✩',
 '🅨',
 '’',
 '\xad',
 '★',
 '±',
 '\U0001fae8',
 '︎',
 '\uf0f0',
 '∙',
 '♛',
 '̉',
 '̛',
 '❆',
 '✜',
 '÷',
 '♜',
 '·',
 '❖',
 '】',
 '❁',
 '🫱',
 '・',
 '€',
 '☛',
 '“',
 '■',
 '\uf046',
 '￼',
 '�',
 '\u200d',
 '🫠',
 '\uf0e8',
 '⁃',
 '≥',
 '～',
 '➣',
 '́',
 '🪩',
 '̃',
 '\uf02b',
 '᪥',
 '🪺',
 '♧',
 '❂',
 '。',
 '♡',
 '，',
 '🪸',
 '：',
 '¥',
 '❝',
 '̂',
 '\U0001fa77',
 '\uf0a7',
 'ৣ',
 '⚘',
 '➢',
 '⇔',
 '、',
 '－',
 '✆',
 '🫣',
 '⛫',
 '►',
 '̆',
 '✎',
 '❯',
 '《',
 '\uf076',
 '❮',
 '❀',
 '̵',
 '🥹',
 '❉',
 '̷',
 '\uf028',
 '✽',
 '«',
 '⇒',
 '➤',
 '\uf0e0',
 '\U0001faad',
 '♙',
 '\uf0fc',
 '【',
 '➥',
 '¤',
 '＆',
 '🛇',
 '\x7f',
 '）',
 '—',
 '”',
 '❞',
 '》',
 '

In [9]:
@udf(returnType=StringType())
def remove_duplicate_punctuation_sequence(input_string):
    def remove_duplicate_sequence(text, target_char, max_length):
        pattern_1 = re.escape(target_char) + '{' + str(max_length) + ',}'
        pattern_2 = '(' + '\s' + re.escape(target_char) + ')' + '{' + str(max_length) + ',}'
        result = re.sub(pattern_2, target_char, re.sub(pattern_1, target_char, text))
        return result
    
    if not input_string:
        return None
    result = input_string
    for punc in string.punctuation:
        if punc == '\\':
            continue
        max_length = 3 if punc == '.' else 1
        reuslt = remove_duplicate_sequence(result, punc, max_length)
    return reuslt

In [10]:
df = df.withColumn("title", remove_duplicate_punctuation_sequence("title"))
df = df.withColumn("description", remove_duplicate_punctuation_sequence("description"))

In [11]:
def get_estate_types(df: pyspark.sql.dataframe.DataFrame):
    df = df.filter(df['estate_type'].isNotNull())
    all_estate_types = df.select('estate_type').rdd.map(lambda x: x[0])
    estate_types_set = set(all_estate_types.collect())
    return estate_types_set

@udf(returnType=StringType())
def normalize_estate_type(input_estate_type):
    if not input_estate_type:
        return None
    estate_type_prefix = ['Cho thuể', 'Mua bán', 'Căn hộ']
    estate_type_map = {
        'Biệt thự, liền k`ề': 'Biệt thự liền kề',
        'Nhà biệt thự liền kề': 'Biệt thự liền kề',
        'Nhà mặt phố': 'Nhà mặt tiền',
        'Phòng trọ, nhà trọ, nhà trọ': 'Phòng trọ, nhà trọ',
        'Phòng trọ': 'Phòng trọ, nhà trọ',
        'Trang trại, khu nghỉ dưỡng': 'Trang trại khu nghỉ dưỡng',
        'Kho nhà xưởng': 'Kho xưởng',
        'Kho, xưởng': 'Kho xưởng'
    }
    result = input_estate_type
    for prefix in estate_type_prefix:
        result = result.replace(prefix, '').strip().capitalize()
    for estate_type in estate_type_map.keys():
        if result == estate_type:
            result = estate_type_map[estate_type]
    return result

In [12]:
df = df.withColumn("estate_type", normalize_estate_type("estate_type"))

## Numeric

In [17]:
@udf(returnType=FloatType())
def price_normalize(price, square):
    if price is None:
        return None
    if isinstance(price, int) or isinstance(price, float):
        return price
    elif isinstance(price, str):
        if price.isnumeric():
            return float(price)
        if square is not None:
            price = underthesea.text_normalize(price)
            # Các trường hợp thực sự điền giá / m2
            if 'triệu/ m' in price or 'triệu / m' in price:
                price = float(price.split()[0]) * 1e6 * square
            # Các trường hợp điền nhầm giá sang giá / m2
            elif 'tỷ/ m' in price or 'tỷ / m' in price:
                price = float(price.split()[0]) * 1e9
            else:
                price = None
        elif square is None:
            price = None
    return price

def get_lower_upper_bound(df, col_name, lower_percent=5, upper_percent=95, outlier_threshold=5):
    lower_percentile, upper_percentile = df.approxQuantile(col_name, [lower_percent/100, upper_percent/100], 0.01)
    quantile_range = upper_percentile - lower_percentile
    lower_bound = np.max([0, lower_percentile - outlier_threshold * quantile_range])
    upper_bound = upper_percentile + outlier_threshold * quantile_range
    return lower_bound, upper_bound

def get_detail_lower_upper_bound(df, col_name, lower_percent=5, upper_percent=95, outlier_threshold=5, overprice=1e15):
    quantiles_by_estate_type = (
        df.groupBy("estate_type")
        .agg(f.percentile_approx(col_name, [lower_percent/100, upper_percent/100], 100).alias("percentile_approx"))
    )
    quantiles_by_estate_type = quantiles_by_estate_type.withColumn("lower_percentile", col("percentile_approx").getItem(0)) \
                                                       .withColumn("upper_percentile", col("percentile_approx").getItem(1)) \
                                                       .withColumn("quantile_range", col("upper_percentile") - col("lower_percentile"))
    quantiles_by_estate_type = quantiles_by_estate_type.withColumn("lower_bound", greatest(col("lower_percentile") - outlier_threshold * col("quantile_range"), lit(0))) \
                                                       .withColumn("upper_bound", col("upper_percentile") + outlier_threshold * col("quantile_range"))
    
    return quantiles_by_estate_type

In [19]:
get_detail_lower_upper_bound(df, "price").show()



+--------------------+--------------------+----------------+----------------+--------------+-----------+-----------+
|         estate_type|   percentile_approx|lower_percentile|upper_percentile|quantile_range|lower_bound|upper_bound|
+--------------------+--------------------+----------------+----------------+--------------+-----------+-----------+
|             Nhà phố|    [4.0E9, 1.22E11]|           4.0E9|         1.22E11|       1.18E11|        0.0|    7.12E11|
|           Văn phòng|    [7.8E9, 3.83E11]|           7.8E9|         3.83E11|      3.752E11|        0.0|   2.259E12|
|Đất nông, lâm nghiệp|[3000000.0, 1.125...|       3000000.0|        1.125E11|    1.12497E11|        0.0| 6.74985E11|
|  Mặt bằng, cửa hàng|     [2.0E7, 1.2E11]|           2.0E7|          1.2E11|     1.1998E11|        0.0|   7.199E11|
|   Biệt thự, liền kề|     [6.3E9, 8.0E10]|           6.3E9|          8.0E10|       7.37E10|        0.0|   4.485E11|
|    Đất nền, phân lô|     [5.0E8, 3.7E10]|           5.0E8|    

                                                                                

## Extra infos

In [87]:
def get_extra_info_labels(df):
    extra_infos_df = df.select("extra_infos")
    extra_infos_labels = extra_infos_df.rdd.flatMap(lambda x: list(x[0].asDict().keys())).collect()
    return set(extra_infos_labels)

def cast_to_string(value):
    try:
        return str(value)
    except (ValueError, TypeError):
        return None

def cast_to_boolean(value):
    try:
        return bool(value)
    except (ValueError, TypeError):
        return None

def cast_to_integer(value):
    try:
        return int(value)
    except (ValueError, TypeError):
        return None
    
def cast_to_float(value):
    try:
        return float(value)
    except (ValueError, TypeError):
        return None
    
def normalize_text_field_in_dict(dict_obj):
    result_dict = dict_obj
    for key in result_dict.keys():
        if isinstance(result_dict[key], str):
            result_dict[key] = result_dict[key].replace(',', '.')
            new_val = ''
            for c in result_dict[key]:
                if c.isalpha() or c.isnumeric() or c == '.' or c == ' ':
                    new_val += c
            result_dict[key] = new_val
    return result_dict

old_keys = ['Chiều dài', 'Chiều ngang', 'Chính chủ', 'Chổ để xe hơi', 'Hướng', 'Loại tin', 'Lộ giới', 'Nhà bếp', 'Pháp lý', 'Phòng ăn', 'Sân thượng', 'Số lầu', 'Số phòng ngủ', 'Số phòng ngủ :', 'Số toilet :', 'Tầng :']
new_keys = ['Chiều dài', 'Chiều ngang', 'Chính chủ', 'Chỗ để xe hơi', 'Hướng', 'remove', 'Lộ giới', 'Nhà bếp', 'Pháp lý', 'Phòng ăn', 'Sân thượng', 'Số lầu', 'Số phòng ngủ', 'Số phòng ngủ', 'Số toilet', 'Tầng']
remove_keys = ['remove']
@udf(returnType=StructType([
    StructField('Chiều dài', FloatType()),
    StructField('Chiều ngang', FloatType()),
    StructField('Chính chủ', BooleanType()),
    StructField('Chỗ để xe hơi', BooleanType()),
    StructField('Hướng', StringType()),
    StructField('Lộ giới', FloatType()),
    StructField('Nhà bếp', BooleanType()),
    StructField('Pháp lý', StringType()),
    StructField('Phòng ăn', BooleanType()),
    StructField('Sân thượng', BooleanType()),
    StructField('Số lầu', IntegerType()),
    StructField('Số phòng ngủ', IntegerType()),
    StructField('Số toilet', IntegerType()),
    StructField('Tầng', IntegerType()),
]))
def normalize_extra_infos_dict(input_extra_infos_row, old_keys, new_keys, remove_keys):
    old_keys = list(old_keys)
    new_keys = list(new_keys)
    remove_keys = list(remove_keys)
    assert len(old_keys) == len(new_keys)

    # Normalize dict keys
    extra_infos_dict = input_extra_infos_row.asDict()
    dict_nomalized_keys = {}

    for old_key, new_key in zip(old_keys, new_keys):
        if old_key in extra_infos_dict.keys():
            if new_key in dict_nomalized_keys.keys() and dict_nomalized_keys[new_key] is None \
                or new_key not in dict_nomalized_keys.keys():
                dict_nomalized_keys[new_key] = extra_infos_dict[old_key]
        else:
            dict_nomalized_keys[new_key] = None
    for key in remove_keys:
        if key in dict_nomalized_keys.keys():
            dict_nomalized_keys.pop(key)
    # Normalize dict values
    result_dict = normalize_text_field_in_dict(dict_nomalized_keys)
    result_dict['Chiều dài'] = cast_to_float(dict_nomalized_keys['Chiều dài'].replace('m', ''))
    result_dict['Chiều ngang'] = cast_to_float(dict_nomalized_keys['Chiều ngang'].replace('m', ''))
    result_dict['Chính chủ'] = cast_to_boolean(dict_nomalized_keys['Chính chủ'])
    result_dict['Chỗ để xe hơi'] = cast_to_boolean(dict_nomalized_keys['Chỗ để xe hơi'])
    result_dict['Hướng'] = cast_to_string(dict_nomalized_keys['Hướng'])
    result_dict['Lộ giới'] = cast_to_float(dict_nomalized_keys['Lộ giới'].replace('m', ''))
    result_dict['Nhà bếp'] = cast_to_boolean(dict_nomalized_keys['Nhà bếp'])
    result_dict['Pháp lý'] = cast_to_string(dict_nomalized_keys['Pháp lý'])
    result_dict['Phòng ăn'] = cast_to_boolean(dict_nomalized_keys['Phòng ăn'])
    result_dict['Sân thượng'] = cast_to_boolean(dict_nomalized_keys['Sân thượng'])
    result_dict['Số lầu'] = cast_to_integer(dict_nomalized_keys['Số lầu'])
    result_dict['Số phòng ngủ'] = cast_to_integer(dict_nomalized_keys['Số phòng ngủ'])
    result_dict['Số toilet'] = cast_to_integer(dict_nomalized_keys['Số toilet'])
    result_dict['Tầng'] = cast_to_integer(dict_nomalized_keys['Tầng'])
    return result_dict

In [83]:
df = df.withColumn("extra_infos", normalize_extra_infos_dict("extra_infos", lit(old_keys), lit(new_keys), lit(remove_keys)))