# Data Ingestion, Cleaning and Exploration with Delta Lake 

Databricks provides a notebook interface compatible with Python, SQL, Pyspark, Scala, R, and more. In this notebook, we discuss how some basic data operations that can be performed on a table using Pyspark. We also introduce Delta Lake and Delta tables. 

All code and descriptions below are written by Zoya Shafique, unless where noted.

## <img src = 'https://www.svgrepo.com/show/176852/pin-signs.svg' style="height: 50px; margin: 5px; padding: 5px"/> Overview
---

In this tutorial, you'll learn how to use Databrick's Delta Lake and PySpark functionalities for handling data. This tutorial is intended for users with some experience with data handling, Python and Machine Learning. 

By the end of this tutorial, you'll be able to:

* Create and manage Delta Tables
* Use PySpark for understanding and cleaning data

Note that the goal of this tutorial is not to provide a walk through of data cleaning but rather to show how Databricks lakehouse storage, Delta Lake, can be used in combination with PySpark for data handling.

## <img src = 'https://www.svgrepo.com/show/176852/pin-signs.svg' style="height: 50px; margin: 5px; padding: 5px"/> Before you begin
---
Before you start the tutorial, you should:

* Download the dataset from <a href="https://www.kaggle.com/datasets/nehalbirla/vehicle-dataset-from-cardekho" target="_blank">this link</a>.
* Create a compute resource. For information on how to initialize your compute, plese <a href="https://github.com/zoyashaf/DataLakehouses101/blob/21011a4ffd7e4eb7f045f393720a428b940a3b3b/docs/create_compute.pdf" target="_blank">check here</a>.
 


### <img src='https://www.svgrepo.com/show/122877/presentation.svg' style="height: 65px; margin: 5px; padding: 5px"/> Loading Data
---
This section covers basics of loading data in a notebook in Databricks. Users familiar with Pandas will see many similarities between the PySpark interface and Pandas operations.

In [0]:
'''
To begin, we must first initialize our Spark session which will allow us to use the DataFrame API to handle our data. 
In line 8 we are creating our spark session instance. We can use the SparkSession.Builder object to configure our Spark session, 
but for the sake of this tutorial, we are using the default settings. 
'''
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 

<img src='https://www.svgrepo.com/show/530436/help.svg' style="height: 50px; margin: 0px; padding: 0px"/> Please refer to <a href="https://github.com/zoyashaf/DataLakehouses101/blob/5e5423427db745aa64ea52e3efe7fcd1fb04a288/figures/catalog.png" target="_blank">this image</a> to see how to import data into your databricks account and notebook.



In [0]:
# Reading in data from our file storage 
## .option tells our code to read in the header row of the csv file 
car_data = df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/zshafiq001@citymail.cuny.edu/car_details_v4_edited-1.csv")
display(car_data.limit(5))
# NOTE: You may also use car_data.show(), however display() showcases the data in an easy to read table whereas show() provides a raw output. 

#### <img src='https://www.svgrepo.com/show/170412/notebook.svg' style="height: 65px; margin: 5px; padding: 5px"/> Task 1: 


Click on the '+' sign next to 'Table' in the cell above. You will see options for visualization and data profile. 
  * First generate a data profile. Do you notice anything weird about the data? 
  * Next, generate a bar graph using the visualization option. Use 'Make' as the x-axis and 'Price' as the y-axis. Does there seem to be a discernable pattern between makes of cars and their price?




!!!
< Your answer here > 
!!!

We can also use pyspark commands to learn more about our data*: 
* **`describe()`**:  displays count, mean, stddev, min, max. 
* **`summary()`**:  displays interquartile range (IQR) in addition to attributes from describe.
* **`printschema()`**: prints table schema in a tree format, with each column name followed by the data type and the nullability indicator, which shows if the column allows nulls or not.


*Note: adapted from DataBricks Academy ML 01- Data Cleansing tutorial 


In [0]:
display(car_data.describe())

In [0]:
display(car_data.summary())

In [0]:
car_data.printSchema()
# Since printSchema() is meant to print, we don't need to add a display wrapper

#### <img src='https://upload.wikimedia.org/wikipedia/commons/6/68/Exclamation_Point.svg' style="height: 45px; margin: 5px; padding: 5px"/> Concept Review

* <b>Data Profile</b> allows users to quickly and easily gain an understanding of their data. The tool provides a complete overview of the dataset's characteristics, statistics, and more. Furthermore, the data profile along with any visualizations can easily be added to a dashboard to quickly create effective summaries of the data. With these features, users can easily understand the basic structure of their data, explore the data distribution, identify missing values and more. 

### <img src='https://www.svgrepo.com/show/229520/lake.svg' style="height: 80px; margin: 5px; padding: 5px"/> Delta Lake Tables 

Currently, our data exists as a DataFrame that we built from our .csv file. To take full advantage of Databrick's software, we can convert our DataFrame into a Delta Lake table. Delta Lake is an open-source storage layer used by Databricks. It provides organization and ACID transaction support to traditional lake storage, such as a distributed file system. As such, saving our data as a Delta table can proide extra functionality for efficient processing. 

More information about Delta Lake can be found <a href="https://docs.databricks.com/en/delta/index.html" target="_blank">here. </a> 



In [0]:
# Write the data to a table.
table_name = "car_data_v2"
car_data.write.saveAsTable(table_name)

In [0]:
## We can use a spark.sql query to get a quick overview of the table we created in the previous cell. The following command shows us the meta data associated with our table. 
display(spark.sql('DESCRIBE DETAIL car_data_v2'))


To view the created table, navigate to the catalog tab in the sidebar. You will see "car_data_v1" listed underneath tables in the "Database Tables" tab. You can find more information about how to view your table in the catalog here. One of the main advantages of using a Delta table as opposed to a DataFrame is that the Delta Table keeps a historical record of your data. This helps with data versioning and also reproducibility. Fruthermore, a historical record can help you monitor your data quality and keep track of any and all changes. The best part in all of this is that, Delta Lake is an extension of the Spark DataFrame API, so we can treat our Delta table just like a normal table if we choose to. For more information, <a href="https://github.com/zoyashaf/DataLakehouses101/blob/a6aecacba9a429d2eca83c2442928067ac9e9001/docs/Delta%20Lake%20Tables.pdf" target="_blank"> please see the associated documentation.  </a> 



Another advantage of storing our DataFrame as a Delta table is that it allows us to store and manage metadata along with our actual data. 

More information about the functionality of Delta tables can be found <a href="https://docs.databricks.com/en/delta/tutorial.html#create-a-table" target="_blank">here. </a> 






### <img src = 'https://www.svgrepo.com/show/503651/vacuum-cleaner.svg' style="height: 80px; margin: 5px; padding: 5px"/> Data Cleaning 
---
In this section, we will explore how we can use Delta tables and PySpark has for analyzing and cleaning data.



In [0]:
# First lets load in our Delta table 
## Note: We can load our table using the table name we specified earlier or by using the direct path to the table. 
car_table = spark.read.table(table_name)
display(car_table)

#### <img src = 'https://www.svgrepo.com/show/499853/idea.svg' style="height: 60px; margin: 5px; padding: 5px"/> Looking at Datatypes

As we see from our data profile and our describe method above, many of the numerical categories were picked up as strings. We need to fix this before we can use our dataset.

In [0]:
from pyspark.sql.functions import col, translate

fixed_price_df = car_table.withColumn("price", translate(col("price"), " ", "").cast("double"))

## Lets confirm if the change worked as expected 
fixed_price_df.printSchema()

#### <img src='https://www.svgrepo.com/show/170412/notebook.svg' style="height: 65px; margin: 5px; padding: 5px"/> Task 2: 
Which other columns should be numerical but were read as strings? Convert these to the correct data type. 
  * Hint: Some columns do contain characters alongside numerical values. For these columns, use translate(col("Column Name", "Characters", "")) to remove them before casting the column as a numerical datatype
  * Consider: Can we simply apply col and translate to the 'Max Power' and 'Max Torque' columns?

In [0]:
'''
!!! Your answer here !!!
'''

In [0]:
## NOTE: This is just one solution. 
from pyspark.sql.functions import split #function used for splitting strings 

## Separting Max Torque and Max Power columns into their parts 
### First, we split the strings at '@'. Then we assign each part of the split string to a separate column. Finally, we drop the original column from the dataframe. 

fixed_dtype_df = fixed_price_df.withColumn("max_torque_Nm", split("max_torque", "@")[0]) \
                   .withColumn("max_torque_rpm", split("max_torque", "@")[1])
fixed_dtype_df = fixed_dtype_df.drop("Max Torque")

fixed_dtype_df = fixed_dtype_df.withColumn("max_power_bhp", split("max_power", "@")[0]) \
                   .withColumn("max_power_rpm", split("max_power", "@")[1])
fixed_dtype_df = fixed_dtype_df.drop("max_power")

## After splitting the columns, we need to remove the units from the rows 
columns_with_strings = [['max_power_bhp', ' bhp', 'double'],
                  ['max_power_rpm', ' rpm', 'double'],  ['max_torque_rpm', ' rpm', 'double'], ['max_torque_Nm', ' Nm', 'double']]

for column, string, dtype in columns_with_strings:
  fixed_dtype_df = fixed_dtype_df.withColumn(column, translate(col(column), string, '').cast(dtype))

## We use translate() to replace 'cc' with empty strings and then convert the column to the correct data type. 
fixed_dtype_df = fixed_dtype_df.withColumn("engine_cc", translate(col("engine"), " cc", "").cast("double")) 
fixed_dtype_df = fixed_dtype_df.drop("engine")

## Converting string columns to int/double types
columns_to_fix = [['year', 'int'], ['kilometer', 'double'], ['length', 'double'],
                  ['width', 'double'], ['height', 'double'], ['seating_capacity', 'int'],
                  ['fuel_tank_capacity', 'double']] 

for column, dtype in columns_to_fix:
  fixed_dtype_df = fixed_dtype_df.withColumn(column, translate(col(column), ' ', '').cast(dtype))

fixed_dtype_df.printSchema()


In [0]:
# Now that our data is in the correct format, lets recalculate the statistics:
display(fixed_dtype_df.describe())

In [0]:
## Now lets save this data with a note that we fixed the Schema
fixed_dtype_df.write.format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-dtypes-in-columns") \
  .saveAsTable("default.car_data_v2")

In the above command, we are saving another copy of our data on top of the original file.

* **`.option("overwriteSchema", "true") `**:  replaces the existing schema with the updated one. We need this as we changed the datatypes of the columns, which is recorded as part of the schema. 
* **`.option("userMetadata", <comment>)`**: used to add messages to our commit 

####  <img src = 'https://www.svgrepo.com/show/499853/idea.svg' style="height: 60px; margin: 5px; padding: 5px"/> Handling extreme and null values 

##### Looking into extreme values 
From our describe functions above, we can see some strange data such as a minimum of 0 km in the Kilometer column as a Fuel Capacity of 15. A max price of 3.5e7 also seems extreme for a used car. Let's explore this more. 

In [0]:
# Lets take a look at the price column 
display(fixed_dtype_df
        .groupBy("price").count()
        .orderBy(col("price").desc(), col("count"))
       )

## NOTE: Adding a visualization to our table  here can help us quickly understand the distribution of our data and the outliers 
### Some cars seem very expensive compared to the majority of the dataset

In [0]:
display(fixed_dtype_df.filter(col("kilometer") == 0))
# Mini coopers can be expensive, the year of the model is 2022 and the car is unregistered so perhaps it is a new car for sale? 

In [0]:
pos_km_df = fixed_dtype_df.filter(col("kilometer") > 0) # only keeping rows with km greater than 0 

In [0]:
# Now lets take a look at the minimum maximum values
display(pos_km_df
        .groupBy("kilometer").count()
        .orderBy(col("kilometer").desc(), col("count"))
       )

#### <img src='https://www.svgrepo.com/show/170412/notebook.svg' style="height: 65px; margin: 5px; padding: 5px"/> Task 3: 


Click on the '+' sign next to 'Table' in the cell above. You will see options for visualization.
  * Create two histogram plots. One with a bin size of 10 and one with a bin size of 100. What do these vizualizations tell you about the data?



!!!
< Your answer here > 
!!!

In [0]:
# Two used cars with 1 km (a litte more than 0.5 miles) seems strange. Lets look into the row further
display(pos_km_df.filter(col("kilometer") == 1))
# These are both 2022 Audis. Considering the high price, perhaps these are also new cars for sale? 

##### Looking into null values 
We also have many columns with null values. How we approach these depends greatly on the domain and task at hand. 

For a moment, lets consider a different example. For instance, a survey dataset where respondents were asked about their income level, education level, and whether they own a car. In this dataset, the "car ownership" column contains null values for some respondents. Some key considerations for this dataset are listed below. 

  * <b> Missing Data: </b> Null values in the "car ownership" column could indicate missing data or non-response. This could be due to various reasons such as respondents choosing not to answer the question, data entry errors, or survey design issues. Understanding the missing data mechanism is crucial for assessing data quality and potential biases in the dataset.
  * <b> Imputation Strategy: </b> The presence of null values in the "car ownership" column could influence the choice of imputation strategy if the goal is to fill in missing values. For example, if null values are more prevalent among respondents with lower income levels, simply imputing the mean or median car ownership rate may not be appropriate as it could bias the analysis.
  * <b> Analyzing Patterns: </b> If null values in the "car ownership" column are associated with certain demographic characteristics such as age or location, it could indicate differences in car ownership rates among different groups of respondents. Understanding these patterns could provide valuable insights for targeted marketing strategies or policy interventions.

Depending on the context, how we handle null values chaneges. Some approaches are *
* Drop all rows with null values 
* Replace them with mean/median/zero/etc. 
* Replace them with the mode 
* Create a new column to denote rows that have null values 

For the purposes of this tutorial, we will replace the missing values with a value such as the average. This process is known as imputing. For this, we will look at Spark's <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Imputer.html?highlight=imputer#pyspark.ml.feature.Imputer" target="_blank"><b> Imputer </b></a>  method. Note, it is important to include an extra column denoting that a field has been imputed if the operation is performed*.


*Note: adapted from DataBricks Academy ML 01- Data Cleansing tutorial 

%md
#### <img src='https://www.svgrepo.com/show/170412/notebook.svg' style="height: 65px; margin: 5px; padding: 5px"/> Task 4: 


Take a moment to look through the Imputer function's documentation (linked above).
  * Is there any requirements for the data type of the inputs? 
  * Can Imputer perform any type of imputation (e.g., numerical, categorial)? 


!!!
< Your answer here > 
!!!

In the following cells, we will prepare our data for imputing. 
  * First, we need to convert any integer columns into double 
  * We need to denote rows where null values are present so we can keep track of imputed values. 

In [0]:
'''
The code in this cell is taken from: Databricks Academy ML 01 Data Cleansing 
'''
from pyspark.sql.types import IntegerType

integer_columns = [x.name for x in pos_km_df.schema.fields if x.dataType == IntegerType()]
doubles_df = pos_km_df

for c in integer_columns:
    doubles_df = doubles_df.withColumn(c, col(c).cast("double"))

columns = "\n - ".join(integer_columns)
print(f"Columns converted from Integer to Double:\n - {columns}")

In [0]:
# We need to denote which rows had null values before we impute our data. 
from pyspark.sql.functions import when

impute_cols = [
    "kilometer",
    "max_power_bhp", 
    "max_torque_Nm",
    "max_power_rpm", 
    "max_torque_rpm",
    "length",
    "width",
    "height",
    "seating_capacity",
    "fuel_tank_capacity",
    "engine_cc"
]

# We will put a 0 if there is no value in the given column for that row and a 1 if there is a null value. 
for c in impute_cols:
    doubles_df = doubles_df.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))

In [0]:
display(doubles_df.limit(10))

We are now ready to impute our data! Users familiar with scikit-learn will recognize the syntax for applying the Imputer function. 
 * We first create an instance of the Imputer object, specifying the method that we want to use to impute our data. 
 * Next, we 'fit' the impute instance on our data.
 * Finally, we call Imputer's transform method to convert our existing dataframe with its null values into a dataframe with all the null values filled in. 

More speciically, Spark ML's APIs are standardized in much the same way as scikit-learn. This allows different methods to be packaged into one pipeline. More details on two of the key components of the Spark ML API are described below: 

**
* **Transformers**: Converts one DataFrame into another. Takes a DataFrame as input and returns an updated DataFrame, based on the function. Transformers do not learn any parameters from the data and simply apply rule-based transformations. It has a **`.transform()`** method. 

* **Estimator**: An algorithm which can be fit on a DataFrame to produce a Transformer. It has a **`.fit()`** method because it learns parameters from your DataFrame in order to transform it.   

**

It is important to note that any call to a '.fit()' function should only be applied to training data. 

** Note: Descriptions taken from DataBricks Academy ML 01- Data Cleansing tutorial 


In [0]:
from pyspark.ml.feature import Imputer

imputer = Imputer(strategy="median", inputCols=impute_cols, outputCols=impute_cols)

imputer = imputer.fit(doubles_df)
imputed_df = imputer.transform(doubles_df)

In [0]:
## lets display our imputed data 
display(imputed_df.limit(10))

In [0]:
## Lets take another look at the summary statistics for the dataset. 
display(imputed_df.describe())

Consider: We could also have approached this problem by calculating statistics based on Make and using those statistics to fill in null values for relevant fields. What other ways could we have approached this problem? 


Now that our data is cleaned, we can save our DataFrame to the Delta Lake. As our table already exists as a Delta table, we can use the 'overwrite' flag when saving our data to ensure that the existing file is replaced with our updated version. Saving our cleaned data to Delta Lake ensures that the data is stored in a reliable and efficient manner, making it ready for subsequent analysis, including building machine learning models. We will not have to save it to our local machine and reupload it again but can access it directly from our catalog. 

In [0]:
## As we made changes to our schema (i.e., we changed the data type of some columns), we need to include th overwriteSchema command to write our updated table. 

imputed_df.write.format("delta").mode("overwrite").saveAsTable("default.car_data_v1")

If we go back to our catalog and view the history of our table, we can see that a new version has been added, denoting all of the changes we made. This is a more compact and efficient way to process data than saving new files for all updated tables as it allows us to easily track changes. 

## <img src = 'https://www.svgrepo.com/show/176852/pin-signs.svg' style="height: 50px; margin: 5px; padding: 5px"/> Summary
---
In this tutorial, you learned how to:

* Create Delta tables
* Use Delta tables with PySpark to clean data 
* Update your tables to maintain a consistent record of your data


## <img src = 'https://www.svgrepo.com/show/199671/next.svg' style="height: 50px; margin: 5px; padding: 5px"/> Next steps
---

Take a look into the docs for:
*
* <a href="https://github.com/zoyashaf/DataLakehouses101/blob/5736beeea55e96813750e154f69e37c7ca0e6de0/docs/Delta%20Lake%20Tables.pdf" target="_blank">  A Look at the Delta Table Interface  </a> 

Or continue to the next tutorial: 
* <a href="https://github.com/zoyashaf/DataLakehouses101/blob/5736beeea55e96813750e154f69e37c7ca0e6de0/Data%20Processing%20with%20Delta%20Lakes/Delta%20Lake%20Tutorial.ipynb" target="_blank"> Delta Lake Tutorial </a> 


