### Merging all paquet files at start of job...


In [1]:
import pandas as pd
import numpy as np
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import lit

conf = SparkConf().setAppName('idealista_format').setMaster(f"local[*]")
sc = SparkContext(conf=conf)

spark = SparkSession.builder.appName('idealista_format').getOrCreate()

directory = "landing/persistent/idealista"

parq_files = {}  # List which will store all of the full filepaths.
# Walk the tree.
for root, directories, files in os.walk(directory):
    for filename in files:
        if filename[-7:] == 'parquet':
            parq_files[root[29:39]] = (root+'/'+filename)
            

22/05/26 01:40:13 WARN Utils: Your hostname, m1Mac-5.local resolves to a loopback address: 127.0.0.1; using 192.168.0.159 instead (on interface en0)
22/05/26 01:40:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/05/26 01:40:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [33]:
def transform_idealista(rdd_in):
    '''
    
    '''
    transform_rdd = rdd_in \
        .map(lambda x: (x['propertyCode'], 
                        x['propertyType'],
                        x['operation'],
                        x['country'],
                        x['municipality'],
                        x['province'],
                        x['district'], 
                        x['neighborhood'],
                        x['price'],
                        x['priceByArea'],
                        x['rooms'],
                        x['bathrooms'],
                        x['size'],
                        x['status'],
                        x['floor'],
                        x['hasLift'],
                        x['newDevelopment'],
                        x['numPhotos'],
                        x['distance'], 
                        x['exterior'])) \
        .filter(lambda row: all(x is not None for x in row)) \
        .distinct()
    
    return transform_rdd


# spark transformations in sequence for each parquet file
i = 0 # special loop counter
for key in parq_files:
    # read spark df from parquet file
    df = spark.read.parquet(parq_files[key])
    rdd_addDate = df.withColumn("date", lit(key)).rdd # add 'date' attribute and transform into rdd
    transform_rdd = transform_idealista(rdd_addDate) # remove duplicates and select attributes
    if i == 0:
        union_idealista_rdd = transform_rdd
    else:
        union_idealista_rdd = union_idealista_rdd.union(transform_rdd)
    i += 1

### Misc testing...

In [15]:
allTest = sc.textFile("landing/persistent/idealista/*")

In [16]:
allTest.count()

                                                                                

39655

In [2]:
df = spark.read.parquet("landing/persistent/idealista/2021_02_22_idealista/part-00000-10c3d7de-11b8-44a9-907f-6f0a7d7051d0-c000.snappy.parquet")

In [3]:
df

DataFrame[address: string, bathrooms: bigint, country: string, detailedType: struct<subTypology:string,typology:string>, distance: string, district: string, exterior: boolean, externalReference: string, floor: string, has360: boolean, has3DTour: boolean, hasLift: boolean, hasPlan: boolean, hasStaging: boolean, hasVideo: boolean, latitude: double, longitude: double, municipality: string, neighborhood: string, newDevelopment: boolean, numPhotos: bigint, operation: string, parkingSpace: struct<hasParkingSpace:boolean,isParkingSpaceIncludedInPrice:boolean,parkingSpacePrice:double>, price: double, priceByArea: double, propertyCode: string, propertyType: string, province: string, rooms: bigint, showAddress: boolean, size: double, status: string, suggestedTexts: struct<subtitle:string,title:string>, thumbnail: string, topNewDevelopment: boolean, url: string]

In [20]:
rdd_addDate = df.withColumn("date", lit('2021_02_22')).rdd.drop(1)

AttributeError: 'RDD' object has no attribute 'drop'

In [17]:
rdd_addDate.first()

Row(address="barrio La Dreta de l'Eixample", bathrooms=3, country='es', detailedType=Row(subTypology=None, typology='flat'), distance='679', district='Eixample', exterior=True, externalReference='BCN21749', floor=None, has360=False, has3DTour=True, hasLift=True, hasPlan=True, hasStaging=False, hasVideo=False, latitude=41.3983667, longitude=2.169047, municipality='Barcelona', neighborhood="La Dreta de l'Eixample", newDevelopment=False, numPhotos=36, operation='sale', parkingSpace=None, price=700000.0, priceByArea=3415.0, propertyCode='88080251', propertyType='flat', province='Barcelona', rooms=6, showAddress=False, size=205.0, status='renew', suggestedTexts=Row(subtitle="La Dreta de l'Eixample, Barcelona", title='Piso'), thumbnail='https://img3.idealista.com/blur/WEB_LISTING/0/id.pro.es.image.master/3e/a4/94/743378551.jpg', topNewDevelopment=False, url='https://www.idealista.com/inmueble/88080251/', date='2021_02_22')

In [14]:
transform_rdd = rdd_addDate.map(lambda x: (x['propertyCode'],
                                           x['propertyType'],
                                            x['date'],
                                            x['operation'],
                                            x['country'],
                                            x['municipality'],
                                            x['province'],
                                            x['district'], 
                                            x['neighborhood'],
                                            x['price'],
                                            x['priceByArea'],
                                            x['rooms'],
                                            x['bathrooms'],
                                            x['size'],
                                            x['status'],
                                            x['floor'],
                                            x['hasLift'],
                                            x['parkingSpace'],
                                            x['newDevelopment'],
                                            x['numPhotos'],
                                            x['distance'], 
                                            x['exterior']))

#.filter(lambda row: all(x is not None for x in row)).distinct()

In [15]:
transform_rdd.take(2)

[('88080251',
  'flat',
  '2021_02_22',
  'sale',
  'es',
  'Barcelona',
  'Barcelona',
  'Eixample',
  "La Dreta de l'Eixample",
  700000.0,
  3415.0,
  6,
  3,
  205.0,
  'renew',
  None,
  True,
  None,
  False,
  36,
  '679',
  True),
 ('91956445',
  'flat',
  '2021_02_22',
  'sale',
  'es',
  'Barcelona',
  'Barcelona',
  'Eixample',
  "La Dreta de l'Eixample",
  1975000.0,
  8700.0,
  4,
  3,
  227.0,
  'good',
  None,
  True,
  None,
  False,
  30,
  '634',
  True)]

In [16]:
transform_rdd.count()

50

In [17]:
df2 = spark.read.parquet("landing/persistent/idealista/2020_09_05_idealista/part-00000-8f1ef1af-429d-4348-b6f5-3466876f0efa-c000.snappy.parquet")

In [18]:
rdd_addDate2 = df2.withColumn("date", lit('2020_09_05')).rdd

In [19]:
rdd_addDate2.count()

347

In [20]:
transform_rdd2 = rdd_addDate2.map(lambda x: (x['propertyCode'],
                                             x['propertyType'],
                                             x['date'],
                                            x['operation'],
                                            x['country'],
                                            x['municipality'],
                                            x['province'],
                                            x['district'], 
                                            x['neighborhood'],
                                            x['price'],
                                            x['priceByArea'],
                                            x['rooms'],
                                            x['bathrooms'],
                                            x['size'],
                                            x['status'],
                                            x['floor'],
                                            x['hasLift'],
                                            x['parkingSpace'],
                                            x['newDevelopment'],
                                            x['numPhotos'],
                                            x['distance'], 
                                            x['exterior']))


In [21]:
test_rdd2 = transform_rdd2.filter(lambda row: all(x is not None for x in row)).distinct()
test_rdd2.count()

7

In [22]:
transform_rdd2.take(2)

[('91224583',
  'flat',
  '2020_09_05',
  'sale',
  'es',
  'Hospitalet de Llobregat',
  'Barcelona',
  'La Florida - Les Planes',
  'Les Planes',
  180000.0,
  2195.0,
  3,
  1,
  82.0,
  'good',
  '5',
  True,
  None,
  False,
  25,
  '431',
  True),
 ('91944839',
  'flat',
  '2020_09_05',
  'sale',
  'es',
  'Hospitalet de Llobregat',
  'Barcelona',
  'Centre',
  'Sant Josep',
  159000.0,
  2208.0,
  3,
  1,
  72.0,
  'good',
  '8',
  True,
  None,
  False,
  9,
  '427',
  True)]

In [23]:
transform_rdd2.count()

347

In [24]:
union_rdd = transform_rdd.union(transform_rdd2)
union_rdd.count()

397

In [70]:
parq_files

{'2021_02_22': 'landing/persistent/idealista/2021_02_22_idealista/part-00000-10c3d7de-11b8-44a9-907f-6f0a7d7051d0-c000.snappy.parquet',
 '2020_03_16': 'landing/persistent/idealista/2020_03_16_idealista/part-00000-88f16d66-e7f6-4fa0-9ab8-bac88dc5adfd-c000.snappy.parquet',
 '2020_01_13': 'landing/persistent/idealista/2020_01_13_idealista/part-00000-0b5efacb-48eb-472a-9b87-a863fe92e503-c000.snappy.parquet',
 '2020_02_10': 'landing/persistent/idealista/2020_02_10_idealista/part-00000-5dcf60c1-1a7f-41f2-967e-5cead88ecf36-c000.snappy.parquet',
 '2020_06_13': 'landing/persistent/idealista/2020_06_13_idealista/part-00000-c6a1025d-3527-418f-a90d-eb45a03f51ed-c000.snappy.parquet',
 '2020_04_16': 'landing/persistent/idealista/2020_04_16_idealista/part-00000-5271a859-1abd-4195-8c2c-a6cf2a4ab961-c000.snappy.parquet',
 '2020_07_15': 'landing/persistent/idealista/2020_07_15_idealista/part-00000-a946f2f4-f32a-405c-aebe-5a78c8a06979-c000.snappy.parquet',
 '2020_10_25': 'landing/persistent/idealista/202