# Recreating the AdventureWorks database
In this notebook, we perform all steps needed to 
1. clean out any previous iteration of the adventureworks database. 
2. download a zip file of the Adventure Works Database provided in parquet format. 
3. use that zip file to create tables in the `adventureworks` schema. 

This script should be run whenever a user wants to create the `adventureworks` database for the first time or reset the all 
tables. 

## Initial Setup
We start with importing all the libraries we are going to need for this script and ensure we are using the correct catalog. 

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, FloatType
import os
import requests
import zipfile

## Cleanup and Recreate
In this first step, we begin by completely dropping and recreating the `adventureworks` catalog. This is going to simulate the sample database provided by Microsoft. This should be renamed to whatever database/catalog you wish to refer to.

In [0]:
%sql 
DROP CATALOG IF EXISTS adventureworks CASCADE;
CREATE CATALOG adventureworks;

## Download Data
With the catalog set up We now have to create the volume filesystem for our flat files. We also initiate the download to get our pipeline moving along. The files used in this pipeline were provided by [Michael Olafusi (olafusimichael/AdventureWorksParquet)](https://github.com/olafusimichael/AdventureWorksParquet).

All these values should be renamed if using a different naming convention. 

In [0]:
catalog = 'adventureworks'
schema = 'default'
volume = 'adventureworks_parquet'
url = 'https://github.com/olafusimichael/AdventureWorksParquet/archive/main.zip'
workspace_dir = f'/Volumes/{catalog}/{schema}/{volume}'
zip_path = f'{workspace_dir}/main.zip'

spark.sql(
    f"""
    CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.{volume}
    """)

In [0]:
with open(zip_path, 'wb') as f:
    f.write(requests.get(url).content)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(workspace_dir)

## Table Creation
This is the meat and potatoes of this script. With everything else done, we loop through all parquete files, and create a table out of the data within. 

In [0]:
parquet_dir = f'{workspace_dir}/AdventureWorksParquet-main'
for file in dbutils.fs.ls(parquet_dir):
    if file.name.endswith('.parquet'):
        table_name = file.name.replace('.parquet', '')
        schema_name = table_name.split('.')[0]
        spark.sql(f'CREATE SCHEMA IF NOT EXISTS {catalog}.{schema_name}')
        df = spark.read.parquet(file.path)
        df.write.mode('overwrite').saveAsTable(f'{catalog}.{table_name}')

### Adding missing files
A couple files seem to be missing from the above mentioned parquet dataset. Therefore, I'll be importing the 2022 version of the CSV files from microsoft and add them in as I need more tables for the data warehouse.
I've attempted to create these from the provided install scripts inside the zip file provided by microsoft below. 

In [0]:
spark.sql(
    f'''
    CREATE VOLUME IF NOT EXISTS {catalog}.{schema}.csv_files
    ''')
csv_url = 'https://github.com/Microsoft/sql-server-samples/releases/download/adventureworks/AdventureWorks-oltp-install-script.zip'
csv_workspace_dir = f'/Volumes/{catalog}/{schema}/csv_files'
zip_path_csv = f'{csv_workspace_dir}/AdventureWorksCSV.zip'    

In [0]:
with open(zip_path_csv, 'wb') as f:
    f.write(requests.get(csv_url).content)
with zipfile.ZipFile('/Volumes/adventureworks/default/csv_files/AdventureWorksCSV.zip', 'r') as zip_ref:
    zip_ref.extractall(csv_workspace_dir) 

In [0]:
df_schema = StructType([
    StructField('EmployeeID', IntegerType(), True),
    StructField('NationalIDNumber', StringType(), True),
    StructField('LoginID', StringType(), True),
    StructField('OrganizationNode', StringType(), True),
    StructField('OrganizationLevel', IntegerType(), True),
    StructField('JobTitle', StringType(), True),
    StructField('BirthDate', DateType(), True),
    StructField('MaritalStatus', StringType(), True),
    StructField('Gender', StringType(), True),
    StructField('HireDate', DateType(), True),
    StructField('SalariedFlag', IntegerType(), True),
    StructField('VacationHours', IntegerType(), True),
    StructField('SickLeaveHours', IntegerType(), True),
    StructField('CurrentFlag', IntegerType(), True),
    StructField('rowguid', StringType(), True),
    StructField('ModifiedDate', DateType(), True)
])

df = spark.read.csv(f'{csv_workspace_dir}/Employee.csv',header=False,schema=df_schema,inferSchema=True,sep='\t',encoding='utf-16')

In [0]:
spark.sql("DROP TABLE IF EXISTS adventureworks.humanresources.employee")


In [0]:
df.write.mode('overwrite').saveAsTable("adventureworks.humanresources.employee")

In [0]:
spark.sql("select EmployeeID from adventureworks.humanresources.employee").count() == df.count()

Once completed, you should be able to see all the tables in the `adventureworks` catalog of your workspace, inside their respective schemas. This will effectively turn the `adventureworks` catalog into a sample database for any further queries. This verison of the database does lack, any simblance of partitions, indicies, primary and foreign keys, constraints, and many other standard database features. These could likely be included, especially if utilizing the upcoming [Lakebase](https://docs.databricks.com/aws/en/oltp) feature I found, but more efford would need to be put into importing these files, which is not the point of this repo and its exercises. 