<a href="https://colab.research.google.com/github/miriammazzeo95/BigData_and_Timeseries_in_Pyspark/blob/main/DataOrdering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# **Set Up Pyspark, Imports and Functions**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# TO CHANGE CURRENT DIR
# %cd /content/drive/MyDrive/Colab\ Notebooks/Big\ Data\ with\ Pyspark

/content/drive/MyDrive/Colab Notebooks/Big Data with Pyspark


In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# https://colab.research.google.com/drive/1YGZCoCGw632dFe_yxAViQYIsXnFKqEdA?usp=sharing
colb_script_id = '1YGZCoCGw632dFe_yxAViQYIsXnFKqEdA'
link_to_file_in_drive = drive.CreateFile({'id':colb_script_id})

link_to_file_in_drive.GetContentFile('Pyspark_ConfigurationImportsFunctions.ipynb') # creates a local copy in the VM
!jupyter nbconvert --to python 'Pyspark_ConfigurationImportsFunctions.ipynb' # converts the local copy from notebook to .py
!rm Pyspark_ConfigurationImportsFunctions.ipynb # deletes the local copy
import Pyspark_ConfigurationImportsFunctions as pyspark_config # imports everything from the script
dir(pyspark_config)

# **Ordering Time Series file in folders**

> This script is meant to 'reorder' files to the right monthly and daily folder. The code process parquet data files, collected daily by meters since 4 years. The files are contained in month and day folders, in theese folders files with wrong timestamp are contained as well. The code process files relatively slow to avoid exceeding memory 






In [None]:
# THE DATASET HAS COLUMNS THAT CHANGE OVER TIME
# this function checks the number of columns and fix them adding missing columns with empty values
'''
['Dato', 'Time-Kvarter', 'InstallationsID', 'InstallationAdresse', 'InstallationPostNr', 'MålerArtBeskrivelse', 'MålerEgenskab', 'MeterRegister', 'MålerVærdier', 'ProduktkomponentType', 'Dataset', 'day'] ->
['Dato', 'Time-Kvarter', 'MeterKey', 'InstallationsID', 'InstallationAdresse', 'InstallationPostNr', 'MålerArtBeskrivelse', 'MålerEgenskab', 'MeterRegister', 'MålerVærdier', 'Produktkomponent', 'ProduktkomponentType', 'InstallationKabelskab', 'InstallationLS_Udføring', 'InstallationNetstation', 'InstallationStation60KV', 'Type', 'Dataset', 'day']
'''
def fix_cols(df, path):
    value = ''
    if len(df.columns)==11:
        
        df = df.withColumn('Produktkomponent',lit(value))\
            .withColumn('MeterKey', lit(value))\
            .withColumn('InstallationKabelskab',lit(value))\
            .withColumn('InstallationLS_Udføring',lit(value))\
            .withColumn('InstallationNetstation',lit(value))\
            .withColumn('InstallationStation60KV',lit(value))\
            .withColumn('Type',lit(value))\
            .select( 'Dato', 'Time-Kvarter', 'MeterKey', 'InstallationsID', 'InstallationAdresse', 'InstallationPostNr', 'MålerArtBeskrivelse', 'MålerEgenskab', 'MeterRegister', 'MålerVærdier', 'Produktkomponent', 'ProduktkomponentType', 'InstallationKabelskab', 'InstallationLS_Udføring', 'InstallationNetstation', 'InstallationStation60KV', 'Type', 'Dataset')
        return df
    
    elif len(df.columns)==14:

        df = df.withColumn('Produktkomponent',lit(value))\
            .withColumn('MeterKey', lit(value))\
            .withColumn('Type',lit(value))\
            .withColumn('InstallationStation60KV',lit(value))\
            .select( 'Dato', 'Time-Kvarter', 'MeterKey', 'InstallationsID', 'InstallationAdresse', 'InstallationPostNr', 'MålerArtBeskrivelse', 'MålerEgenskab', 'MeterRegister', 'MålerVærdier', 'Produktkomponent', 'ProduktkomponentType', 'InstallationKabelskab', 'InstallationLS_Udføring', 'InstallationNetstation', 'InstallationStation60KV', 'Type', 'Dataset')
        return df

    elif len(df.columns)==18:

        df = df.where(df.Type == 'FED')
        return df
    
    else:
        print(['Found columns: ', df.columns, "Columns' number: ", len(df.columns), 'UNEXPECTED!',path ])
        raise NameError('Columns number unexpected!')

%pyspark

# SKIP CORRUPTED FILES
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
all_trefor_months = get_filePaths('hdfs dfs -ls /feddl/landing/trefor/quarter_hourly/*/*/')
saving_folder = '/feddl/work/CDK/miriam/trefor/2109/0_formatting'
num_of_paths = len(all_trefor_months)

######################################### MONTH PATH INDEX
for i in range(0,num_of_paths): 

    month_path = all_trefor_months[i]
    print([i, month_path])
    month_str = month_path[-2:]
    year_str = month_path[-13:-9]
    saving_path = saving_folder + f'/year={year_str}/month={month_str}'       
    
    daylyPaths = get_filePaths('hdfs dfs -ls ' + month_path +'/')
    
    ############################################## DAY PATH INDEX
    for day_path in daylyPaths:
    
        df = spark.read.parquet(day_path)
        # ADDING EMPTY COLUMNS WHEN NEEDED
        df = fix_treforCols(df, day_path) 
        
        # MARCH 2021 DATE BY DATE    
        if ((year_str == '2021') and (month_str == '03')):
 
            # CREATE LIST WITH THE RIGHT DATES
            df_dates= df.select(col('Dato')).where((month(col('Dato'))==month_str) & (year(col('Dato'))==year_str)).distinct()
            df_dates = df_dates.withColumn('Dato',col("Dato").cast(StringType()) )
            list_dates = df_dates.collect()
        
            # FILTER DF ON EACH DATE AND SAVE
            if list_dates != []:
                
                ########################################################### DATES INDEX
                for k in range(0,len(list_dates)):
    
                    # FILTER ON RIGHT DATES
                    right_date = list_dates[k]['Dato']
                    df_day = df.where(col('Dato') == right_date)
                    
                    # ADD DAY COLUMN
                    df_day = df_day.withColumn('day', dayofmonth('Dato'))
                    
                    # SAVE DAY FILE
                    df_day.write.parquet(saving_path, mode='append', partitionBy='day')
            
                    # DEBUGGING
                    print(['FILE', day_path, 'DATE', right_date, 'SAVED', saving_path])
        else:
 
            # FILTER DATA WITH THE RIGHT DATES
            df_month= df.where((month(col('Dato')) == month_str) & (year(col('Dato')) == year_str))
            
            # ADD DAY COLUMN
            df_month = df_month.withColumn('day', dayofmonth('Dato'))
            
            # SAVE MONTH BY DATE
            df_month.write.parquet(saving_path, mode='append', partitionBy='day')
    
            # DEBUGGING
            print(['FILE', day_path, 'SAVED', saving_path])
            
    # FIND ALL EXTRA DATES
    df_extraDates= df.select(col('Dato')).where((month(col('Dato'))!=month_str) | (year(col('Dato'))!=year_str)).distinct()
    df_extraDates = df_extraDates.withColumn('Dato',col("Dato").cast(StringType()) )
    list_extraDates = df_extraDates.collect()
    
    # DEBUGGING
    print(['Extra dates: ', list_extraDates])

    # IF THERE ARE CORRECTIONS WRITE THEM IN THE RIGHT DATE-FOLDER
    if list_extraDates != []:
    
        for j in range(0,len(list_extraDates)):

            # FILTER ON CORRECTION DATES
            extra_date = list_extraDates[j]['Dato']
            df_correctionDay = df.where(col('Dato') == extra_date)
        
            # SAVE EXTRA DAY IN THE RIGHT DATE-PATH
            year_correctionDay = extra_date[:4]
            month_correctionDay = extra_date[-5:-3]

            # ADD DAY COLUMN
            df_correctionDay = df_correctionDay.withColumn('day', dayofmonth('Dato'))

            # SAVE DATAFRAME AS PARQUET
            saving_path = saving_folder + f'/year={year_correctionDay}/month={month_correctionDay}'
            df_correctionDay.write.parquet(saving_path, mode='append', partitionBy='day')
        
            # DEBUGGING
            print(['EXTRA DATE', extra_date, 'SAVED', saving_path])