In [0]:
# Mount
import json
# Set up the configurations for mounting the GCS bucket
gcs_bucket_name = "dybucket"
mount_point = "/mnt/dy"
project_id = "mentorsko-1725955324975"
service_account_key = "/dbfs/FileStore/tables/mentorsko_1725955324975_032294f8805c.json"

# Read the service account key file
with open(service_account_key, 'r') as key_file:
    service_account_info = json.load(key_file)

# Define the GCS service account credentials
config = {
    "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
    "fs.gs.auth.service.account.enable": "true",
    "fs.gs.auth.service.account.email": service_account_info["client_email"],
    "fs.gs.auth.service.account.private.key.id": service_account_info["private_key_id"],
    "fs.gs.auth.service.account.private.key": service_account_info["private_key"],
    "fs.gs.project.id": project_id
}

# Mount the GCS bucket
dbutils.fs.mount(
    source=f"gs://{gcs_bucket_name}",
    mount_point=mount_point,
    extra_configs=config
)

In [0]:
display(dbutils.fs.ls("/mnt/dy/dataset"))

In [0]:
%sql
create schema if not exists rohit_databricks_npmentorskool_onmicrosoft_com.bronze;

In [0]:
# The source directory containing your CSV files
source_path = '/mnt/dy/dataset'

# The destination catalog and schema where you want to create the new tables
catalog = 'rohit_databricks_npmentorskool_onmicrosoft_com'
schema = 'bronze'

# Get a list of files in the source directory
try:
    files = dbutils.fs.ls(source_path)
except Exception as e:
    print(f"Error accessing source path {source_path}: {e}")
    # Stop execution if the source directory is not accessible
    dbutils.notebook.exit("Stopping notebook execution due to inaccessible source path.")

# Loop through each file
for file_info in files:
    file_path = file_info.path
    file_name = file_info.name

    # Check if the file is a CSV file to avoid processing subdirectories
    if file_name.endswith('.csv'):
        # Create a table name from the file name by removing the '.csv' extension
        # and ensuring it's a valid table name (e.g., replacing spaces)
        table_name = file_name.replace('.csv', '').replace(' ', '_')
        
        # Build the full destination table path
        destination_table = f"{catalog}.{schema}.{table_name}"
        
        print(f"Processing file: {file_name}")

        try:
            # Step 1: Read the CSV file into a Spark DataFrame
            # The schema, including column names with special characters, is inferred here.
            df = spark.read.format('csv').options(
                header='true',
                inferSchema='true',
                timestampFormat='dd-MM-yyyy HH.mm'
            ).load(file_path)

            # Step 2: Write the DataFrame to a Delta table with column mapping enabled
            # This single operation creates the table and loads the data.
            print(f"Writing data to Delta table: {destination_table} with column mapping enabled.")
            df.write.format('delta').mode('overwrite').option(
                "delta.columnMapping.mode", "name"
            ).option("mergeSchema", "true").saveAsTable(destination_table)
            
            print(f"Successfully created and loaded data into table: {destination_table}")

        except Exception as e:
            print(f"An error occurred while processing {file_name}: {e}")

In [0]:
%sql
SELECT * FROM rohit_databricks_npmentorskool_onmicrosoft_com.bronze.addresses LIMIT 100;