## Data Preparation

In [4]:
import pandas as pd
import numpy as np
from pandas.io.json import json_normalize
from pymongo import MongoClient
import json, re
from re import sub
from decimal import Decimal
import dask.dataframe as dd
from dask.distributed import Client, progress
import dask.dataframe as dd
from dask.delayed import delayed

### Dask Client

In [5]:
client = Client(n_workers=4, threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:40679  Dashboard: http://127.0.0.1:37127/status,Cluster  Workers: 4  Cores: 4  Memory: 8.26 GB


### Database Connection

In [6]:
DB_USER = 'anutibara'
DB_PASS = 'anutibara'
DB_HOST = 'scraping-cluster-7dtgt.gcp.mongodb.net'
DB_NAME = 'scraping_db'

try:
    client = MongoClient(f'mongodb+srv://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}')
    print("Database connected successfully")
except Exception as e:
    print("Error to connect to database: ", e)
db = client.get_database(DB_NAME)
properties = db.properties

Database connected successfully


### Database Queries

In [7]:
properties.count_documents({})

14209

### Convert Mongo Collection to DataFrame

#### New Properties

In [8]:
new_properties_json = list(properties.find({ 'use': 'Nuevo' }))

In [22]:
df_general_info = json_normalize(new_properties_json, record_path='offersType', 
                                meta=['_id', 
                                    'scrapingDate', 
                                    'scrapingHour', 
                                    'modifyDate',
                                    'modifyHour', 
                                    'code', 
                                    'status', 
                                    'type', 
                                    'use', 
                                    'nameProject', 
                                    'description'])
ddf_general_info = dd.from_pandas(df_general_info, npartitions=10)

In [24]:
df_location = json_normalize(new_properties_json, record_path='location', meta='urlProperty')
ddf_location = dd.from_pandas(df_location, npartitions=10)

In [11]:
ddf_new_properties = dd.merge(ddf_general_info, ddf_location)

In [12]:
df_owner_property = json_normalize(new_properties_json, record_path='ownerProperty', meta='urlProperty')
ddf_owner_property = dd.from_pandas(df_owner_property, npartitions=10)

In [13]:
ddf_new_properties = dd.merge(ddf_new_properties, ddf_owner_property)

In [14]:
df_features = json_normalize(new_properties_json, record_path='features', meta='urlProperty')
ddf_features = dd.from_pandas(df_features, npartitions=10)
new_columns = ['admon_price', 'antiquity', 'general_bathrooms', 'condition', 'construction_area',
               'floor', 'garages', 'includes_administration', 'interior_floors', 'range_prices', 
               'range_private_area', 'general_rooms', 'square_meters', 'square_meters_price', 'stratum', 
               'weather', 'urlProperty']
ddf_features = ddf_features.rename(columns=dict(zip(ddf_features.columns, new_columns)))

In [15]:
ddf_new_properties = dd.merge(ddf_new_properties, ddf_features)

In [16]:
df_more_features = json_normalize(new_properties_json, record_path='moreFeatures', meta='urlProperty')
ddf_more_features = dd.from_pandas(df_more_features, npartitions=10)

In [17]:
ddf_new_properties = dd.merge(ddf_new_properties, ddf_more_features)

#### Old Properties

In [18]:
old_properties_json = list(properties.find({ 'use': 'Usado' }))
df_general_info = pd.DataFrame(old_properties_json, 
                                    columns=['_id', 
                                    'urlProperty', 
                                    'scrapingDate', 
                                    'scrapingHour', 
                                    'modifyDate',
                                    'modifyHour', 
                                    'code', 
                                    'status', 
                                    'type', 
                                    'use', 
                                    'nameProject', 
                                    'description',
                                    'offersType'])
df_location = json_normalize(old_properties_json, 'location')
df_owner_property = json_normalize(old_properties_json, 'ownerProperty')
df_features = json_normalize(old_properties_json, 'features')
df_more_features = json_normalize(old_properties_json, 'moreFeatures')
df_old_properties = pd.concat([df_general_info, df_location, df_owner_property, 
                df_features, df_more_features], axis=1)

### Rename DataFrame Columns

#### New Properties

In [19]:
df_new_properties.rename(columns= {
                        "_id": "id_mongoose", 
                        "urlProperty":"id_property",
                        "scrapingDate": "scraping_date",
                        "scrapingHour": "scraping_hour",
                        "modifyDate": "modify_date",
                        "modifyHour": "modify_hour",
                        "status": "active",
                        "use": "new_property",
                        "nameProject": "name_project",
                        "offersType": "offers_type",
                        "id": "id_owner_property",
                        "name": "name_owner_property",
                        "contractType": "contract_type_owner_property",
                        "financing": "financing_owner_property",
                        "schedule": "schedule_owner_property",
                        "squareMeters": "range_square_meters",
                        "constructionArea": "range_construction_area",
                        "squareMetersPrice": "square_meters_price",
                        "interiorFloors": "interior_floors",
                        "includesAdministration": "includes_administration",
                        "admonPrice": "admon_price",
                        "interiorFeatures": "interior_features",
                        "exteriorFeatures": "exterior_features",
                        "sectorFeatures": "sector_features",
                        "offerType": "offer_type",
                        "privateArea": "private_area"
                    }, 
          inplace = True)

NameError: name 'df_new_properties' is not defined

#### Old Properties

In [None]:
df_old_properties.rename(columns= {
                        "_id": "id_mongoose", 
                        "urlProperty":"id_property",
                        "scrapingDate": "scraping_date",
                        "scrapingHour": "scraping_hour",
                        "modifyDate": "modify_date",
                        "modifyHour": "modify_hour",
                        "status": "active",
                        "use": "new_property",
                        "nameProject": "name_project",
                        "offersType": "offers_type",
                        "id": "id_owner_property",
                        "name": "name_owner_property",
                        "contractType": "contract_type_owner_property",
                        "financing": "financing_owner_property",
                        "schedule": "schedule_owner_property",
                        "squareMeters": "square_meters",
                        "privateArea": "private_area",
                        "constructionArea": "construction_area",
                        "squareMetersPrice": "square_meters_price",
                        "interiorFloors": "interior_floors",
                        "includesAdministration": "includes_administration",
                        "admonPrice": "admon_price",
                        "interiorFeatures": "interior_features",
                        "exteriorFeatures": "exterior_features",
                        "sectorFeatures": "sector_features"
                    }, 
          inplace = True)

### Data Cleaning

#### New Properties

In [None]:
df_new_properties['active'] = df_new_properties['active'].apply(lambda status: True if (status == 'Active') else False)
df_new_properties['new_property'] = df_new_properties['new_property'].apply(lambda status: True if (status == 'Nuevo') else False)
df_new_properties['includes_administration'] = df_new_properties['includes_administration'].apply(lambda status: True if (status == 'Nuevo') else False)
df_new_properties['garages'] = df_new_properties['garages'].replace('', value = 0, regex = True).astype(int)
df_new_properties['stratum'] = df_new_properties['stratum'].replace('', value = 0, regex = True).astype(int)
df_new_properties['floor'] = df_new_properties['floor'].astype(int)
df_new_properties['area'] = df_new_properties['area'].astype(float)
df_new_properties['private_area'] = df_new_properties['private_area'].replace('', value = 0, regex = True)
df_new_properties['private_area'] = df_new_properties['private_area'].astype(float)
df_new_properties['rooms'] = df_new_properties['rooms'].astype(int)
df_new_properties['bathrooms'] = df_new_properties['bathrooms'].astype(int)
df_new_properties['price'] = df_new_properties['price'].apply(lambda price: Decimal(sub(r'[^\d,]', '', price))).astype(int)

#### Old Properties

In [None]:
df_old_properties['active'] = df_old_properties['active'].apply(lambda status: True if (status == 'Active') else False)
df_old_properties['new_property'] = df_old_properties['new_property'].apply(lambda status: True if (status == 'Nuevo') else False)
df_old_properties['includes_administration'] = df_old_properties['includes_administration'].apply(lambda status: True if (status == 'Nuevo') else False)
df_old_properties['garages'] = df_old_properties['garages'].replace('', value = 0, regex = True)
df_old_properties['garages'] = df_old_properties['garages'].replace('Más de 10', value = 0, regex = True)
df_old_properties['garages'] = df_old_properties['garages'].astype(int)
df_old_properties['stratum'] = df_old_properties['stratum'].replace('', value = 0, regex = True)
df_old_properties['stratum'] = df_old_properties['stratum'].replace('Campestre', value = 0, regex = True)
df_old_properties['stratum'] = df_old_properties['stratum'].astype(int)
df_old_properties['price'] = df_old_properties['price'].apply(lambda price: Decimal(sub(r'[^\d,]', '', price))).astype(int)
df_old_properties['square_meters'] = df_old_properties['square_meters'].str[0:-3]
df_old_properties['square_meters'] = df_old_properties['square_meters'].apply(lambda meters: Decimal(sub(r'[^\d,]', '', meters))).astype(float)
df_old_properties['private_area'] = df_old_properties['private_area'].str[0:-2]
df_old_properties['private_area'] = df_old_properties['private_area'].replace('', value = '0', regex = True)
df_old_properties['private_area'] = df_old_properties['private_area'].apply(lambda area: Decimal(sub(r'[^\d.]', '', area))).astype(float)
df_old_properties['construction_area'] = df_old_properties['construction_area'].str[0:-3]
df_old_properties['construction_area'] = df_old_properties['construction_area'].apply(lambda area: Decimal(sub(r'[^\d,]', '', area))).astype(float)
df_old_properties['floor'] = df_old_properties['floor'].astype(int)

### Testing

#### New Properties

In [None]:
df_new_properties

#### Old Properties

In [None]:
df_old_properties