In [1]:
from pyspark.sql import *
from pyspark import *
from pyspark.sql import * #SparkSession, Row
from pyspark.sql.functions import * #udf, concat, col, lit, ltrim, rtrim
from pyspark.sql.types import *
from pyspark.sql.window import Window
global applicationName

applicationName = "testspark"
spark = (SparkSession
    .builder
    .appName(applicationName)
    .enableHiveSupport()
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    .getOrCreate())

## Import the library

In [1]:
from myETLcode import *

### Read in dataframe

In [11]:
df = spark.read.csv('house_prices.csv', header=True)

In [4]:
df.show(5)

+--------------------+--------------------+--------+--------------------+------------------+--------------------+----------+-----------+-------------+--------+
|             address|           amenities|     lat|                link|               lon|                name|     price|prize_range|         size|  tenure|
+--------------------+--------------------+--------+--------------------+------------------+--------------------+----------+-----------+-------------+--------+
|Jalan Cheras Inta...|3 Bedroom(s),2 Ba...|3.072998|https://www.iprop...|        101.774192|Cheras Intan, Cheras|RM 265,000|        Low|  750 sq. ft.|Freehold|
|Jalan 6/95B, Tama...|3 Bedroom(s),2 Ba...| 3.10169|https://www.iprop...|101.75050999999999|Ketumbar Heights,...|RM 295,000|        Low|  755 sq. ft.|Freehold|
|Mont Kiara, KL, K...|4+1 Bedroom(s),5 ...|     0.0|https://www.iprop...|               0.0|10 Mont Kiara @ M...| RM 10,000|        Low|3,668 sq. ft.|Freehold|
|Jalan Langkawi, S...|3 Bedroom(s),2 Ba.

### Convert datatype of the prices 
- regexp_extract( col, regex_expression, index)     : extract *265,000* from *RM 265,000*
- regexp_replace( col, regex_expression, replacing) : replace '*,*' with space
- cast biginteger type to the number  

In [12]:
df = df.withColumn('prices', regexp_extract(col('price'),'RM (.+)',1))\
        .withColumn('prices', regexp_replace(col('prices'),',',''))\
        .withColumn('prices', col('prices').cast('bigint'))

In [39]:
df.select('price','prices').show(3)

+----------+------+
|     price|prices|
+----------+------+
|RM 265,000|265000|
|RM 295,000|295000|
| RM 10,000| 10000|
+----------+------+
only showing top 3 rows



In [22]:
df = df.withColumn('lat', round('lat',3))\
       .withColumn('lon',round('lon',3))

### Simple data profile on selected columns (only applies to small data)

In [57]:
df_prof = data_profile(df_my.select('House Name','amenities','prices'), spark.sparkContext)

In [59]:
df_prof.show()

+----------+--------+--------+-----+------------------+-----------------+-----+------+----+
|    column|datatype|dist len|count|              mean|             s.d.|  min|   max|null|
+----------+--------+--------+-----+------------------+-----------------+-----+------+----+
|    prices|  bigint|      76|  239|312650.44351464434|53912.15545023094|10000|380000|   0|
| amenities|  string|      32|  239|                  |                 |     |      |   0|
|House Name|  string|     145|  239|                  |                 |     |      |   0|
+----------+--------+--------+-----+------------------+-----------------+-----+------+----+



---

I have found that when massive ETL's involved, there are a lot of work needed to write the spark codes to perform transformation. To make my job easy, I wrote functions and wrap up spark original methods.
The below will demo the difference between using spark original methods and the wrapped up methods.

- `df_sp` used standard spark methods
- `df_my` used my library methods

# Library

### Renaming 

```
rename_col(df,rename_map):
```
Rename the columns according to the renaming in dictionary

In [23]:
df_sp = df.withColumnRenamed('lat','latitude')\
          .withColumnRenamed('lon','longitude')\
          .withColumnRenamed('name','House Name')
        
df_sp.columns

['address',
 'amenities',
 'latitude',
 'link',
 'longitude',
 'House Name',
 'price',
 'prize_range',
 'size',
 'tenure',
 'prices']

In [24]:
df_my = rename_col(df, {'lat' :'latitude',\
                        'lon' :'longitude',\
                        'name':'House Name'})

df.columns

['address',
 'amenities',
 'lat',
 'link',
 'lon',
 'name',
 'price',
 'prize_range',
 'size',
 'tenure',
 'prices']

### Dropping columns
```python
    iter_drop(df, drop_cols)
```
Drop columns in the list

In [25]:
df_sp = df_sp.drop('price','tenure','link','address')
df_sp.columns

['amenities',
 'latitude',
 'longitude',
 'House Name',
 'prize_range',
 'size',
 'prices']

In [110]:
drop_cols = ['price','tenure','link','address']
df_my = iter_drop(df_my, drop_cols)
df_my.columns

['amenities',
 'latitude',
 'longitude',
 'House Name',
 'prize_range',
 'size',
 'prices',
 'price_category']

### If-Else
```python
recursive_cond(df, conditions,NA ='NA'):
```
alternative for when().otherwise()

In [28]:
df_sp = df_sp.withColumn('price_category', \
                                when((col('prices')>=200000)&(col('prices')< 250000),'200k - 250k')\
                     .otherwise(when((col('prices')>=250000)&(col('prices')< 300000),'250k - 300k')\
                     .otherwise(when((col('prices')>=300000)&(col('prices')< 350000),'300k - 350k')\
                     .otherwise(when((col('prices')>=350000)&(col('prices')< 400000),'350k - 400k')\
                     .otherwise('Others')))))

In [69]:
df_sp.select('price_category').distinct().show()

+--------------+
|price_category|
+--------------+
|   300k - 350k|
|        Others|
|   350k - 400k|
|   200k - 250k|
|   250k - 300k|
+--------------+



In [29]:
conditions = [(lambda df: (df['prices']>=200000) & (df['prices']<250000) ,'200k - 250k'),
              (lambda df: (df['prices']>=250000) & (df['prices']<300000) ,'250k - 300k'),
              (lambda df: (df['prices']>=300000) & (df['prices']<350000) ,'300k - 350k'),
              (lambda df: (df['prices']>=350000) & (df['prices']<400000) ,'350k - 400k')]

df_my = df_my.withColumn('price_category', recursive_cond(df_my, conditions, 'Others'))

In [62]:
df_my.select('price_category').distinct().show()

+--------------+
|price_category|
+--------------+
|   300k - 350k|
|        Others|
|   350k - 400k|
|   200k - 250k|
|   250k - 300k|
+--------------+



### Aggregation
```python
aggregation(df, group_col, agg_cols)
```

In [84]:
df_sp_agg = df_sp.groupBy('prize_range')\
                 .agg(count('House Name').alias('no_of_house'),
                      min('prices').alias('min_price'),
                      max('prices').alias('max_price'),
                      countDistinct('price_category').alias('price_category'))
df_sp_agg.show()

+-----------+-----------+---------+---------+--------------+
|prize_range|no_of_house|min_price|max_price|price_category|
+-----------+-----------+---------+---------+--------------+
|       High|        151|   300000|   380000|             2|
|        Low|         88|    10000|   300000|             4|
+-----------+-----------+---------+---------+--------------+



In [83]:
df_my_agg = aggregation(df_my, 'prize_range', {'House Name'    :{'no_of_house':'count'},
                                               'prices'        :{'min_price'  :'min',
                                                                 'max_price'  :'max'},
                                               'price_category':'countDistinct'})
df_my_agg.show()

+-----------+---------+---------+--------------+-----------+
|prize_range|max_price|min_price|price_category|no_of_house|
+-----------+---------+---------+--------------+-----------+
|       High|   380000|   300000|             2|        151|
|        Low|   300000|    10000|             4|         88|
+-----------+---------+---------+--------------+-----------+



### Joining

```python
joining(x,df1,df2,how,drop)

how: left, right, inner
drop: True, False
```

extra methods:
- `select_join(select_columns, rename_column_dict)`
- `join_alias(join_on_col_dict, rename_column_dict)`

In [146]:
def show_join_results(df):
    print('Columns:')
    print(','.join(df.columns))
    print('\nTable:')
    df.select('House Name','price_category','prize_range','drill_cate').sort('House Name').show(10)

#### Create an anonymous dataframe for joining

In [160]:
df_to_join = spark.createDataFrame([['Low','200k - 250k','Cheap' ,'Extra1'],\
                                    ['Low','250k - 300k','Middle Low','Extra2'],
                                    ['Low','300k - 350k','Middle','Extra3'],
                                    ['High','350k - 400k','High','Extra4']], \
                                    schema=['prize_range','category','drill','UnwantCol'])

In [161]:
df_to_join.show()

+-----------+-----------+----------+---------+
|prize_range|   category|     drill|UnwantCol|
+-----------+-----------+----------+---------+
|        Low|200k - 250k|     Cheap|   Extra1|
|        Low|250k - 300k|Middle Low|   Extra2|
|        Low|300k - 350k|    Middle|   Extra3|
|       High|350k - 400k|      High|   Extra4|
+-----------+-----------+----------+---------+



Joining the two dataframe on 


df            | df_to_join    
------------- |--------------
prize_range   | range        
price_category| category     

drop away `category` only after joining then rename `drill` --> `drill_cate`

In [165]:
df_sp_join = df_sp.join(df_to_join.select('prize_range','category','drill'), \
                        on=[df_sp['prize_range']    == df_to_join['prize_range'],
                            df_sp['price_category'] == df_to_join['category']], \
                        how='left')\
                  .drop(df_to_join['prize_range']).drop('category')\
                  .withColumnRenamed('drill','drill_cate')

show_join_results(df_sp_join)

Columns:
amenities,latitude,longitude,House Name,prize_range,size,prices,price_category,drill_cate

Table:
+--------------------+--------------+-----------+----------+
|          House Name|price_category|prize_range|drill_cate|
+--------------------+--------------+-----------+----------+
|10 Mont Kiara @ M...|        Others|        Low|      null|
|Akasia Apartments...|   250k - 300k|        Low|Middle Low|
|Akasia Apartments...|   300k - 350k|       High|      null|
|Alam Idaman, Shah...|   350k - 400k|       High|      High|
|Alam Sanjung, Sub...|        Others|        Low|      null|
|Alam Sanjung, Sub...|        Others|        Low|      null|
|    Aman Dua, Kepong|   250k - 300k|        Low|Middle Low|
|    Aman Dua, Kepong|   250k - 300k|        Low|Middle Low|
|Aman Puri Apartme...|   250k - 300k|        Low|Middle Low|
|Amara Boulevard A...|   250k - 300k|        Low|Middle Low|
+--------------------+--------------+-----------+----------+
only showing top 10 rows



In [166]:
df_my_join1 = joining(df1 = df_my, 
                      df2 = df_to_join.select('prize_range','category','drill'), 
                       x = {'prize_range':'prize_range',
                           'price_category':'category'})

df_my_join1 = rename_col(df_my_join1, {'drill':'drill_cate'})

show_join_results(df_my_join1)

Columns:
amenities,latitude,longitude,House Name,prize_range,size,prices,price_category,drill_cate

Table:
+--------------------+--------------+-----------+----------+
|          House Name|price_category|prize_range|drill_cate|
+--------------------+--------------+-----------+----------+
|10 Mont Kiara @ M...|        Others|        Low|      null|
|Akasia Apartments...|   250k - 300k|        Low|Middle Low|
|Akasia Apartments...|   300k - 350k|       High|      null|
|Alam Idaman, Shah...|   350k - 400k|       High|      High|
|Alam Sanjung, Sub...|        Others|        Low|      null|
|Alam Sanjung, Sub...|        Others|        Low|      null|
|    Aman Dua, Kepong|   250k - 300k|        Low|Middle Low|
|    Aman Dua, Kepong|   250k - 300k|        Low|Middle Low|
|Aman Puri Apartme...|   250k - 300k|        Low|Middle Low|
|Amara Boulevard A...|   250k - 300k|        Low|Middle Low|
+--------------------+--------------+-----------+----------+
only showing top 10 rows



##### OR

In [167]:
J_option = {}
J_option['sel'] = ['prize_range','category','drill']
J_option['rj'] = {'drill':'drill_cate',
                 'prize_range':'RANGE_RENAMED'}
J_option['jo'] = {'prize_range':'prize_range',
                  'price_category':'category'}

df_my_join2 = joining(df1 = df_my, 
                      df2 = df_to_join.select(*select_join(**J_option)), 
                      x = join_alias(**J_option),
                      drop=False)

show_join_results(df_my_join2)

Columns:
amenities,latitude,longitude,House Name,prize_range,size,prices,price_category,drill_cate,RANGE_RENAMED,category

Table:
+--------------------+--------------+-----------+----------+
|          House Name|price_category|prize_range|drill_cate|
+--------------------+--------------+-----------+----------+
|10 Mont Kiara @ M...|        Others|        Low|      null|
|Akasia Apartments...|   250k - 300k|        Low|Middle Low|
|Akasia Apartments...|   300k - 350k|       High|      null|
|Alam Idaman, Shah...|   350k - 400k|       High|      High|
|Alam Sanjung, Sub...|        Others|        Low|      null|
|Alam Sanjung, Sub...|        Others|        Low|      null|
|    Aman Dua, Kepong|   250k - 300k|        Low|Middle Low|
|    Aman Dua, Kepong|   250k - 300k|        Low|Middle Low|
|Aman Puri Apartme...|   250k - 300k|        Low|Middle Low|
|Amara Boulevard A...|   250k - 300k|        Low|Middle Low|
+--------------------+--------------+-----------+----------+
only showing top