# Modules

In [1]:
import findspark
findspark.init()

In [2]:
from delta import *

In [3]:
import os
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
import configparser
import realstate_utils as rs


In [4]:
config = configparser.ConfigParser()
config.read('config.ini')

path = config.get('PATH', 'PATH_DATA')
url = config.get('URL', 'URL')
page = int(config.get('PAGE', 'page'))

# Processing

In [5]:
builder = pyspark.sql.SparkSession.builder \
    .appName("Real_state") \
    .config("spark.hadoop.home", "C:\hadoop-3.3.5\bin") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [5]:
spark.active()

In [4]:
print(url)

https://www.citya.com/annonces/vente/appartement,maison?sort=b.dateMandat&direction=desc


In [6]:
data = rs.scrapping(url,page)



  soup = bs(html, "html")


In [7]:
# Définissez le schéma du DataFrame
schema = StructType([
    StructField("id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("surface", FloatType(), True),
    StructField("price", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("number_pieces", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("description", StringType(), True)
    
])
df = spark.createDataFrame(data,schema)

In [8]:
df.count()

5567

In [9]:
df.show(1,False)

+---+-----------------------------------------+-------+-------+-------------+-----------+-------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
df_final= df.select("id","type","city","postal_code","number_pieces","surface","price")


In [12]:
df_final.show()

+---+-----------+-------------------+-----------+-------------+-------+------+
| Id|       type|               city|postal_code|number_pieces|surface| price|
+---+-----------+-------------------+-----------+-------------+-------+------+
|  0|Appartement|            Quimper|      29000|            2|   45.0| 98500|
|  1|     Maison|           Bergerac|      24100|            5|   92.0|183000|
|  2|Appartement|             Fréjus|      83600|            3|   63.0|399000|
|  3|Appartement|             Fréjus|      83600|            3|   63.0|399000|
|  4|Appartement|Saint Cyr sur Loire|      37540|            5|   96.0|255000|
|  5|Appartement|           Archamps|      74160|            2|  51.16|320000|
|  6|Appartement|           Archamps|      74160|            3|   64.0|393500|
|  7|Appartement|           Archamps|      74160|            2|   48.0|302500|
|  8|Appartement|           Archamps|      74160|            2|   46.0|302500|
|  9|Appartement|           Archamps|      74160|   

In [29]:
df.groupby("type").count().show()

+-----------+-----+
|       type|count|
+-----------+-----+
|Appartement| 4244|
|     Maison| 1311|
|    Demeure|   12|
+-----------+-----+



In [30]:
# Enrégistrement des résultats dans un parquet en fonction du type de logement 
df_final.write.partitionBy("type")\
        .mode("overwrite").parquet("../output/parquet/realstate.parquet")

In [11]:
print(spark.sparkContext.getConf().get("spark.jars.packages"))

io.delta:delta-spark_2.12:3.1.0


In [12]:
# Enrégistrement des résultats dans un delta lake  
df_final.write.format("delta")\
  .mode("overwrite").save("../output/delta/realstate")