In [None]:
#Check files
dbutils.fs.ls('/mnt/input')

In [None]:
from pyspark.sql.functions import col, split, when, current_timestamp, lit
from datetime import datetime

# Delete all CSV files in outputdatabricks directory
existing_directory_path = '/mnt/outputdatabricks/'
csv_files = [file.name for file in dbutils.fs.ls(existing_directory_path) if file.name.endswith('.csv')]
for csv_file in csv_files:
    dbutils.fs.rm(existing_directory_path + csv_file)

#Dictionary Country Data 
country_data = {
    "at": {"TotalPopulation": 8958960, "LandArea": 82409.00},
    "be": {"TotalPopulation": 11686140, "LandArea": 30280.00},
    "ch": {"TotalPopulation": 8796669, "LandArea": 39516.00},
    "de": {"TotalPopulation": 83294633, "LandArea": 348560.00},
    "dk": {"TotalPopulation": 5910913, "LandArea": 42430.00},
    "es": {"TotalPopulation": 47519628, "LandArea": 498800.00},
    "fr": {"TotalPopulation": 64756584, "LandArea": 547557.00},
    "gb": {"TotalPopulation": 67736802, "LandArea": 241930.00},
    "ie": {"TotalPopulation": 5056935, "LandArea": 68890.00},
    "it": {"TotalPopulation": 58870762, "LandArea": 294140.00},
    "lu": {"TotalPopulation": 654768, "LandArea": 2590.00},
    "nl": {"TotalPopulation": 17618299, "LandArea": 33720.00},
    "no": {"TotalPopulation": 5474360, "LandArea": 365268.00},
    "pt": {"TotalPopulation": 10247605, "LandArea": 91590.00},
    "se": {"TotalPopulation": 10612086, "LandArea": 410340.00},
    "mk": {"TotalPopulation": 00000000, "LandArea": 000000.00}
}


directory_path = '/mnt/input/'

# 1. Reading the files
file_list = [file.name for file in dbutils.fs.ls(directory_path) if "already_loaded" not in file.name]
existing_directory_path = '/mnt/outputdatabricks/'

for file in file_list:
    # Read the file
    df = spark.read.option("header", "true").csv(directory_path + file)
    
    # 2. Cleaning the data
    df = df.withColumn("Last_Cleaned", current_timestamp())
    
    if "start" in df.columns and "end" in df.columns:
        df = df.withColumn("Start_Date", split(col("start"), " ")[0])
        df = df.withColumn("Start_Time", split(col("start"), " ")[1])
        df = df.withColumn("End_Date", split(col("end"), " ")[0])
        df = df.withColumn("End_Time", split(col("end"), " ")[1])
        
    df = df.withColumn("Season", 
                      when(col("Start_Date").substr(6, 2).cast("int").isin([12, 1, 2]), "Winter")
                      .when(col("Start_Date").substr(6, 2).cast("int").isin([3, 4, 5]), "Spring")
                      .when(col("Start_Date").substr(6, 2).cast("int").isin([6, 7, 8]), "Summer")
                      .otherwise("Autumn"))
    
    df = df.withColumn("Day_Night", 
                      when(col("Start_Time").substr(1, 2).cast("int").between(6, 17), "Day")
                      .otherwise("Night"))
    
    country_name = file[:2]
    df = df.withColumn("Country", lit(country_name))

    # Add the new columns based on the country_data dictionary
    df = df.withColumn("TotalPopulation", lit(country_data[country_name]["TotalPopulation"]))
    df = df.withColumn("LandArea", lit(country_data[country_name]["LandArea"]))

    df.show()
    
    # 3. Saving the cleaned data
    # Construct the full path to the output file
    temp_dir = existing_directory_path + "temp/"
    df.write.format("csv").option("header", "true").mode("overwrite").save(temp_dir)

    current_utc_time = datetime.utcnow().strftime('%Y%m%d%H%M%S')
    output_file_path = existing_directory_path + f"{country_name}_cleaned_{current_utc_time}.csv"
    
    part_files = [f.name for f in dbutils.fs.ls(temp_dir) if 'part-' in f.name]
    if part_files:
        part_file_path = temp_dir + part_files[0]
        dbutils.fs.mv(part_file_path, output_file_path)

    # Clean up the temporary directory
    dbutils.fs.rm(temp_dir, recurse=True)

    # 4. Deleting the original files after processing
    dbutils.fs.rm(directory_path + file)

    # 4. Renaming the original files
    #new_file_name = f"{file}_already_loaded_{current_utc_time}"
    #dbutils.fs.mv(directory_path + file, directory_path + new_file_name)