In [0]:
'''
Author: Martand Singh
Date: 20 June 2021
Email: martandsays@gmail.com
Phone: +971-509185250
Task 1:
Create table/file named “customers” with following columns.
• CustomerId
• PropertyId
• CustomerName
• PropertyName
• Email
• Lease_Start_Date
• Lease_End_Date
• location
2) Consider below set of data for the table/file created above.
1,1001,Stephen, xyz,stephen@gmail.com,2020-06-04 09:34:35, 2021-06-03 12:34:35, MussaffahAbuDhabi
2,1231,Neelima,abc ,neelima123@yahoo.com,2021-02-017 07:48:53, 2022-02-16 06:34:35,HattaDubai
3,1345,Ahmed, klo,ahme67@hotmail.com,2019-04-05 04:21:30, 2020-04-04 12:40:05,
JebelAli-Dubai
4,1001,Mohammad,pou ,moha4@gmail.com,2018-11-04 04:23:15, 2021-11-03 10:51:35,
CreativeCity-Fujairah
5,1567,Wazir, tef,12waz@midas.ae,2021-09-23 07:14:25, 2022-09-22 01:42:21,
MediaCity-Sharjah


## I am creating a CSV named customers.csv and saving it to my DBFS location dbfs/FileStore/tables/customers.csv
## CSV contains exact same data and column names as given in the document.

## NOTE: As i am using databricks, In some places I am using display() to display formatted dataset. This function is specially available only
in databricks, So if you are using an environment othere than databricks, it may throw error.
'''

In [0]:
# Task 2: Using the above data create temp table in SQL or save file in temp location in dbfs zone
# We have create a file with the given data and uploaded it to DBFS at the path /FileStore/tables/customers.csv .

In [0]:
# We can explore the file here. Top 3 rows of the file
!cat /dbfs/FileStore/tables/customers.csv  | head -3  

In [0]:
# Step 2: Read file into dataframe.
# Task 1: Load data in a dataframe where column name is same as file headers
# As we want same headers, so we are using inline schema. It will take headers from file with the help of option("header", "true").
dfRaw = spark \
        .read \
        .option("header", "true") \
        .format("csv") \
        .load("/FileStore/tables/customers.csv")
  

In [0]:
dfRaw.show() # View dataset

In [0]:
'''
Task 3: Change the type for columns lease_start_date and lease_end_date to date datatype. Rename the
column to start_date and end_date and drop the original columns in the new dataframe.

-- As mentioned, we have to change the type of the column, rename and then drop. So I am creating 2 new columns with the names start_date, end_date with date types & dropping the original columns from the new dataset, as this is faster with less number of operations.

Task 4:  Extract only date from the above two date columns
-- As in above step our start_date & end_date has date type, so automatically it will show date only.
'''

df_trans = dfRaw \
          .withColumn("start_date", dfRaw["Lease_Start_Date"].cast("date") ) \
          .withColumn("end_date", dfRaw["Lease_End_Date"].cast("date") ) \
          .drop("Lease_Start_Date", "Lease_End_Date")

df_trans.printSchema()

In [0]:
df_trans.show()

In [0]:
# Task 5:Split the location into two separate columns, towns and cities and drop the existing column.

# import split() from  pyspark.sql.functions to split our location column with the pipe(|) character & take 1st (0th index) item of the result array as
# town & 2nd (1st index) as city. Then drop the location column
from pyspark.sql.functions import split

df_trans_address = df_trans \
            .withColumn("towns", split(df_trans["location"], '-')[0]) \
            .withColumn("cities", split(df_trans["location"], '-')[1]) \
            .drop("location")

df_trans_address.show()

In [0]:
'''
Task 6: Get the leasing period length in number of days - column name lease_diff_days. We use datediff function for this, which return date difference in days.
 
Task 7: Get the leasing period length in number of months - columne name lease_diff_months. We use months_between function here which return months between two dates.

Task 8: Get the leasing period length in number of months with two decimal point -column name lease_diff_months_2decimal. We use round function which round off any float number to the given decimal places.

Task 9: Get the leasing period length in number of months as absolute month and always round up to
higher no for eg 5.3 should be 6 and 4.7 should be 5 - column name lease_diff_months_absolute. We use ceil function, it always returns next whole integer.

'''
from pyspark.sql.functions import datediff, months_between, round, ceil

df_date_operated = df_trans_address \
            .withColumn("lease_diff_days", datediff(df_trans_address["end_date"], df_trans_address["start_date"] )) \
            .withColumn("lease_diff_months", months_between(df_trans_address["end_date"], df_trans_address["start_date"] ) )  \
            .withColumn("lease_diff_months_2decimal", round(months_between(df_trans_address["end_date"], df_trans_address["start_date"] ),  2) )  \
.withColumn("lease_diff_months_absolute", ceil(months_between(df_trans_address["end_date"], df_trans_address["start_date"] ) )  )  \



In [0]:
# Display function is only available on databricks, so if this does not work, please comment it and uncomment show()
display(df_date_operated)
# df_date_operated.show()

CustomerId,PropertyId,CustomerName,PropertyName,Email,start_date,end_date,towns,cities,lease_diff_days,lease_diff_months,lease_diff_months_2decimal,lease_diff_months_absolute
1,1001,Stephen,xyz,stephen@gmail.com,2020-06-04,2021-06-03,Mussaffah,AbuDhabi,364,11.96774194,11.97,12
2,1231,Neelima,abc,neelima123@yahoo.com,2021-02-17,2022-02-16,Hatta,Dubai,364,11.96774194,11.97,12
3,1345,Ahmed,klo,ahme67@hotmail.com,2019-04-05,2020-04-04,JebelAli,Dubai,365,11.96774194,11.97,12
4,1001,Mohammad,pou,moha4@gmail.com,2018-11-04,2021-11-03,CreativeCity,Fujairah,1095,35.96774194,35.97,36
5,1567,Wazir,tef,12waz@midas.ae,2021-09-23,2022-09-22,MediaCity,Sharjah,364,11.96774194,11.97,12


In [0]:
'''
 All those customers staying in Dubai would get an lease extension of 30 days show their new
extended lease date. Final display of the date should be in DD-MON-YYYY format.

- For this we are using CASE expression to decide our final lease date. date_format & date_add to format date & extending lease date respectively.
'''

from pyspark.sql.functions import expr, date_add, date_format
df_final = (df_date_operated \
.withColumn("extended_lease", expr("CASE LOWER(cities) WHEN 'dubai' THEN date_format(date_add(end_date, 30), 'dd-MMM-yyyy') ELSE  date_format( end_date, 'dd-MMM-yyyy') END") ))

In [0]:
display(df_final)

CustomerId,PropertyId,CustomerName,PropertyName,Email,start_date,end_date,towns,cities,lease_diff_days,lease_diff_months,lease_diff_months_2decimal,lease_diff_months_absolute,extended_lease
1,1001,Stephen,xyz,stephen@gmail.com,2020-06-04,2021-06-03,Mussaffah,AbuDhabi,364,11.96774194,11.97,12,03-Jun-2021
2,1231,Neelima,abc,neelima123@yahoo.com,2021-02-17,2022-02-16,Hatta,Dubai,364,11.96774194,11.97,12,18-Mar-2022
3,1345,Ahmed,klo,ahme67@hotmail.com,2019-04-05,2020-04-04,JebelAli,Dubai,365,11.96774194,11.97,12,04-May-2020
4,1001,Mohammad,pou,moha4@gmail.com,2018-11-04,2021-11-03,CreativeCity,Fujairah,1095,35.96774194,35.97,36,03-Nov-2021
5,1567,Wazir,tef,12waz@midas.ae,2021-09-23,2022-09-22,MediaCity,Sharjah,364,11.96774194,11.97,12,22-Sep-2022


In [0]:
'''
Write the final dataframe with all above changes into SQL DB/Newfile location.
- As per your document choice is given between SQL DB & NEw file location. I will be choosing New file option. So saving file to new DBFS location.
'''

df_final.write.format("csv").save("/dubai_holding/final_lease_data")

In [0]:
# Verify final csv file. Here we can see our final csv is saved in form of csv. We can partition our data based on date or city depending on our filter criteria. but as our data is very less, it doesn't make any sense to do it now.
!ls /dbfs/dubai_holding/final_lease_data

In [0]:
!cat  /dbfs/dubai_holding/final_lease_data/'part-00000-tid-1757011429137729195-d2bb2160-81cb-42b8-8e59-c0123f8aafc4-19-1-c000.csv'

In [0]:
# Summary  -final data summary (optional)

df_final.groupBy(df_final["cities"]).agg({'cities': "count"}).withColumnRenamed("count(cities)", "Total_Properties").show()

In [0]:
# FINISH - My submissio ends here. After this it is just a short summary of how to make temp table using pyspark and use it for the same task.

In [0]:
# Above all things can be done by SQL API also. We just have to generate either delta table or a temp table(slower than delta tables).
dfRaw.createOrReplaceTempView("lease_data_temp") # this will generate a loca temp table which we can use using SQL API
# As there was a choice given between pyspark or any language. So I am just submiting my assessment for pyspark

In [0]:
%sql
SELECT
  *
FROM
  lease_data_temp

CustomerId,PropertyId,CustomerName,PropertyName,Email,Lease_Start_Date,Lease_End_Date,location
1,1001,Stephen,xyz,stephen@gmail.com,2020-06-04 09:34:35,2021-06-03 12:34:35,Mussaffah-AbuDhabi
2,1231,Neelima,abc,neelima123@yahoo.com,2021-02-017 07:48:53,2022-02-16 06:34:35,Hatta-Dubai
3,1345,Ahmed,klo,ahme67@hotmail.com,2019-04-05 04:21:30,2020-04-04 12:40:05,JebelAli-Dubai
4,1001,Mohammad,pou,moha4@gmail.com,2018-11-04 04:23:15,2021-11-03 10:51:35,CreativeCity-Fujairah
5,1567,Wazir,tef,12waz@midas.ae,2021-09-23 07:14:25,2022-09-22 01:42:21,MediaCity-Sharjah


In [0]:
%sql 
SHOW COLUMNS FROM lease_data_temp


col_name
CustomerId
PropertyId
CustomerName
PropertyName
Email
Lease_Start_Date
Lease_End_Date
location
