In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from functools import reduce 

## Connect with azure datalake using OAuth 2.0 with an Azure Active Directory (Azure AD)

In [None]:
service_credential = dbutils.secrets.get(scope="key_vault_scope",key="service-credential-key")

spark.conf.set("fs.azure.account.auth.type.minedata.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.minedata.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.minedata.dfs.core.windows.net", "fec6b5a7-ad07-45e2-b61b-dc037c92904f")
spark.conf.set("fs.azure.account.oauth2.client.secret.minedata.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.minedata.dfs.core.windows.net", "https://login.microsoftonline.com/3417d36b-fa61-4b84-b95e-8414a4e5753f/oauth2/token")

## Read minining data from two machines

In [None]:
# Read machine one data
machine_one_data = spark.read.option("header",True).csv("abfss://miningdata@minedata.dfs.core.windows.net/plant_one/MiningProcess_Flotation_Plant_Database_1.csv")

# Read machine two data
machine_two_data = spark.read.option("header",True).csv("abfss://miningdata@minedata.dfs.core.windows.net/plant_two/MiningProcess_Flotation_Plant_Database_2.csv")


## Check the schema of both machine data

In [None]:
machine_one_data.printSchema()

In [None]:
machine_two_data.printSchema()

## Data exploaration

- Drop unwanted columns
- Convert string into float
- Merge both plant data and load into the datalake

In [None]:
# Check the existing number of columns
print(f"Number of machine one data features: {len(machine_one_data.columns)}")
print(f"Number of machine two data features: {len(machine_two_data.columns)}")

In [None]:
machine_one_data.show(1)

### Drop unwanted columns

In [None]:
# Create column name list
col_list = machine_one_data.columns
print(col_list)
print(f"Length is :{len(col_list)}")
# Drop unwanted column name from the list
col_list = col_list[3:23]
print(col_list)

In [None]:
# Select required column
machine_one_data = machine_one_data.select(*col_list)
machine_two_data = machine_two_data.select(*col_list)

In [None]:
print(f"Number of machine one data features: {len(machine_one_data.columns)}")
print(f"Number of machine two data features: {len(machine_two_data.columns)}")

### Convert sting data to float

In [None]:
machine_one_data.show(1)

In [None]:
# Define user defined function
def convert_float(str):
    str = str.replace(",", ".")
    str = str.strip()
    return float(str)

convert_float_udf = udf(lambda z: convert_float(z), FloatType())

In [None]:
for col_name in machine_one_data.columns:
    machine_one_data = machine_one_data.withColumn(col_name, convert_float_udf(col(col_name)))
    
for col_name in machine_two_data.columns:
    machine_two_data = machine_two_data.withColumn(col_name, convert_float_udf(col(col_name)))

In [None]:
machine_two_data.show(1)

### Merge both plant data and load into the datalake

In [None]:
df = reduce(DataFrame.unionAll, [machine_one_data, machine_two_data])
print(f"Number of machine one data features: {machine_one_data.count()}")
print(f"Number of machine two data features: {machine_two_data.count()}")
print(f"Total number of machine data:{df.count()}")

In [None]:
df.write.option("header",True).csv("abfss://miningdata@minedata.dfs.core.windows.net/processed.csv")