In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types
from bucket_utils import Bucket
from datetime import datetime
import os
import pickle

In [8]:
region = 'uk-london-1'
namespace = 'lrqgbz9z6zlj'
bucket_name = 'london-property-sales-price'
ppd_download_bucket_folder = 'ppd-download-chunks/'
epc_download_bucket_folder = 'epc-download-chunks/'
# save_as_table_name
# write_mode

In [9]:
match_rate_path = "../../../../data/match_rate_log.txt"
index_bounds_path = "../../../../data/index-bound.pkl"

OCI_ACCESS_KEY_ID = os.environ['OCI_ACCESS_KEY_ID']
OCI_SECRET_ACCESS_KEY = os.environ['OCI_SECRET_ACCESS_KEY']
OCI_REGION = region
OCI_NAMESPACE = namespace
BUCKET_NAME = bucket_name
DWH_USER = os.environ["DWH_USER"]
DWH_PASSWORD = os.environ["DWH_PASSWORD"]
DWH_DB = os.environ["DWH_DB"]

db_url = f"jdbc:postgresql://pgwarehouse:5432/{DWH_DB}"
db_properties = {
    "user": DWH_USER,
    "password": DWH_PASSWORD,
    "driver": "org.postgresql.Driver"
}

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Transform') \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.506') \
    .config('spark.hadoop.fs.s3a.endpoint', f'https://{OCI_NAMESPACE}.compat.objectstorage.{OCI_REGION}.oraclecloud.com') \
    .config('spark.hadoop.fs.s3a.access.key', OCI_ACCESS_KEY_ID) \
    .config('spark.hadoop.fs.s3a.secret.key', OCI_SECRET_ACCESS_KEY) \
    .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .config('spark.hadoop.fs.s3a.connection.ssl.enabled', 'true') \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.7.4.jar") \
    .getOrCreate()

In [10]:
ppd_path = f's3a://{BUCKET_NAME}/{ppd_download_bucket_folder}/*.parquet'
ppd = spark.read.parquet(ppd_path, header=True, inferSchema=True)

bucket_session = Bucket(OCI_REGION, OCI_NAMESPACE, bucket_name)
objects = bucket_session.list_objects()

folders = [x for x in objects if epc_download_bucket_folder in x]

folders = set([x.split("/")[1] for x in folders])

epc_path = f"s3a://{BUCKET_NAME}/"

epc_file_paths = [f"{epc_path}/{epc_download_bucket_folder}/{folder}/*.parquet" for folder in folders]

epc = spark.read.parquet(*epc_file_paths)

epc = epc.withColumn("TOTAL_FLOOR_AREA", F.col("TOTAL_FLOOR_AREA").cast(types.IntegerType()))
epc = epc.na.fill({"TOTAL_FLOOR_AREA": 0})

def create_key(df, column_list):
    new_cols = []
    for col in column_list:
        new_col = col+"_fmt"
        df = df.withColumn(new_col, F.nvl(col, F.lit("")))
        df = df.withColumn(new_col, F.trim(new_col))
        df = df.withColumn(new_col, F.lower(new_col))
        df = df.withColumn(new_col, F.regexp_replace(new_col," ", ""))
        df = df.withColumn(new_col, F.regexp_replace(new_col,",", ""))
        new_cols.append(new_col)
    df = df.withColumn("KEY", F.concat(*new_cols))
    df = df.na.fill({"KEY": ""})
    df = df.drop(*new_cols)
    return df

ppd_cols_keys = ["SAON", "PAON", "STREET","POSTCODE"]
ppd = create_key(ppd, ppd_cols_keys)

epc_cols_keys = ['ADDRESS', 'POSTCODE']
epc = create_key(epc, epc_cols_keys)

# DEDUPE EPC
# For addresses that have multiple values for "TOTAL_FLOOR_AREA", keep only the highest.
max_floor_area = epc.groupby('key').agg(F.max("TOTAL_FLOOR_AREA").alias("TOTAL_FLOOR_AREA"))
epc = epc.join(max_floor_area, on=['key', 'TOTAL_FLOOR_AREA'])[['key', 'TOTAL_FLOOR_AREA']].distinct()

data = ppd.join(epc, how='left', on='key')

In [20]:
match_agg = data.groupby(data['TOTAL_FLOOR_AREA'].isNotNull().alias("is_match")).count()

In [21]:
cnt_match = match_agg[match_agg["is_match"]==True].collect()[0][1]

In [22]:
cnt_tot = data.count()
match_rate = cnt_match/cnt_tot

In [29]:
cnt_tot

1599224

In [35]:
1000 / (cnt_tot-cnt_match)

0.000861729473819366

In [24]:
match_rate

0.27436244078378014

In [26]:
nulls = data[data['TOTAL_FLOOR_AREA'].isNull()]

In [27]:
nulls.columns

['KEY',
 'TRANSACTION_UNIQUE_IDENTIFIER',
 'PRICE',
 'DATE_OF_TRANSFER',
 'POSTCODE',
 'PROPERTY_TYPE',
 'OLD_NEW',
 'DURATION',
 'PAON',
 'SAON',
 'STREET',
 'LOCALITY',
 'TOWN_CITY',
 'DISTRICT',
 'COUNTY',
 'PPD_CATEGORY_TYPE',
 'RECORD_STATUS_MONTHLY_FILE_ONLY',
 '__index_level_0__',
 'TOTAL_FLOOR_AREA']

In [38]:
unmatched = nulls.sample(withReplacement=True, fraction=0.0009, seed=3).toPandas()

In [39]:
unmatched

Unnamed: 0,KEY,TRANSACTION_UNIQUE_IDENTIFIER,PRICE,DATE_OF_TRANSFER,POSTCODE,PROPERTY_TYPE,OLD_NEW,DURATION,PAON,SAON,STREET,LOCALITY,TOWN_CITY,DISTRICT,COUNTY,PPD_CATEGORY_TYPE,RECORD_STATUS_MONTHLY_FILE_ONLY,__index_level_0__,TOTAL_FLOOR_AREA
0,52orbiswharfbridgescourtroadsw113gw,{5A9D8B55-C1C5-68EB-E053-6B04A8C0D293},141000,2017-09-22 00:00,SW11 3GW,F,N,L,ORBIS WHARF,52,BRIDGES COURT ROAD,,LONDON,WANDSWORTH,GREATER LONDON,A,A,22301937,
1,69hamptonhouse2michaelroadsw62rn,{152AB733-8175-E651-E063-4704A8C061D9},845424,2022-12-19 00:00,SW6 2RN,F,Y,L,"HAMPTON HOUSE, 2",69,MICHAEL ROAD,,LONDON,HAMMERSMITH AND FULHAM,GREATER LONDON,A,A,27992631,
2,apartment13mulberryapartmentscosteravenuen42ld,{FF538F41-4EDC-49AB-90C7-439AAEBE67D1},149750,2015-03-27 00:00,N4 2LD,F,Y,L,MULBERRY APARTMENTS,APARTMENT 13,COSTER AVENUE,,LONDON,HACKNEY,GREATER LONDON,A,A,20378801,
3,apartment14021stgabrielwalkse16fb,{3E0330F0-6C5D-8D89-E050-A8C062052140},650000,2016-06-23 00:00,SE1 6FB,F,Y,L,1,APARTMENT 1402,ST GABRIEL WALK,,LONDON,SOUTHWARK,GREATER LONDON,A,A,21036754,
4,apartment68switchhousewestbatterseapowerstatio...,{EA3278AA-9487-2676-E053-6B04A8C015F8},760000,2021-06-18 00:00,SW11 8BD,F,Y,L,SWITCH HOUSE WEST BATTERSEA POWER STATION,APARTMENT 68,CIRCUS ROAD WEST,,LONDON,WANDSWORTH,GREATER LONDON,A,A,26245103,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1083,nanlincolnplazahotelindesconcourtnan,{AE4D86D4-991B-4619-E053-6C04A8C03CD0},30075903,2019-07-22 00:00,,O,N,L,LINCOLN PLAZA HOTEL,,INDESCON COURT,,LONDON,TOWER HAMLETS,GREATER LONDON,B,A,24751351,
1084,theflat124stjohnstreetec1v4js,{593E753B-D424-46B2-B7AD-D61412823666},525000,2012-06-01 00:00,EC1V 4JS,F,N,L,124,THE FLAT,ST JOHN STREET,,LONDON,ISLINGTON,GREATER LONDON,A,A,17760957,
1085,unit18railwayarchesmunsterroadsw64ry,{B82222ED-8F58-6691-E053-6B04A8C02FB2},200955,2020-03-31 00:00,SW6 4RY,O,N,L,RAILWAY ARCHES,UNIT 18,MUNSTER ROAD,,LONDON,HAMMERSMITH AND FULHAM,GREATER LONDON,B,A,25634107,
1086,unit4eastcoteindustrialestatefieldendroadha49xg,{EC7AD09A-8B96-9200-E053-6C04A8C0E306},1025000,2022-06-21 00:00,HA4 9XG,O,N,F,EASTCOTE INDUSTRIAL ESTATE,UNIT 4,FIELD END ROAD,,RUISLIP,HILLINGDON,GREATER LONDON,B,A,27647703,


In [43]:
epc = spark.read.parquet(*epc_file_paths)

epc = epc.withColumn("TOTAL_FLOOR_AREA", F.col("TOTAL_FLOOR_AREA").cast(types.IntegerType()))
epc = epc.na.fill({"TOTAL_FLOOR_AREA": 0})

epc_cols_keys = ['ADDRESS', 'POSTCODE']
epc = create_key(epc, epc_cols_keys)

In [44]:
epc.columns

['ADDRESS', 'POSTCODE', 'TOTAL_FLOOR_AREA', '__index_level_0__', 'KEY']

In [45]:
epc[epc['KEY']=="52orbiswharfbridgescourtroadsw113gw"].show()

+-------+--------+----------------+-----------------+---+
|ADDRESS|POSTCODE|TOTAL_FLOOR_AREA|__index_level_0__|KEY|
+-------+--------+----------------+-----------------+---+
+-------+--------+----------------+-----------------+---+



In [57]:
tmp = epc[
    (epc['POSTCODE'].contains("SW11 3GW"))
    # &(epc['ADDRESS'].contains("rbis"))
    # &(epc['ADDRESS'].startswith("52"))
].toPandas()

In [58]:
tmp.to_csv("epc_tmp.csv", index=None)

In [56]:
tmp_data = data[data['POSTCODE']=="SW11 3GW"].toPandas()

In [60]:
tmp_data.to_csv("data_tmp.csv", index=None)

In [None]:
epc[epc['KEY']=="52orbiswharfbridgescourtroadsw113gw"].show()

In [4]:

    
    timestamp = str(datetime.today()).split(".")[0]
    with open(match_rate_path, "a") as f:
        f.write(f"\n{timestamp}|{match_rate:.2f}")
    
    data = data.withColumn("PARTIAL_POSTCODE", F.split_part(F.col('postcode'), F.lit(' '), F.lit(1)))
    
    data = data.withColumn(
        "PROPERTY_TYPE",
        F.when(data["PROPERTY_TYPE"] == "D", "Detached")
         .when(data["PROPERTY_TYPE"] == "S", "Semi-Detached")
         .when(data["PROPERTY_TYPE"] == "T", "Terraced")
         .when(data["PROPERTY_TYPE"] == "F", "Flats/Maisonettes")
         .when(data["PROPERTY_TYPE"] == "O", "Other")
         .otherwise(data["PROPERTY_TYPE"])
    )
    
    data = data.withColumn(
        "OLD_NEW",
        F.when(data["OLD_NEW"] == "Y", "newly built")
         .when(data["OLD_NEW"] == "N", "old")
         .otherwise(data["OLD_NEW"])
    )
    
    data = data.withColumn(
        "DURATION",
        F.when(data["DURATION"] == "F", "Freehold")
         .when(data["DURATION"] == "L", "Leasehold")
         .otherwise(data["DURATION"])
    )
    
    data = data.drop('key')
    
    data = data.na.fill({"TOTAL_FLOOR_AREA": 0})
    
    data = data.withColumn('DATE_OF_TRANSFER',F.col('DATE_OF_TRANSFER').cast(types.DateType()))
    data = data.withColumn('PRICE',F.col('PRICE').cast(types.FloatType()))
    
    data = data.withColumn("__index__", F.monotonically_increasing_id())
    data = data.drop('__index_level_0__')

    index_bounds = data.select(
                F.min("__index__").alias('MIN_INDEX'),
                F.max("__index__").alias('MAX_INDEX')
            ).collect()
    
    bounds_list = [index_bounds[0][0], index_bounds[0][1]]
    with open(index_bounds_path, "wb") as f:
        pickle.dump(bounds_list, f)

    data.write.jdbc(url=db_url, table=save_as_table_name, mode=write_mode, properties=db_properties)
    
    spark.stop()

In [1]:
import pandas as pd

In [2]:
df = pd.DataFrame({
    'A':[1,2,None],
    'B':[1,2,3],
    'c':[1,2,3]
})

In [3]:
df

Unnamed: 0,A,B,c
0,1.0,1,1
1,2.0,2,2
2,,3,3


In [5]:
for col in df.columns:
    df = df[df[col].notnull()]

In [6]:
df

Unnamed: 0,A,B,c
0,1.0,1,1
1,2.0,2,2
