In [1]:
import os
import sys
import socket

import sys,uuid,datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

key = os.environ["MINIO_ROOT_USER"]
secret = os.environ["MINIO_ROOT_PASSWORD"]
endpoint = os.environ["MINIO_SECRET_ENDPOINT"]
endpoint = "http://127.0.0.1:9000"

print(endpoint)

#sc.stop()

spark = SparkSession.builder \
.master("k8s://https://kubernetes.docker.internal:6443") \
.appName("querying") \
.config("spark.hadoop.fs.s3a.access.key", key) \
.config("spark.hadoop.fs.s3a.secret.key", secret) \
.config("spark.hadoop.fs.s3a.endpoint", endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0,org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.config('spark.submit.deployMode', 'client') \
.config("spark.kubernetes.container.image", "spark:spark-docker") \
.config("spark.kubernetes.pyspark.pythonVersion", "3") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "default") \
.config("spark.executor.instances", "1") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.kubernetes.executor.request.cores","0.5") \
.config("spark.kubernetes.executor.limit.cores","1") \
.config("jupyterService.jupyterPort", "30888") \
.config("serviceAccount", "spark") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()


#os.environ['PYSPARK_PYTHON'] = 'python3'
#os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'

#.config("spark.driver.host", "10.1.2.104") \
#.config("spark.driver.port", "4040") \

sc = spark.sparkContext
#sc._conf.getAll()

http://127.0.0.1:9000


In [2]:
spark.sql("SELECT COUNT(*) FROM delta.`s3a://real-estate/lake/bronze/property`").show()

+--------+
|count(1)|
+--------+
|      25|
+--------+



In [3]:
df_p = spark.sql("SELECT * FROM delta.`s3a://real-estate/lake/bronze/property`")
df_p.columns
df_p.printSchema()

root
 |-- propertyDetails_accountId: long (nullable = true)
 |-- propertyDetails_availableFrom: string (nullable = true)
 |-- propertyDetails_availableFromFormatted: string (nullable = true)
 |-- propertyDetails_cityId: long (nullable = true)
 |-- propertyDetails_cityName: string (nullable = true)
 |-- propertyDetails_contactFormTypeId: long (nullable = true)
 |-- propertyDetails_countryId: long (nullable = true)
 |-- propertyDetails_description: string (nullable = true)
 |-- propertyDetails_geoAccuracy: long (nullable = true)
 |-- propertyDetails_hasNewBuildingProject: boolean (nullable = true)
 |-- propertyDetails_hasVirtualTour: boolean (nullable = true)
 |-- propertyDetails_id: long (nullable = true)
 |-- propertyDetails_isBuyRent: string (nullable = true)
 |-- propertyDetails_isHighlighted: boolean (nullable = true)
 |-- propertyDetails_isNew: boolean (nullable = true)
 |-- propertyDetails_isNewEndDate: string (nullable = true)
 |-- propertyDetails_isOnline: boolean (nullable = tr

In [None]:
spark.sql("""SELECT propertyDetails_searchCity
                  , COUNT(*) AS cnt_of_properties
                  , propertyDetails_attributesOutside_propChildFriendly AS childFriendly
                  , AVG(propertyDetails_sellingPrice) AS AVG_sellingPrice
                  , AVG(propertyDetails_normalizedPrice) AS AVG_normalizedPrice
            FROM delta.`s3a://real-estate/lake/bronze/property`
            GROUP BY propertyDetails_searchCity
                    , propertyDetails_attributesOutside_propChildFriendly
            """).show()

In [None]:
spark.sql("""SELECT propertyDetails_searchCity
                  , propertyDetails_id
                  , CAST(propertyDetails_id AS STRING) || '_' || propertyDetails_normalizedPrice as fingerprint
                  , propertyDetails_normalizedPrice
            FROM delta.`s3a://real-estate/lake/bronze/property`
            WHERE propertyDetails_id IN (6246235, 6224170)
            """).show()

In [5]:
properties = [{
                'id': 6246235,
                'fingerprint': '6246235_1090000',
                'is_prefix': False,
                'rentOrBuy': 'buy',
                'city': 'Bern',
                'propertyType': 'house',
                'radius': 0,
                'last_normalized_price': 1090000,
            },{
                'id': 6224170,
                'fingerprint': '6224170_1405000',
                'is_prefix': False,
                'rentOrBuy': 'buy',
                'city': 'Bern',
                'propertyType': 'house',
                'radius': 0,
                'last_normalized_price': 1405000,
            }
            ,{
                'id': 123,
                'fingerprint': '123_1405000',
                'is_prefix': False,
                'rentOrBuy': 'buy',
                'city': 'Bern',
                'propertyType': 'house',
                'radius': 0,
                'last_normalized_price': 1405000,
            }]

fingerprints : list = ["'" + p['fingerprint'] + "'" for p in properties]
fingerprints : str = ', '.join(fingerprints)
    
ids: list = [str(p['id']) for p in properties]
ids: str = ', '.join(ids)
    
print(ids)

6246235, 6224170, 123


In [24]:
# get a list of property_ids that have changed or new

cols = ['propertyDetails_id', 'fingerprint']
existing_props: list = (
    spark.sql(
        """SELECT propertyDetails_id
            , CAST(propertyDetails_id AS STRING)
                || '_'
                || propertyDetails_normalizedPrice AS fingerprint
        FROM delta.`s3a://real-estate/lake/bronze/property`
        WHERE propertyDetails_id IN ( {ids} )
        """.format(
            ids=ids
        )
    )
    .select('propertyDetails_id', 'fingerprint')
    .collect()
)
print(existing_props)

[Row(propertyDetails_id=6246235, fingerprint='6246235_1090000'), Row(propertyDetails_id=6224170, fingerprint='6224170_1455000')]


In [85]:
import pandas as pd
import pandasql as ps

pd_existing_props = pd.DataFrame(existing_props,columns=cols)
cols_PropertyDataFrame = [ 'id',
            'fingerprint',
            'is_prefix',
            'rentOrBuy',
            'city',
            'propertyType',
            'radius',
            'last_normalized_price']
pd_properties = pd.DataFrame(properties,columns=cols_PropertyDataFrame)

df_changed= ps.sqldf(
       """
        SELECT p.id, p.fingerprint, p.is_prefix, p.rentOrBuy, p.city, p.propertyType, p.radius, p.last_normalized_price
          FROM pd_properties p LEFT OUTER JOIN pd_existing_props e
            ON p.id = e.propertyDetails_id
            WHERE p.fingerprint != e.fingerprint
                 OR e.fingerprint IS NULL
        """)

changed_properties = []
for index, row in df_changed.iterrows():
    changed_properties.append(row.to_dict())

In [91]:
ids_changed = ', '.join(str(e) for e in df_changed['id'].tolist())
print(ids_changed)

6224170, 123


In [None]:
sc.stop()