In [1]:
import os
import re
import cgi
import requests
import mimetypes
import xlrd
import xlsxwriter
import time
import csv
import json
import os.path
import chardet
import xml.etree.ElementTree as ET


In [2]:
class Dao:
    def get_rows(self, rawinput):
        with open(rawinput, encoding = 'utf-8') as csvfile:
            spamreader = csv.reader(csvfile, delimiter = ',')
            next(spamreader)
            models = list()
            for row in spamreader:
                model = dict()
                model['nid'] = row[0]
                model['title'] = row[1]
                model['subOrg'] = row[2]
                model['dataset_link'] = row[3]
                model['field_description'] = row[4]
                model['resource_link'] = row[5]
                model['resource_Writeformat'] = row[6]
                model['resource_WriteCharset'] = row[7]
                model['linkcode'] = None
                model['true_format'] = None
                model['true_Charset'] = None
                model['columnAlike'] = None
                models.append(model)
        return (models)
    

    def get_machine_readable(self):
        machine_readable = [
            'CSV',
            'JSON',
            'XML',
            'VND.GOOGLE-EARTH.KML+XML',
            'XLSX',
            'KMZ',
            'XLS',
            'ODS',
            'VND.MS-EXCEL',
            'VND.MS-EXCEL.12'
        ]
        return machine_readable
  


In [3]:
class HeaderDownloader:
    
    def __init__(self, url):
        self.__url = url
        
    def get_response(self, url):
        try:
            browser_headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)\
                                Chrome/55.0.2883.75 Safari/537.36'}
            response = requests.get(url, timeout = 3000, headers = browser_headers)
        except Exception as e:
            response = str(e)
        return response 
    
    def get_statuscode(self,response):
        try:
            linkecode = response.status_code
        except:
            linkecode = 'Error'
        return linkecode
    def get_fileformat(self, response):
        try:
            if 'Content-Type' in response.headers:
                typeparams, charset = cgi.parse_header(response.headers.get('Content-Type', ''))
                if typeparams == '':
                    content_type = ''
                else:
                    content_type = (typeparams.split('/')[1])
                if content_type in ('octet-stream','plain',''):
                    try:
                        _, filenameparams = cgi.parse_header(response.headers.get('Content-Disposition',''))
                        filename = filenameparams['filename']
                        extension = os.path.splitext(filename)[1].strip('.')
                        true_format = extension
                    except:
                        m = re.search(r'(\.\w*$)', url)
                        if m:
                            true_format = m.group(1).strip('.')
                        else:
                            true_format = 'Unknow'
                else:
                    true_format = content_type
            else:
                true_format = "NoFileFounded"
        except:
            true_format = "Unknow"
        return true_format

In [4]:
class ContentDownloader:
    def __init__(self, url, column):
        self.__url = url
        self.__column = column

    def get_session(self, url):
        try:
            with requests.Session() as s:
                browser_headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)\
                Chrome/55.0.2883.75 Safari/537.36'}
                download = s.get(url, timeout = 3000, headers = browser_headers)
                content = download.content
        except Exception as e:
            content = str(e)
        return content
    
    def charset_detect(self,content):
        try:
            charset = chardet.detect(content)['encoding']
        except:
            charset = 'None'
        return charset
    
    def field_description_clean(self,column):
        description = column.replace('\n','').replace('\t','').strip()
        description_list = (re.split('、|,|，',description))
        try:
            for n, col in enumerate(description_list):
                if col.find('(') and col.endswith(')'):
                    m = re.match(r'(.*)\(.*\)',col)
                    description_list[n] = m.group(1)
        except:
            pass
        return description_list

In [5]:
class ColumnMatcher:
    def __init__(self,content,charset,column):
        self.content = content
        self.__charset = charset
        self.__column = column
        
    def csv_column_matching(self,content,charset,column):
        try:
            decoded_content = content.decode(charset)
            cr = csv.reader(decoded_content.splitlines(), delimiter = ',')
            content_list = list(cr)
            headingCol = content_list[0] #讀csv第一欄為標題
            clean_column = [x for x in headingCol if x != '']
            if '\ufeff' in clean_column[0]:
                clean_column[0] = clean_column[0].replace('\ufeff', '')
            if set(column) == set(clean_column):
                Col_matching = 'True'
            else:
                Col_matching = 'False'
        except Exception as e:
            Col_matching = str(e)
        return Col_matching
    def xml_column_matching(self,content,charset,column):
        try:
            XML = content.decode(charset)
            root = ET.fromstring(XML)
            tag_list = list()
            for descendant in root.findall(".//*"):
                if descendant.tag not in tag_list:
                    tag_list.append(descendant.tag)
            for n,tag in enumerate(tag_list):
                if tag.startswith('{') and tag.find('}'):
                    m = re.match(r'({.*})(.*)',tag)
                    tag_list[n] = m.group(2)
            if len(list(set(column).intersection(set(tag_list)))) == len(column):
                Col_matching = 'True'
            else:
                Col_matching = 'False'
        except Exception as e:
            Col_matching = str(e)
        return Col_matching 
    def json_column_matching(self,content,charset,column):
        try:
            decoded_content = content.decode(charset)
            json_obj = json.loads(decoded_content)
            json_key = list()
            for i in json_obj:
                for key in i.keys():
                    if key not in json_key:
                        json_key.append(key)
            if set(column) == set(json_key):
                Col_matching = 'True'
            else:
                Col_matching = 'Flase'
        except Exception as e:
                Col_matching = str(e)
        return Col_matching
    def xls_column_matching(self,content,charset,column):
        try:
            wb = xlrd.open_workbook(file_contents = content, encoding_override = 'cp950')
            table = wb.sheets()[0]
            nrows = table.nrows
            xls_heading = table.row_values(0)  #讀xls第一欄為標題
            if set (xls_heading) == set (column):
                Col_matching = 'True'
            else:
                Col_matching = 'False'
        except Exception as e:
            Col_matching = str(e)
        return Col_matching

In [6]:
class XlsWriters():
    def writer(self, filename,data_list):
        workbook = xlsxwriter.Workbook(filename, {'strings_to_urls': False})
        worksheet = workbook.add_worksheet(u'評鑑範例表')
        row = 0
        col = 0
        worksheet.write(row, col, u"資料集Id")
        worksheet.write(row, col + 1, u"資料集名稱")
        worksheet.write(row, col + 2, u"資料所屬機關")
        worksheet.write(row, col + 3, u"資料集連結")
        worksheet.write(row, col + 4, u"主要欄位說明")
        worksheet.write(row, col + 5, u"資料資源下載連結")
        worksheet.write(row, col + 6, u"詮釋資料檔案格式")
        worksheet.write(row, col + 7, u"詮釋資料文字編碼")
        worksheet.write(row, col + 8, u"連線狀態")
        worksheet.write(row, col + 9, u"資料資源檔案格式")
        worksheet.write(row, col + 10, u"資料資源文字編碼")
        worksheet.write(row, col + 11, u"欄位是否相符")

        row += 1
        for i in data_list:
            nid = i['nid']
            title = i['title']
            subOrg = i['subOrg']
            dataset_link = i['dataset_link']
            field_description = i['field_description']
            resource_link = i['resource_link']
            resource_Writeformat = i['resource_Writeformat']
            resource_WriteCharset = i['resource_WriteCharset']
            linkStatus = i['linkcode']
            TrueFormat = i['true_format']
            TrueCharset = i['true_Charset']
            columnAlike = i['columnAlike']

            worksheet.write(row, col, str(nid))
            worksheet.write(row, col + 1, title)
            worksheet.write(row, col + 2, subOrg)
            worksheet.write(row, col + 3, dataset_link)
            worksheet.write(row, col + 4, field_description)
            worksheet.write(row, col + 5, resource_link)
            worksheet.write(row, col + 6, resource_Writeformat)
            worksheet.write(row, col + 7, resource_WriteCharset)
            worksheet.write(row, col + 8, str(linkStatus))
            worksheet.write(row, col + 9, TrueFormat)
            worksheet.write(row, col + 10, str(TrueCharset))
            worksheet.write(row, col + 11, columnAlike)
            row += 1
        time.sleep(3)
        workbook.close()

In [7]:
Finance = Dao()
#產生input data實例
Finance_data = Finance.get_rows('Minstry of Finance.csv')
#getRows() 取得列 rawdata 列表資料
machine_readable = Finance.get_machine_readable()
#get_machine_readable() 取得定義之機器可讀類型

#iter raw data list
for data in Finance_data[100:200]:
    url = data['resource_link']
    column = data['field_description']
    write_Charset = data['resource_WriteCharset']
    head_downloader = HeaderDownloader(url)
    http_res = head_downloader.get_response(url)
    data['linkcode'] = head_downloader.get_statuscode(http_res)
    data['true_format'] = head_downloader.get_fileformat(http_res)
    if data['true_format'] in ('', 'plain','Unknow'):
        data['true_format'] = data['resource_Writeformat']
    
    if data['true_format'].upper() in machine_readable:
        content = ContentDownloader(url,column)
        content_session = content.get_session(url)
        charset = content.charset_detect(content_session)
        data['true_Charset'] = charset
        description_list = content.field_description_clean(column)
        match_column = ColumnMatcher(content_session,charset,description_list)
        
        if data['true_format'].upper() == 'CSV':
            data['columnAlike'] = match_column.csv_column_matching(content_session,charset,description_list)
        if data['true_format'].upper() == 'XML':
            data['columnAlike'] = match_column.xml_column_matching(content_session,charset,description_list)
        elif data['true_format'].upper() == 'JSON':
            data['columnAlike'] = match_column.json_column_matching(content_session,charset,description_list)
        elif data['true_format'].upper() in ('XLS', 'XLSX', 'VND.MS-EXCEL', 'VND.MS-EXCEL.12'):
            data['columnAlike'] = match_column.xls_column_matching(content_session,charset,description_list)
            
xls_writer = XlsWriters()
xls_writer.writer("sample_test.xls",Finance_data)