<a href="https://colab.research.google.com/github/nceder/qpb4e/blob/main/code/Chapter%2020/Chapter_20.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 20 Basic file wrangling

# 20.2 Scenario: The product feed from hell

### Quick Check : Consider the choices

What are your options for handling the tasks I've identified? What modules in the standard library can you think of that will do the job? If you want, you can even stop right now and work out the code to do it. Then compare your solution with the one you develop later.

#### Discussion

From the standard library, use datetime for managing the dates/times of the files, and either os.path and os or pathlib for renaming and archiving the files.


In [39]:
import pathlib

for filename in ['item_info.txt', 'item_attributes.txt', 'related_items.txt']:
    pathlib.Path(filename).touch()


In [11]:
import pathlib
cur_path = pathlib.Path(".")
FILE_PATTERN = "*.txt"
path_list = cur_path.glob(FILE_PATTERN)
print(list(path_list))

[WindowsPath('item_attributes.txt'), WindowsPath('item_info.txt'), WindowsPath('related_items.txt')]


In [None]:
import datetime
import pathlib

date_string = datetime.date.today()
print(date_string)

date_string = datetime.date.today().strftime("%Y-%m-%d")
print(date_string)

2025-08-18
2025-08-18


### Listing 20.1 File files_01.py


In [None]:
# Listing 20.1 File files_01.py

import datetime
import pathlib

FILE_PATTERN = "*.txt"             #A
ARCHIVE = "archive"

def main():

    date_string = datetime.date.today().strftime("%Y-%m-%d")    # Ritorna la data odierna in formato YYYY-MM-DD

    cur_path = pathlib.Path(".")
    archive_path = cur_path.joinpath(ARCHIVE)
    archive_path.mkdir(exist_ok=True)        #C

    paths = cur_path.glob(FILE_PATTERN)

    for path in paths:
        new_filename = f"{path.stem}_{date_string}{path.suffix}"
        new_path = archive_path.joinpath(new_filename)        #D
        path.rename(new_path)                      #E

if __name__ == '__main__':
     main()

### Quick Check: Potential Problems
Because the preceding solution is very simple, there are likely to be many situations that it won’t handle well. What are some potential issues or problems that might arise with the example script? How might you remedy these problems?

Consider the naming convention used for the files, which is based on the year, month and day, in that order. What advantages do you see in that convention? What might be the disadvantages? Can you make any arguments for putting the date string somewhere else in the filename, such as the beginning or the end?

#### Discussion

Multiple files during the same day would be a problem, for one thing. If you have lots of files, navigating the archive directory will become increasingly  difficult.

Using year-month-day date formats makes a text-based sort of the files sort by date as well. Putting the date at the end of the filename but before the extension makes it more difficult to parse the date element visually.

# 20.3 More organization

### Listing 20.2 File files_02.py

In [5]:
import pathlib

for filename in ['item_info.txt', 'item_attributes.txt', 'related_items.txt']:
    pathlib.Path(filename).touch()

In [10]:
# Listing 20.1 File files_01.py

import datetime
import pathlib

FILE_PATTERN = "*.txt"          
ARCHIVE = "archive"

def main():

    cur_path = pathlib.Path(".")
    archive_path = cur_path.joinpath(ARCHIVE).joinpath() # Creiamo la directory "archive"
    archive_path.mkdir(exist_ok=True)

    date_string = datetime.date.today().strftime("%Y-%m-%d")    # Ritorna la data odierna in formato YYYY-MM-DD
    date_path = cur_path.joinpath(date_string).joinpath() # Creiamo la sotto directory in base alla data odierna
    date_path.mkdir(exist_ok=True)


    paths = cur_path.glob(FILE_PATTERN) # Elenco dei file presenti nella directory corrente

    for path in paths:
        path.rename(date_path.joinpath(path.name)) # Andiamo a spostare i vari file con il pattern *.txt all'interno della directory creata giornalmente      

if __name__ == '__main__':
     main()

### Try This: Implementation of multiple directories

How would you modify the code that you developed to archive each set of files in subdirectories named according to date received? Feel free to take the time to implement the code and test it.

### Quick Check: Alternate solutions
How might you create a script that does the same thing without using pathlib? What libraries and functions would you use?

#### Discussion
You'd use the os.path and os libraries—specifically, `os.path.join()`, `os.mkdir()`, and `os.rename()`.

In [None]:
# @title
import datetime
import pathlib

FILE_PATTERN = "*.txt"
ARCHIVE = "archive"

if __name__ == '__main__':

    date_string = datetime.date.today().strftime("%Y-%m-%d")

    cur_path = pathlib.Path(".")

    new_path = cur_path.joinpath(ARCHIVE, date_string)
    new_path.mkdir()

    paths = cur_path.glob(FILE_PATTERN)

    for path in paths:
        path.rename(new_path.joinpath(path.name))

## 20.4.1 Compressing files

### Listing 20.3 File files_03.py

### Try This: Archiving to zip files pseudocode

Write the pseudocode for a solution that stores data files in zip files. What modules and functions or methods do you intend to use? Try coding your solution to make sure that it works.

#### Discussion
Pseudocode:
```
create path for zip file
create empty zipfile
for each file
    write into zipfile
    remove original file
```
(See the next section for sample code that does this.)

In [36]:
import pathlib

for filename in ['item_info.txt', 'item_attributes.txt', 'related_items.txt']:
    pathlib.Path(filename).touch()

In [None]:
from zipfile import ZipFile

cur_path = pathlib.Path(".")
archive_path = cur_path.joinpath(ARCHIVE)
archive_path.mkdir(exist_ok=True)  

zip_name = datetime.date.today().strftime("%Y-%m-%d")    # Ritorna la data odierna in formato YYYY-MM-DD
zip_path = archive_path.joinpath(zip_name) # Il path dove inserire il file zip è all'interno di "archive"
with ZipFile(f'{zip_path}.zip', 'w') as myzip:
    pass # Creiamo un file zip vuoto

In [37]:
# Listing 20.1 File files_01.py

import datetime
import pathlib
from zipfile import ZipFile



FILE_PATTERN = "*.txt"             #A
ARCHIVE = "archive"

def main():

    cur_path = pathlib.Path(".")
    archive_path = cur_path.joinpath(ARCHIVE)
    archive_path.mkdir(exist_ok=True)  

    zip_name = datetime.date.today().strftime("%Y-%m-%d")    # Ritorna la data odierna in formato YYYY-MM-DD
    zip_path = archive_path.joinpath(zip_name) # Il path dove inserire il file zip è all'interno di "archive"

    paths = cur_path.glob(FILE_PATTERN) # Otteniamo i path object dei 3 file txt presenti in directory

    with ZipFile(zip_path, 'w') as myzip:
        for path in paths:
            myzip.write(path)     # Aggiunge il file allo ZIP
            path.unlink()         # Elimina il file originale dalla directory

if __name__ == '__main__':
     main()

In [8]:
# Listing 20.3 File files_03.py

import datetime
import pathlib
import zipfile          #A

FILE_PATTERN = "*.txt"
ARCHIVE = "archive"

def main():

    date_string = datetime.date.today().strftime("%Y-%m-%d")

    cur_path = pathlib.Path(".")
    archive_path = cur_path.joinpath(ARCHIVE)
    archive_path.mkdir(exist_ok=True)

    paths = cur_path.glob(FILE_PATTERN)

    zip_file_path = cur_path.joinpath(ARCHIVE, date_string + ".zip")   #B
    zip_file = zipfile.ZipFile(str(zip_file_path), "w")       #C

    for path in paths:
        zip_file.write(str(path))                                 #D
        path.unlink()             #E

if __name__ == '__main__':
     main()

## 20.4.2 Grooming files

### Listing 20.4 File files_04.py

In [45]:
# run this before running cell below
# create zip files in archive directory
from datetime import datetime, timedelta


def populate_archive(zip_file_path, current_date):
    for days in range(30, 40):
        zip_date = current_date - timedelta(days=days)
        new_zip_path = zip_file_path.joinpath(f"{zip_date.strftime('%Y-%m-%d')}.zip")
        zip_file = new_zip_path.write_text("Test")

cur_path = pathlib.Path(".")
zip_file_path = cur_path.joinpath(ARCHIVE)
current_date = datetime.today()
populate_archive(zip_file_path, current_date)

In [44]:
# Listing 20.4 File files_04.py

from datetime import datetime, timedelta
import pathlib
import zipfile

FILE_PATTERN = "*.zip"
ARCHIVE = "archive"
ARCHIVE_WEEKDAY = 1
def main():
    cur_path = pathlib.Path(".")
    zip_file_path = cur_path.joinpath(ARCHIVE)

    paths = zip_file_path.glob(FILE_PATTERN)
    current_date = datetime.today()    #A

    for path in paths:
        name = path.stem              #B
        path_date = datetime.strptime(name, "%Y-%m-%d")     #C
        path_timedelta = current_date - path_date          #D
        if (path_timedelta > timedelta(days=30)
                and path_date.weekday() != ARCHIVE_WEEKDAY):    #E
            path.unlink()

if __name__ == '__main__':
     main()

### Quick Check: Consider different parameters

Take some time to consider different grooming options. How would you modify the code in the previous Try This to keep only one file a month? How would you change the code so that files from the previous month and older are groomed to save one a week? (Note: This is not the same as older than 30 days!)

#### Discussion
You could use something similar to the code above but also check the month of the file against the current month.

In [25]:
# run this before running cell below
# create zip files in archive directory
from datetime import datetime, timedelta
import pathlib
import zipfile

ARCHIVE = "archive"

def populate_archive(zip_file_path, current_date):
    for days in range(30, 40):
        zip_date = current_date - timedelta(days=days)
        new_zip_path = zip_file_path.joinpath(f"{zip_date.strftime('%Y-%m-%d')}.zip")
        zip_file = new_zip_path.write_text("Test")

cur_path = pathlib.Path(".")
zip_file_path = cur_path.joinpath(ARCHIVE)
current_date = datetime.today()
populate_archive(zip_file_path, current_date)

In [None]:
# Manteniamo solo un file per mese
# Dobbiamo verificare qual è il file più recente per ogni mese e verificare se il nuovo file rispetto allo stesso mese è più recente.
# Se è piu recente elimiamo quello presente ed inseriamo quello nuovo, altrimenti niente.

from datetime import datetime, timedelta
import pathlib
import zipfile

FILE_PATTERN = "*.zip"
ARCHIVE = "archive"
ARCHIVE_WEEKDAY = 1

def main():
    
    # Otteniamo elenco dei file attualmente presenti
    cur_path = pathlib.Path(".")
    zip_file_path = cur_path.joinpath(ARCHIVE)
    paths = zip_file_path.glob(FILE_PATTERN)

    # Inizializziamo un dizionario per mantenere il file più recente per ogni mese (anno, mese) = datetime object
    latest_per_month = {}

    for path in paths:
        file_data = datetime.strptime(path.stem, "%Y-%m-%d")
        year_month = (file_data.year, file_data.month)

        # Se il mese non è presente, o la data trovata è più recente, aggiorna il dizionario e tieni il file nuovo
        if year_month not in latest_per_month or file_data > latest_per_month[year_month][0]:
            # E se era presente già un file lo andiamo ad eliminare prima di inserire quello nuovo
            if year_month in latest_per_month:
                latest_per_month[year_month][1].unlink(missing_ok=True) # Stiamo puntando al dizionario, alla chiave anno-mese che restituisce indice 1 della tupla (file_data, path). Quindi stiamo correttamente eliminando il file da ARCHIVE
            
            latest_per_month[year_month] = (file_data, path)

if __name__ == '__main__':
     main()

In [27]:
from datetime import datetime, timedelta
import pathlib
import zipfile

ARCHIVE = "archive"

def populate_archive(zip_file_path, start_date, end_date):
    zip_file_path.mkdir(exist_ok=True)
    current_date = start_date
    while current_date <= end_date:
        zip_filename = f"{current_date.strftime('%Y-%m-%d')}.zip"
        zip_full_path = zip_file_path / zip_filename
        with zipfile.ZipFile(zip_full_path, mode="w") as zf:
            zf.writestr("Test.txt", "Test")
        current_date += timedelta(days=1)

cur_path = pathlib.Path(".")
zip_file_path = cur_path / ARCHIVE
start_date = datetime(2025, 5, 1)
end_date = datetime.today()  # 2025-08-18

populate_archive(zip_file_path, start_date, end_date)


In [None]:
# Manteniamo solo un file per mese
# Dobbiamo mantenere solo un file per settimana per i file che hanno più di 30 giorni.
# Quindi dobbiamo vedere se sono passati più di 30 giorni rispetto al file che verifichiamo. Se sono passati più di 30 giorni confrontiamo questo file con quanto presente nel dizionario, se questo è più recente sostituiamo altrimenti no.
# La chiave del dizionario ora è anno-settimana

from datetime import datetime, timedelta
import pathlib
import zipfile

FILE_PATTERN = "*.zip"
ARCHIVE = "archive"
ARCHIVE_WEEKDAY = 1

def main():
    
    # Otteniamo elenco dei file attualmente presenti
    cur_path = pathlib.Path(".")
    zip_file_path = cur_path.joinpath(ARCHIVE)
    paths = zip_file_path.glob(FILE_PATTERN)

    # Inizializziamo un dizionario per mantenere il file più recente per ogni mese (anno, mese) = datetime object
    weekly_archives = {}

    for path in paths:
        file_data = datetime.strptime(path.stem, "%Y-%m-%d")
        year_week = (file_data.year, file_data.isocalendar()[1]) # Ritorna una tupla anno-settimana


        delta_data = (datetime.today() - file_data).days # Delta in giorni tra la data del file e oggi

        # Se sono passati più di 30 giorni
        if delta_data > 30:
            # E la settimana non è presente, o la data trovata è più recente, aggiorna il dizionario e tieni il file nuovo
            if year_week not in weekly_archives or file_data > weekly_archives[year_week][0]:
            # E se era presente già un file lo andiamo ad eliminare prima di inserire quello nuovo
                if year_week in weekly_archives:
                    weekly_archives[year_week][1].unlink(missing_ok=True) # Stiamo puntando al dizionario, alla chiave anno-mese che restituisce indice 1 della tupla (file_data, path). Quindi stiamo correttamente eliminando il file da ARCHIVE
            
            weekly_archives[year_week] = (file_data, path)

if __name__ == '__main__':
     main()