In [1]:
%load_ext autoreload

In [2]:
%autoreload 2
%reload_ext autoreload

In [3]:
import logging
import csv
import sys
from pathlib import Path
import subprocess


In [4]:
logger = logging.getLogger(__name__)
logger.root.setLevel('DEBUG')

In [5]:
def csv_to_list(file):
    '''read csv file `file` into a list
    
    Guess the CSV dialect (e.g. tsv, csv, etc.)
    
    Returns `list`'''
    csvFile = Path(file).expanduser().absolute()
    file_csv = []
    
    with open(csvFile, 'r') as file:
        dialect = csv.Sniffer().sniff(file.read(1024))
        file.seek(0)
        reader = csv.reader(file, dialect)
        for row in reader:
            file_csv.append(row)

    return file_csv

In [6]:
def map_headers(csv_list, expected_headers=[]):
    '''map row 0 of a csv as formatted as a list to a dictionary of expected header values'''
    missing_headers = []
    header_map = {}
    
    csvHeader = csv_list[0]
    
    logger.debug('checking for missing headers')
    for each in expected_headers:
        if each not in csvHeader:
            missing_headers.append(each)
            
    if len(missing_headers) > 0:
        logging.warning(f'missing expected headers: {missing_headers}')
    for index, value in enumerate(csvHeader):
        if value in expected_headers:
            header_map[value] = index
        
    logging.debug('completed mapping')
    return(header_map, missing_headers)

In [7]:
def do_exit(e='unknown error in unknown module: BSoD!', exit_status=0, testing=False):
    logging.info(f'exited before completion with exit code {exit_status}')
    print('program exited due to errors')
    print(e)
    if not testing:
        sys.exit(exit_status)

In [545]:
class student_path(gd_path):
    def __init__(self, path=None, class_of=None, id_number=None, name=None):
        '''student directory in google drive; child class of gd_path:
        
        Args:
        
        Properties:
            class_of(`str`): "ClassOf-YYYY" string representation of projected graduation year
            name(`str`): "Last, First" string representation of student name
            id_number(`int`): student id number
            matches(`dict`):  name and webview link of directories that contain "id_number"
            path_parts(`dict`): path compontents stored as dictionary keys'''
        
        super(student_path, self).__init__(path=path)
        self.matches = {}
        self.path_parts = {'ClassOf': None, 'id_number': None, 'name': None}
        self.class_of = class_of
        self.name = name
        self.id_number = id_number
    
    
    def __repr__(self):
        return f'student_path({self.student_dir_name})'
        
    def __str__(self):
        return f'{self.student_dir_name}'
    
    def get_xattr(self, attribute, file=None):
        if not file:
            file = self.student_dir_name
        return super().get_xattr(attribute, file)
    
    @property
    def class_of(self):
        return self._class_of
    
    @class_of.setter
    def class_of(self, class_of):
        '''string representation of projected graduation date in format: "ClassOf-YYYY"
        
        Properties Set:
            path_parts(`dict`): dictionary of component parts of path'''
        if not class_of:
            self._class_of = None
        else:
            # attempt to coerce strings from cSV file into type int
            class_of = int(class_of)
            if not isinstance(class_of, int):
                raise TypeError('class_of must be of type `int`')
        self.path_parts['ClassOf'] = f'ClassOf-{class_of}'
        self._class_of = class_of
        
    @property
    def name(self):
        return self._name
    
    @name.setter
    def name(self, name):
        '''string representation of "Last, First" names
        
        Properties Set:
            path_parts(`dict`): dictionary of component parts of path'''
        if not name:
            self._name = None
        else:
            if not isinstance (name, str):
                raise TypeError('name must be of type `str`')
        self.path_parts['name'] = name
        self._name = name
        
    @property
    def id_number(self):
        return self._id_number
    
    @id_number.setter
    def id_number(self, number):
        '''integer of student id number
        
        Properties Set:
            path_parts(`dict`): dictionary of component parts of path'''
        if not number:
            self._id_number = None
        else:
            # try to coerce number into type int
            number = int(number)
            if not isinstance (number, int):
                raise TypeError('id_number must be of type `int`')
        self.path_parts['id_number'] = number
        self._id_number = number

    @property
    def student_dir_name(self):
        '''full absolute path to student directory in format:
            ClassOf-YYYY/Last, First - NNNNNNN'''
        d = f"/{self.path_parts['ClassOf']}/{self.path_parts['name']} - {self.path_parts['id_number']}"
        if self.path:
            # not sure why this is needed, but any joining of self.root/Path(d) fails
            d = f'{str(self.path)}/{d}'
        return Path(d)
    
    # method for checking for similarly named student folders in this ClassOf folder
    def check_similar(self):
        '''check for similarly named directories based on student id number 
        within the path/ClassOf/ directory
        
        Properties Set:
            self.matches(`dict`): dictionary of similar directories
        Returns:
            `bool`: True if matching directories found'''
        similar = False
        matches = {}
        for i in self.student_dir_name.parent.glob(f"*{self.path_parts['id_number']}*"):
            match_id = self.get_xattr('user.drive.id', self.student_dir_name.parent/i)
            if i.absolute().is_dir():
                url = '/'.join((self._dir_base, match_id[0]))
            else:
                url = '/'.join((self._file_base, match_id[0]))
            matches[str(i)] = url
        self.matches = matches
        if matches:
            similar = True
        return similar

    def mkdir(self, path=None, exist_ok=False, parents=True, kwargs={}):
        '''make a google drive directory using pathlib.Path().mkdir()
        
        Args:
            path(`str` or `Path`): defaults to self.student_dir_name
            exist_ok(`bool`): True - do not raise error if directory exists
            parents(`bool`): True - create parents if they do not exist
            kwargs({}): pathlib.Path() kwargs
            
        Returns:
            list[str]: google drive object ID string'''
        if not path:
            path = self.student_dir_name
        logging.debug(f'calling super().mkdir(path={path})')
        val = super().mkdir(path=path, exist_ok=exist_ok, parents=parents, **kwargs)
        return val
            
        

In [533]:
p = '/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)'
s = student_path(id_number=234569, path=p)
# s.path = '/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)'
s.class_of = 1000
s.name = "name, some"

In [534]:
print(s.path)
print(s.student_dir_name)

/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)
/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-1000/name, some - 234569


In [535]:
print(s)

/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-1000/name, some - 234569


In [536]:
print(s.webview_link)

https://drive.google.com/drive/folders/1Qri-HU7OK_SEOVV_RN5dbU1MJhJnD65I


In [467]:
s.check_similar()
s.matches

{}

In [531]:
s.mkdir(exist_ok=True)

DEBUG:root:calling super().mkdir(path=/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-1000/name, some - 234569)
DEBUG:root:using supplied path: /Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-1000/name, some - 234569


['1j83z1VVKtpKdy4zO3m75laUazZgXhMK6']

In [95]:
s.student_dir_name

PosixPath('/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-1000/a, b - 123456')

In [77]:
s.get_xattr('user.drive.id')

['1j83z1VVKtpKdy4zO3m75laUazZgXhMK6']

In [510]:
class gd_path():
    def __init__(self, path=None):
        '''google drive path class
        
        Attributes:
            path(`str`): path to google drive drive object'''
        self.confirmed = False
        self.path = path
        self._file_base = 'https://drive.google.com/file/d/'
        self._dir_base = 'https://drive.google.com/drive/folders/'
        self.is_file = False   
    
    
    def __repr__(self):
        return f'gd_path({self.path})'
    
    def __str__(self):
        return f'{self.path}'
    
    @property
    def path(self):
        return self._path
    
    @path.setter
    def path(self, path):
        '''full path to object
        
        Args:
            path(`str` or `Path`): /path/to/object
            
        Sets Attributes:
            self.path: path to object
            self.root: same as path for directories, parent directory for files
            self.is_file: true for files and file-like objects, false for directories'''
        if not path:
            self._path = None
        else:
            self._path = Path(path)
            if self._path.is_dir() and self._path.exists():
                self.root = self._path
                self.is_file = False
            if self.path.is_file() and self._path.exists():
                self.root = self._path.parent
                self.is_file = True
            
            if not self._path.exists():
                self.is_file = False
                self.root = self._path.parent

    @property
    def webview_link(self):
        '''full webview link to object in google drive'''
        self._webview_link = None
        try:
            item_id = self.get_xattr('user.drive.id')
        except FileNotFoundError as e:
            logging.debug(f'{e}')
            return None
        except ChildProcessError as e:
            logging.debug(f'{e}')
            return None

        if len(item_id) < 1:
            return None
        else:
            item_id = item_id[0]
        
        
        if not self.is_file:
            self._webview_link = f'{self._dir_base}{item_id}'
        if self.is_file:
            self._webview_link = f'{self._file_base}{item_id}'
        return self._webview_link
            
    def check_parent(self, expected):
        '''checks if the parent matches the expected parent'''
        if self.root.parents[0].name == expected:
            return True
        else:
            return False
        
    def get_xattr(self, attribute, file=None):
        '''get the extended attributes of a file or directory
        Args:
            file(`str` or Path): path to file
            attribute('`str`'): attribute key to access

        Returns:
            `list` - attribute or key: attribute pairs

        Raises:
            FileNotFoundError - file or directory does not exist
            ChildProcessError - xattr utility exits with non-zero code 
                This is common for files that have no extended attributes or do not
                have the requested attribute'''
        if not file:
            file = self.path
        else:
            file = Path(file).absolute()
            
        attributes = []
        if not file.exists():
            raise FileNotFoundError(file)

        p = subprocess.Popen(f'xattr -p  {attribute} "{file.resolve()}"', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        for line in p.stdout.readlines():
            attributes.append(line.decode("utf-8").strip())
    #         attributes = attributes + line.decode("utf-8").strip()
        retval = p.wait()
        if retval != 0:
            raise ChildProcessError(f'xattr exited with value: {retval}')
        return attributes     

    @property
    def file_id(self, path=None):
        '''unique file id for each object (directories or file)
        
        Args:
            path(`str` or `Path`): path to object; defaults to self.path
        
        Returns:
            `list` of `str` containing the file id'''
        if not path:
            path = self.path
        try:
            file_id = self.get_xattr('user.drive.id', path)
        except FileNotFoundError as e:
            logging.info(f'{path} does not appear to exist; cannot get attributes')
            file_id = None
        return file_id
    
    def confirm(self, path=None):
        '''confirm that a created object has been sent over file stream
        
        Args:
            path(`str` or `Path`): path to object; default is self.path
        
        Returns:
            `list` of `str` containing the file id
            
        Attributes Set:
            self.confirmed: True when object has been sent'''
        
        if not path:
            path = self.path
        file_id = self.file_id
        
        if file_id:
            if 'local-' in file_id[0]:
                self.confirmed = False
                file_id = None
            else:
                self.confirmed = True
        return file_id
    
    def mkdir(self, path=None, parents=False, exist_ok=False, kwargs={}):
        '''create a directory using pathlib.Path().mkdir()
        
        Args:
            path(`str` or `Path`): path to create
            parents(`bool`): create parent directories - default false
            exists_ok(`bool`): do not raise error if directory exists
            kwargs: kwargs for pathlib.Path().mkdir()
            
        Returns:
            file_id(`list`)'''
        if not path:
            path = self.path
            logging.debug(f'using self.path: {path}')
        else:
            logging.debug(f'using supplied path: {path}')
            
        if path.is_file():
            raise TypeError(f'{path} is a file')
            
        path = Path(path)
            
        path.mkdir(parents=parents, exist_ok=exist_ok, **kwargs)
        if self.confirm(path):
            file_id = self.get_xattr('user.drive.id', path)
        return self.file_id
    

In [283]:
g = gd_path(path='/Volumes/GoogleDrive/Shared drives/IT Blabla I/xyz-pdq/spam/eggs_ham_spam')
m = gd_path(path='/Volumes/GoogleDrive/My Drive/dev.csv')

In [285]:
g.file_id

INFO:root:/Volumes/GoogleDrive/Shared drives/IT Blabla I/xyz-pdq/spam/eggs_ham_spam does not appear to exist; cannot get attributes


In [286]:
g.confirm()

INFO:root:/Volumes/GoogleDrive/Shared drives/IT Blabla I/xyz-pdq/spam/eggs_ham_spam does not appear to exist; cannot get attributes


In [287]:
g.mkdir(exist_ok=True)

['local-223141']

In [291]:
g.confirm()

['1Qpzp14230uUtd9GD4qjuVadxLK_Z4nXz']

In [210]:
print(g.webview_link)
print(m.webview_link)

DEBUG:root:/Volumes/GoogleDrive/Shared drives/IT Blabla I/xyz-pdq


None
https://drive.google.com/drive/folders/1vF1_x-_4dKRaPme9o0ZGuJO2pCMdTell


In [212]:
g.path

PosixPath('/Volumes/GoogleDrive/Shared drives/IT Blabla I/xyz-pdq')

In [52]:
m.root.is_dir()

True

In [369]:
g.path.mkdir()

FileExistsError: [Errno 17] File exists: '/Volumes/GoogleDrive/Shared drives/IT Blabla I/xyz-pdq/spam/eggs_ham_spam'

In [367]:
def validate_data(csv_list, expected_headers, header_map):
    valid = []
    invalid = []

    for row in csv_list[1:]:
        good_row = True
        for k in expected_headers.keys():
            # test for coercable types
            try:
                test = expected_headers[k](row[header_map[k]])
            except ValueError:
#                 do_exit(f'Bad student.export: {k} contained {row[header_map[k]]}\ncannot continue. Please try running the export again.')
                logging.warning(f'{row}')
                logging.warning(f'Bad student.export: column "{k}" contained "{row[header_map[k]]}"--this should be {(expected_headers[k])}')
                invalid.append(row)
                good_row = False
                break
        if  good_row:
            valid.append(row)
        
    return valid, invalid
            
    

In [368]:
v, i = validate_data(c, e, m)



In [None]:
print(g.get_xattr('user.drive.team_drive_id'))
print(m.get_xattr('user.drive.id'))
# print(m.get_xattr('user.drive.team_drive_id'))

In [293]:
# def create_student_folders(csv_file, drive_path):

In [546]:
def main():
    
#     csv_file = Path('./student.export.csv.text')
    csv_file = Path('./bad.export.csv.text')
    
    drive_path = Path('/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)')
    
#     expected_headers = ['LastFirst', 'ClassOf', 'Student_Number']
    expected_headers ={'LastFirst': str, 'ClassOf': int, 'Student_Number': int}
    
    
    ############
    
    logging.debug('starting up')
    
    # check drive path is on a shared drive 
    google_drive = gd_path(drive_path)
    try:
        google_drive.get_xattr('user.drive.id')
    except ChildProcessError as e:
        do_exit(f'The specified Google Drive "{drive_path}" is not a Shared Drive', 1)
    except FileNotFoundError as e:
        do_exit(f'The specified Google Shared Drive {drive_path}" does not exist', 1)
    
    # check if export is OK
    try:
        csv_list = csv_to_list(csv_file)
    except (FileNotFoundError, OSError, IOError) as e:
        logging.error(f'could not read csv file: {csv_file}')
        logging.error(f'{e}')
        do_exit(e, 1)
    
    # check for headers in CSV file
    header_map, missing_headers = map_headers(csv_list, expected_headers.keys())
    
    if len(missing_headers) > 0:
        do_exit(f'{csv_file.name} is missing one or more headers:\n\t{missing_headers}\nprogram cannot continue', 1)
    
    # validate the csv list
    valid_rows, invalid_rows = validate_data(csv_list, expected_headers, header_map)

    
    directories = {'created': [], 'skipped': [], 'invalid': invalid_rows, 'confirmed': []
                   'failed': []}
    for row in valid_rows:
        name = row[header_map['LastFirst']]
        class_of = row[header_map['ClassOf']]
        id_number = row[header_map['Student_Number']]
        
        s_path = student_path(path=drive_path,
                              name=name, 
                              class_of=class_of, 
                              id_number=id_number)
        
        # check if there already exists a directory with the student number
        if s_path.check_similar():
            # flag those that have multiple entries
            if len(s_path.matches) > 1:
                logging.warning(f'multiple directories exist in {class_of} for {id_number}')
                directories['skipped'].append((s_path, 'multiple'))
            # flag those that already exist for auditing purposes
            else:
                logging.info(f'skipped entry for {class_of}/{name} - {id_number}')
                directories['skipped'].append((s_path, 'exists'))

                
        else:
            # create the directory
            try:
                s_path.mkdir(parents=True)
            except FileExistsError as e:
                logging.error(f'{s_path.student_dir_name} exists')
                directories['skipped'].append((s_path, 'exists'))
            else:
                directories['created'].append(s_path)
    
    # double check that drectories were created and properly synced to google drive
    for each in directories['created']:
        # loop over the created directories N times with a longer delay each time
        # check that everything is confirmed uploaded; if it is not after Nth time, 
        # log as 'failed'
        pass
    
    return directories
    
    # cleanup
    # handle invalid_rows
    # check that all directories were created
    


In [547]:
f = main()

DEBUG:root:starting up
DEBUG:__main__:checking for missing headers
DEBUG:root:completed mapping
INFO:root:skipped entry for 2021/Lillie, Cameron - 505590


In [567]:
f['skipped'][1][0].confirm()

['1j83z1VVKtpKdy4zO3m75laUazZgXhMK6']

In [557]:
# notify of students with multiple directories
for each in f['skipped']:
    if each[1] == 'multiple':
        d = each[0].matches
        for k in d:
            print(f'{k}')
            print(f'\t{d[k]}')
            

/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-2023/Wanja, Michelle - 505586
	https://drive.google.com/drive/folders//12osRSWUqTrudjRf7G1oJGrM1y0wkCCNw
/Volumes/GoogleDrive/Shared drives/IT Blabla I/Student Cumulative Folders (AKA Student Portfolios)/ClassOf-2023/Wanja, Michelle-Amanda - 505586
	https://drive.google.com/drive/folders//1R-SMZa_rpe_B_Sg-a_zLgr7VdLUtpdJl


In [562]:
for each in f['skipped']:
    if each[1] == 'exists':
        print(f'"{each[0].name} - {each[0].id_number}" already existed - no action needed')

"Lillie, Cameron - 505590" already existed - no action needed


In [None]:
int(f[1][10])