#### Python script performs the below tasks:
* Ingests the data from csv file and creates dataframe
* Transforms the data (Change "id" datatype column to int)
* Creates database in lakehouse
* Creates database tables and loads data

#### Parameters:
* **filename** - This represents the filename which is the source datasource
* **databasename** - Name of the database that need to be created for storing the data
* **tablename** - Table name inside the database that need to created for loading the data from dataframe

#### 1. Data Extraction

##### Define the widgets to enter the values of the parameters

In [0]:
#dbutils.widgets.remove("TempTableName")
dbutils.widgets.text("filename", "SRDataEngineerChallenge_DATASET.csv", "Enter file name in csv format")
dbutils.widgets.text("databasename", "sfldatabase", "Enter database name")
dbutils.widgets.text("tablename", "sfltable", "Enter Table name")

##### Assign parameter values to variables

In [0]:
import pandas as pd
filename = dbutils.widgets.get("filename")
databasename = dbutils.widgets.get("databasename")
tablename = dbutils.widgets.get("tablename")

##### Read the csv file and create pandas dataframe

In [0]:
# Read all as String
hrdata = pd.read_csv(filename,converters={i: str for i in range(100)}) 

#### 2. Data Transformation

##### Convert column datatype to int

In [0]:
hrdata['id'] = hrdata['id'].str.replace("\$|,", "").astype(int)


  hrdata['id'] = hrdata['id'].str.replace("\$|,", "").astype(int)


##### Convert pandas dataframe to spark dataframe

In [0]:
# From pandas to DataFrame
df_hrdata = sqlContext.createDataFrame(hrdata)

#### 3. Data Loading

##### Define source, database name and table name

In [0]:
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
source = f"dbfs:/user/{username}/copy-into-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.databasename={databasename}")
spark.sql(f"SET c.source='{source}'")

Out[72]: DataFrame[key: string, value: string]

##### Database Creation
* Drop database if existing with same name
* Create database
* Define to use the created database

In [0]:
spark.sql("DROP DATABASE IF EXISTS ${c.databasename} CASCADE")
spark.sql("CREATE DATABASE ${c.databasename}")
spark.sql("USE ${c.databasename}")

dbutils.fs.rm(source, True)
          
          


Out[73]: False

##### Write/Load the data from dataframe to table 
* Drop the table if existing with the same name

In [0]:
# Write the data to a table.
spark.sql("DROP TABLE IF EXISTS " + tablename)
df_hrdata.write.saveAsTable(tablename)

#### Query the data from the table

In [0]:
spark.sql("select * from " + tablename).show()

+---+----------+-----------+--------------------+-----------+---------------+
| id|first_name|  last_name|               email|     gender|     ip_address|
+---+----------+-----------+--------------------+-----------+---------------+
|  1|Margaretta| Laughtisse|mlaughtisse0@medi...|Genderfluid| 34.148.232.131|
|  2|     Vally|    Garment|  vgarment1@wisc.edu|   Bigender|  15.158.123.36|
|  3|     Tessa|      Curee|     tcuree2@php.net|   Bigender|132.209.143.225|
|  4|     Arman|  Heineking|aheineking3@tutto...|       Male| 157.110.61.233|
|  5|   Roselia|    Trustie|    rtrustie4@ft.com| Non-binary|   49.55.218.81|
|  6|     Roxie|  Springett|rspringett5@devia...|       Male| 51.206.104.138|
|  7|      Gabi|    Kernell|gkernell6@hugedom...|     Female|  223.30.27.146|
|  8|      Dino|   Kentwell|  dkentwell7@com.com|    Agender| 107.244.52.181|
|  9|Petronilla|     Jandel|pjandel8@amazon.c...|     Female| 187.54.208.203|
| 10|  Courtnay|Zecchinelli|czecchinelli9@cam...|Genderfluid|  8

#### Conclusion
We have successfully extracted data from csv file, transformed and loaded it into the **database table** in databricks **data lakehouse**