d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Create a Parquet Table

## Notebook Configuration

Before you run this cell, make sure to add a unique user name to the file
<a href="$./includes/configuration" target="_blank">
includes/configuration</a>, e.g.

```username = "yourfirstname_yourlastname"```

In [0]:
%run ./includes/configuration

Reload data to DataFrame

In [0]:
file_path = health_tracker + "raw/health_tracker_data_2020_1.json"
health_tracker_data_2020_1_df = spark.read.format("json").load(file_path)

## Create a Parquet Table

#### Step 1: Make Idempotent
First, we remove the files in the `healthtracker/processed` directory.

Then, we drop the table we will create from the Metastore if it exists.

This step will make the notebook idempotent. In other words, it could be run more than once without throwing errors or introducing extra files.

🚨 **NOTE** Throughout this lesson, we'll be writing files to the root location of the Databricks File System (DBFS). In general, best practice is to write files to your cloud object storage. We use DBFS root here for demonstration purposes.

In [0]:
dbutils.fs.rm(health_tracker + "processed", recurse=True)

spark.sql(
    f"""
DROP TABLE IF EXISTS health_tracker_processed
"""
)

#### Step 2: Transform the Data
We perform transformations by selecting columns in the following ways:
- use `from_unixtime` to transform `"time"`, cast as a `date`, and aliased to `dte`
- use `from_unixtime` to transform `"time"`, cast as a `timestamp`, and aliased to `time`
- `heartrate` is selected as is
- `name` is selected as is
- cast `"device_id"` as an integer aliased to `p_device_id`

In [0]:
from pyspark.sql.functions import col, from_unixtime

def process_health_tracker_data(dataframe):
  return (
    dataframe
    .select(
        from_unixtime("time").cast("date").alias("dte"),
        from_unixtime("time").cast("timestamp").alias("time"),
        "heartrate",
        "name",
        col("device_id").cast("integer").alias("p_device_id")
    )
  )

processedDF = process_health_tracker_data(health_tracker_data_2020_1_df)

#### Step 3: Write the Files to the processed directory
Note that we are partitioning the data by device id.

1. use `.format("parquet")`
1. partition by `"p_device_id"`

In [0]:
(processedDF.write
 .mode("overwrite")
 .format("parquet")
 .partitionBy("p_device_id")
 .save(health_tracker + "processed"))

#### Step 4: Register the Table in the Metastore
Next, use Spark SQL to register the table in the metastore.
Upon creation we specify the format as parquet and that the location where the parquet files were written should be used.

In [0]:
spark.sql(
    f"""
DROP TABLE IF EXISTS health_tracker_processed
"""
)

spark.sql(
    f"""
CREATE TABLE health_tracker_processed
USING PARQUET
LOCATION "{health_tracker}/processed"
"""
)

#### Step 5: Verify Parquet-based Data Lake table

Count the records in the `health_tracker_processed` Table

In [0]:
health_tracker_processed = spark.read.table("health_tracker_processed")
health_tracker_processed.count()

Note that the count does not return results.

#### Step 6: Register the Partitions

Per best practice, we have created a partitioned table. However, if you create a partitioned table from existing data,
Spark SQL does not automatically discover the partitions and register them in the Metastore.

`MSCK REPAIR TABLE` will register the partitions in the Hive Metastore. Learn more about this command in <a href="https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-repair-table.html" target="_blank">
the docs</a>.

In [0]:
spark.sql("MSCK REPAIR TABLE health_tracker_processed")

#### Step 7: Count the Records in the `health_tracker_processed` table

Count the records in the `health_tracker_processed` table.

With the table repaired and the partitions registered, we now have results.
We expect there to be 3720 records: five device measurements, 24 hours a day for 31 days.

In [0]:
health_tracker_processed.count()

-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>